1use crate::query::df_graph::common::{arrow_err, compute_plan_properties};
27use arrow::compute::take;
28use arrow_array::builder::{
29 BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder,
30};
31use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
32use arrow_schema::{DataType, Field, Schema, SchemaRef};
33use datafusion::common::Result as DFResult;
34use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
35use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
36use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
37use futures::{Stream, StreamExt};
38use std::any::Any;
39use std::collections::HashMap;
40use std::fmt;
41use std::pin::Pin;
42use std::sync::Arc;
43use std::task::{Context, Poll};
44use uni_common::Value;
45use uni_cypher::ast::{CypherLiteral, Expr};
46
47struct ElementTypeInfo {
49 data_type: DataType,
51 is_cv_encoded: bool,
53}
54
55pub struct GraphUnwindExec {
62 input: Arc<dyn ExecutionPlan>,
64
65 expr: Expr,
67
68 variable: String,
70
71 params: HashMap<String, Value>,
73
74 schema: SchemaRef,
76
77 properties: Arc<PlanProperties>,
79
80 metrics: ExecutionPlanMetricsSet,
82}
83
84impl fmt::Debug for GraphUnwindExec {
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 f.debug_struct("GraphUnwindExec")
87 .field("expr", &self.expr)
88 .field("variable", &self.variable)
89 .finish()
90 }
91}
92
93impl GraphUnwindExec {
94 pub fn new(
103 input: Arc<dyn ExecutionPlan>,
104 expr: Expr,
105 variable: impl Into<String>,
106 params: HashMap<String, Value>,
107 ) -> Self {
108 let variable = variable.into();
109
110 let schema = Self::build_schema(input.schema(), &variable, &expr);
112 let properties = compute_plan_properties(schema.clone());
113
114 Self {
115 input,
116 expr,
117 variable,
118 params,
119 schema,
120 properties,
121 metrics: ExecutionPlanMetricsSet::new(),
122 }
123 }
124
125 fn infer_element_type(expr: &Expr) -> ElementTypeInfo {
131 let json_fallback = || ElementTypeInfo {
132 data_type: DataType::LargeBinary,
133 is_cv_encoded: true,
134 };
135
136 let Expr::List(items) = expr else {
137 return json_fallback();
138 };
139
140 let first_type = items.iter().find_map(|item| match item {
142 Expr::Literal(CypherLiteral::Null) => None,
143 Expr::Literal(CypherLiteral::Bool(_)) => Some(DataType::Boolean),
144 Expr::Literal(CypherLiteral::Integer(_)) => Some(DataType::Int64),
145 Expr::Literal(CypherLiteral::Float(_)) => Some(DataType::Float64),
146 Expr::Literal(CypherLiteral::String(_)) => Some(DataType::Utf8),
147 _ => Some(DataType::Utf8), });
149
150 let Some(expected) = first_type else {
151 return json_fallback(); };
153
154 let all_match = items.iter().all(|item| match item {
156 Expr::Literal(CypherLiteral::Null) => true,
157 Expr::Literal(CypherLiteral::Bool(_)) => expected == DataType::Boolean,
158 Expr::Literal(CypherLiteral::Integer(_)) => expected == DataType::Int64,
159 Expr::Literal(CypherLiteral::Float(_)) => expected == DataType::Float64,
160 Expr::Literal(CypherLiteral::String(_)) => expected == DataType::Utf8,
161 _ => false, });
163
164 if all_match {
165 ElementTypeInfo {
166 data_type: expected,
167 is_cv_encoded: false,
168 }
169 } else {
170 json_fallback()
171 }
172 }
173
174 fn build_schema(input_schema: SchemaRef, variable: &str, expr: &Expr) -> SchemaRef {
180 let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
181
182 let type_info = Self::infer_element_type(expr);
183
184 let mut field = Field::new(variable, type_info.data_type, true);
185 if type_info.is_cv_encoded {
186 field = field.with_metadata(HashMap::from([("cv_encoded".into(), "true".into())]));
187 }
188 fields.push(Arc::new(field));
189
190 Arc::new(Schema::new(fields))
191 }
192}
193
194impl DisplayAs for GraphUnwindExec {
195 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 write!(
197 f,
198 "GraphUnwindExec: {} AS {}",
199 self.expr.to_string_repr(),
200 self.variable
201 )
202 }
203}
204
205impl ExecutionPlan for GraphUnwindExec {
206 fn name(&self) -> &str {
207 "GraphUnwindExec"
208 }
209
210 fn as_any(&self) -> &dyn Any {
211 self
212 }
213
214 fn schema(&self) -> SchemaRef {
215 Arc::clone(&self.schema)
216 }
217
218 fn properties(&self) -> &Arc<PlanProperties> {
219 &self.properties
220 }
221
222 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
223 vec![&self.input]
224 }
225
226 fn with_new_children(
227 self: Arc<Self>,
228 children: Vec<Arc<dyn ExecutionPlan>>,
229 ) -> DFResult<Arc<dyn ExecutionPlan>> {
230 if children.len() != 1 {
231 return Err(datafusion::error::DataFusionError::Plan(
232 "GraphUnwindExec requires exactly one child".to_string(),
233 ));
234 }
235
236 Ok(Arc::new(Self::new(
237 Arc::clone(&children[0]),
238 self.expr.clone(),
239 self.variable.clone(),
240 self.params.clone(),
241 )))
242 }
243
244 fn execute(
245 &self,
246 partition: usize,
247 context: Arc<TaskContext>,
248 ) -> DFResult<SendableRecordBatchStream> {
249 let input_stream = self.input.execute(partition, context)?;
250 let metrics = BaselineMetrics::new(&self.metrics, partition);
251
252 Ok(Box::pin(GraphUnwindStream {
253 input: input_stream,
254 expr: self.expr.clone(),
255 params: self.params.clone(),
256 schema: Arc::clone(&self.schema),
257 metrics,
258 }))
259 }
260
261 fn metrics(&self) -> Option<MetricsSet> {
262 Some(self.metrics.clone_inner())
263 }
264}
265
266struct GraphUnwindStream {
268 input: SendableRecordBatchStream,
270
271 expr: Expr,
273
274 params: HashMap<String, Value>,
276
277 schema: SchemaRef,
279
280 metrics: BaselineMetrics,
282}
283
284impl GraphUnwindStream {
285 fn process_batch(&self, batch: RecordBatch) -> DFResult<RecordBatch> {
287 let mut expansions: Vec<(usize, Value)> = Vec::new(); for row_idx in 0..batch.num_rows() {
291 let list_value = self.evaluate_expr_for_row(&batch, row_idx)?;
293
294 match list_value {
295 Value::List(items) => {
296 for item in items {
297 expansions.push((row_idx, item));
298 }
299 }
300 Value::Null => {
301 }
303 other => {
304 expansions.push((row_idx, other));
306 }
307 }
308 }
309
310 self.build_output_batch(&batch, &expansions)
311 }
312
313 fn evaluate_expr_for_row(&self, batch: &RecordBatch, row_idx: usize) -> DFResult<Value> {
315 self.evaluate_expr_impl(&self.expr, batch, row_idx)
316 }
317
318 fn evaluate_expr_impl(
320 &self,
321 expr: &Expr,
322 batch: &RecordBatch,
323 row_idx: usize,
324 ) -> DFResult<Value> {
325 match expr {
326 Expr::List(items) => {
328 let mut values = Vec::with_capacity(items.len());
329 for item in items {
330 values.push(self.evaluate_expr_impl(item, batch, row_idx)?);
331 }
332 Ok(Value::List(values))
333 }
334
335 Expr::Literal(lit) => Ok(lit.to_value()),
337
338 Expr::Parameter(name) => self.params.get(name).cloned().ok_or_else(|| {
340 datafusion::error::DataFusionError::Execution(format!(
341 "Parameter '{}' not found",
342 name
343 ))
344 }),
345
346 Expr::Variable(var_name) => self.get_column_value(batch, var_name, row_idx),
348
349 Expr::Property(base_expr, prop_name) => {
351 if let Expr::Variable(var_name) = base_expr.as_ref() {
353 let col_name = format!("{}.{}", var_name, prop_name);
354 if batch.schema().column_with_name(&col_name).is_some() {
355 return self.get_column_value(batch, &col_name, row_idx);
356 }
357 }
358
359 let base_value = self.evaluate_expr_impl(base_expr, batch, row_idx)?;
361 if let Value::Map(map) = base_value {
362 Ok(map.get(prop_name).cloned().unwrap_or(Value::Null))
363 } else {
364 Ok(Value::Null)
365 }
366 }
367
368 Expr::FunctionCall { name, args, .. } => {
370 let name_lower = name.to_lowercase();
371 match name_lower.as_str() {
372 "range" => {
373 if args.len() >= 2 {
374 let start = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
375 let end = self.evaluate_expr_impl(&args[1], batch, row_idx)?;
376 let step = if args.len() >= 3 {
377 self.evaluate_expr_impl(&args[2], batch, row_idx)?
378 } else {
379 Value::Int(1)
380 };
381
382 if let (Some(s), Some(e), Some(st)) =
383 (start.as_i64(), end.as_i64(), step.as_i64())
384 {
385 let mut result = Vec::new();
386 let mut i = s;
387 while (st > 0 && i <= e) || (st < 0 && i >= e) {
388 result.push(Value::Int(i));
389 i += st;
390 }
391 return Ok(Value::List(result));
392 }
393 }
394 Ok(Value::List(vec![]))
395 }
396 "keys" => {
397 if args.len() == 1 {
398 let val = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
399 if let Value::Map(map) = val {
400 let source = match map.get("_all_props") {
403 Some(Value::Map(all)) => all,
404 _ => &map,
405 };
406 let mut key_strings: Vec<String> = source
407 .iter()
408 .filter(|(k, v)| !v.is_null() && !k.starts_with('_'))
409 .map(|(k, _)| k.clone())
410 .collect();
411 key_strings.sort();
412 let keys: Vec<Value> =
413 key_strings.into_iter().map(Value::String).collect();
414 return Ok(Value::List(keys));
415 }
416 if val.is_null() {
417 return Ok(Value::Null);
418 }
419 }
420 Ok(Value::List(vec![]))
421 }
422 "size" | "length" => {
423 if args.len() == 1 {
424 let val = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
425 let sz = match &val {
426 Value::List(arr) => arr.len() as i64,
427 Value::String(s) => s.len() as i64,
428 Value::Map(m) => m.len() as i64,
429 _ => 0,
430 };
431 return Ok(Value::Int(sz));
432 }
433 Ok(Value::Null)
434 }
435 "date" | "time" | "localtime" | "datetime" | "localdatetime" | "duration" => {
437 let mut eval_args = Vec::with_capacity(args.len());
438 for arg in args {
439 eval_args.push(self.evaluate_expr_impl(arg, batch, row_idx)?);
440 }
441 crate::query::datetime::eval_datetime_function(
442 &name.to_uppercase(),
443 &eval_args,
444 )
445 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
446 }
447 "split" => {
448 let mut eval_args = Vec::with_capacity(args.len());
449 for arg in args {
450 eval_args.push(self.evaluate_expr_impl(arg, batch, row_idx)?);
451 }
452 crate::query::expr_eval::eval_split(&eval_args).map_err(|e| {
453 datafusion::error::DataFusionError::Execution(e.to_string())
454 })
455 }
456 _ => {
457 Ok(Value::List(vec![]))
459 }
460 }
461 }
462
463 Expr::BinaryOp { left, op, right } => {
465 let l = self.evaluate_expr_impl(left, batch, row_idx)?;
466 let r = self.evaluate_expr_impl(right, batch, row_idx)?;
467 crate::query::expr_eval::eval_binary_op(&l, op, &r)
468 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
469 }
470
471 Expr::Map(entries) => {
473 let mut map = HashMap::new();
474 for (key, val_expr) in entries {
475 let val = self.evaluate_expr_impl(val_expr, batch, row_idx)?;
476 map.insert(key.clone(), val);
477 }
478 Ok(Value::Map(map))
479 }
480
481 Expr::ArrayIndex { array, index } => {
483 let arr_val = self.evaluate_expr_impl(array, batch, row_idx)?;
484 let idx_val = self.evaluate_expr_impl(index, batch, row_idx)?;
485 match (&arr_val, idx_val.as_i64()) {
486 (Value::List(list), Some(i)) => {
487 let len = list.len() as i64;
489 let resolved = if i < 0 { len + i } else { i };
490 if resolved >= 0 && (resolved as usize) < list.len() {
491 Ok(list[resolved as usize].clone())
492 } else {
493 Ok(Value::Null)
494 }
495 }
496 _ => Ok(Value::Null),
497 }
498 }
499
500 _ => Ok(Value::Null),
502 }
503 }
504
505 fn get_column_value(
507 &self,
508 batch: &RecordBatch,
509 col_name: &str,
510 row_idx: usize,
511 ) -> DFResult<Value> {
512 let col = batch.column_by_name(col_name).ok_or_else(|| {
513 datafusion::error::DataFusionError::Execution(format!(
514 "Column '{}' not found for UNWIND",
515 col_name
516 ))
517 })?;
518
519 Ok(arrow_to_json_value(col.as_ref(), row_idx))
520 }
521
522 fn build_output_batch(
524 &self,
525 input: &RecordBatch,
526 expansions: &[(usize, Value)],
527 ) -> DFResult<RecordBatch> {
528 if expansions.is_empty() {
529 return Ok(RecordBatch::new_empty(Arc::clone(&self.schema)));
530 }
531
532 let num_rows = expansions.len();
533
534 let indices: Vec<u64> = expansions.iter().map(|(idx, _)| *idx as u64).collect();
536 let indices_array = UInt64Array::from(indices);
537
538 let mut columns: Vec<ArrayRef> = Vec::new();
540 for col in input.columns() {
541 let expanded = take(col.as_ref(), &indices_array, None)?;
542 columns.push(expanded);
543 }
544
545 let unwind_field = self.schema.field(self.schema.fields().len() - 1);
547 let is_cv_encoded = unwind_field
548 .metadata()
549 .get("cv_encoded")
550 .is_some_and(|v| v == "true");
551
552 let unwind_col: ArrayRef = match (unwind_field.data_type(), is_cv_encoded) {
553 (DataType::Boolean, false) => {
554 let mut builder = BooleanBuilder::with_capacity(num_rows);
555 for (_, value) in expansions {
556 if let Value::Bool(b) = value {
557 builder.append_value(*b);
558 } else {
559 builder.append_null();
560 }
561 }
562 Arc::new(builder.finish())
563 }
564 (DataType::Int64, false) => {
565 let mut builder = Int64Builder::with_capacity(num_rows);
566 for (_, value) in expansions {
567 if let Value::Int(i) = value {
568 builder.append_value(*i);
569 } else {
570 builder.append_null();
571 }
572 }
573 Arc::new(builder.finish())
574 }
575 (DataType::Float64, false) => {
576 let mut builder = Float64Builder::with_capacity(num_rows);
577 for (_, value) in expansions {
578 if let Value::Float(f) = value {
579 builder.append_value(*f);
580 } else {
581 builder.append_null();
582 }
583 }
584 Arc::new(builder.finish())
585 }
586 (DataType::Utf8, false) => {
587 let mut builder = StringBuilder::new();
588 for (_, value) in expansions {
589 if let Value::String(s) = value {
590 builder.append_value(s);
591 } else {
592 builder.append_null();
593 }
594 }
595 Arc::new(builder.finish())
596 }
597 (DataType::LargeBinary, _) => {
598 let mut builder = LargeBinaryBuilder::with_capacity(num_rows, num_rows * 16);
600 for (_, value) in expansions {
601 if value.is_null() {
602 builder.append_null();
603 } else {
604 let encoded = uni_common::cypher_value_codec::encode(value);
605 builder.append_value(&encoded);
606 }
607 }
608 Arc::new(builder.finish())
609 }
610 _ => {
611 let mut builder = StringBuilder::new();
613 for (_, value) in expansions {
614 if value.is_null() {
615 builder.append_null();
616 } else {
617 let json_val: serde_json::Value = value.clone().into();
618 let json_str =
619 serde_json::to_string(&json_val).unwrap_or_else(|_| "null".to_string());
620 builder.append_value(&json_str);
621 }
622 }
623 Arc::new(builder.finish())
624 }
625 };
626 columns.push(unwind_col);
627
628 self.metrics.record_output(num_rows);
629
630 RecordBatch::try_new(Arc::clone(&self.schema), columns).map_err(arrow_err)
631 }
632}
633
634impl Stream for GraphUnwindStream {
635 type Item = DFResult<RecordBatch>;
636
637 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
638 let metrics = self.metrics.clone();
639 let _timer = metrics.elapsed_compute().timer();
640 match self.input.poll_next_unpin(cx) {
641 Poll::Ready(Some(Ok(batch))) => {
642 let result = self.process_batch(batch);
643 Poll::Ready(Some(result))
644 }
645 other => other,
646 }
647 }
648}
649
650impl RecordBatchStream for GraphUnwindStream {
651 fn schema(&self) -> SchemaRef {
652 Arc::clone(&self.schema)
653 }
654}
655
656pub(crate) fn arrow_to_json_value(array: &dyn Array, row: usize) -> Value {
658 use arrow_array::{
659 BooleanArray, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array,
660 LargeStringArray, ListArray, StringArray, UInt8Array, UInt16Array, UInt32Array,
661 UInt64Array,
662 };
663
664 if array.is_null(row) {
665 return Value::Null;
666 }
667
668 let any = array.as_any();
669
670 if let Some(arr) = any.downcast_ref::<StringArray>() {
672 return Value::String(arr.value(row).to_string());
673 }
674 if let Some(arr) = any.downcast_ref::<LargeStringArray>() {
675 return Value::String(arr.value(row).to_string());
676 }
677
678 macro_rules! try_int {
680 ($arr_type:ty) => {
681 if let Some(arr) = any.downcast_ref::<$arr_type>() {
682 return Value::Int(arr.value(row) as i64);
683 }
684 };
685 }
686 try_int!(Int64Array);
687 try_int!(Int32Array);
688 try_int!(Int16Array);
689 try_int!(Int8Array);
690 try_int!(UInt64Array);
691 try_int!(UInt32Array);
692 try_int!(UInt16Array);
693 try_int!(UInt8Array);
694
695 if let Some(arr) = any.downcast_ref::<Float64Array>() {
697 return Value::Float(arr.value(row));
698 }
699 if let Some(arr) = any.downcast_ref::<Float32Array>() {
700 return Value::Float(arr.value(row) as f64);
701 }
702
703 if let Some(arr) = any.downcast_ref::<BooleanArray>() {
705 return Value::Bool(arr.value(row));
706 }
707
708 if let Some(arr) = any.downcast_ref::<ListArray>() {
710 let values = arr.value(row);
711 let result: Vec<Value> = (0..values.len())
712 .map(|i| arrow_to_json_value(values.as_ref(), i))
713 .collect();
714 return Value::List(result);
715 }
716
717 if let Some(arr) = any.downcast_ref::<arrow_array::LargeBinaryArray>() {
719 let bytes = arr.value(row);
720 if let Ok(uni_val) = uni_common::cypher_value_codec::decode(bytes) {
721 return uni_val;
722 }
723 if let Ok(parsed) = serde_json::from_slice::<serde_json::Value>(bytes) {
725 return Value::from(parsed);
726 }
727 return Value::Null;
728 }
729
730 if let Some(s) = any.downcast_ref::<arrow_array::StructArray>() {
732 let mut map = HashMap::new();
733 for (field, child) in s.fields().iter().zip(s.columns()) {
734 map.insert(
735 field.name().clone(),
736 arrow_to_json_value(child.as_ref(), row),
737 );
738 }
739 return Value::Map(map);
740 }
741
742 Value::Null
744}
745
746#[cfg(test)]
747mod tests {
748 use super::*;
749 use arrow_array::{LargeBinaryArray, UInt64Array};
750 use uni_cypher::ast::CypherLiteral;
751
752 #[test]
753 fn test_build_schema() {
754 let input_schema = Arc::new(Schema::new(vec![
755 Field::new("n._vid", DataType::UInt64, false),
756 Field::new("n.name", DataType::Utf8, true),
757 ]));
758
759 let expr = Expr::Variable("some_list".to_string());
761 let output_schema = GraphUnwindExec::build_schema(input_schema, "item", &expr);
762
763 assert_eq!(output_schema.fields().len(), 3);
764 assert_eq!(output_schema.field(0).name(), "n._vid");
765 assert_eq!(output_schema.field(1).name(), "n.name");
766 assert_eq!(output_schema.field(2).name(), "item");
767 assert_eq!(output_schema.field(2).data_type(), &DataType::LargeBinary);
768 assert_eq!(
769 output_schema
770 .field(2)
771 .metadata()
772 .get("cv_encoded")
773 .map(String::as_str),
774 Some("true")
775 );
776 }
777
778 #[test]
779 fn test_build_schema_boolean_list() {
780 let input_schema = Arc::new(Schema::new(vec![Field::new(
781 "n._vid",
782 DataType::UInt64,
783 false,
784 )]));
785
786 let expr = Expr::List(vec![
787 Expr::Literal(CypherLiteral::Bool(true)),
788 Expr::Literal(CypherLiteral::Bool(false)),
789 Expr::Literal(CypherLiteral::Null),
790 ]);
791 let output_schema = GraphUnwindExec::build_schema(input_schema, "a", &expr);
792
793 let field = output_schema.field(1);
794 assert_eq!(field.name(), "a");
795 assert_eq!(field.data_type(), &DataType::Boolean);
796 assert!(field.metadata().is_empty());
797 }
798
799 #[test]
800 fn test_build_schema_integer_list() {
801 let input_schema = Arc::new(Schema::new(vec![Field::new(
802 "n._vid",
803 DataType::UInt64,
804 false,
805 )]));
806
807 let expr = Expr::List(vec![
808 Expr::Literal(CypherLiteral::Integer(1)),
809 Expr::Literal(CypherLiteral::Integer(2)),
810 Expr::Literal(CypherLiteral::Integer(3)),
811 ]);
812 let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
813
814 let field = output_schema.field(1);
815 assert_eq!(field.name(), "x");
816 assert_eq!(field.data_type(), &DataType::Int64);
817 assert!(field.metadata().is_empty());
818 }
819
820 #[test]
821 fn test_build_schema_float_list() {
822 let input_schema = Arc::new(Schema::new(vec![Field::new(
823 "n._vid",
824 DataType::UInt64,
825 false,
826 )]));
827
828 let expr = Expr::List(vec![
829 Expr::Literal(CypherLiteral::Float(1.5)),
830 Expr::Literal(CypherLiteral::Float(2.5)),
831 ]);
832 let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
833
834 let field = output_schema.field(1);
835 assert_eq!(field.name(), "x");
836 assert_eq!(field.data_type(), &DataType::Float64);
837 assert!(field.metadata().is_empty());
838 }
839
840 #[test]
841 fn test_build_schema_string_list() {
842 let input_schema = Arc::new(Schema::new(vec![Field::new(
843 "n._vid",
844 DataType::UInt64,
845 false,
846 )]));
847
848 let expr = Expr::List(vec![
849 Expr::Literal(CypherLiteral::String("hello".to_string())),
850 Expr::Literal(CypherLiteral::String("world".to_string())),
851 ]);
852 let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
853
854 let field = output_schema.field(1);
855 assert_eq!(field.name(), "x");
856 assert_eq!(field.data_type(), &DataType::Utf8);
857 assert!(field.metadata().is_empty());
859 }
860
861 #[test]
862 fn test_build_schema_mixed_list() {
863 let input_schema = Arc::new(Schema::new(vec![Field::new(
864 "n._vid",
865 DataType::UInt64,
866 false,
867 )]));
868
869 let expr = Expr::List(vec![
870 Expr::Literal(CypherLiteral::Integer(1)),
871 Expr::Literal(CypherLiteral::String("hello".to_string())),
872 ]);
873 let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
874
875 let field = output_schema.field(1);
876 assert_eq!(field.name(), "x");
877 assert_eq!(field.data_type(), &DataType::LargeBinary);
878 assert_eq!(
879 field.metadata().get("cv_encoded").map(String::as_str),
880 Some("true")
881 );
882 }
883
884 #[test]
885 fn test_evaluate_literal_list() {
886 use arrow_array::builder::UInt64Builder;
887 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
888
889 let mut vid_builder = UInt64Builder::new();
891 vid_builder.append_value(1);
892
893 let batch = RecordBatch::try_new(
894 Arc::new(Schema::new(vec![Field::new(
895 "n._vid",
896 DataType::UInt64,
897 false,
898 )])),
899 vec![Arc::new(vid_builder.finish())],
900 )
901 .unwrap();
902
903 let input_schema = Arc::new(Schema::new(vec![Field::new(
905 "n._vid",
906 DataType::UInt64,
907 false,
908 )]));
909
910 let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
912
913 let stream = GraphUnwindStream {
915 input: Box::pin(empty_stream),
916 expr: Expr::List(vec![
917 Expr::Literal(CypherLiteral::Integer(1)),
918 Expr::Literal(CypherLiteral::Integer(2)),
919 Expr::Literal(CypherLiteral::Integer(3)),
920 ]),
921 params: HashMap::new(),
922 schema: Arc::new(Schema::new(vec![
923 Field::new("n._vid", DataType::UInt64, false),
924 Field::new("x", DataType::Utf8, true),
925 ])),
926 metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
927 };
928
929 let result = stream.evaluate_expr_for_row(&batch, 0).unwrap();
930 match result {
931 Value::List(items) => {
932 assert_eq!(items.len(), 3);
933 assert_eq!(items[0], Value::Int(1));
934 assert_eq!(items[1], Value::Int(2));
935 assert_eq!(items[2], Value::Int(3));
936 }
937 _ => panic!("Expected list"),
938 }
939 }
940
941 #[test]
942 fn test_evaluate_map_literal() {
943 use arrow_array::builder::UInt64Builder;
944 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
945
946 let mut vid_builder = UInt64Builder::new();
947 vid_builder.append_value(1);
948
949 let batch = RecordBatch::try_new(
950 Arc::new(Schema::new(vec![Field::new(
951 "n._vid",
952 DataType::UInt64,
953 false,
954 )])),
955 vec![Arc::new(vid_builder.finish())],
956 )
957 .unwrap();
958
959 let input_schema = Arc::new(Schema::new(vec![Field::new(
960 "n._vid",
961 DataType::UInt64,
962 false,
963 )]));
964
965 let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
966
967 let stream = GraphUnwindStream {
968 input: Box::pin(empty_stream),
969 expr: Expr::Map(vec![
970 ("a".to_string(), Expr::Literal(CypherLiteral::Integer(1))),
971 (
972 "b".to_string(),
973 Expr::Literal(CypherLiteral::String("hello".to_string())),
974 ),
975 ]),
976 params: HashMap::new(),
977 schema: Arc::new(Schema::new(vec![
978 Field::new("n._vid", DataType::UInt64, false),
979 Field::new("x", DataType::LargeBinary, true),
980 ])),
981 metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
982 };
983
984 let result = stream.evaluate_expr_for_row(&batch, 0).unwrap();
985 match result {
986 Value::Map(map) => {
987 assert_eq!(map.get("a"), Some(&Value::Int(1)));
988 assert_eq!(map.get("b"), Some(&Value::String("hello".to_string())));
989 }
990 _ => panic!("Expected Map, got {:?}", result),
991 }
992 }
993
994 #[test]
995 fn test_evaluate_map_property_access() {
996 use arrow_array::builder::UInt64Builder;
997 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
998
999 let mut vid_builder = UInt64Builder::new();
1000 vid_builder.append_value(1);
1001
1002 let batch = RecordBatch::try_new(
1003 Arc::new(Schema::new(vec![Field::new(
1004 "n._vid",
1005 DataType::UInt64,
1006 false,
1007 )])),
1008 vec![Arc::new(vid_builder.finish())],
1009 )
1010 .unwrap();
1011
1012 let input_schema = Arc::new(Schema::new(vec![Field::new(
1013 "n._vid",
1014 DataType::UInt64,
1015 false,
1016 )]));
1017
1018 let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
1019
1020 let map_expr = Expr::Map(vec![
1022 ("a".to_string(), Expr::Literal(CypherLiteral::Integer(1))),
1023 (
1024 "b".to_string(),
1025 Expr::Literal(CypherLiteral::String("x".to_string())),
1026 ),
1027 ]);
1028 let prop_expr = Expr::Property(Box::new(map_expr), "a".to_string());
1029
1030 let stream = GraphUnwindStream {
1031 input: Box::pin(empty_stream),
1032 expr: prop_expr.clone(),
1033 params: HashMap::new(),
1034 schema: Arc::new(Schema::new(vec![
1035 Field::new("n._vid", DataType::UInt64, false),
1036 Field::new("x", DataType::LargeBinary, true),
1037 ])),
1038 metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
1039 };
1040
1041 let result = stream.evaluate_expr_impl(&prop_expr, &batch, 0).unwrap();
1042 assert_eq!(result, Value::Int(1));
1043 }
1044
1045 #[test]
1046 fn test_arrow_to_json_value_uint64_is_coerced_to_int() {
1047 let arr = UInt64Array::from(vec![Some(42u64)]);
1048 let value = arrow_to_json_value(&arr, 0);
1049 assert_eq!(value, Value::Int(42));
1050 }
1051
1052 #[test]
1053 fn test_arrow_to_json_value_largebinary_decodes_cypher_map() {
1054 let encoded = uni_common::cypher_value_codec::encode(&Value::Map(HashMap::new()));
1055 let arr = LargeBinaryArray::from(vec![Some(encoded.as_slice())]);
1056 let value = arrow_to_json_value(&arr, 0);
1057 assert_eq!(value, Value::Map(HashMap::new()));
1058 }
1059}