1use crate::query::df_graph::common::{arrow_err, compute_plan_properties};
27use arrow::compute::take;
28use arrow_array::builder::{
29 BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder,
30};
31use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
32use arrow_schema::{DataType, Field, Schema, SchemaRef};
33use datafusion::common::Result as DFResult;
34use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
35use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
36use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
37use futures::{Stream, StreamExt};
38use std::any::Any;
39use std::collections::HashMap;
40use std::fmt;
41use std::pin::Pin;
42use std::sync::Arc;
43use std::task::{Context, Poll};
44use uni_common::Value;
45use uni_cypher::ast::{CypherLiteral, Expr};
46
47struct ElementTypeInfo {
49 data_type: DataType,
51 is_cv_encoded: bool,
53}
54
55pub struct GraphUnwindExec {
62 input: Arc<dyn ExecutionPlan>,
64
65 expr: Expr,
67
68 variable: String,
70
71 params: HashMap<String, Value>,
73
74 schema: SchemaRef,
76
77 properties: Arc<PlanProperties>,
79
80 metrics: ExecutionPlanMetricsSet,
82}
83
84impl fmt::Debug for GraphUnwindExec {
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 f.debug_struct("GraphUnwindExec")
87 .field("expr", &self.expr)
88 .field("variable", &self.variable)
89 .finish()
90 }
91}
92
93impl GraphUnwindExec {
94 pub fn new(
103 input: Arc<dyn ExecutionPlan>,
104 expr: Expr,
105 variable: impl Into<String>,
106 params: HashMap<String, Value>,
107 ) -> Self {
108 let variable = variable.into();
109
110 let schema = Self::build_schema(input.schema(), &variable, &expr);
112 let properties = compute_plan_properties(schema.clone());
113
114 Self {
115 input,
116 expr,
117 variable,
118 params,
119 schema,
120 properties,
121 metrics: ExecutionPlanMetricsSet::new(),
122 }
123 }
124
125 fn infer_element_type(expr: &Expr) -> ElementTypeInfo {
131 let json_fallback = || ElementTypeInfo {
132 data_type: DataType::LargeBinary,
133 is_cv_encoded: true,
134 };
135
136 let Expr::List(items) = expr else {
137 return json_fallback();
138 };
139
140 let first_type = items.iter().find_map(|item| match item {
142 Expr::Literal(CypherLiteral::Null) => None,
143 Expr::Literal(CypherLiteral::Bool(_)) => Some(DataType::Boolean),
144 Expr::Literal(CypherLiteral::Integer(_)) => Some(DataType::Int64),
145 Expr::Literal(CypherLiteral::Float(_)) => Some(DataType::Float64),
146 Expr::Literal(CypherLiteral::String(_)) => Some(DataType::Utf8),
147 _ => Some(DataType::Utf8), });
149
150 let Some(expected) = first_type else {
151 return json_fallback(); };
153
154 let all_match = items.iter().all(|item| match item {
156 Expr::Literal(CypherLiteral::Null) => true,
157 Expr::Literal(CypherLiteral::Bool(_)) => expected == DataType::Boolean,
158 Expr::Literal(CypherLiteral::Integer(_)) => expected == DataType::Int64,
159 Expr::Literal(CypherLiteral::Float(_)) => expected == DataType::Float64,
160 Expr::Literal(CypherLiteral::String(_)) => expected == DataType::Utf8,
161 _ => false, });
163
164 if all_match {
165 ElementTypeInfo {
166 data_type: expected,
167 is_cv_encoded: false,
168 }
169 } else {
170 json_fallback()
171 }
172 }
173
174 fn build_schema(input_schema: SchemaRef, variable: &str, expr: &Expr) -> SchemaRef {
180 let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
181
182 let type_info = Self::infer_element_type(expr);
183
184 let mut field = Field::new(variable, type_info.data_type, true);
185 if type_info.is_cv_encoded {
186 field = field.with_metadata(HashMap::from([("cv_encoded".into(), "true".into())]));
187 }
188 fields.push(Arc::new(field));
189
190 Arc::new(Schema::new(fields))
191 }
192}
193
194impl DisplayAs for GraphUnwindExec {
195 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 write!(
197 f,
198 "GraphUnwindExec: {} AS {}",
199 self.expr.to_string_repr(),
200 self.variable
201 )
202 }
203}
204
205impl ExecutionPlan for GraphUnwindExec {
206 fn name(&self) -> &str {
207 "GraphUnwindExec"
208 }
209
210 fn as_any(&self) -> &dyn Any {
211 self
212 }
213
214 fn schema(&self) -> SchemaRef {
215 Arc::clone(&self.schema)
216 }
217
218 fn properties(&self) -> &Arc<PlanProperties> {
219 &self.properties
220 }
221
222 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
223 vec![&self.input]
224 }
225
226 fn with_new_children(
227 self: Arc<Self>,
228 children: Vec<Arc<dyn ExecutionPlan>>,
229 ) -> DFResult<Arc<dyn ExecutionPlan>> {
230 if children.len() != 1 {
231 return Err(datafusion::error::DataFusionError::Plan(
232 "GraphUnwindExec requires exactly one child".to_string(),
233 ));
234 }
235
236 Ok(Arc::new(Self::new(
237 Arc::clone(&children[0]),
238 self.expr.clone(),
239 self.variable.clone(),
240 self.params.clone(),
241 )))
242 }
243
244 fn execute(
245 &self,
246 partition: usize,
247 context: Arc<TaskContext>,
248 ) -> DFResult<SendableRecordBatchStream> {
249 let input_stream = self.input.execute(partition, context)?;
250 let metrics = BaselineMetrics::new(&self.metrics, partition);
251
252 Ok(Box::pin(GraphUnwindStream {
253 input: input_stream,
254 expr: self.expr.clone(),
255 params: self.params.clone(),
256 schema: Arc::clone(&self.schema),
257 metrics,
258 }))
259 }
260
261 fn metrics(&self) -> Option<MetricsSet> {
262 Some(self.metrics.clone_inner())
263 }
264}
265
266struct GraphUnwindStream {
268 input: SendableRecordBatchStream,
270
271 expr: Expr,
273
274 params: HashMap<String, Value>,
276
277 schema: SchemaRef,
279
280 metrics: BaselineMetrics,
282}
283
284impl GraphUnwindStream {
285 fn process_batch(&self, batch: RecordBatch) -> DFResult<RecordBatch> {
287 let mut expansions: Vec<(usize, Value)> = Vec::new(); for row_idx in 0..batch.num_rows() {
291 let list_value = self.evaluate_expr_for_row(&batch, row_idx)?;
293
294 match list_value {
295 Value::List(items) => {
296 for item in items {
297 expansions.push((row_idx, item));
298 }
299 }
300 Value::Null => {
301 }
303 other => {
304 expansions.push((row_idx, other));
306 }
307 }
308 }
309
310 self.build_output_batch(&batch, &expansions)
311 }
312
313 fn evaluate_expr_for_row(&self, batch: &RecordBatch, row_idx: usize) -> DFResult<Value> {
315 self.evaluate_expr_impl(&self.expr, batch, row_idx)
316 }
317
318 fn evaluate_expr_impl(
320 &self,
321 expr: &Expr,
322 batch: &RecordBatch,
323 row_idx: usize,
324 ) -> DFResult<Value> {
325 match expr {
326 Expr::List(items) => {
328 let mut values = Vec::with_capacity(items.len());
329 for item in items {
330 values.push(self.evaluate_expr_impl(item, batch, row_idx)?);
331 }
332 Ok(Value::List(values))
333 }
334
335 Expr::Literal(lit) => Ok(lit.to_value()),
337
338 Expr::Parameter(name) => self.params.get(name).cloned().ok_or_else(|| {
340 datafusion::error::DataFusionError::Execution(format!(
341 "Parameter '{}' not found",
342 name
343 ))
344 }),
345
346 Expr::Variable(var_name) => self.get_column_value(batch, var_name, row_idx),
348
349 Expr::Property(base_expr, prop_name) => {
351 if let Expr::Variable(var_name) = base_expr.as_ref() {
353 let col_name = format!("{}.{}", var_name, prop_name);
354 if batch.schema().column_with_name(&col_name).is_some() {
355 return self.get_column_value(batch, &col_name, row_idx);
356 }
357 }
358
359 let base_value = self.evaluate_expr_impl(base_expr, batch, row_idx)?;
361 if let Value::Map(map) = base_value {
362 Ok(map.get(prop_name).cloned().unwrap_or(Value::Null))
363 } else {
364 Ok(Value::Null)
365 }
366 }
367
368 Expr::FunctionCall { name, args, .. } => {
370 let name_lower = name.to_lowercase();
371 match name_lower.as_str() {
372 "range" => {
373 if args.len() >= 2 {
374 let start = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
375 let end = self.evaluate_expr_impl(&args[1], batch, row_idx)?;
376 let step = if args.len() >= 3 {
377 self.evaluate_expr_impl(&args[2], batch, row_idx)?
378 } else {
379 Value::Int(1)
380 };
381
382 if start.is_null() || end.is_null() || step.is_null() {
384 return Ok(Value::Null);
385 }
386
387 let (s, e, st) = match (start.as_i64(), end.as_i64(), step.as_i64()) {
392 (Some(s), Some(e), Some(st)) => (s, e, st),
393 _ => {
394 return Err(datafusion::error::DataFusionError::Execution(
395 format!(
396 "range() requires integer arguments, got start={start:?}, end={end:?}, step={step:?}"
397 ),
398 ));
399 }
400 };
401 if st == 0 {
402 return Err(datafusion::error::DataFusionError::Execution(
403 "range() step argument cannot be 0".to_string(),
404 ));
405 }
406 let mut result = Vec::new();
407 let mut i = s;
408 while (st > 0 && i <= e) || (st < 0 && i >= e) {
409 result.push(Value::Int(i));
410 match i.checked_add(st) {
414 Some(next) => i = next,
415 None => break,
416 }
417 }
418 return Ok(Value::List(result));
419 }
420 Ok(Value::List(vec![]))
421 }
422 "keys" => {
423 if args.len() == 1 {
424 let val = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
425 if let Value::Map(map) = val {
426 let source = match map.get("_all_props") {
429 Some(Value::Map(all)) => all,
430 _ => &map,
431 };
432 let mut key_strings: Vec<String> = source
433 .iter()
434 .filter(|(k, v)| !v.is_null() && !k.starts_with('_'))
435 .map(|(k, _)| k.clone())
436 .collect();
437 key_strings.sort();
438 let keys: Vec<Value> =
439 key_strings.into_iter().map(Value::String).collect();
440 return Ok(Value::List(keys));
441 }
442 if val.is_null() {
443 return Ok(Value::Null);
444 }
445 }
446 Ok(Value::List(vec![]))
447 }
448 "size" | "length" => {
449 if args.len() == 1 {
450 let val = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
451 let sz = match &val {
452 Value::List(arr) => arr.len() as i64,
453 Value::String(s) => s.chars().count() as i64,
456 Value::Map(m) => m.len() as i64,
457 _ => 0,
458 };
459 return Ok(Value::Int(sz));
460 }
461 Ok(Value::Null)
462 }
463 "date" | "time" | "localtime" | "datetime" | "localdatetime" | "duration" => {
465 let mut eval_args = Vec::with_capacity(args.len());
466 for arg in args {
467 eval_args.push(self.evaluate_expr_impl(arg, batch, row_idx)?);
468 }
469 crate::query::datetime::eval_datetime_function(
470 &name.to_uppercase(),
471 &eval_args,
472 )
473 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
474 }
475 "split" => {
476 let mut eval_args = Vec::with_capacity(args.len());
477 for arg in args {
478 eval_args.push(self.evaluate_expr_impl(arg, batch, row_idx)?);
479 }
480 crate::query::expr_eval::eval_split(&eval_args).map_err(|e| {
481 datafusion::error::DataFusionError::Execution(e.to_string())
482 })
483 }
484 _ => {
485 Ok(Value::List(vec![]))
487 }
488 }
489 }
490
491 Expr::BinaryOp { left, op, right } => {
493 let l = self.evaluate_expr_impl(left, batch, row_idx)?;
494 let r = self.evaluate_expr_impl(right, batch, row_idx)?;
495 crate::query::expr_eval::eval_binary_op(&l, op, &r)
496 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
497 }
498
499 Expr::Map(entries) => {
501 let mut map = HashMap::new();
502 for (key, val_expr) in entries {
503 let val = self.evaluate_expr_impl(val_expr, batch, row_idx)?;
504 map.insert(key.clone(), val);
505 }
506 Ok(Value::Map(map))
507 }
508
509 Expr::ArrayIndex { array, index } => {
511 let arr_val = self.evaluate_expr_impl(array, batch, row_idx)?;
512 let idx_val = self.evaluate_expr_impl(index, batch, row_idx)?;
513 match (&arr_val, idx_val.as_i64()) {
514 (Value::List(list), Some(i)) => {
515 let len = list.len() as i64;
517 let resolved = if i < 0 { len + i } else { i };
518 if resolved >= 0 && (resolved as usize) < list.len() {
519 Ok(list[resolved as usize].clone())
520 } else {
521 Ok(Value::Null)
522 }
523 }
524 _ => Ok(Value::Null),
525 }
526 }
527
528 _ => Ok(Value::Null),
530 }
531 }
532
533 fn get_column_value(
535 &self,
536 batch: &RecordBatch,
537 col_name: &str,
538 row_idx: usize,
539 ) -> DFResult<Value> {
540 let col = batch.column_by_name(col_name).ok_or_else(|| {
541 datafusion::error::DataFusionError::Execution(format!(
542 "Column '{}' not found for UNWIND",
543 col_name
544 ))
545 })?;
546
547 Ok(arrow_to_json_value(col.as_ref(), row_idx))
548 }
549
550 fn build_output_batch(
552 &self,
553 input: &RecordBatch,
554 expansions: &[(usize, Value)],
555 ) -> DFResult<RecordBatch> {
556 if expansions.is_empty() {
557 return Ok(RecordBatch::new_empty(Arc::clone(&self.schema)));
558 }
559
560 let num_rows = expansions.len();
561
562 let indices: Vec<u64> = expansions.iter().map(|(idx, _)| *idx as u64).collect();
564 let indices_array = UInt64Array::from(indices);
565
566 let mut columns: Vec<ArrayRef> = Vec::new();
568 for col in input.columns() {
569 let expanded = take(col.as_ref(), &indices_array, None)?;
570 columns.push(expanded);
571 }
572
573 let unwind_field = self.schema.field(self.schema.fields().len() - 1);
575 let is_cv_encoded = unwind_field
576 .metadata()
577 .get("cv_encoded")
578 .is_some_and(|v| v == "true");
579
580 let unwind_col: ArrayRef = match (unwind_field.data_type(), is_cv_encoded) {
581 (DataType::Boolean, false) => {
582 let mut builder = BooleanBuilder::with_capacity(num_rows);
583 for (_, value) in expansions {
584 if let Value::Bool(b) = value {
585 builder.append_value(*b);
586 } else {
587 builder.append_null();
588 }
589 }
590 Arc::new(builder.finish())
591 }
592 (DataType::Int64, false) => {
593 let mut builder = Int64Builder::with_capacity(num_rows);
594 for (_, value) in expansions {
595 if let Value::Int(i) = value {
596 builder.append_value(*i);
597 } else {
598 builder.append_null();
599 }
600 }
601 Arc::new(builder.finish())
602 }
603 (DataType::Float64, false) => {
604 let mut builder = Float64Builder::with_capacity(num_rows);
605 for (_, value) in expansions {
606 if let Value::Float(f) = value {
607 builder.append_value(*f);
608 } else {
609 builder.append_null();
610 }
611 }
612 Arc::new(builder.finish())
613 }
614 (DataType::Utf8, false) => {
615 let mut builder = StringBuilder::new();
616 for (_, value) in expansions {
617 if let Value::String(s) = value {
618 builder.append_value(s);
619 } else {
620 builder.append_null();
621 }
622 }
623 Arc::new(builder.finish())
624 }
625 (DataType::LargeBinary, _) => {
626 let mut builder = LargeBinaryBuilder::with_capacity(num_rows, num_rows * 16);
628 for (_, value) in expansions {
629 if value.is_null() {
630 builder.append_null();
631 } else {
632 let encoded = uni_common::cypher_value_codec::encode(value);
633 builder.append_value(&encoded);
634 }
635 }
636 Arc::new(builder.finish())
637 }
638 _ => {
639 let mut builder = StringBuilder::new();
641 for (_, value) in expansions {
642 if value.is_null() {
643 builder.append_null();
644 } else {
645 let json_val: serde_json::Value = value.clone().into();
646 let json_str =
647 serde_json::to_string(&json_val).unwrap_or_else(|_| "null".to_string());
648 builder.append_value(&json_str);
649 }
650 }
651 Arc::new(builder.finish())
652 }
653 };
654 columns.push(unwind_col);
655
656 self.metrics.record_output(num_rows);
657
658 RecordBatch::try_new(Arc::clone(&self.schema), columns).map_err(arrow_err)
659 }
660}
661
662impl Stream for GraphUnwindStream {
663 type Item = DFResult<RecordBatch>;
664
665 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
666 let metrics = self.metrics.clone();
667 let _timer = metrics.elapsed_compute().timer();
668 match self.input.poll_next_unpin(cx) {
669 Poll::Ready(Some(Ok(batch))) => {
670 let result = self.process_batch(batch);
671 Poll::Ready(Some(result))
672 }
673 other => other,
674 }
675 }
676}
677
678impl RecordBatchStream for GraphUnwindStream {
679 fn schema(&self) -> SchemaRef {
680 Arc::clone(&self.schema)
681 }
682}
683
684pub(crate) fn arrow_to_json_value(array: &dyn Array, row: usize) -> Value {
686 use arrow_array::{
687 BooleanArray, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array,
688 LargeStringArray, ListArray, StringArray, UInt8Array, UInt16Array, UInt32Array,
689 UInt64Array,
690 };
691
692 if array.is_null(row) {
693 return Value::Null;
694 }
695
696 let any = array.as_any();
697
698 if let Some(arr) = any.downcast_ref::<StringArray>() {
700 return Value::String(arr.value(row).to_string());
701 }
702 if let Some(arr) = any.downcast_ref::<LargeStringArray>() {
703 return Value::String(arr.value(row).to_string());
704 }
705
706 macro_rules! try_int {
708 ($arr_type:ty) => {
709 if let Some(arr) = any.downcast_ref::<$arr_type>() {
710 return Value::Int(arr.value(row) as i64);
711 }
712 };
713 }
714 try_int!(Int64Array);
715 try_int!(Int32Array);
716 try_int!(Int16Array);
717 try_int!(Int8Array);
718 try_int!(UInt64Array);
719 try_int!(UInt32Array);
720 try_int!(UInt16Array);
721 try_int!(UInt8Array);
722
723 if let Some(arr) = any.downcast_ref::<Float64Array>() {
725 return Value::Float(arr.value(row));
726 }
727 if let Some(arr) = any.downcast_ref::<Float32Array>() {
728 return Value::Float(arr.value(row) as f64);
729 }
730
731 if let Some(arr) = any.downcast_ref::<BooleanArray>() {
733 return Value::Bool(arr.value(row));
734 }
735
736 if let Some(arr) = any.downcast_ref::<ListArray>() {
738 let values = arr.value(row);
739 let result: Vec<Value> = (0..values.len())
740 .map(|i| arrow_to_json_value(values.as_ref(), i))
741 .collect();
742 return Value::List(result);
743 }
744
745 if let Some(arr) = any.downcast_ref::<arrow_array::LargeBinaryArray>() {
747 let bytes = arr.value(row);
748 if let Ok(uni_val) = uni_common::cypher_value_codec::decode(bytes) {
749 return uni_val;
750 }
751 if let Ok(parsed) = serde_json::from_slice::<serde_json::Value>(bytes) {
753 return Value::from(parsed);
754 }
755 return Value::Null;
756 }
757
758 if let Some(s) = any.downcast_ref::<arrow_array::StructArray>() {
760 let mut map = HashMap::new();
761 for (field, child) in s.fields().iter().zip(s.columns()) {
762 map.insert(
763 field.name().clone(),
764 arrow_to_json_value(child.as_ref(), row),
765 );
766 }
767 return Value::Map(map);
768 }
769
770 Value::Null
772}
773
774#[cfg(test)]
775mod tests {
776 use super::*;
777 use arrow_array::{LargeBinaryArray, UInt64Array};
778 use uni_cypher::ast::CypherLiteral;
779
780 #[test]
781 fn test_build_schema() {
782 let input_schema = Arc::new(Schema::new(vec![
783 Field::new("n._vid", DataType::UInt64, false),
784 Field::new("n.name", DataType::Utf8, true),
785 ]));
786
787 let expr = Expr::Variable("some_list".to_string());
789 let output_schema = GraphUnwindExec::build_schema(input_schema, "item", &expr);
790
791 assert_eq!(output_schema.fields().len(), 3);
792 assert_eq!(output_schema.field(0).name(), "n._vid");
793 assert_eq!(output_schema.field(1).name(), "n.name");
794 assert_eq!(output_schema.field(2).name(), "item");
795 assert_eq!(output_schema.field(2).data_type(), &DataType::LargeBinary);
796 assert_eq!(
797 output_schema
798 .field(2)
799 .metadata()
800 .get("cv_encoded")
801 .map(String::as_str),
802 Some("true")
803 );
804 }
805
806 #[test]
807 fn test_build_schema_boolean_list() {
808 let input_schema = Arc::new(Schema::new(vec![Field::new(
809 "n._vid",
810 DataType::UInt64,
811 false,
812 )]));
813
814 let expr = Expr::List(vec![
815 Expr::Literal(CypherLiteral::Bool(true)),
816 Expr::Literal(CypherLiteral::Bool(false)),
817 Expr::Literal(CypherLiteral::Null),
818 ]);
819 let output_schema = GraphUnwindExec::build_schema(input_schema, "a", &expr);
820
821 let field = output_schema.field(1);
822 assert_eq!(field.name(), "a");
823 assert_eq!(field.data_type(), &DataType::Boolean);
824 assert!(field.metadata().is_empty());
825 }
826
827 #[test]
828 fn test_build_schema_integer_list() {
829 let input_schema = Arc::new(Schema::new(vec![Field::new(
830 "n._vid",
831 DataType::UInt64,
832 false,
833 )]));
834
835 let expr = Expr::List(vec![
836 Expr::Literal(CypherLiteral::Integer(1)),
837 Expr::Literal(CypherLiteral::Integer(2)),
838 Expr::Literal(CypherLiteral::Integer(3)),
839 ]);
840 let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
841
842 let field = output_schema.field(1);
843 assert_eq!(field.name(), "x");
844 assert_eq!(field.data_type(), &DataType::Int64);
845 assert!(field.metadata().is_empty());
846 }
847
848 #[test]
849 fn test_build_schema_float_list() {
850 let input_schema = Arc::new(Schema::new(vec![Field::new(
851 "n._vid",
852 DataType::UInt64,
853 false,
854 )]));
855
856 let expr = Expr::List(vec![
857 Expr::Literal(CypherLiteral::Float(1.5)),
858 Expr::Literal(CypherLiteral::Float(2.5)),
859 ]);
860 let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
861
862 let field = output_schema.field(1);
863 assert_eq!(field.name(), "x");
864 assert_eq!(field.data_type(), &DataType::Float64);
865 assert!(field.metadata().is_empty());
866 }
867
868 #[test]
869 fn test_build_schema_string_list() {
870 let input_schema = Arc::new(Schema::new(vec![Field::new(
871 "n._vid",
872 DataType::UInt64,
873 false,
874 )]));
875
876 let expr = Expr::List(vec![
877 Expr::Literal(CypherLiteral::String("hello".to_string())),
878 Expr::Literal(CypherLiteral::String("world".to_string())),
879 ]);
880 let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
881
882 let field = output_schema.field(1);
883 assert_eq!(field.name(), "x");
884 assert_eq!(field.data_type(), &DataType::Utf8);
885 assert!(field.metadata().is_empty());
887 }
888
889 #[test]
890 fn test_build_schema_mixed_list() {
891 let input_schema = Arc::new(Schema::new(vec![Field::new(
892 "n._vid",
893 DataType::UInt64,
894 false,
895 )]));
896
897 let expr = Expr::List(vec![
898 Expr::Literal(CypherLiteral::Integer(1)),
899 Expr::Literal(CypherLiteral::String("hello".to_string())),
900 ]);
901 let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
902
903 let field = output_schema.field(1);
904 assert_eq!(field.name(), "x");
905 assert_eq!(field.data_type(), &DataType::LargeBinary);
906 assert_eq!(
907 field.metadata().get("cv_encoded").map(String::as_str),
908 Some("true")
909 );
910 }
911
912 #[test]
913 fn test_evaluate_literal_list() {
914 use arrow_array::builder::UInt64Builder;
915 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
916
917 let mut vid_builder = UInt64Builder::new();
919 vid_builder.append_value(1);
920
921 let batch = RecordBatch::try_new(
922 Arc::new(Schema::new(vec![Field::new(
923 "n._vid",
924 DataType::UInt64,
925 false,
926 )])),
927 vec![Arc::new(vid_builder.finish())],
928 )
929 .unwrap();
930
931 let input_schema = Arc::new(Schema::new(vec![Field::new(
933 "n._vid",
934 DataType::UInt64,
935 false,
936 )]));
937
938 let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
940
941 let stream = GraphUnwindStream {
943 input: Box::pin(empty_stream),
944 expr: Expr::List(vec![
945 Expr::Literal(CypherLiteral::Integer(1)),
946 Expr::Literal(CypherLiteral::Integer(2)),
947 Expr::Literal(CypherLiteral::Integer(3)),
948 ]),
949 params: HashMap::new(),
950 schema: Arc::new(Schema::new(vec![
951 Field::new("n._vid", DataType::UInt64, false),
952 Field::new("x", DataType::Utf8, true),
953 ])),
954 metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
955 };
956
957 let result = stream.evaluate_expr_for_row(&batch, 0).unwrap();
958 match result {
959 Value::List(items) => {
960 assert_eq!(items.len(), 3);
961 assert_eq!(items[0], Value::Int(1));
962 assert_eq!(items[1], Value::Int(2));
963 assert_eq!(items[2], Value::Int(3));
964 }
965 _ => panic!("Expected list"),
966 }
967 }
968
969 #[test]
970 fn test_evaluate_map_literal() {
971 use arrow_array::builder::UInt64Builder;
972 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
973
974 let mut vid_builder = UInt64Builder::new();
975 vid_builder.append_value(1);
976
977 let batch = RecordBatch::try_new(
978 Arc::new(Schema::new(vec![Field::new(
979 "n._vid",
980 DataType::UInt64,
981 false,
982 )])),
983 vec![Arc::new(vid_builder.finish())],
984 )
985 .unwrap();
986
987 let input_schema = Arc::new(Schema::new(vec![Field::new(
988 "n._vid",
989 DataType::UInt64,
990 false,
991 )]));
992
993 let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
994
995 let stream = GraphUnwindStream {
996 input: Box::pin(empty_stream),
997 expr: Expr::Map(vec![
998 ("a".to_string(), Expr::Literal(CypherLiteral::Integer(1))),
999 (
1000 "b".to_string(),
1001 Expr::Literal(CypherLiteral::String("hello".to_string())),
1002 ),
1003 ]),
1004 params: HashMap::new(),
1005 schema: Arc::new(Schema::new(vec![
1006 Field::new("n._vid", DataType::UInt64, false),
1007 Field::new("x", DataType::LargeBinary, true),
1008 ])),
1009 metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
1010 };
1011
1012 let result = stream.evaluate_expr_for_row(&batch, 0).unwrap();
1013 match result {
1014 Value::Map(map) => {
1015 assert_eq!(map.get("a"), Some(&Value::Int(1)));
1016 assert_eq!(map.get("b"), Some(&Value::String("hello".to_string())));
1017 }
1018 _ => panic!("Expected Map, got {:?}", result),
1019 }
1020 }
1021
1022 #[test]
1023 fn test_evaluate_map_property_access() {
1024 use arrow_array::builder::UInt64Builder;
1025 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1026
1027 let mut vid_builder = UInt64Builder::new();
1028 vid_builder.append_value(1);
1029
1030 let batch = RecordBatch::try_new(
1031 Arc::new(Schema::new(vec![Field::new(
1032 "n._vid",
1033 DataType::UInt64,
1034 false,
1035 )])),
1036 vec![Arc::new(vid_builder.finish())],
1037 )
1038 .unwrap();
1039
1040 let input_schema = Arc::new(Schema::new(vec![Field::new(
1041 "n._vid",
1042 DataType::UInt64,
1043 false,
1044 )]));
1045
1046 let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
1047
1048 let map_expr = Expr::Map(vec![
1050 ("a".to_string(), Expr::Literal(CypherLiteral::Integer(1))),
1051 (
1052 "b".to_string(),
1053 Expr::Literal(CypherLiteral::String("x".to_string())),
1054 ),
1055 ]);
1056 let prop_expr = Expr::Property(Box::new(map_expr), "a".to_string());
1057
1058 let stream = GraphUnwindStream {
1059 input: Box::pin(empty_stream),
1060 expr: prop_expr.clone(),
1061 params: HashMap::new(),
1062 schema: Arc::new(Schema::new(vec![
1063 Field::new("n._vid", DataType::UInt64, false),
1064 Field::new("x", DataType::LargeBinary, true),
1065 ])),
1066 metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
1067 };
1068
1069 let result = stream.evaluate_expr_impl(&prop_expr, &batch, 0).unwrap();
1070 assert_eq!(result, Value::Int(1));
1071 }
1072
1073 #[test]
1074 fn test_arrow_to_json_value_uint64_is_coerced_to_int() {
1075 let arr = UInt64Array::from(vec![Some(42u64)]);
1076 let value = arrow_to_json_value(&arr, 0);
1077 assert_eq!(value, Value::Int(42));
1078 }
1079
1080 #[test]
1081 fn test_arrow_to_json_value_largebinary_decodes_cypher_map() {
1082 let encoded = uni_common::cypher_value_codec::encode(&Value::Map(HashMap::new()));
1083 let arr = LargeBinaryArray::from(vec![Some(encoded.as_slice())]);
1084 let value = arrow_to_json_value(&arr, 0);
1085 assert_eq!(value, Value::Map(HashMap::new()));
1086 }
1087
1088 fn eval_scalar_fn(name: &str, args: Vec<Expr>) -> DFResult<Value> {
1091 use arrow_array::builder::UInt64Builder;
1092 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1093
1094 let mut vid_builder = UInt64Builder::new();
1095 vid_builder.append_value(1);
1096 let schema = Arc::new(Schema::new(vec![Field::new(
1097 "n._vid",
1098 DataType::UInt64,
1099 false,
1100 )]));
1101 let batch =
1102 RecordBatch::try_new(schema.clone(), vec![Arc::new(vid_builder.finish())]).unwrap();
1103 let empty_stream = RecordBatchStreamAdapter::new(schema.clone(), futures::stream::empty());
1104
1105 let expr = Expr::FunctionCall {
1106 name: name.to_string(),
1107 args,
1108 distinct: false,
1109 window_spec: None,
1110 };
1111 let stream = GraphUnwindStream {
1112 input: Box::pin(empty_stream),
1113 expr: expr.clone(),
1114 params: HashMap::new(),
1115 schema,
1116 metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
1117 };
1118 stream.evaluate_expr_impl(&expr, &batch, 0)
1119 }
1120
1121 fn int_lit(v: i64) -> Expr {
1122 Expr::Literal(CypherLiteral::Integer(v))
1123 }
1124
1125 #[test]
1128 fn test_range_overflow_terminates() {
1129 let result = eval_scalar_fn(
1130 "range",
1131 vec![int_lit(i64::MAX - 1), int_lit(i64::MAX), int_lit(2)],
1132 )
1133 .unwrap();
1134 assert_eq!(result, Value::List(vec![Value::Int(i64::MAX - 1)]));
1136 }
1137
1138 #[test]
1140 fn test_range_zero_step_errors() {
1141 let err = eval_scalar_fn("range", vec![int_lit(1), int_lit(5), int_lit(0)]);
1142 assert!(err.is_err(), "range(1, 5, 0) must error, got {err:?}");
1143 }
1144
1145 #[test]
1147 fn test_range_float_args_error() {
1148 let err = eval_scalar_fn(
1149 "range",
1150 vec![
1151 Expr::Literal(CypherLiteral::Float(1.0)),
1152 Expr::Literal(CypherLiteral::Float(3.0)),
1153 ],
1154 );
1155 assert!(err.is_err(), "range(1.0, 3.0) must error, got {err:?}");
1156 }
1157
1158 #[test]
1160 fn test_size_string_counts_chars_not_bytes() {
1161 let result = eval_scalar_fn(
1162 "size",
1163 vec![Expr::Literal(CypherLiteral::String("héllo".to_string()))],
1164 )
1165 .unwrap();
1166 assert_eq!(result, Value::Int(5));
1168 }
1169}