Skip to main content

uni_query/query/df_graph/
unwind.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! UNWIND execution plan for DataFusion.
5//!
6//! This module provides [`GraphUnwindExec`], a DataFusion [`ExecutionPlan`] that
7//! expands list values into multiple rows (similar to SQL `UNNEST`).
8//!
9//! # Supported Expressions
10//!
11//! Currently supports:
12//! - Literal lists: `UNWIND [1, 2, 3] AS x`
13//! - Variable references: `UNWIND list AS item` (where `list` is a column)
14//! - Property access: `UNWIND n.items AS item`
15//!
16//! # Example
17//!
18//! ```text
19//! Input:   [{"list": [1, 2, 3]}]
20//! UNWIND:  list AS item
21//! Output:  [{"list": [1,2,3], "item": 1},
22//!           {"list": [1,2,3], "item": 2},
23//!           {"list": [1,2,3], "item": 3}]
24//! ```
25
26use crate::query::df_graph::common::{arrow_err, compute_plan_properties};
27use arrow::compute::take;
28use arrow_array::builder::{
29    BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder,
30};
31use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
32use arrow_schema::{DataType, Field, Schema, SchemaRef};
33use datafusion::common::Result as DFResult;
34use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
35use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
36use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
37use futures::{Stream, StreamExt};
38use std::any::Any;
39use std::collections::HashMap;
40use std::fmt;
41use std::pin::Pin;
42use std::sync::Arc;
43use std::task::{Context, Poll};
44use uni_common::Value;
45use uni_cypher::ast::{CypherLiteral, Expr};
46
47/// Result of UNWIND element type inference.
48struct ElementTypeInfo {
49    /// Arrow data type for the unwind variable column.
50    data_type: DataType,
51    /// Whether values need JSON encoding metadata.
52    is_cv_encoded: bool,
53}
54
55/// UNWIND execution plan that expands list values into multiple rows.
56///
57/// Takes an input plan and an expression that evaluates to a list. For each
58/// row in the input, if the expression evaluates to a list, produces multiple
59/// output rows (one per list element) with the list element bound to a new
60/// variable.
61pub struct GraphUnwindExec {
62    /// Input execution plan.
63    input: Arc<dyn ExecutionPlan>,
64
65    /// Expression to evaluate (should produce a list).
66    expr: Expr,
67
68    /// Variable name to bind list elements to.
69    variable: String,
70
71    /// Query parameters for expression evaluation.
72    params: HashMap<String, Value>,
73
74    /// Output schema.
75    schema: SchemaRef,
76
77    /// Cached plan properties.
78    properties: PlanProperties,
79
80    /// Execution metrics.
81    metrics: ExecutionPlanMetricsSet,
82}
83
84impl fmt::Debug for GraphUnwindExec {
85    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86        f.debug_struct("GraphUnwindExec")
87            .field("expr", &self.expr)
88            .field("variable", &self.variable)
89            .finish()
90    }
91}
92
93impl GraphUnwindExec {
94    /// Create a new UNWIND execution plan.
95    ///
96    /// # Arguments
97    ///
98    /// * `input` - Input plan providing rows to expand
99    /// * `expr` - Expression that evaluates to a list
100    /// * `variable` - Variable name for list elements
101    /// * `params` - Query parameters for expression evaluation
102    pub fn new(
103        input: Arc<dyn ExecutionPlan>,
104        expr: Expr,
105        variable: impl Into<String>,
106        params: HashMap<String, Value>,
107    ) -> Self {
108        let variable = variable.into();
109
110        // Build output schema: input schema + new variable column
111        let schema = Self::build_schema(input.schema(), &variable, &expr);
112        let properties = compute_plan_properties(schema.clone());
113
114        Self {
115            input,
116            expr,
117            variable,
118            params,
119            schema,
120            properties,
121            metrics: ExecutionPlanMetricsSet::new(),
122        }
123    }
124
125    /// Infer the native Arrow `DataType` for the elements of an UNWIND expression.
126    ///
127    /// For literal lists with homogeneous element types (ignoring nulls), returns
128    /// the native type. For heterogeneous or non-inferrable expressions, falls back
129    /// to JSON-encoded Utf8.
130    fn infer_element_type(expr: &Expr) -> ElementTypeInfo {
131        let json_fallback = || ElementTypeInfo {
132            data_type: DataType::LargeBinary,
133            is_cv_encoded: true,
134        };
135
136        let Expr::List(items) = expr else {
137            return json_fallback();
138        };
139
140        // Infer type from first non-null literal
141        let first_type = items.iter().find_map(|item| match item {
142            Expr::Literal(CypherLiteral::Null) => None,
143            Expr::Literal(CypherLiteral::Bool(_)) => Some(DataType::Boolean),
144            Expr::Literal(CypherLiteral::Integer(_)) => Some(DataType::Int64),
145            Expr::Literal(CypherLiteral::Float(_)) => Some(DataType::Float64),
146            Expr::Literal(CypherLiteral::String(_)) => Some(DataType::Utf8),
147            _ => Some(DataType::Utf8), // Sentinel for non-literal: forces fallback
148        });
149
150        let Some(expected) = first_type else {
151            return json_fallback(); // All nulls or empty
152        };
153
154        // Verify all remaining non-null items match the expected type
155        let all_match = items.iter().all(|item| match item {
156            Expr::Literal(CypherLiteral::Null) => true,
157            Expr::Literal(CypherLiteral::Bool(_)) => expected == DataType::Boolean,
158            Expr::Literal(CypherLiteral::Integer(_)) => expected == DataType::Int64,
159            Expr::Literal(CypherLiteral::Float(_)) => expected == DataType::Float64,
160            Expr::Literal(CypherLiteral::String(_)) => expected == DataType::Utf8,
161            _ => false, // Non-literal
162        });
163
164        if all_match {
165            ElementTypeInfo {
166                data_type: expected,
167                is_cv_encoded: false,
168            }
169        } else {
170            json_fallback()
171        }
172    }
173
174    /// Build output schema by adding the unwind variable column.
175    ///
176    /// Uses type inference on the UNWIND expression to emit natively-typed
177    /// columns when possible. Falls back to JSON-encoded `Utf8` for
178    /// heterogeneous or non-inferrable expressions.
179    fn build_schema(input_schema: SchemaRef, variable: &str, expr: &Expr) -> SchemaRef {
180        let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
181
182        let type_info = Self::infer_element_type(expr);
183
184        let mut field = Field::new(variable, type_info.data_type, true);
185        if type_info.is_cv_encoded {
186            field = field.with_metadata(HashMap::from([("cv_encoded".into(), "true".into())]));
187        }
188        fields.push(Arc::new(field));
189
190        Arc::new(Schema::new(fields))
191    }
192}
193
194impl DisplayAs for GraphUnwindExec {
195    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196        write!(
197            f,
198            "GraphUnwindExec: {} AS {}",
199            self.expr.to_string_repr(),
200            self.variable
201        )
202    }
203}
204
205impl ExecutionPlan for GraphUnwindExec {
206    fn name(&self) -> &str {
207        "GraphUnwindExec"
208    }
209
210    fn as_any(&self) -> &dyn Any {
211        self
212    }
213
214    fn schema(&self) -> SchemaRef {
215        Arc::clone(&self.schema)
216    }
217
218    fn properties(&self) -> &PlanProperties {
219        &self.properties
220    }
221
222    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
223        vec![&self.input]
224    }
225
226    fn with_new_children(
227        self: Arc<Self>,
228        children: Vec<Arc<dyn ExecutionPlan>>,
229    ) -> DFResult<Arc<dyn ExecutionPlan>> {
230        if children.len() != 1 {
231            return Err(datafusion::error::DataFusionError::Plan(
232                "GraphUnwindExec requires exactly one child".to_string(),
233            ));
234        }
235
236        Ok(Arc::new(Self::new(
237            Arc::clone(&children[0]),
238            self.expr.clone(),
239            self.variable.clone(),
240            self.params.clone(),
241        )))
242    }
243
244    fn execute(
245        &self,
246        partition: usize,
247        context: Arc<TaskContext>,
248    ) -> DFResult<SendableRecordBatchStream> {
249        let input_stream = self.input.execute(partition, context)?;
250        let metrics = BaselineMetrics::new(&self.metrics, partition);
251
252        Ok(Box::pin(GraphUnwindStream {
253            input: input_stream,
254            expr: self.expr.clone(),
255            params: self.params.clone(),
256            schema: Arc::clone(&self.schema),
257            metrics,
258        }))
259    }
260
261    fn metrics(&self) -> Option<MetricsSet> {
262        Some(self.metrics.clone_inner())
263    }
264}
265
266/// Stream that performs the UNWIND operation.
267struct GraphUnwindStream {
268    /// Input stream.
269    input: SendableRecordBatchStream,
270
271    /// Expression to evaluate.
272    expr: Expr,
273
274    /// Query parameters.
275    params: HashMap<String, Value>,
276
277    /// Output schema.
278    schema: SchemaRef,
279
280    /// Metrics.
281    metrics: BaselineMetrics,
282}
283
284impl GraphUnwindStream {
285    /// Process a single input batch.
286    fn process_batch(&self, batch: RecordBatch) -> DFResult<RecordBatch> {
287        // For each row, evaluate the expression and expand if it's a list
288        let mut expansions: Vec<(usize, Value)> = Vec::new(); // (input_row_idx, list_element)
289
290        for row_idx in 0..batch.num_rows() {
291            // Evaluate expression for this row
292            let list_value = self.evaluate_expr_for_row(&batch, row_idx)?;
293
294            match list_value {
295                Value::List(items) => {
296                    for item in items {
297                        expansions.push((row_idx, item));
298                    }
299                }
300                Value::Null => {
301                    // UNWIND on null produces no rows (Cypher semantics)
302                }
303                other => {
304                    // Non-list values: treat as single-element list
305                    expansions.push((row_idx, other));
306                }
307            }
308        }
309
310        self.build_output_batch(&batch, &expansions)
311    }
312
313    /// Evaluate the expression for a specific row.
314    fn evaluate_expr_for_row(&self, batch: &RecordBatch, row_idx: usize) -> DFResult<Value> {
315        self.evaluate_expr_impl(&self.expr, batch, row_idx)
316    }
317
318    /// Evaluate an expression recursively.
319    fn evaluate_expr_impl(
320        &self,
321        expr: &Expr,
322        batch: &RecordBatch,
323        row_idx: usize,
324    ) -> DFResult<Value> {
325        match expr {
326            // Literal list: [1, 2, 3]
327            Expr::List(items) => {
328                let mut values = Vec::with_capacity(items.len());
329                for item in items {
330                    values.push(self.evaluate_expr_impl(item, batch, row_idx)?);
331                }
332                Ok(Value::List(values))
333            }
334
335            // Literal value
336            Expr::Literal(lit) => Ok(lit.to_value()),
337
338            // Parameter reference: $param
339            Expr::Parameter(name) => self.params.get(name).cloned().ok_or_else(|| {
340                datafusion::error::DataFusionError::Execution(format!(
341                    "Parameter '{}' not found",
342                    name
343                ))
344            }),
345
346            // Variable reference: look up column
347            Expr::Variable(var_name) => self.get_column_value(batch, var_name, row_idx),
348
349            // Property access: n.prop
350            Expr::Property(base_expr, prop_name) => {
351                // Try looking up as column name first: var.prop
352                if let Expr::Variable(var_name) = base_expr.as_ref() {
353                    let col_name = format!("{}.{}", var_name, prop_name);
354                    if batch.schema().column_with_name(&col_name).is_some() {
355                        return self.get_column_value(batch, &col_name, row_idx);
356                    }
357                }
358
359                // Fall back to evaluating base as a map
360                let base_value = self.evaluate_expr_impl(base_expr, batch, row_idx)?;
361                if let Value::Map(map) = base_value {
362                    Ok(map.get(prop_name).cloned().unwrap_or(Value::Null))
363                } else {
364                    Ok(Value::Null)
365                }
366            }
367
368            // Function call: range(1, 10)
369            Expr::FunctionCall { name, args, .. } => {
370                let name_lower = name.to_lowercase();
371                match name_lower.as_str() {
372                    "range" => {
373                        if args.len() >= 2 {
374                            let start = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
375                            let end = self.evaluate_expr_impl(&args[1], batch, row_idx)?;
376                            let step = if args.len() >= 3 {
377                                self.evaluate_expr_impl(&args[2], batch, row_idx)?
378                            } else {
379                                Value::Int(1)
380                            };
381
382                            if let (Some(s), Some(e), Some(st)) =
383                                (start.as_i64(), end.as_i64(), step.as_i64())
384                            {
385                                let mut result = Vec::new();
386                                let mut i = s;
387                                while (st > 0 && i <= e) || (st < 0 && i >= e) {
388                                    result.push(Value::Int(i));
389                                    i += st;
390                                }
391                                return Ok(Value::List(result));
392                            }
393                        }
394                        Ok(Value::List(vec![]))
395                    }
396                    "keys" => {
397                        if args.len() == 1 {
398                            let val = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
399                            if let Value::Map(map) = val {
400                                // Use _all_props sub-map for schemaless entities
401                                // when present; otherwise use the top-level map.
402                                let source = match map.get("_all_props") {
403                                    Some(Value::Map(all)) => all,
404                                    _ => &map,
405                                };
406                                let mut key_strings: Vec<String> = source
407                                    .iter()
408                                    .filter(|(k, v)| !v.is_null() && !k.starts_with('_'))
409                                    .map(|(k, _)| k.clone())
410                                    .collect();
411                                key_strings.sort();
412                                let keys: Vec<Value> =
413                                    key_strings.into_iter().map(Value::String).collect();
414                                return Ok(Value::List(keys));
415                            }
416                            if val.is_null() {
417                                return Ok(Value::Null);
418                            }
419                        }
420                        Ok(Value::List(vec![]))
421                    }
422                    "size" | "length" => {
423                        if args.len() == 1 {
424                            let val = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
425                            let sz = match &val {
426                                Value::List(arr) => arr.len() as i64,
427                                Value::String(s) => s.len() as i64,
428                                Value::Map(m) => m.len() as i64,
429                                _ => 0,
430                            };
431                            return Ok(Value::Int(sz));
432                        }
433                        Ok(Value::Null)
434                    }
435                    // Temporal constructors: date(), time(), localtime(), datetime(), localdatetime(), duration()
436                    "date" | "time" | "localtime" | "datetime" | "localdatetime" | "duration" => {
437                        let mut eval_args = Vec::with_capacity(args.len());
438                        for arg in args {
439                            eval_args.push(self.evaluate_expr_impl(arg, batch, row_idx)?);
440                        }
441                        crate::query::datetime::eval_datetime_function(
442                            &name.to_uppercase(),
443                            &eval_args,
444                        )
445                        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
446                    }
447                    "split" => {
448                        let mut eval_args = Vec::with_capacity(args.len());
449                        for arg in args {
450                            eval_args.push(self.evaluate_expr_impl(arg, batch, row_idx)?);
451                        }
452                        crate::query::expr_eval::eval_split(&eval_args).map_err(|e| {
453                            datafusion::error::DataFusionError::Execution(e.to_string())
454                        })
455                    }
456                    _ => {
457                        // Unsupported function - return empty list
458                        Ok(Value::List(vec![]))
459                    }
460                }
461            }
462
463            // Binary operations: e.g. size(types) - 1
464            Expr::BinaryOp { left, op, right } => {
465                let l = self.evaluate_expr_impl(left, batch, row_idx)?;
466                let r = self.evaluate_expr_impl(right, batch, row_idx)?;
467                crate::query::expr_eval::eval_binary_op(&l, op, &r)
468                    .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
469            }
470
471            // Map literal: {a: 1, b: 'x'}
472            Expr::Map(entries) => {
473                let mut map = HashMap::new();
474                for (key, val_expr) in entries {
475                    let val = self.evaluate_expr_impl(val_expr, batch, row_idx)?;
476                    map.insert(key.clone(), val);
477                }
478                Ok(Value::Map(map))
479            }
480
481            // Array index: qrows[p]
482            Expr::ArrayIndex { array, index } => {
483                let arr_val = self.evaluate_expr_impl(array, batch, row_idx)?;
484                let idx_val = self.evaluate_expr_impl(index, batch, row_idx)?;
485                match (&arr_val, idx_val.as_i64()) {
486                    (Value::List(list), Some(i)) => {
487                        // Cypher uses 0-based indexing; negative indices count from end
488                        let len = list.len() as i64;
489                        let resolved = if i < 0 { len + i } else { i };
490                        if resolved >= 0 && (resolved as usize) < list.len() {
491                            Ok(list[resolved as usize].clone())
492                        } else {
493                            Ok(Value::Null)
494                        }
495                    }
496                    _ => Ok(Value::Null),
497                }
498            }
499
500            // Unsupported expressions return null
501            _ => Ok(Value::Null),
502        }
503    }
504
505    /// Get a column value as JSON for a specific row.
506    fn get_column_value(
507        &self,
508        batch: &RecordBatch,
509        col_name: &str,
510        row_idx: usize,
511    ) -> DFResult<Value> {
512        let col = batch.column_by_name(col_name).ok_or_else(|| {
513            datafusion::error::DataFusionError::Execution(format!(
514                "Column '{}' not found for UNWIND",
515                col_name
516            ))
517        })?;
518
519        Ok(arrow_to_json_value(col.as_ref(), row_idx))
520    }
521
522    /// Build output batch from expansions.
523    fn build_output_batch(
524        &self,
525        input: &RecordBatch,
526        expansions: &[(usize, Value)],
527    ) -> DFResult<RecordBatch> {
528        if expansions.is_empty() {
529            return Ok(RecordBatch::new_empty(Arc::clone(&self.schema)));
530        }
531
532        let num_rows = expansions.len();
533
534        // Build index array for take operation
535        let indices: Vec<u64> = expansions.iter().map(|(idx, _)| *idx as u64).collect();
536        let indices_array = UInt64Array::from(indices);
537
538        // Expand input columns
539        let mut columns: Vec<ArrayRef> = Vec::new();
540        for col in input.columns() {
541            let expanded = take(col.as_ref(), &indices_array, None)?;
542            columns.push(expanded);
543        }
544
545        // Add the unwind variable column using the appropriate typed builder
546        let unwind_field = self.schema.field(self.schema.fields().len() - 1);
547        let is_cv_encoded = unwind_field
548            .metadata()
549            .get("cv_encoded")
550            .is_some_and(|v| v == "true");
551
552        let unwind_col: ArrayRef = match (unwind_field.data_type(), is_cv_encoded) {
553            (DataType::Boolean, false) => {
554                let mut builder = BooleanBuilder::with_capacity(num_rows);
555                for (_, value) in expansions {
556                    if let Value::Bool(b) = value {
557                        builder.append_value(*b);
558                    } else {
559                        builder.append_null();
560                    }
561                }
562                Arc::new(builder.finish())
563            }
564            (DataType::Int64, false) => {
565                let mut builder = Int64Builder::with_capacity(num_rows);
566                for (_, value) in expansions {
567                    if let Value::Int(i) = value {
568                        builder.append_value(*i);
569                    } else {
570                        builder.append_null();
571                    }
572                }
573                Arc::new(builder.finish())
574            }
575            (DataType::Float64, false) => {
576                let mut builder = Float64Builder::with_capacity(num_rows);
577                for (_, value) in expansions {
578                    if let Value::Float(f) = value {
579                        builder.append_value(*f);
580                    } else {
581                        builder.append_null();
582                    }
583                }
584                Arc::new(builder.finish())
585            }
586            (DataType::Utf8, false) => {
587                let mut builder = StringBuilder::new();
588                for (_, value) in expansions {
589                    if let Value::String(s) = value {
590                        builder.append_value(s);
591                    } else {
592                        builder.append_null();
593                    }
594                }
595                Arc::new(builder.finish())
596            }
597            (DataType::LargeBinary, _) => {
598                // CypherValue-encoded: preserves exact types through UNWIND
599                let mut builder = LargeBinaryBuilder::with_capacity(num_rows, num_rows * 16);
600                for (_, value) in expansions {
601                    if value.is_null() {
602                        builder.append_null();
603                    } else {
604                        let encoded = uni_common::cypher_value_codec::encode(value);
605                        builder.append_value(&encoded);
606                    }
607                }
608                Arc::new(builder.finish())
609            }
610            _ => {
611                // Fallback: JSON-encoded Utf8 (heterogeneous or non-inferrable types)
612                let mut builder = StringBuilder::new();
613                for (_, value) in expansions {
614                    if value.is_null() {
615                        builder.append_null();
616                    } else {
617                        let json_val: serde_json::Value = value.clone().into();
618                        let json_str =
619                            serde_json::to_string(&json_val).unwrap_or_else(|_| "null".to_string());
620                        builder.append_value(&json_str);
621                    }
622                }
623                Arc::new(builder.finish())
624            }
625        };
626        columns.push(unwind_col);
627
628        self.metrics.record_output(num_rows);
629
630        RecordBatch::try_new(Arc::clone(&self.schema), columns).map_err(arrow_err)
631    }
632}
633
634impl Stream for GraphUnwindStream {
635    type Item = DFResult<RecordBatch>;
636
637    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
638        match self.input.poll_next_unpin(cx) {
639            Poll::Ready(Some(Ok(batch))) => {
640                let result = self.process_batch(batch);
641                Poll::Ready(Some(result))
642            }
643            other => other,
644        }
645    }
646}
647
648impl RecordBatchStream for GraphUnwindStream {
649    fn schema(&self) -> SchemaRef {
650        Arc::clone(&self.schema)
651    }
652}
653
654/// Convert an Arrow array value at a specific row to `uni_common::Value`.
655pub(crate) fn arrow_to_json_value(array: &dyn Array, row: usize) -> Value {
656    use arrow_array::{
657        BooleanArray, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array,
658        LargeStringArray, ListArray, StringArray, UInt8Array, UInt16Array, UInt32Array,
659        UInt64Array,
660    };
661
662    if array.is_null(row) {
663        return Value::Null;
664    }
665
666    let any = array.as_any();
667
668    // String types
669    if let Some(arr) = any.downcast_ref::<StringArray>() {
670        return Value::String(arr.value(row).to_string());
671    }
672    if let Some(arr) = any.downcast_ref::<LargeStringArray>() {
673        return Value::String(arr.value(row).to_string());
674    }
675
676    // Integer types - use a macro to reduce repetition
677    macro_rules! try_int {
678        ($arr_type:ty) => {
679            if let Some(arr) = any.downcast_ref::<$arr_type>() {
680                return Value::Int(arr.value(row) as i64);
681            }
682        };
683    }
684    try_int!(Int64Array);
685    try_int!(Int32Array);
686    try_int!(Int16Array);
687    try_int!(Int8Array);
688    try_int!(UInt64Array);
689    try_int!(UInt32Array);
690    try_int!(UInt16Array);
691    try_int!(UInt8Array);
692
693    // Float types
694    if let Some(arr) = any.downcast_ref::<Float64Array>() {
695        return Value::Float(arr.value(row));
696    }
697    if let Some(arr) = any.downcast_ref::<Float32Array>() {
698        return Value::Float(arr.value(row) as f64);
699    }
700
701    // Boolean
702    if let Some(arr) = any.downcast_ref::<BooleanArray>() {
703        return Value::Bool(arr.value(row));
704    }
705
706    // List (recursive)
707    if let Some(arr) = any.downcast_ref::<ListArray>() {
708        let values = arr.value(row);
709        let result: Vec<Value> = (0..values.len())
710            .map(|i| arrow_to_json_value(values.as_ref(), i))
711            .collect();
712        return Value::List(result);
713    }
714
715    // LargeBinary (CypherValue) — decode to Value
716    if let Some(arr) = any.downcast_ref::<arrow_array::LargeBinaryArray>() {
717        let bytes = arr.value(row);
718        if let Ok(uni_val) = uni_common::cypher_value_codec::decode(bytes) {
719            return uni_val;
720        }
721        // Fallback: try plain JSON text
722        if let Ok(parsed) = serde_json::from_slice::<serde_json::Value>(bytes) {
723            return Value::from(parsed);
724        }
725        return Value::Null;
726    }
727
728    // Struct — convert fields to a Map so keys()/properties() UDFs work
729    if let Some(s) = any.downcast_ref::<arrow_array::StructArray>() {
730        let mut map = HashMap::new();
731        for (field, child) in s.fields().iter().zip(s.columns()) {
732            map.insert(
733                field.name().clone(),
734                arrow_to_json_value(child.as_ref(), row),
735            );
736        }
737        return Value::Map(map);
738    }
739
740    // Fallback
741    Value::Null
742}
743
744#[cfg(test)]
745mod tests {
746    use super::*;
747    use arrow_array::{LargeBinaryArray, UInt64Array};
748    use uni_cypher::ast::CypherLiteral;
749
750    #[test]
751    fn test_build_schema() {
752        let input_schema = Arc::new(Schema::new(vec![
753            Field::new("n._vid", DataType::UInt64, false),
754            Field::new("n.name", DataType::Utf8, true),
755        ]));
756
757        // Variable reference -> falls back to JSON-encoded Utf8
758        let expr = Expr::Variable("some_list".to_string());
759        let output_schema = GraphUnwindExec::build_schema(input_schema, "item", &expr);
760
761        assert_eq!(output_schema.fields().len(), 3);
762        assert_eq!(output_schema.field(0).name(), "n._vid");
763        assert_eq!(output_schema.field(1).name(), "n.name");
764        assert_eq!(output_schema.field(2).name(), "item");
765        assert_eq!(output_schema.field(2).data_type(), &DataType::LargeBinary);
766        assert_eq!(
767            output_schema
768                .field(2)
769                .metadata()
770                .get("cv_encoded")
771                .map(String::as_str),
772            Some("true")
773        );
774    }
775
776    #[test]
777    fn test_build_schema_boolean_list() {
778        let input_schema = Arc::new(Schema::new(vec![Field::new(
779            "n._vid",
780            DataType::UInt64,
781            false,
782        )]));
783
784        let expr = Expr::List(vec![
785            Expr::Literal(CypherLiteral::Bool(true)),
786            Expr::Literal(CypherLiteral::Bool(false)),
787            Expr::Literal(CypherLiteral::Null),
788        ]);
789        let output_schema = GraphUnwindExec::build_schema(input_schema, "a", &expr);
790
791        let field = output_schema.field(1);
792        assert_eq!(field.name(), "a");
793        assert_eq!(field.data_type(), &DataType::Boolean);
794        assert!(field.metadata().is_empty());
795    }
796
797    #[test]
798    fn test_build_schema_integer_list() {
799        let input_schema = Arc::new(Schema::new(vec![Field::new(
800            "n._vid",
801            DataType::UInt64,
802            false,
803        )]));
804
805        let expr = Expr::List(vec![
806            Expr::Literal(CypherLiteral::Integer(1)),
807            Expr::Literal(CypherLiteral::Integer(2)),
808            Expr::Literal(CypherLiteral::Integer(3)),
809        ]);
810        let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
811
812        let field = output_schema.field(1);
813        assert_eq!(field.name(), "x");
814        assert_eq!(field.data_type(), &DataType::Int64);
815        assert!(field.metadata().is_empty());
816    }
817
818    #[test]
819    fn test_build_schema_float_list() {
820        let input_schema = Arc::new(Schema::new(vec![Field::new(
821            "n._vid",
822            DataType::UInt64,
823            false,
824        )]));
825
826        let expr = Expr::List(vec![
827            Expr::Literal(CypherLiteral::Float(1.5)),
828            Expr::Literal(CypherLiteral::Float(2.5)),
829        ]);
830        let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
831
832        let field = output_schema.field(1);
833        assert_eq!(field.name(), "x");
834        assert_eq!(field.data_type(), &DataType::Float64);
835        assert!(field.metadata().is_empty());
836    }
837
838    #[test]
839    fn test_build_schema_string_list() {
840        let input_schema = Arc::new(Schema::new(vec![Field::new(
841            "n._vid",
842            DataType::UInt64,
843            false,
844        )]));
845
846        let expr = Expr::List(vec![
847            Expr::Literal(CypherLiteral::String("hello".to_string())),
848            Expr::Literal(CypherLiteral::String("world".to_string())),
849        ]);
850        let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
851
852        let field = output_schema.field(1);
853        assert_eq!(field.name(), "x");
854        assert_eq!(field.data_type(), &DataType::Utf8);
855        // Plain string, no cv_encoded metadata
856        assert!(field.metadata().is_empty());
857    }
858
859    #[test]
860    fn test_build_schema_mixed_list() {
861        let input_schema = Arc::new(Schema::new(vec![Field::new(
862            "n._vid",
863            DataType::UInt64,
864            false,
865        )]));
866
867        let expr = Expr::List(vec![
868            Expr::Literal(CypherLiteral::Integer(1)),
869            Expr::Literal(CypherLiteral::String("hello".to_string())),
870        ]);
871        let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
872
873        let field = output_schema.field(1);
874        assert_eq!(field.name(), "x");
875        assert_eq!(field.data_type(), &DataType::LargeBinary);
876        assert_eq!(
877            field.metadata().get("cv_encoded").map(String::as_str),
878            Some("true")
879        );
880    }
881
882    #[test]
883    fn test_evaluate_literal_list() {
884        use arrow_array::builder::UInt64Builder;
885        use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
886
887        // Create a simple batch
888        let mut vid_builder = UInt64Builder::new();
889        vid_builder.append_value(1);
890
891        let batch = RecordBatch::try_new(
892            Arc::new(Schema::new(vec![Field::new(
893                "n._vid",
894                DataType::UInt64,
895                false,
896            )])),
897            vec![Arc::new(vid_builder.finish())],
898        )
899        .unwrap();
900
901        // Create a schema for the empty input stream
902        let input_schema = Arc::new(Schema::new(vec![Field::new(
903            "n._vid",
904            DataType::UInt64,
905            false,
906        )]));
907
908        // Create empty input stream using RecordBatchStreamAdapter
909        let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
910
911        // Create stream with literal list expression
912        let stream = GraphUnwindStream {
913            input: Box::pin(empty_stream),
914            expr: Expr::List(vec![
915                Expr::Literal(CypherLiteral::Integer(1)),
916                Expr::Literal(CypherLiteral::Integer(2)),
917                Expr::Literal(CypherLiteral::Integer(3)),
918            ]),
919            params: HashMap::new(),
920            schema: Arc::new(Schema::new(vec![
921                Field::new("n._vid", DataType::UInt64, false),
922                Field::new("x", DataType::Utf8, true),
923            ])),
924            metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
925        };
926
927        let result = stream.evaluate_expr_for_row(&batch, 0).unwrap();
928        match result {
929            Value::List(items) => {
930                assert_eq!(items.len(), 3);
931                assert_eq!(items[0], Value::Int(1));
932                assert_eq!(items[1], Value::Int(2));
933                assert_eq!(items[2], Value::Int(3));
934            }
935            _ => panic!("Expected list"),
936        }
937    }
938
939    #[test]
940    fn test_evaluate_map_literal() {
941        use arrow_array::builder::UInt64Builder;
942        use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
943
944        let mut vid_builder = UInt64Builder::new();
945        vid_builder.append_value(1);
946
947        let batch = RecordBatch::try_new(
948            Arc::new(Schema::new(vec![Field::new(
949                "n._vid",
950                DataType::UInt64,
951                false,
952            )])),
953            vec![Arc::new(vid_builder.finish())],
954        )
955        .unwrap();
956
957        let input_schema = Arc::new(Schema::new(vec![Field::new(
958            "n._vid",
959            DataType::UInt64,
960            false,
961        )]));
962
963        let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
964
965        let stream = GraphUnwindStream {
966            input: Box::pin(empty_stream),
967            expr: Expr::Map(vec![
968                ("a".to_string(), Expr::Literal(CypherLiteral::Integer(1))),
969                (
970                    "b".to_string(),
971                    Expr::Literal(CypherLiteral::String("hello".to_string())),
972                ),
973            ]),
974            params: HashMap::new(),
975            schema: Arc::new(Schema::new(vec![
976                Field::new("n._vid", DataType::UInt64, false),
977                Field::new("x", DataType::LargeBinary, true),
978            ])),
979            metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
980        };
981
982        let result = stream.evaluate_expr_for_row(&batch, 0).unwrap();
983        match result {
984            Value::Map(map) => {
985                assert_eq!(map.get("a"), Some(&Value::Int(1)));
986                assert_eq!(map.get("b"), Some(&Value::String("hello".to_string())));
987            }
988            _ => panic!("Expected Map, got {:?}", result),
989        }
990    }
991
992    #[test]
993    fn test_evaluate_map_property_access() {
994        use arrow_array::builder::UInt64Builder;
995        use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
996
997        let mut vid_builder = UInt64Builder::new();
998        vid_builder.append_value(1);
999
1000        let batch = RecordBatch::try_new(
1001            Arc::new(Schema::new(vec![Field::new(
1002                "n._vid",
1003                DataType::UInt64,
1004                false,
1005            )])),
1006            vec![Arc::new(vid_builder.finish())],
1007        )
1008        .unwrap();
1009
1010        let input_schema = Arc::new(Schema::new(vec![Field::new(
1011            "n._vid",
1012            DataType::UInt64,
1013            false,
1014        )]));
1015
1016        let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
1017
1018        // Test: {a: 1, b: 'x'}.a should return 1
1019        let map_expr = Expr::Map(vec![
1020            ("a".to_string(), Expr::Literal(CypherLiteral::Integer(1))),
1021            (
1022                "b".to_string(),
1023                Expr::Literal(CypherLiteral::String("x".to_string())),
1024            ),
1025        ]);
1026        let prop_expr = Expr::Property(Box::new(map_expr), "a".to_string());
1027
1028        let stream = GraphUnwindStream {
1029            input: Box::pin(empty_stream),
1030            expr: prop_expr.clone(),
1031            params: HashMap::new(),
1032            schema: Arc::new(Schema::new(vec![
1033                Field::new("n._vid", DataType::UInt64, false),
1034                Field::new("x", DataType::LargeBinary, true),
1035            ])),
1036            metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
1037        };
1038
1039        let result = stream.evaluate_expr_impl(&prop_expr, &batch, 0).unwrap();
1040        assert_eq!(result, Value::Int(1));
1041    }
1042
1043    #[test]
1044    fn test_arrow_to_json_value_uint64_is_coerced_to_int() {
1045        let arr = UInt64Array::from(vec![Some(42u64)]);
1046        let value = arrow_to_json_value(&arr, 0);
1047        assert_eq!(value, Value::Int(42));
1048    }
1049
1050    #[test]
1051    fn test_arrow_to_json_value_largebinary_decodes_cypher_map() {
1052        let encoded = uni_common::cypher_value_codec::encode(&Value::Map(HashMap::new()));
1053        let arr = LargeBinaryArray::from(vec![Some(encoded.as_slice())]);
1054        let value = arrow_to_json_value(&arr, 0);
1055        assert_eq!(value, Value::Map(HashMap::new()));
1056    }
1057}