Skip to main content

uni_query/query/df_graph/
unwind.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! UNWIND execution plan for DataFusion.
5//!
6//! This module provides [`GraphUnwindExec`], a DataFusion [`ExecutionPlan`] that
7//! expands list values into multiple rows (similar to SQL `UNNEST`).
8//!
9//! # Supported Expressions
10//!
11//! Currently supports:
12//! - Literal lists: `UNWIND [1, 2, 3] AS x`
13//! - Variable references: `UNWIND list AS item` (where `list` is a column)
14//! - Property access: `UNWIND n.items AS item`
15//!
16//! # Example
17//!
18//! ```text
19//! Input:   [{"list": [1, 2, 3]}]
20//! UNWIND:  list AS item
21//! Output:  [{"list": [1,2,3], "item": 1},
22//!           {"list": [1,2,3], "item": 2},
23//!           {"list": [1,2,3], "item": 3}]
24//! ```
25
26use crate::query::df_graph::common::compute_plan_properties;
27use arrow::compute::take;
28use arrow_array::builder::{
29    BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder,
30};
31use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
32use arrow_schema::{DataType, Field, Schema, SchemaRef};
33use datafusion::common::Result as DFResult;
34use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
35use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
36use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
37use futures::{Stream, StreamExt};
38use std::any::Any;
39use std::collections::HashMap;
40use std::fmt;
41use std::pin::Pin;
42use std::sync::Arc;
43use std::task::{Context, Poll};
44use uni_common::Value;
45use uni_cypher::ast::{CypherLiteral, Expr};
46
47/// Result of UNWIND element type inference.
48struct ElementTypeInfo {
49    /// Arrow data type for the unwind variable column.
50    data_type: DataType,
51    /// Whether values need JSON encoding metadata.
52    is_cv_encoded: bool,
53}
54
55/// UNWIND execution plan that expands list values into multiple rows.
56///
57/// Takes an input plan and an expression that evaluates to a list. For each
58/// row in the input, if the expression evaluates to a list, produces multiple
59/// output rows (one per list element) with the list element bound to a new
60/// variable.
61pub struct GraphUnwindExec {
62    /// Input execution plan.
63    input: Arc<dyn ExecutionPlan>,
64
65    /// Expression to evaluate (should produce a list).
66    expr: Expr,
67
68    /// Variable name to bind list elements to.
69    variable: String,
70
71    /// Query parameters for expression evaluation.
72    params: HashMap<String, Value>,
73
74    /// Output schema.
75    schema: SchemaRef,
76
77    /// Cached plan properties.
78    properties: PlanProperties,
79
80    /// Execution metrics.
81    metrics: ExecutionPlanMetricsSet,
82}
83
84impl fmt::Debug for GraphUnwindExec {
85    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86        f.debug_struct("GraphUnwindExec")
87            .field("expr", &self.expr)
88            .field("variable", &self.variable)
89            .finish()
90    }
91}
92
93impl GraphUnwindExec {
94    /// Create a new UNWIND execution plan.
95    ///
96    /// # Arguments
97    ///
98    /// * `input` - Input plan providing rows to expand
99    /// * `expr` - Expression that evaluates to a list
100    /// * `variable` - Variable name for list elements
101    /// * `params` - Query parameters for expression evaluation
102    pub fn new(
103        input: Arc<dyn ExecutionPlan>,
104        expr: Expr,
105        variable: impl Into<String>,
106        params: HashMap<String, Value>,
107    ) -> Self {
108        let variable = variable.into();
109
110        // Build output schema: input schema + new variable column
111        let schema = Self::build_schema(input.schema(), &variable, &expr);
112        let properties = compute_plan_properties(schema.clone());
113
114        Self {
115            input,
116            expr,
117            variable,
118            params,
119            schema,
120            properties,
121            metrics: ExecutionPlanMetricsSet::new(),
122        }
123    }
124
125    /// Infer the native Arrow `DataType` for the elements of an UNWIND expression.
126    ///
127    /// For literal lists with homogeneous element types (ignoring nulls), returns
128    /// the native type. For heterogeneous or non-inferrable expressions, falls back
129    /// to JSON-encoded Utf8.
130    fn infer_element_type(expr: &Expr) -> ElementTypeInfo {
131        let json_fallback = || ElementTypeInfo {
132            data_type: DataType::LargeBinary,
133            is_cv_encoded: true,
134        };
135
136        let Expr::List(items) = expr else {
137            return json_fallback();
138        };
139
140        // Infer type from first non-null literal
141        let first_type = items.iter().find_map(|item| match item {
142            Expr::Literal(CypherLiteral::Null) => None,
143            Expr::Literal(CypherLiteral::Bool(_)) => Some(DataType::Boolean),
144            Expr::Literal(CypherLiteral::Integer(_)) => Some(DataType::Int64),
145            Expr::Literal(CypherLiteral::Float(_)) => Some(DataType::Float64),
146            Expr::Literal(CypherLiteral::String(_)) => Some(DataType::Utf8),
147            _ => Some(DataType::Utf8), // Sentinel for non-literal: forces fallback
148        });
149
150        let Some(expected) = first_type else {
151            return json_fallback(); // All nulls or empty
152        };
153
154        // Verify all remaining non-null items match the expected type
155        let all_match = items.iter().all(|item| match item {
156            Expr::Literal(CypherLiteral::Null) => true,
157            Expr::Literal(CypherLiteral::Bool(_)) => expected == DataType::Boolean,
158            Expr::Literal(CypherLiteral::Integer(_)) => expected == DataType::Int64,
159            Expr::Literal(CypherLiteral::Float(_)) => expected == DataType::Float64,
160            Expr::Literal(CypherLiteral::String(_)) => expected == DataType::Utf8,
161            _ => false, // Non-literal
162        });
163
164        if all_match {
165            ElementTypeInfo {
166                data_type: expected,
167                is_cv_encoded: false,
168            }
169        } else {
170            json_fallback()
171        }
172    }
173
174    /// Build output schema by adding the unwind variable column.
175    ///
176    /// Uses type inference on the UNWIND expression to emit natively-typed
177    /// columns when possible. Falls back to JSON-encoded `Utf8` for
178    /// heterogeneous or non-inferrable expressions.
179    fn build_schema(input_schema: SchemaRef, variable: &str, expr: &Expr) -> SchemaRef {
180        let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
181
182        let type_info = Self::infer_element_type(expr);
183
184        let mut field = Field::new(variable, type_info.data_type, true);
185        if type_info.is_cv_encoded {
186            field = field.with_metadata(HashMap::from([("cv_encoded".into(), "true".into())]));
187        }
188        fields.push(Arc::new(field));
189
190        Arc::new(Schema::new(fields))
191    }
192}
193
194impl DisplayAs for GraphUnwindExec {
195    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196        write!(
197            f,
198            "GraphUnwindExec: {} AS {}",
199            self.expr.to_string_repr(),
200            self.variable
201        )
202    }
203}
204
205impl ExecutionPlan for GraphUnwindExec {
206    fn name(&self) -> &str {
207        "GraphUnwindExec"
208    }
209
210    fn as_any(&self) -> &dyn Any {
211        self
212    }
213
214    fn schema(&self) -> SchemaRef {
215        Arc::clone(&self.schema)
216    }
217
218    fn properties(&self) -> &PlanProperties {
219        &self.properties
220    }
221
222    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
223        vec![&self.input]
224    }
225
226    fn with_new_children(
227        self: Arc<Self>,
228        children: Vec<Arc<dyn ExecutionPlan>>,
229    ) -> DFResult<Arc<dyn ExecutionPlan>> {
230        if children.len() != 1 {
231            return Err(datafusion::error::DataFusionError::Plan(
232                "GraphUnwindExec requires exactly one child".to_string(),
233            ));
234        }
235
236        Ok(Arc::new(Self::new(
237            Arc::clone(&children[0]),
238            self.expr.clone(),
239            self.variable.clone(),
240            self.params.clone(),
241        )))
242    }
243
244    fn execute(
245        &self,
246        partition: usize,
247        context: Arc<TaskContext>,
248    ) -> DFResult<SendableRecordBatchStream> {
249        let input_stream = self.input.execute(partition, context)?;
250        let metrics = BaselineMetrics::new(&self.metrics, partition);
251
252        Ok(Box::pin(GraphUnwindStream {
253            input: input_stream,
254            expr: self.expr.clone(),
255            params: self.params.clone(),
256            schema: Arc::clone(&self.schema),
257            metrics,
258        }))
259    }
260
261    fn metrics(&self) -> Option<MetricsSet> {
262        Some(self.metrics.clone_inner())
263    }
264}
265
266/// Stream that performs the UNWIND operation.
267struct GraphUnwindStream {
268    /// Input stream.
269    input: SendableRecordBatchStream,
270
271    /// Expression to evaluate.
272    expr: Expr,
273
274    /// Query parameters.
275    params: HashMap<String, Value>,
276
277    /// Output schema.
278    schema: SchemaRef,
279
280    /// Metrics.
281    metrics: BaselineMetrics,
282}
283
284impl GraphUnwindStream {
285    /// Process a single input batch.
286    fn process_batch(&self, batch: RecordBatch) -> DFResult<RecordBatch> {
287        // For each row, evaluate the expression and expand if it's a list
288        let mut expansions: Vec<(usize, Value)> = Vec::new(); // (input_row_idx, list_element)
289
290        for row_idx in 0..batch.num_rows() {
291            // Evaluate expression for this row
292            let list_value = self.evaluate_expr_for_row(&batch, row_idx)?;
293
294            match list_value {
295                Value::List(items) => {
296                    for item in items {
297                        expansions.push((row_idx, item));
298                    }
299                }
300                Value::Null => {
301                    // UNWIND on null produces no rows (Cypher semantics)
302                }
303                other => {
304                    // Non-list values: treat as single-element list
305                    expansions.push((row_idx, other));
306                }
307            }
308        }
309
310        self.build_output_batch(&batch, &expansions)
311    }
312
313    /// Evaluate the expression for a specific row.
314    fn evaluate_expr_for_row(&self, batch: &RecordBatch, row_idx: usize) -> DFResult<Value> {
315        self.evaluate_expr_impl(&self.expr, batch, row_idx)
316    }
317
318    /// Evaluate an expression recursively.
319    fn evaluate_expr_impl(
320        &self,
321        expr: &Expr,
322        batch: &RecordBatch,
323        row_idx: usize,
324    ) -> DFResult<Value> {
325        match expr {
326            // Literal list: [1, 2, 3]
327            Expr::List(items) => {
328                let mut values = Vec::with_capacity(items.len());
329                for item in items {
330                    values.push(self.evaluate_expr_impl(item, batch, row_idx)?);
331                }
332                Ok(Value::List(values))
333            }
334
335            // Literal value
336            Expr::Literal(lit) => Ok(lit.to_value()),
337
338            // Parameter reference: $param
339            Expr::Parameter(name) => self.params.get(name).cloned().ok_or_else(|| {
340                datafusion::error::DataFusionError::Execution(format!(
341                    "Parameter '{}' not found",
342                    name
343                ))
344            }),
345
346            // Variable reference: look up column
347            Expr::Variable(var_name) => self.get_column_value(batch, var_name, row_idx),
348
349            // Property access: n.prop
350            Expr::Property(base_expr, prop_name) => {
351                // Try looking up as column name first: var.prop
352                if let Expr::Variable(var_name) = base_expr.as_ref() {
353                    let col_name = format!("{}.{}", var_name, prop_name);
354                    if batch.schema().column_with_name(&col_name).is_some() {
355                        return self.get_column_value(batch, &col_name, row_idx);
356                    }
357                }
358
359                // Fall back to evaluating base as a map
360                let base_value = self.evaluate_expr_impl(base_expr, batch, row_idx)?;
361                if let Value::Map(map) = base_value {
362                    Ok(map.get(prop_name).cloned().unwrap_or(Value::Null))
363                } else {
364                    Ok(Value::Null)
365                }
366            }
367
368            // Function call: range(1, 10)
369            Expr::FunctionCall { name, args, .. } => {
370                let name_lower = name.to_lowercase();
371                match name_lower.as_str() {
372                    "range" => {
373                        if args.len() >= 2 {
374                            let start = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
375                            let end = self.evaluate_expr_impl(&args[1], batch, row_idx)?;
376                            let step = if args.len() >= 3 {
377                                self.evaluate_expr_impl(&args[2], batch, row_idx)?
378                            } else {
379                                Value::Int(1)
380                            };
381
382                            if let (Some(s), Some(e), Some(st)) =
383                                (start.as_i64(), end.as_i64(), step.as_i64())
384                            {
385                                let mut result = Vec::new();
386                                let mut i = s;
387                                while (st > 0 && i <= e) || (st < 0 && i >= e) {
388                                    result.push(Value::Int(i));
389                                    i += st;
390                                }
391                                return Ok(Value::List(result));
392                            }
393                        }
394                        Ok(Value::List(vec![]))
395                    }
396                    "keys" => {
397                        if args.len() == 1 {
398                            let val = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
399                            if let Value::Map(map) = val {
400                                // Use _all_props sub-map for schemaless entities
401                                // when present; otherwise use the top-level map.
402                                let source = match map.get("_all_props") {
403                                    Some(Value::Map(all)) => all,
404                                    _ => &map,
405                                };
406                                let mut key_strings: Vec<String> = source
407                                    .iter()
408                                    .filter(|(k, v)| !v.is_null() && !k.starts_with('_'))
409                                    .map(|(k, _)| k.clone())
410                                    .collect();
411                                key_strings.sort();
412                                let keys: Vec<Value> =
413                                    key_strings.into_iter().map(Value::String).collect();
414                                return Ok(Value::List(keys));
415                            }
416                            if val.is_null() {
417                                return Ok(Value::Null);
418                            }
419                        }
420                        Ok(Value::List(vec![]))
421                    }
422                    "size" | "length" => {
423                        if args.len() == 1 {
424                            let val = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
425                            let sz = match &val {
426                                Value::List(arr) => arr.len() as i64,
427                                Value::String(s) => s.len() as i64,
428                                Value::Map(m) => m.len() as i64,
429                                _ => 0,
430                            };
431                            return Ok(Value::Int(sz));
432                        }
433                        Ok(Value::Null)
434                    }
435                    // Temporal constructors: date(), time(), localtime(), datetime(), localdatetime(), duration()
436                    "date" | "time" | "localtime" | "datetime" | "localdatetime" | "duration" => {
437                        let mut eval_args = Vec::with_capacity(args.len());
438                        for arg in args {
439                            eval_args.push(self.evaluate_expr_impl(arg, batch, row_idx)?);
440                        }
441                        crate::query::datetime::eval_datetime_function(
442                            &name.to_uppercase(),
443                            &eval_args,
444                        )
445                        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
446                    }
447                    "split" => {
448                        let mut eval_args = Vec::with_capacity(args.len());
449                        for arg in args {
450                            eval_args.push(self.evaluate_expr_impl(arg, batch, row_idx)?);
451                        }
452                        crate::query::expr_eval::eval_split(&eval_args).map_err(|e| {
453                            datafusion::error::DataFusionError::Execution(e.to_string())
454                        })
455                    }
456                    _ => {
457                        // Unsupported function - return empty list
458                        Ok(Value::List(vec![]))
459                    }
460                }
461            }
462
463            // Binary operations: e.g. size(types) - 1
464            Expr::BinaryOp { left, op, right } => {
465                let l = self.evaluate_expr_impl(left, batch, row_idx)?;
466                let r = self.evaluate_expr_impl(right, batch, row_idx)?;
467                crate::query::expr_eval::eval_binary_op(&l, op, &r)
468                    .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
469            }
470
471            // Map literal: {a: 1, b: 'x'}
472            Expr::Map(entries) => {
473                let mut map = HashMap::new();
474                for (key, val_expr) in entries {
475                    let val = self.evaluate_expr_impl(val_expr, batch, row_idx)?;
476                    map.insert(key.clone(), val);
477                }
478                Ok(Value::Map(map))
479            }
480
481            // Array index: qrows[p]
482            Expr::ArrayIndex { array, index } => {
483                let arr_val = self.evaluate_expr_impl(array, batch, row_idx)?;
484                let idx_val = self.evaluate_expr_impl(index, batch, row_idx)?;
485                match (&arr_val, idx_val.as_i64()) {
486                    (Value::List(list), Some(i)) => {
487                        // Cypher uses 0-based indexing; negative indices count from end
488                        let len = list.len() as i64;
489                        let resolved = if i < 0 { len + i } else { i };
490                        if resolved >= 0 && (resolved as usize) < list.len() {
491                            Ok(list[resolved as usize].clone())
492                        } else {
493                            Ok(Value::Null)
494                        }
495                    }
496                    _ => Ok(Value::Null),
497                }
498            }
499
500            // Unsupported expressions return null
501            _ => Ok(Value::Null),
502        }
503    }
504
505    /// Get a column value as JSON for a specific row.
506    fn get_column_value(
507        &self,
508        batch: &RecordBatch,
509        col_name: &str,
510        row_idx: usize,
511    ) -> DFResult<Value> {
512        let col = batch.column_by_name(col_name).ok_or_else(|| {
513            datafusion::error::DataFusionError::Execution(format!(
514                "Column '{}' not found for UNWIND",
515                col_name
516            ))
517        })?;
518
519        Ok(arrow_to_json_value(col.as_ref(), row_idx))
520    }
521
522    /// Build output batch from expansions.
523    fn build_output_batch(
524        &self,
525        input: &RecordBatch,
526        expansions: &[(usize, Value)],
527    ) -> DFResult<RecordBatch> {
528        if expansions.is_empty() {
529            return Ok(RecordBatch::new_empty(Arc::clone(&self.schema)));
530        }
531
532        let num_rows = expansions.len();
533
534        // Build index array for take operation
535        let indices: Vec<u64> = expansions.iter().map(|(idx, _)| *idx as u64).collect();
536        let indices_array = UInt64Array::from(indices);
537
538        // Expand input columns
539        let mut columns: Vec<ArrayRef> = Vec::new();
540        for col in input.columns() {
541            let expanded = take(col.as_ref(), &indices_array, None)?;
542            columns.push(expanded);
543        }
544
545        // Add the unwind variable column using the appropriate typed builder
546        let unwind_field = self.schema.field(self.schema.fields().len() - 1);
547        let is_cv_encoded = unwind_field
548            .metadata()
549            .get("cv_encoded")
550            .is_some_and(|v| v == "true");
551
552        let unwind_col: ArrayRef = match (unwind_field.data_type(), is_cv_encoded) {
553            (DataType::Boolean, false) => {
554                let mut builder = BooleanBuilder::with_capacity(num_rows);
555                for (_, value) in expansions {
556                    if let Value::Bool(b) = value {
557                        builder.append_value(*b);
558                    } else {
559                        builder.append_null();
560                    }
561                }
562                Arc::new(builder.finish())
563            }
564            (DataType::Int64, false) => {
565                let mut builder = Int64Builder::with_capacity(num_rows);
566                for (_, value) in expansions {
567                    if let Value::Int(i) = value {
568                        builder.append_value(*i);
569                    } else {
570                        builder.append_null();
571                    }
572                }
573                Arc::new(builder.finish())
574            }
575            (DataType::Float64, false) => {
576                let mut builder = Float64Builder::with_capacity(num_rows);
577                for (_, value) in expansions {
578                    if let Value::Float(f) = value {
579                        builder.append_value(*f);
580                    } else {
581                        builder.append_null();
582                    }
583                }
584                Arc::new(builder.finish())
585            }
586            (DataType::Utf8, false) => {
587                let mut builder = StringBuilder::new();
588                for (_, value) in expansions {
589                    if let Value::String(s) = value {
590                        builder.append_value(s);
591                    } else {
592                        builder.append_null();
593                    }
594                }
595                Arc::new(builder.finish())
596            }
597            (DataType::LargeBinary, _) => {
598                // CypherValue-encoded: preserves exact types through UNWIND
599                let mut builder = LargeBinaryBuilder::with_capacity(num_rows, num_rows * 16);
600                for (_, value) in expansions {
601                    if value.is_null() {
602                        builder.append_null();
603                    } else {
604                        let encoded = uni_common::cypher_value_codec::encode(value);
605                        builder.append_value(&encoded);
606                    }
607                }
608                Arc::new(builder.finish())
609            }
610            _ => {
611                // Fallback: JSON-encoded Utf8 (heterogeneous or non-inferrable types)
612                let mut builder = StringBuilder::new();
613                for (_, value) in expansions {
614                    if value.is_null() {
615                        builder.append_null();
616                    } else {
617                        let json_val: serde_json::Value = value.clone().into();
618                        let json_str =
619                            serde_json::to_string(&json_val).unwrap_or_else(|_| "null".to_string());
620                        builder.append_value(&json_str);
621                    }
622                }
623                Arc::new(builder.finish())
624            }
625        };
626        columns.push(unwind_col);
627
628        self.metrics.record_output(num_rows);
629
630        RecordBatch::try_new(Arc::clone(&self.schema), columns)
631            .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
632    }
633}
634
635impl Stream for GraphUnwindStream {
636    type Item = DFResult<RecordBatch>;
637
638    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
639        match self.input.poll_next_unpin(cx) {
640            Poll::Ready(Some(Ok(batch))) => {
641                let result = self.process_batch(batch);
642                Poll::Ready(Some(result))
643            }
644            other => other,
645        }
646    }
647}
648
649impl RecordBatchStream for GraphUnwindStream {
650    fn schema(&self) -> SchemaRef {
651        Arc::clone(&self.schema)
652    }
653}
654
655/// Convert an Arrow array value at a specific row to `uni_common::Value`.
656pub(crate) fn arrow_to_json_value(array: &dyn Array, row: usize) -> Value {
657    use arrow_array::{
658        BooleanArray, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array,
659        LargeStringArray, ListArray, StringArray, UInt8Array, UInt16Array, UInt32Array,
660        UInt64Array,
661    };
662
663    if array.is_null(row) {
664        return Value::Null;
665    }
666
667    let any = array.as_any();
668
669    // String types
670    if let Some(arr) = any.downcast_ref::<StringArray>() {
671        return Value::String(arr.value(row).to_string());
672    }
673    if let Some(arr) = any.downcast_ref::<LargeStringArray>() {
674        return Value::String(arr.value(row).to_string());
675    }
676
677    // Integer types - use a macro to reduce repetition
678    macro_rules! try_int {
679        ($arr_type:ty) => {
680            if let Some(arr) = any.downcast_ref::<$arr_type>() {
681                return Value::Int(arr.value(row) as i64);
682            }
683        };
684    }
685    try_int!(Int64Array);
686    try_int!(Int32Array);
687    try_int!(Int16Array);
688    try_int!(Int8Array);
689    try_int!(UInt64Array);
690    try_int!(UInt32Array);
691    try_int!(UInt16Array);
692    try_int!(UInt8Array);
693
694    // Float types
695    if let Some(arr) = any.downcast_ref::<Float64Array>() {
696        return Value::Float(arr.value(row));
697    }
698    if let Some(arr) = any.downcast_ref::<Float32Array>() {
699        return Value::Float(arr.value(row) as f64);
700    }
701
702    // Boolean
703    if let Some(arr) = any.downcast_ref::<BooleanArray>() {
704        return Value::Bool(arr.value(row));
705    }
706
707    // List (recursive)
708    if let Some(arr) = any.downcast_ref::<ListArray>() {
709        let values = arr.value(row);
710        let result: Vec<Value> = (0..values.len())
711            .map(|i| arrow_to_json_value(values.as_ref(), i))
712            .collect();
713        return Value::List(result);
714    }
715
716    // LargeBinary (CypherValue) — decode to Value
717    if let Some(arr) = any.downcast_ref::<arrow_array::LargeBinaryArray>() {
718        let bytes = arr.value(row);
719        if let Ok(uni_val) = uni_common::cypher_value_codec::decode(bytes) {
720            return uni_val;
721        }
722        // Fallback: try plain JSON text
723        if let Ok(parsed) = serde_json::from_slice::<serde_json::Value>(bytes) {
724            return Value::from(parsed);
725        }
726        return Value::Null;
727    }
728
729    // Struct — convert fields to a Map so keys()/properties() UDFs work
730    if let Some(s) = any.downcast_ref::<arrow_array::StructArray>() {
731        let mut map = HashMap::new();
732        for (field, child) in s.fields().iter().zip(s.columns()) {
733            map.insert(
734                field.name().clone(),
735                arrow_to_json_value(child.as_ref(), row),
736            );
737        }
738        return Value::Map(map);
739    }
740
741    // Fallback
742    Value::Null
743}
744
745#[cfg(test)]
746mod tests {
747    use super::*;
748    use arrow_array::{LargeBinaryArray, UInt64Array};
749    use uni_cypher::ast::CypherLiteral;
750
751    #[test]
752    fn test_build_schema() {
753        let input_schema = Arc::new(Schema::new(vec![
754            Field::new("n._vid", DataType::UInt64, false),
755            Field::new("n.name", DataType::Utf8, true),
756        ]));
757
758        // Variable reference -> falls back to JSON-encoded Utf8
759        let expr = Expr::Variable("some_list".to_string());
760        let output_schema = GraphUnwindExec::build_schema(input_schema, "item", &expr);
761
762        assert_eq!(output_schema.fields().len(), 3);
763        assert_eq!(output_schema.field(0).name(), "n._vid");
764        assert_eq!(output_schema.field(1).name(), "n.name");
765        assert_eq!(output_schema.field(2).name(), "item");
766        assert_eq!(output_schema.field(2).data_type(), &DataType::LargeBinary);
767        assert_eq!(
768            output_schema
769                .field(2)
770                .metadata()
771                .get("cv_encoded")
772                .map(String::as_str),
773            Some("true")
774        );
775    }
776
777    #[test]
778    fn test_build_schema_boolean_list() {
779        let input_schema = Arc::new(Schema::new(vec![Field::new(
780            "n._vid",
781            DataType::UInt64,
782            false,
783        )]));
784
785        let expr = Expr::List(vec![
786            Expr::Literal(CypherLiteral::Bool(true)),
787            Expr::Literal(CypherLiteral::Bool(false)),
788            Expr::Literal(CypherLiteral::Null),
789        ]);
790        let output_schema = GraphUnwindExec::build_schema(input_schema, "a", &expr);
791
792        let field = output_schema.field(1);
793        assert_eq!(field.name(), "a");
794        assert_eq!(field.data_type(), &DataType::Boolean);
795        assert!(field.metadata().is_empty());
796    }
797
798    #[test]
799    fn test_build_schema_integer_list() {
800        let input_schema = Arc::new(Schema::new(vec![Field::new(
801            "n._vid",
802            DataType::UInt64,
803            false,
804        )]));
805
806        let expr = Expr::List(vec![
807            Expr::Literal(CypherLiteral::Integer(1)),
808            Expr::Literal(CypherLiteral::Integer(2)),
809            Expr::Literal(CypherLiteral::Integer(3)),
810        ]);
811        let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
812
813        let field = output_schema.field(1);
814        assert_eq!(field.name(), "x");
815        assert_eq!(field.data_type(), &DataType::Int64);
816        assert!(field.metadata().is_empty());
817    }
818
819    #[test]
820    fn test_build_schema_float_list() {
821        let input_schema = Arc::new(Schema::new(vec![Field::new(
822            "n._vid",
823            DataType::UInt64,
824            false,
825        )]));
826
827        let expr = Expr::List(vec![
828            Expr::Literal(CypherLiteral::Float(1.5)),
829            Expr::Literal(CypherLiteral::Float(2.5)),
830        ]);
831        let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
832
833        let field = output_schema.field(1);
834        assert_eq!(field.name(), "x");
835        assert_eq!(field.data_type(), &DataType::Float64);
836        assert!(field.metadata().is_empty());
837    }
838
839    #[test]
840    fn test_build_schema_string_list() {
841        let input_schema = Arc::new(Schema::new(vec![Field::new(
842            "n._vid",
843            DataType::UInt64,
844            false,
845        )]));
846
847        let expr = Expr::List(vec![
848            Expr::Literal(CypherLiteral::String("hello".to_string())),
849            Expr::Literal(CypherLiteral::String("world".to_string())),
850        ]);
851        let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
852
853        let field = output_schema.field(1);
854        assert_eq!(field.name(), "x");
855        assert_eq!(field.data_type(), &DataType::Utf8);
856        // Plain string, no cv_encoded metadata
857        assert!(field.metadata().is_empty());
858    }
859
860    #[test]
861    fn test_build_schema_mixed_list() {
862        let input_schema = Arc::new(Schema::new(vec![Field::new(
863            "n._vid",
864            DataType::UInt64,
865            false,
866        )]));
867
868        let expr = Expr::List(vec![
869            Expr::Literal(CypherLiteral::Integer(1)),
870            Expr::Literal(CypherLiteral::String("hello".to_string())),
871        ]);
872        let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
873
874        let field = output_schema.field(1);
875        assert_eq!(field.name(), "x");
876        assert_eq!(field.data_type(), &DataType::LargeBinary);
877        assert_eq!(
878            field.metadata().get("cv_encoded").map(String::as_str),
879            Some("true")
880        );
881    }
882
883    #[test]
884    fn test_evaluate_literal_list() {
885        use arrow_array::builder::UInt64Builder;
886        use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
887
888        // Create a simple batch
889        let mut vid_builder = UInt64Builder::new();
890        vid_builder.append_value(1);
891
892        let batch = RecordBatch::try_new(
893            Arc::new(Schema::new(vec![Field::new(
894                "n._vid",
895                DataType::UInt64,
896                false,
897            )])),
898            vec![Arc::new(vid_builder.finish())],
899        )
900        .unwrap();
901
902        // Create a schema for the empty input stream
903        let input_schema = Arc::new(Schema::new(vec![Field::new(
904            "n._vid",
905            DataType::UInt64,
906            false,
907        )]));
908
909        // Create empty input stream using RecordBatchStreamAdapter
910        let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
911
912        // Create stream with literal list expression
913        let stream = GraphUnwindStream {
914            input: Box::pin(empty_stream),
915            expr: Expr::List(vec![
916                Expr::Literal(CypherLiteral::Integer(1)),
917                Expr::Literal(CypherLiteral::Integer(2)),
918                Expr::Literal(CypherLiteral::Integer(3)),
919            ]),
920            params: HashMap::new(),
921            schema: Arc::new(Schema::new(vec![
922                Field::new("n._vid", DataType::UInt64, false),
923                Field::new("x", DataType::Utf8, true),
924            ])),
925            metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
926        };
927
928        let result = stream.evaluate_expr_for_row(&batch, 0).unwrap();
929        match result {
930            Value::List(items) => {
931                assert_eq!(items.len(), 3);
932                assert_eq!(items[0], Value::Int(1));
933                assert_eq!(items[1], Value::Int(2));
934                assert_eq!(items[2], Value::Int(3));
935            }
936            _ => panic!("Expected list"),
937        }
938    }
939
940    #[test]
941    fn test_evaluate_map_literal() {
942        use arrow_array::builder::UInt64Builder;
943        use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
944
945        let mut vid_builder = UInt64Builder::new();
946        vid_builder.append_value(1);
947
948        let batch = RecordBatch::try_new(
949            Arc::new(Schema::new(vec![Field::new(
950                "n._vid",
951                DataType::UInt64,
952                false,
953            )])),
954            vec![Arc::new(vid_builder.finish())],
955        )
956        .unwrap();
957
958        let input_schema = Arc::new(Schema::new(vec![Field::new(
959            "n._vid",
960            DataType::UInt64,
961            false,
962        )]));
963
964        let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
965
966        let stream = GraphUnwindStream {
967            input: Box::pin(empty_stream),
968            expr: Expr::Map(vec![
969                ("a".to_string(), Expr::Literal(CypherLiteral::Integer(1))),
970                (
971                    "b".to_string(),
972                    Expr::Literal(CypherLiteral::String("hello".to_string())),
973                ),
974            ]),
975            params: HashMap::new(),
976            schema: Arc::new(Schema::new(vec![
977                Field::new("n._vid", DataType::UInt64, false),
978                Field::new("x", DataType::LargeBinary, true),
979            ])),
980            metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
981        };
982
983        let result = stream.evaluate_expr_for_row(&batch, 0).unwrap();
984        match result {
985            Value::Map(map) => {
986                assert_eq!(map.get("a"), Some(&Value::Int(1)));
987                assert_eq!(map.get("b"), Some(&Value::String("hello".to_string())));
988            }
989            _ => panic!("Expected Map, got {:?}", result),
990        }
991    }
992
993    #[test]
994    fn test_evaluate_map_property_access() {
995        use arrow_array::builder::UInt64Builder;
996        use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
997
998        let mut vid_builder = UInt64Builder::new();
999        vid_builder.append_value(1);
1000
1001        let batch = RecordBatch::try_new(
1002            Arc::new(Schema::new(vec![Field::new(
1003                "n._vid",
1004                DataType::UInt64,
1005                false,
1006            )])),
1007            vec![Arc::new(vid_builder.finish())],
1008        )
1009        .unwrap();
1010
1011        let input_schema = Arc::new(Schema::new(vec![Field::new(
1012            "n._vid",
1013            DataType::UInt64,
1014            false,
1015        )]));
1016
1017        let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
1018
1019        // Test: {a: 1, b: 'x'}.a should return 1
1020        let map_expr = Expr::Map(vec![
1021            ("a".to_string(), Expr::Literal(CypherLiteral::Integer(1))),
1022            (
1023                "b".to_string(),
1024                Expr::Literal(CypherLiteral::String("x".to_string())),
1025            ),
1026        ]);
1027        let prop_expr = Expr::Property(Box::new(map_expr), "a".to_string());
1028
1029        let stream = GraphUnwindStream {
1030            input: Box::pin(empty_stream),
1031            expr: prop_expr.clone(),
1032            params: HashMap::new(),
1033            schema: Arc::new(Schema::new(vec![
1034                Field::new("n._vid", DataType::UInt64, false),
1035                Field::new("x", DataType::LargeBinary, true),
1036            ])),
1037            metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
1038        };
1039
1040        let result = stream.evaluate_expr_impl(&prop_expr, &batch, 0).unwrap();
1041        assert_eq!(result, Value::Int(1));
1042    }
1043
1044    #[test]
1045    fn test_arrow_to_json_value_uint64_is_coerced_to_int() {
1046        let arr = UInt64Array::from(vec![Some(42u64)]);
1047        let value = arrow_to_json_value(&arr, 0);
1048        assert_eq!(value, Value::Int(42));
1049    }
1050
1051    #[test]
1052    fn test_arrow_to_json_value_largebinary_decodes_cypher_map() {
1053        let encoded = uni_common::cypher_value_codec::encode(&Value::Map(HashMap::new()));
1054        let arr = LargeBinaryArray::from(vec![Some(encoded.as_slice())]);
1055        let value = arrow_to_json_value(&arr, 0);
1056        assert_eq!(value, Value::Map(HashMap::new()));
1057    }
1058}