Skip to main content

uni_query/query/df_graph/
unwind.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! UNWIND execution plan for DataFusion.
5//!
6//! This module provides [`GraphUnwindExec`], a DataFusion [`ExecutionPlan`] that
7//! expands list values into multiple rows (similar to SQL `UNNEST`).
8//!
9//! # Supported Expressions
10//!
11//! Currently supports:
12//! - Literal lists: `UNWIND [1, 2, 3] AS x`
13//! - Variable references: `UNWIND list AS item` (where `list` is a column)
14//! - Property access: `UNWIND n.items AS item`
15//!
16//! # Example
17//!
18//! ```text
19//! Input:   [{"list": [1, 2, 3]}]
20//! UNWIND:  list AS item
21//! Output:  [{"list": [1,2,3], "item": 1},
22//!           {"list": [1,2,3], "item": 2},
23//!           {"list": [1,2,3], "item": 3}]
24//! ```
25
26use crate::query::df_graph::common::{arrow_err, compute_plan_properties};
27use arrow::compute::take;
28use arrow_array::builder::{
29    BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder,
30};
31use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
32use arrow_schema::{DataType, Field, Schema, SchemaRef};
33use datafusion::common::Result as DFResult;
34use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
35use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
36use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
37use futures::{Stream, StreamExt};
38use std::any::Any;
39use std::collections::HashMap;
40use std::fmt;
41use std::pin::Pin;
42use std::sync::Arc;
43use std::task::{Context, Poll};
44use uni_common::Value;
45use uni_cypher::ast::{CypherLiteral, Expr};
46
47/// Result of UNWIND element type inference.
48struct ElementTypeInfo {
49    /// Arrow data type for the unwind variable column.
50    data_type: DataType,
51    /// Whether values need JSON encoding metadata.
52    is_cv_encoded: bool,
53}
54
55/// UNWIND execution plan that expands list values into multiple rows.
56///
57/// Takes an input plan and an expression that evaluates to a list. For each
58/// row in the input, if the expression evaluates to a list, produces multiple
59/// output rows (one per list element) with the list element bound to a new
60/// variable.
61pub struct GraphUnwindExec {
62    /// Input execution plan.
63    input: Arc<dyn ExecutionPlan>,
64
65    /// Expression to evaluate (should produce a list).
66    expr: Expr,
67
68    /// Variable name to bind list elements to.
69    variable: String,
70
71    /// Query parameters for expression evaluation.
72    params: HashMap<String, Value>,
73
74    /// Output schema.
75    schema: SchemaRef,
76
77    /// Cached plan properties.
78    properties: Arc<PlanProperties>,
79
80    /// Execution metrics.
81    metrics: ExecutionPlanMetricsSet,
82}
83
84impl fmt::Debug for GraphUnwindExec {
85    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86        f.debug_struct("GraphUnwindExec")
87            .field("expr", &self.expr)
88            .field("variable", &self.variable)
89            .finish()
90    }
91}
92
93impl GraphUnwindExec {
94    /// Create a new UNWIND execution plan.
95    ///
96    /// # Arguments
97    ///
98    /// * `input` - Input plan providing rows to expand
99    /// * `expr` - Expression that evaluates to a list
100    /// * `variable` - Variable name for list elements
101    /// * `params` - Query parameters for expression evaluation
102    pub fn new(
103        input: Arc<dyn ExecutionPlan>,
104        expr: Expr,
105        variable: impl Into<String>,
106        params: HashMap<String, Value>,
107    ) -> Self {
108        let variable = variable.into();
109
110        // Build output schema: input schema + new variable column
111        let schema = Self::build_schema(input.schema(), &variable, &expr);
112        let properties = compute_plan_properties(schema.clone());
113
114        Self {
115            input,
116            expr,
117            variable,
118            params,
119            schema,
120            properties,
121            metrics: ExecutionPlanMetricsSet::new(),
122        }
123    }
124
125    /// Infer the native Arrow `DataType` for the elements of an UNWIND expression.
126    ///
127    /// For literal lists with homogeneous element types (ignoring nulls), returns
128    /// the native type. For heterogeneous or non-inferrable expressions, falls back
129    /// to JSON-encoded Utf8.
130    fn infer_element_type(expr: &Expr) -> ElementTypeInfo {
131        let json_fallback = || ElementTypeInfo {
132            data_type: DataType::LargeBinary,
133            is_cv_encoded: true,
134        };
135
136        let Expr::List(items) = expr else {
137            return json_fallback();
138        };
139
140        // Infer type from first non-null literal
141        let first_type = items.iter().find_map(|item| match item {
142            Expr::Literal(CypherLiteral::Null) => None,
143            Expr::Literal(CypherLiteral::Bool(_)) => Some(DataType::Boolean),
144            Expr::Literal(CypherLiteral::Integer(_)) => Some(DataType::Int64),
145            Expr::Literal(CypherLiteral::Float(_)) => Some(DataType::Float64),
146            Expr::Literal(CypherLiteral::String(_)) => Some(DataType::Utf8),
147            _ => Some(DataType::Utf8), // Sentinel for non-literal: forces fallback
148        });
149
150        let Some(expected) = first_type else {
151            return json_fallback(); // All nulls or empty
152        };
153
154        // Verify all remaining non-null items match the expected type
155        let all_match = items.iter().all(|item| match item {
156            Expr::Literal(CypherLiteral::Null) => true,
157            Expr::Literal(CypherLiteral::Bool(_)) => expected == DataType::Boolean,
158            Expr::Literal(CypherLiteral::Integer(_)) => expected == DataType::Int64,
159            Expr::Literal(CypherLiteral::Float(_)) => expected == DataType::Float64,
160            Expr::Literal(CypherLiteral::String(_)) => expected == DataType::Utf8,
161            _ => false, // Non-literal
162        });
163
164        if all_match {
165            ElementTypeInfo {
166                data_type: expected,
167                is_cv_encoded: false,
168            }
169        } else {
170            json_fallback()
171        }
172    }
173
174    /// Build output schema by adding the unwind variable column.
175    ///
176    /// Uses type inference on the UNWIND expression to emit natively-typed
177    /// columns when possible. Falls back to JSON-encoded `Utf8` for
178    /// heterogeneous or non-inferrable expressions.
179    fn build_schema(input_schema: SchemaRef, variable: &str, expr: &Expr) -> SchemaRef {
180        let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
181
182        let type_info = Self::infer_element_type(expr);
183
184        let mut field = Field::new(variable, type_info.data_type, true);
185        if type_info.is_cv_encoded {
186            field = field.with_metadata(HashMap::from([("cv_encoded".into(), "true".into())]));
187        }
188        fields.push(Arc::new(field));
189
190        Arc::new(Schema::new(fields))
191    }
192}
193
194impl DisplayAs for GraphUnwindExec {
195    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196        write!(
197            f,
198            "GraphUnwindExec: {} AS {}",
199            self.expr.to_string_repr(),
200            self.variable
201        )
202    }
203}
204
205impl ExecutionPlan for GraphUnwindExec {
206    fn name(&self) -> &str {
207        "GraphUnwindExec"
208    }
209
210    fn as_any(&self) -> &dyn Any {
211        self
212    }
213
214    fn schema(&self) -> SchemaRef {
215        Arc::clone(&self.schema)
216    }
217
218    fn properties(&self) -> &Arc<PlanProperties> {
219        &self.properties
220    }
221
222    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
223        vec![&self.input]
224    }
225
226    fn with_new_children(
227        self: Arc<Self>,
228        children: Vec<Arc<dyn ExecutionPlan>>,
229    ) -> DFResult<Arc<dyn ExecutionPlan>> {
230        if children.len() != 1 {
231            return Err(datafusion::error::DataFusionError::Plan(
232                "GraphUnwindExec requires exactly one child".to_string(),
233            ));
234        }
235
236        Ok(Arc::new(Self::new(
237            Arc::clone(&children[0]),
238            self.expr.clone(),
239            self.variable.clone(),
240            self.params.clone(),
241        )))
242    }
243
244    fn execute(
245        &self,
246        partition: usize,
247        context: Arc<TaskContext>,
248    ) -> DFResult<SendableRecordBatchStream> {
249        let input_stream = self.input.execute(partition, context)?;
250        let metrics = BaselineMetrics::new(&self.metrics, partition);
251
252        Ok(Box::pin(GraphUnwindStream {
253            input: input_stream,
254            expr: self.expr.clone(),
255            params: self.params.clone(),
256            schema: Arc::clone(&self.schema),
257            metrics,
258        }))
259    }
260
261    fn metrics(&self) -> Option<MetricsSet> {
262        Some(self.metrics.clone_inner())
263    }
264}
265
266/// Stream that performs the UNWIND operation.
267struct GraphUnwindStream {
268    /// Input stream.
269    input: SendableRecordBatchStream,
270
271    /// Expression to evaluate.
272    expr: Expr,
273
274    /// Query parameters.
275    params: HashMap<String, Value>,
276
277    /// Output schema.
278    schema: SchemaRef,
279
280    /// Metrics.
281    metrics: BaselineMetrics,
282}
283
284impl GraphUnwindStream {
285    /// Process a single input batch.
286    fn process_batch(&self, batch: RecordBatch) -> DFResult<RecordBatch> {
287        // For each row, evaluate the expression and expand if it's a list
288        let mut expansions: Vec<(usize, Value)> = Vec::new(); // (input_row_idx, list_element)
289
290        for row_idx in 0..batch.num_rows() {
291            // Evaluate expression for this row
292            let list_value = self.evaluate_expr_for_row(&batch, row_idx)?;
293
294            match list_value {
295                Value::List(items) => {
296                    for item in items {
297                        expansions.push((row_idx, item));
298                    }
299                }
300                Value::Null => {
301                    // UNWIND on null produces no rows (Cypher semantics)
302                }
303                other => {
304                    // Non-list values: treat as single-element list
305                    expansions.push((row_idx, other));
306                }
307            }
308        }
309
310        self.build_output_batch(&batch, &expansions)
311    }
312
313    /// Evaluate the expression for a specific row.
314    fn evaluate_expr_for_row(&self, batch: &RecordBatch, row_idx: usize) -> DFResult<Value> {
315        self.evaluate_expr_impl(&self.expr, batch, row_idx)
316    }
317
318    /// Evaluate an expression recursively.
319    fn evaluate_expr_impl(
320        &self,
321        expr: &Expr,
322        batch: &RecordBatch,
323        row_idx: usize,
324    ) -> DFResult<Value> {
325        match expr {
326            // Literal list: [1, 2, 3]
327            Expr::List(items) => {
328                let mut values = Vec::with_capacity(items.len());
329                for item in items {
330                    values.push(self.evaluate_expr_impl(item, batch, row_idx)?);
331                }
332                Ok(Value::List(values))
333            }
334
335            // Literal value
336            Expr::Literal(lit) => Ok(lit.to_value()),
337
338            // Parameter reference: $param
339            Expr::Parameter(name) => self.params.get(name).cloned().ok_or_else(|| {
340                datafusion::error::DataFusionError::Execution(format!(
341                    "Parameter '{}' not found",
342                    name
343                ))
344            }),
345
346            // Variable reference: look up column
347            Expr::Variable(var_name) => self.get_column_value(batch, var_name, row_idx),
348
349            // Property access: n.prop
350            Expr::Property(base_expr, prop_name) => {
351                // Try looking up as column name first: var.prop
352                if let Expr::Variable(var_name) = base_expr.as_ref() {
353                    let col_name = format!("{}.{}", var_name, prop_name);
354                    if batch.schema().column_with_name(&col_name).is_some() {
355                        return self.get_column_value(batch, &col_name, row_idx);
356                    }
357                }
358
359                // Fall back to evaluating base as a map
360                let base_value = self.evaluate_expr_impl(base_expr, batch, row_idx)?;
361                if let Value::Map(map) = base_value {
362                    Ok(map.get(prop_name).cloned().unwrap_or(Value::Null))
363                } else {
364                    Ok(Value::Null)
365                }
366            }
367
368            // Function call: range(1, 10)
369            Expr::FunctionCall { name, args, .. } => {
370                let name_lower = name.to_lowercase();
371                match name_lower.as_str() {
372                    "range" => {
373                        if args.len() >= 2 {
374                            let start = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
375                            let end = self.evaluate_expr_impl(&args[1], batch, row_idx)?;
376                            let step = if args.len() >= 3 {
377                                self.evaluate_expr_impl(&args[2], batch, row_idx)?
378                            } else {
379                                Value::Int(1)
380                            };
381
382                            // Cypher null-propagation: any null bound yields null.
383                            if start.is_null() || end.is_null() || step.is_null() {
384                                return Ok(Value::Null);
385                            }
386
387                            // range() requires integer arguments. `as_i64` returns
388                            // None for floats, so a non-integer numeric (or any
389                            // non-numeric) is a type error — NOT a silent empty list
390                            // (openCypher requires an error). (review H5)
391                            let (s, e, st) = match (start.as_i64(), end.as_i64(), step.as_i64()) {
392                                (Some(s), Some(e), Some(st)) => (s, e, st),
393                                _ => {
394                                    return Err(datafusion::error::DataFusionError::Execution(
395                                        format!(
396                                            "range() requires integer arguments, got start={start:?}, end={end:?}, step={step:?}"
397                                        ),
398                                    ));
399                                }
400                            };
401                            if st == 0 {
402                                return Err(datafusion::error::DataFusionError::Execution(
403                                    "range() step argument cannot be 0".to_string(),
404                                ));
405                            }
406                            let mut result = Vec::new();
407                            let mut i = s;
408                            while (st > 0 && i <= e) || (st < 0 && i >= e) {
409                                result.push(Value::Int(i));
410                                // Checked step: stop at the i64 boundary instead of
411                                // panicking (debug) or wrapping into an infinite loop
412                                // (release). (review H5)
413                                match i.checked_add(st) {
414                                    Some(next) => i = next,
415                                    None => break,
416                                }
417                            }
418                            return Ok(Value::List(result));
419                        }
420                        Ok(Value::List(vec![]))
421                    }
422                    "keys" => {
423                        if args.len() == 1 {
424                            let val = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
425                            if let Value::Map(map) = val {
426                                // Use _all_props sub-map for schemaless entities
427                                // when present; otherwise use the top-level map.
428                                let source = match map.get("_all_props") {
429                                    Some(Value::Map(all)) => all,
430                                    _ => &map,
431                                };
432                                let mut key_strings: Vec<String> = source
433                                    .iter()
434                                    .filter(|(k, v)| !v.is_null() && !k.starts_with('_'))
435                                    .map(|(k, _)| k.clone())
436                                    .collect();
437                                key_strings.sort();
438                                let keys: Vec<Value> =
439                                    key_strings.into_iter().map(Value::String).collect();
440                                return Ok(Value::List(keys));
441                            }
442                            if val.is_null() {
443                                return Ok(Value::Null);
444                            }
445                        }
446                        Ok(Value::List(vec![]))
447                    }
448                    "size" | "length" => {
449                        if args.len() == 1 {
450                            let val = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
451                            let sz = match &val {
452                                Value::List(arr) => arr.len() as i64,
453                                // Unicode character count, not byte length:
454                                // openCypher size('héllo') == 5, not 6. (review H6)
455                                Value::String(s) => s.chars().count() as i64,
456                                Value::Map(m) => m.len() as i64,
457                                _ => 0,
458                            };
459                            return Ok(Value::Int(sz));
460                        }
461                        Ok(Value::Null)
462                    }
463                    // Temporal constructors: date(), time(), localtime(), datetime(), localdatetime(), duration()
464                    "date" | "time" | "localtime" | "datetime" | "localdatetime" | "duration" => {
465                        let mut eval_args = Vec::with_capacity(args.len());
466                        for arg in args {
467                            eval_args.push(self.evaluate_expr_impl(arg, batch, row_idx)?);
468                        }
469                        crate::query::datetime::eval_datetime_function(
470                            &name.to_uppercase(),
471                            &eval_args,
472                        )
473                        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
474                    }
475                    "split" => {
476                        let mut eval_args = Vec::with_capacity(args.len());
477                        for arg in args {
478                            eval_args.push(self.evaluate_expr_impl(arg, batch, row_idx)?);
479                        }
480                        crate::query::expr_eval::eval_split(&eval_args).map_err(|e| {
481                            datafusion::error::DataFusionError::Execution(e.to_string())
482                        })
483                    }
484                    _ => {
485                        // Unsupported function - return empty list
486                        Ok(Value::List(vec![]))
487                    }
488                }
489            }
490
491            // Binary operations: e.g. size(types) - 1
492            Expr::BinaryOp { left, op, right } => {
493                let l = self.evaluate_expr_impl(left, batch, row_idx)?;
494                let r = self.evaluate_expr_impl(right, batch, row_idx)?;
495                crate::query::expr_eval::eval_binary_op(&l, op, &r)
496                    .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
497            }
498
499            // Map literal: {a: 1, b: 'x'}
500            Expr::Map(entries) => {
501                let mut map = HashMap::new();
502                for (key, val_expr) in entries {
503                    let val = self.evaluate_expr_impl(val_expr, batch, row_idx)?;
504                    map.insert(key.clone(), val);
505                }
506                Ok(Value::Map(map))
507            }
508
509            // Array index: qrows[p]
510            Expr::ArrayIndex { array, index } => {
511                let arr_val = self.evaluate_expr_impl(array, batch, row_idx)?;
512                let idx_val = self.evaluate_expr_impl(index, batch, row_idx)?;
513                match (&arr_val, idx_val.as_i64()) {
514                    (Value::List(list), Some(i)) => {
515                        // Cypher uses 0-based indexing; negative indices count from end
516                        let len = list.len() as i64;
517                        let resolved = if i < 0 { len + i } else { i };
518                        if resolved >= 0 && (resolved as usize) < list.len() {
519                            Ok(list[resolved as usize].clone())
520                        } else {
521                            Ok(Value::Null)
522                        }
523                    }
524                    _ => Ok(Value::Null),
525                }
526            }
527
528            // Unsupported expressions return null
529            _ => Ok(Value::Null),
530        }
531    }
532
533    /// Get a column value as JSON for a specific row.
534    fn get_column_value(
535        &self,
536        batch: &RecordBatch,
537        col_name: &str,
538        row_idx: usize,
539    ) -> DFResult<Value> {
540        let col = batch.column_by_name(col_name).ok_or_else(|| {
541            datafusion::error::DataFusionError::Execution(format!(
542                "Column '{}' not found for UNWIND",
543                col_name
544            ))
545        })?;
546
547        Ok(arrow_to_json_value(col.as_ref(), row_idx))
548    }
549
550    /// Build output batch from expansions.
551    fn build_output_batch(
552        &self,
553        input: &RecordBatch,
554        expansions: &[(usize, Value)],
555    ) -> DFResult<RecordBatch> {
556        if expansions.is_empty() {
557            return Ok(RecordBatch::new_empty(Arc::clone(&self.schema)));
558        }
559
560        let num_rows = expansions.len();
561
562        // Build index array for take operation
563        let indices: Vec<u64> = expansions.iter().map(|(idx, _)| *idx as u64).collect();
564        let indices_array = UInt64Array::from(indices);
565
566        // Expand input columns
567        let mut columns: Vec<ArrayRef> = Vec::new();
568        for col in input.columns() {
569            let expanded = take(col.as_ref(), &indices_array, None)?;
570            columns.push(expanded);
571        }
572
573        // Add the unwind variable column using the appropriate typed builder
574        let unwind_field = self.schema.field(self.schema.fields().len() - 1);
575        let is_cv_encoded = unwind_field
576            .metadata()
577            .get("cv_encoded")
578            .is_some_and(|v| v == "true");
579
580        let unwind_col: ArrayRef = match (unwind_field.data_type(), is_cv_encoded) {
581            (DataType::Boolean, false) => {
582                let mut builder = BooleanBuilder::with_capacity(num_rows);
583                for (_, value) in expansions {
584                    if let Value::Bool(b) = value {
585                        builder.append_value(*b);
586                    } else {
587                        builder.append_null();
588                    }
589                }
590                Arc::new(builder.finish())
591            }
592            (DataType::Int64, false) => {
593                let mut builder = Int64Builder::with_capacity(num_rows);
594                for (_, value) in expansions {
595                    if let Value::Int(i) = value {
596                        builder.append_value(*i);
597                    } else {
598                        builder.append_null();
599                    }
600                }
601                Arc::new(builder.finish())
602            }
603            (DataType::Float64, false) => {
604                let mut builder = Float64Builder::with_capacity(num_rows);
605                for (_, value) in expansions {
606                    if let Value::Float(f) = value {
607                        builder.append_value(*f);
608                    } else {
609                        builder.append_null();
610                    }
611                }
612                Arc::new(builder.finish())
613            }
614            (DataType::Utf8, false) => {
615                let mut builder = StringBuilder::new();
616                for (_, value) in expansions {
617                    if let Value::String(s) = value {
618                        builder.append_value(s);
619                    } else {
620                        builder.append_null();
621                    }
622                }
623                Arc::new(builder.finish())
624            }
625            (DataType::LargeBinary, _) => {
626                // CypherValue-encoded: preserves exact types through UNWIND
627                let mut builder = LargeBinaryBuilder::with_capacity(num_rows, num_rows * 16);
628                for (_, value) in expansions {
629                    if value.is_null() {
630                        builder.append_null();
631                    } else {
632                        let encoded = uni_common::cypher_value_codec::encode(value);
633                        builder.append_value(&encoded);
634                    }
635                }
636                Arc::new(builder.finish())
637            }
638            _ => {
639                // Fallback: JSON-encoded Utf8 (heterogeneous or non-inferrable types)
640                let mut builder = StringBuilder::new();
641                for (_, value) in expansions {
642                    if value.is_null() {
643                        builder.append_null();
644                    } else {
645                        let json_val: serde_json::Value = value.clone().into();
646                        let json_str =
647                            serde_json::to_string(&json_val).unwrap_or_else(|_| "null".to_string());
648                        builder.append_value(&json_str);
649                    }
650                }
651                Arc::new(builder.finish())
652            }
653        };
654        columns.push(unwind_col);
655
656        self.metrics.record_output(num_rows);
657
658        RecordBatch::try_new(Arc::clone(&self.schema), columns).map_err(arrow_err)
659    }
660}
661
662impl Stream for GraphUnwindStream {
663    type Item = DFResult<RecordBatch>;
664
665    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
666        let metrics = self.metrics.clone();
667        let _timer = metrics.elapsed_compute().timer();
668        match self.input.poll_next_unpin(cx) {
669            Poll::Ready(Some(Ok(batch))) => {
670                let result = self.process_batch(batch);
671                Poll::Ready(Some(result))
672            }
673            other => other,
674        }
675    }
676}
677
678impl RecordBatchStream for GraphUnwindStream {
679    fn schema(&self) -> SchemaRef {
680        Arc::clone(&self.schema)
681    }
682}
683
684/// Convert an Arrow array value at a specific row to `uni_common::Value`.
685pub(crate) fn arrow_to_json_value(array: &dyn Array, row: usize) -> Value {
686    use arrow_array::{
687        BooleanArray, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array,
688        LargeStringArray, ListArray, StringArray, UInt8Array, UInt16Array, UInt32Array,
689        UInt64Array,
690    };
691
692    if array.is_null(row) {
693        return Value::Null;
694    }
695
696    let any = array.as_any();
697
698    // String types
699    if let Some(arr) = any.downcast_ref::<StringArray>() {
700        return Value::String(arr.value(row).to_string());
701    }
702    if let Some(arr) = any.downcast_ref::<LargeStringArray>() {
703        return Value::String(arr.value(row).to_string());
704    }
705
706    // Integer types - use a macro to reduce repetition
707    macro_rules! try_int {
708        ($arr_type:ty) => {
709            if let Some(arr) = any.downcast_ref::<$arr_type>() {
710                return Value::Int(arr.value(row) as i64);
711            }
712        };
713    }
714    try_int!(Int64Array);
715    try_int!(Int32Array);
716    try_int!(Int16Array);
717    try_int!(Int8Array);
718    try_int!(UInt64Array);
719    try_int!(UInt32Array);
720    try_int!(UInt16Array);
721    try_int!(UInt8Array);
722
723    // Float types
724    if let Some(arr) = any.downcast_ref::<Float64Array>() {
725        return Value::Float(arr.value(row));
726    }
727    if let Some(arr) = any.downcast_ref::<Float32Array>() {
728        return Value::Float(arr.value(row) as f64);
729    }
730
731    // Boolean
732    if let Some(arr) = any.downcast_ref::<BooleanArray>() {
733        return Value::Bool(arr.value(row));
734    }
735
736    // List (recursive)
737    if let Some(arr) = any.downcast_ref::<ListArray>() {
738        let values = arr.value(row);
739        let result: Vec<Value> = (0..values.len())
740            .map(|i| arrow_to_json_value(values.as_ref(), i))
741            .collect();
742        return Value::List(result);
743    }
744
745    // LargeBinary (CypherValue) — decode to Value
746    if let Some(arr) = any.downcast_ref::<arrow_array::LargeBinaryArray>() {
747        let bytes = arr.value(row);
748        if let Ok(uni_val) = uni_common::cypher_value_codec::decode(bytes) {
749            return uni_val;
750        }
751        // Fallback: try plain JSON text
752        if let Ok(parsed) = serde_json::from_slice::<serde_json::Value>(bytes) {
753            return Value::from(parsed);
754        }
755        return Value::Null;
756    }
757
758    // Struct — convert fields to a Map so keys()/properties() UDFs work
759    if let Some(s) = any.downcast_ref::<arrow_array::StructArray>() {
760        let mut map = HashMap::new();
761        for (field, child) in s.fields().iter().zip(s.columns()) {
762            map.insert(
763                field.name().clone(),
764                arrow_to_json_value(child.as_ref(), row),
765            );
766        }
767        return Value::Map(map);
768    }
769
770    // Fallback
771    Value::Null
772}
773
774#[cfg(test)]
775mod tests {
776    use super::*;
777    use arrow_array::{LargeBinaryArray, UInt64Array};
778    use uni_cypher::ast::CypherLiteral;
779
780    #[test]
781    fn test_build_schema() {
782        let input_schema = Arc::new(Schema::new(vec![
783            Field::new("n._vid", DataType::UInt64, false),
784            Field::new("n.name", DataType::Utf8, true),
785        ]));
786
787        // Variable reference -> falls back to JSON-encoded Utf8
788        let expr = Expr::Variable("some_list".to_string());
789        let output_schema = GraphUnwindExec::build_schema(input_schema, "item", &expr);
790
791        assert_eq!(output_schema.fields().len(), 3);
792        assert_eq!(output_schema.field(0).name(), "n._vid");
793        assert_eq!(output_schema.field(1).name(), "n.name");
794        assert_eq!(output_schema.field(2).name(), "item");
795        assert_eq!(output_schema.field(2).data_type(), &DataType::LargeBinary);
796        assert_eq!(
797            output_schema
798                .field(2)
799                .metadata()
800                .get("cv_encoded")
801                .map(String::as_str),
802            Some("true")
803        );
804    }
805
806    #[test]
807    fn test_build_schema_boolean_list() {
808        let input_schema = Arc::new(Schema::new(vec![Field::new(
809            "n._vid",
810            DataType::UInt64,
811            false,
812        )]));
813
814        let expr = Expr::List(vec![
815            Expr::Literal(CypherLiteral::Bool(true)),
816            Expr::Literal(CypherLiteral::Bool(false)),
817            Expr::Literal(CypherLiteral::Null),
818        ]);
819        let output_schema = GraphUnwindExec::build_schema(input_schema, "a", &expr);
820
821        let field = output_schema.field(1);
822        assert_eq!(field.name(), "a");
823        assert_eq!(field.data_type(), &DataType::Boolean);
824        assert!(field.metadata().is_empty());
825    }
826
827    #[test]
828    fn test_build_schema_integer_list() {
829        let input_schema = Arc::new(Schema::new(vec![Field::new(
830            "n._vid",
831            DataType::UInt64,
832            false,
833        )]));
834
835        let expr = Expr::List(vec![
836            Expr::Literal(CypherLiteral::Integer(1)),
837            Expr::Literal(CypherLiteral::Integer(2)),
838            Expr::Literal(CypherLiteral::Integer(3)),
839        ]);
840        let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
841
842        let field = output_schema.field(1);
843        assert_eq!(field.name(), "x");
844        assert_eq!(field.data_type(), &DataType::Int64);
845        assert!(field.metadata().is_empty());
846    }
847
848    #[test]
849    fn test_build_schema_float_list() {
850        let input_schema = Arc::new(Schema::new(vec![Field::new(
851            "n._vid",
852            DataType::UInt64,
853            false,
854        )]));
855
856        let expr = Expr::List(vec![
857            Expr::Literal(CypherLiteral::Float(1.5)),
858            Expr::Literal(CypherLiteral::Float(2.5)),
859        ]);
860        let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
861
862        let field = output_schema.field(1);
863        assert_eq!(field.name(), "x");
864        assert_eq!(field.data_type(), &DataType::Float64);
865        assert!(field.metadata().is_empty());
866    }
867
868    #[test]
869    fn test_build_schema_string_list() {
870        let input_schema = Arc::new(Schema::new(vec![Field::new(
871            "n._vid",
872            DataType::UInt64,
873            false,
874        )]));
875
876        let expr = Expr::List(vec![
877            Expr::Literal(CypherLiteral::String("hello".to_string())),
878            Expr::Literal(CypherLiteral::String("world".to_string())),
879        ]);
880        let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
881
882        let field = output_schema.field(1);
883        assert_eq!(field.name(), "x");
884        assert_eq!(field.data_type(), &DataType::Utf8);
885        // Plain string, no cv_encoded metadata
886        assert!(field.metadata().is_empty());
887    }
888
889    #[test]
890    fn test_build_schema_mixed_list() {
891        let input_schema = Arc::new(Schema::new(vec![Field::new(
892            "n._vid",
893            DataType::UInt64,
894            false,
895        )]));
896
897        let expr = Expr::List(vec![
898            Expr::Literal(CypherLiteral::Integer(1)),
899            Expr::Literal(CypherLiteral::String("hello".to_string())),
900        ]);
901        let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
902
903        let field = output_schema.field(1);
904        assert_eq!(field.name(), "x");
905        assert_eq!(field.data_type(), &DataType::LargeBinary);
906        assert_eq!(
907            field.metadata().get("cv_encoded").map(String::as_str),
908            Some("true")
909        );
910    }
911
912    #[test]
913    fn test_evaluate_literal_list() {
914        use arrow_array::builder::UInt64Builder;
915        use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
916
917        // Create a simple batch
918        let mut vid_builder = UInt64Builder::new();
919        vid_builder.append_value(1);
920
921        let batch = RecordBatch::try_new(
922            Arc::new(Schema::new(vec![Field::new(
923                "n._vid",
924                DataType::UInt64,
925                false,
926            )])),
927            vec![Arc::new(vid_builder.finish())],
928        )
929        .unwrap();
930
931        // Create a schema for the empty input stream
932        let input_schema = Arc::new(Schema::new(vec![Field::new(
933            "n._vid",
934            DataType::UInt64,
935            false,
936        )]));
937
938        // Create empty input stream using RecordBatchStreamAdapter
939        let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
940
941        // Create stream with literal list expression
942        let stream = GraphUnwindStream {
943            input: Box::pin(empty_stream),
944            expr: Expr::List(vec![
945                Expr::Literal(CypherLiteral::Integer(1)),
946                Expr::Literal(CypherLiteral::Integer(2)),
947                Expr::Literal(CypherLiteral::Integer(3)),
948            ]),
949            params: HashMap::new(),
950            schema: Arc::new(Schema::new(vec![
951                Field::new("n._vid", DataType::UInt64, false),
952                Field::new("x", DataType::Utf8, true),
953            ])),
954            metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
955        };
956
957        let result = stream.evaluate_expr_for_row(&batch, 0).unwrap();
958        match result {
959            Value::List(items) => {
960                assert_eq!(items.len(), 3);
961                assert_eq!(items[0], Value::Int(1));
962                assert_eq!(items[1], Value::Int(2));
963                assert_eq!(items[2], Value::Int(3));
964            }
965            _ => panic!("Expected list"),
966        }
967    }
968
969    #[test]
970    fn test_evaluate_map_literal() {
971        use arrow_array::builder::UInt64Builder;
972        use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
973
974        let mut vid_builder = UInt64Builder::new();
975        vid_builder.append_value(1);
976
977        let batch = RecordBatch::try_new(
978            Arc::new(Schema::new(vec![Field::new(
979                "n._vid",
980                DataType::UInt64,
981                false,
982            )])),
983            vec![Arc::new(vid_builder.finish())],
984        )
985        .unwrap();
986
987        let input_schema = Arc::new(Schema::new(vec![Field::new(
988            "n._vid",
989            DataType::UInt64,
990            false,
991        )]));
992
993        let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
994
995        let stream = GraphUnwindStream {
996            input: Box::pin(empty_stream),
997            expr: Expr::Map(vec![
998                ("a".to_string(), Expr::Literal(CypherLiteral::Integer(1))),
999                (
1000                    "b".to_string(),
1001                    Expr::Literal(CypherLiteral::String("hello".to_string())),
1002                ),
1003            ]),
1004            params: HashMap::new(),
1005            schema: Arc::new(Schema::new(vec![
1006                Field::new("n._vid", DataType::UInt64, false),
1007                Field::new("x", DataType::LargeBinary, true),
1008            ])),
1009            metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
1010        };
1011
1012        let result = stream.evaluate_expr_for_row(&batch, 0).unwrap();
1013        match result {
1014            Value::Map(map) => {
1015                assert_eq!(map.get("a"), Some(&Value::Int(1)));
1016                assert_eq!(map.get("b"), Some(&Value::String("hello".to_string())));
1017            }
1018            _ => panic!("Expected Map, got {:?}", result),
1019        }
1020    }
1021
1022    #[test]
1023    fn test_evaluate_map_property_access() {
1024        use arrow_array::builder::UInt64Builder;
1025        use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1026
1027        let mut vid_builder = UInt64Builder::new();
1028        vid_builder.append_value(1);
1029
1030        let batch = RecordBatch::try_new(
1031            Arc::new(Schema::new(vec![Field::new(
1032                "n._vid",
1033                DataType::UInt64,
1034                false,
1035            )])),
1036            vec![Arc::new(vid_builder.finish())],
1037        )
1038        .unwrap();
1039
1040        let input_schema = Arc::new(Schema::new(vec![Field::new(
1041            "n._vid",
1042            DataType::UInt64,
1043            false,
1044        )]));
1045
1046        let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
1047
1048        // Test: {a: 1, b: 'x'}.a should return 1
1049        let map_expr = Expr::Map(vec![
1050            ("a".to_string(), Expr::Literal(CypherLiteral::Integer(1))),
1051            (
1052                "b".to_string(),
1053                Expr::Literal(CypherLiteral::String("x".to_string())),
1054            ),
1055        ]);
1056        let prop_expr = Expr::Property(Box::new(map_expr), "a".to_string());
1057
1058        let stream = GraphUnwindStream {
1059            input: Box::pin(empty_stream),
1060            expr: prop_expr.clone(),
1061            params: HashMap::new(),
1062            schema: Arc::new(Schema::new(vec![
1063                Field::new("n._vid", DataType::UInt64, false),
1064                Field::new("x", DataType::LargeBinary, true),
1065            ])),
1066            metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
1067        };
1068
1069        let result = stream.evaluate_expr_impl(&prop_expr, &batch, 0).unwrap();
1070        assert_eq!(result, Value::Int(1));
1071    }
1072
1073    #[test]
1074    fn test_arrow_to_json_value_uint64_is_coerced_to_int() {
1075        let arr = UInt64Array::from(vec![Some(42u64)]);
1076        let value = arrow_to_json_value(&arr, 0);
1077        assert_eq!(value, Value::Int(42));
1078    }
1079
1080    #[test]
1081    fn test_arrow_to_json_value_largebinary_decodes_cypher_map() {
1082        let encoded = uni_common::cypher_value_codec::encode(&Value::Map(HashMap::new()));
1083        let arr = LargeBinaryArray::from(vec![Some(encoded.as_slice())]);
1084        let value = arrow_to_json_value(&arr, 0);
1085        assert_eq!(value, Value::Map(HashMap::new()));
1086    }
1087
1088    /// Evaluate a scalar function-call expression (e.g. `range(...)`, `size(...)`)
1089    /// over a one-row batch — the harness for the H5/H6 regression tests.
1090    fn eval_scalar_fn(name: &str, args: Vec<Expr>) -> DFResult<Value> {
1091        use arrow_array::builder::UInt64Builder;
1092        use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1093
1094        let mut vid_builder = UInt64Builder::new();
1095        vid_builder.append_value(1);
1096        let schema = Arc::new(Schema::new(vec![Field::new(
1097            "n._vid",
1098            DataType::UInt64,
1099            false,
1100        )]));
1101        let batch =
1102            RecordBatch::try_new(schema.clone(), vec![Arc::new(vid_builder.finish())]).unwrap();
1103        let empty_stream = RecordBatchStreamAdapter::new(schema.clone(), futures::stream::empty());
1104
1105        let expr = Expr::FunctionCall {
1106            name: name.to_string(),
1107            args,
1108            distinct: false,
1109            window_spec: None,
1110        };
1111        let stream = GraphUnwindStream {
1112            input: Box::pin(empty_stream),
1113            expr: expr.clone(),
1114            params: HashMap::new(),
1115            schema,
1116            metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
1117        };
1118        stream.evaluate_expr_impl(&expr, &batch, 0)
1119    }
1120
1121    fn int_lit(v: i64) -> Expr {
1122        Expr::Literal(CypherLiteral::Integer(v))
1123    }
1124
1125    /// H5: `i += st` near i64::MAX must terminate at the boundary, not panic
1126    /// (debug) or wrap into an infinite loop (release).
1127    #[test]
1128    fn test_range_overflow_terminates() {
1129        let result = eval_scalar_fn(
1130            "range",
1131            vec![int_lit(i64::MAX - 1), int_lit(i64::MAX), int_lit(2)],
1132        )
1133        .unwrap();
1134        // Only i64::MAX-1 fits; the next step would overflow and is not emitted.
1135        assert_eq!(result, Value::List(vec![Value::Int(i64::MAX - 1)]));
1136    }
1137
1138    /// H5: a zero step is an error, not an infinite loop.
1139    #[test]
1140    fn test_range_zero_step_errors() {
1141        let err = eval_scalar_fn("range", vec![int_lit(1), int_lit(5), int_lit(0)]);
1142        assert!(err.is_err(), "range(1, 5, 0) must error, got {err:?}");
1143    }
1144
1145    /// H5: float bounds are a type error (openCypher), NOT a silent empty list.
1146    #[test]
1147    fn test_range_float_args_error() {
1148        let err = eval_scalar_fn(
1149            "range",
1150            vec![
1151                Expr::Literal(CypherLiteral::Float(1.0)),
1152                Expr::Literal(CypherLiteral::Float(3.0)),
1153            ],
1154        );
1155        assert!(err.is_err(), "range(1.0, 3.0) must error, got {err:?}");
1156    }
1157
1158    /// H6: size()/length() of a multi-byte string counts characters, not bytes.
1159    #[test]
1160    fn test_size_string_counts_chars_not_bytes() {
1161        let result = eval_scalar_fn(
1162            "size",
1163            vec![Expr::Literal(CypherLiteral::String("héllo".to_string()))],
1164        )
1165        .unwrap();
1166        // 5 chars, but 6 UTF-8 bytes — must be 5.
1167        assert_eq!(result, Value::Int(5));
1168    }
1169}