1use crate::query::df_graph::common::compute_plan_properties;
27use arrow::compute::take;
28use arrow_array::builder::{
29 BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder,
30};
31use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
32use arrow_schema::{DataType, Field, Schema, SchemaRef};
33use datafusion::common::Result as DFResult;
34use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
35use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
36use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
37use futures::{Stream, StreamExt};
38use std::any::Any;
39use std::collections::HashMap;
40use std::fmt;
41use std::pin::Pin;
42use std::sync::Arc;
43use std::task::{Context, Poll};
44use uni_common::Value;
45use uni_cypher::ast::{CypherLiteral, Expr};
46
47struct ElementTypeInfo {
49 data_type: DataType,
51 is_cv_encoded: bool,
53}
54
55pub struct GraphUnwindExec {
62 input: Arc<dyn ExecutionPlan>,
64
65 expr: Expr,
67
68 variable: String,
70
71 params: HashMap<String, Value>,
73
74 schema: SchemaRef,
76
77 properties: PlanProperties,
79
80 metrics: ExecutionPlanMetricsSet,
82}
83
84impl fmt::Debug for GraphUnwindExec {
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 f.debug_struct("GraphUnwindExec")
87 .field("expr", &self.expr)
88 .field("variable", &self.variable)
89 .finish()
90 }
91}
92
93impl GraphUnwindExec {
94 pub fn new(
103 input: Arc<dyn ExecutionPlan>,
104 expr: Expr,
105 variable: impl Into<String>,
106 params: HashMap<String, Value>,
107 ) -> Self {
108 let variable = variable.into();
109
110 let schema = Self::build_schema(input.schema(), &variable, &expr);
112 let properties = compute_plan_properties(schema.clone());
113
114 Self {
115 input,
116 expr,
117 variable,
118 params,
119 schema,
120 properties,
121 metrics: ExecutionPlanMetricsSet::new(),
122 }
123 }
124
125 fn infer_element_type(expr: &Expr) -> ElementTypeInfo {
131 let json_fallback = || ElementTypeInfo {
132 data_type: DataType::LargeBinary,
133 is_cv_encoded: true,
134 };
135
136 let Expr::List(items) = expr else {
137 return json_fallback();
138 };
139
140 let first_type = items.iter().find_map(|item| match item {
142 Expr::Literal(CypherLiteral::Null) => None,
143 Expr::Literal(CypherLiteral::Bool(_)) => Some(DataType::Boolean),
144 Expr::Literal(CypherLiteral::Integer(_)) => Some(DataType::Int64),
145 Expr::Literal(CypherLiteral::Float(_)) => Some(DataType::Float64),
146 Expr::Literal(CypherLiteral::String(_)) => Some(DataType::Utf8),
147 _ => Some(DataType::Utf8), });
149
150 let Some(expected) = first_type else {
151 return json_fallback(); };
153
154 let all_match = items.iter().all(|item| match item {
156 Expr::Literal(CypherLiteral::Null) => true,
157 Expr::Literal(CypherLiteral::Bool(_)) => expected == DataType::Boolean,
158 Expr::Literal(CypherLiteral::Integer(_)) => expected == DataType::Int64,
159 Expr::Literal(CypherLiteral::Float(_)) => expected == DataType::Float64,
160 Expr::Literal(CypherLiteral::String(_)) => expected == DataType::Utf8,
161 _ => false, });
163
164 if all_match {
165 ElementTypeInfo {
166 data_type: expected,
167 is_cv_encoded: false,
168 }
169 } else {
170 json_fallback()
171 }
172 }
173
174 fn build_schema(input_schema: SchemaRef, variable: &str, expr: &Expr) -> SchemaRef {
180 let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
181
182 let type_info = Self::infer_element_type(expr);
183
184 let mut field = Field::new(variable, type_info.data_type, true);
185 if type_info.is_cv_encoded {
186 field = field.with_metadata(HashMap::from([("cv_encoded".into(), "true".into())]));
187 }
188 fields.push(Arc::new(field));
189
190 Arc::new(Schema::new(fields))
191 }
192}
193
194impl DisplayAs for GraphUnwindExec {
195 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 write!(
197 f,
198 "GraphUnwindExec: {} AS {}",
199 self.expr.to_string_repr(),
200 self.variable
201 )
202 }
203}
204
205impl ExecutionPlan for GraphUnwindExec {
206 fn name(&self) -> &str {
207 "GraphUnwindExec"
208 }
209
210 fn as_any(&self) -> &dyn Any {
211 self
212 }
213
214 fn schema(&self) -> SchemaRef {
215 Arc::clone(&self.schema)
216 }
217
218 fn properties(&self) -> &PlanProperties {
219 &self.properties
220 }
221
222 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
223 vec![&self.input]
224 }
225
226 fn with_new_children(
227 self: Arc<Self>,
228 children: Vec<Arc<dyn ExecutionPlan>>,
229 ) -> DFResult<Arc<dyn ExecutionPlan>> {
230 if children.len() != 1 {
231 return Err(datafusion::error::DataFusionError::Plan(
232 "GraphUnwindExec requires exactly one child".to_string(),
233 ));
234 }
235
236 Ok(Arc::new(Self::new(
237 Arc::clone(&children[0]),
238 self.expr.clone(),
239 self.variable.clone(),
240 self.params.clone(),
241 )))
242 }
243
244 fn execute(
245 &self,
246 partition: usize,
247 context: Arc<TaskContext>,
248 ) -> DFResult<SendableRecordBatchStream> {
249 let input_stream = self.input.execute(partition, context)?;
250 let metrics = BaselineMetrics::new(&self.metrics, partition);
251
252 Ok(Box::pin(GraphUnwindStream {
253 input: input_stream,
254 expr: self.expr.clone(),
255 params: self.params.clone(),
256 schema: Arc::clone(&self.schema),
257 metrics,
258 }))
259 }
260
261 fn metrics(&self) -> Option<MetricsSet> {
262 Some(self.metrics.clone_inner())
263 }
264}
265
266struct GraphUnwindStream {
268 input: SendableRecordBatchStream,
270
271 expr: Expr,
273
274 params: HashMap<String, Value>,
276
277 schema: SchemaRef,
279
280 metrics: BaselineMetrics,
282}
283
284impl GraphUnwindStream {
285 fn process_batch(&self, batch: RecordBatch) -> DFResult<RecordBatch> {
287 let mut expansions: Vec<(usize, Value)> = Vec::new(); for row_idx in 0..batch.num_rows() {
291 let list_value = self.evaluate_expr_for_row(&batch, row_idx)?;
293
294 match list_value {
295 Value::List(items) => {
296 for item in items {
297 expansions.push((row_idx, item));
298 }
299 }
300 Value::Null => {
301 }
303 other => {
304 expansions.push((row_idx, other));
306 }
307 }
308 }
309
310 self.build_output_batch(&batch, &expansions)
311 }
312
313 fn evaluate_expr_for_row(&self, batch: &RecordBatch, row_idx: usize) -> DFResult<Value> {
315 self.evaluate_expr_impl(&self.expr, batch, row_idx)
316 }
317
318 fn evaluate_expr_impl(
320 &self,
321 expr: &Expr,
322 batch: &RecordBatch,
323 row_idx: usize,
324 ) -> DFResult<Value> {
325 match expr {
326 Expr::List(items) => {
328 let mut values = Vec::with_capacity(items.len());
329 for item in items {
330 values.push(self.evaluate_expr_impl(item, batch, row_idx)?);
331 }
332 Ok(Value::List(values))
333 }
334
335 Expr::Literal(lit) => Ok(lit.to_value()),
337
338 Expr::Parameter(name) => self.params.get(name).cloned().ok_or_else(|| {
340 datafusion::error::DataFusionError::Execution(format!(
341 "Parameter '{}' not found",
342 name
343 ))
344 }),
345
346 Expr::Variable(var_name) => self.get_column_value(batch, var_name, row_idx),
348
349 Expr::Property(base_expr, prop_name) => {
351 if let Expr::Variable(var_name) = base_expr.as_ref() {
353 let col_name = format!("{}.{}", var_name, prop_name);
354 if batch.schema().column_with_name(&col_name).is_some() {
355 return self.get_column_value(batch, &col_name, row_idx);
356 }
357 }
358
359 let base_value = self.evaluate_expr_impl(base_expr, batch, row_idx)?;
361 if let Value::Map(map) = base_value {
362 Ok(map.get(prop_name).cloned().unwrap_or(Value::Null))
363 } else {
364 Ok(Value::Null)
365 }
366 }
367
368 Expr::FunctionCall { name, args, .. } => {
370 let name_lower = name.to_lowercase();
371 match name_lower.as_str() {
372 "range" => {
373 if args.len() >= 2 {
374 let start = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
375 let end = self.evaluate_expr_impl(&args[1], batch, row_idx)?;
376 let step = if args.len() >= 3 {
377 self.evaluate_expr_impl(&args[2], batch, row_idx)?
378 } else {
379 Value::Int(1)
380 };
381
382 if let (Some(s), Some(e), Some(st)) =
383 (start.as_i64(), end.as_i64(), step.as_i64())
384 {
385 let mut result = Vec::new();
386 let mut i = s;
387 while (st > 0 && i <= e) || (st < 0 && i >= e) {
388 result.push(Value::Int(i));
389 i += st;
390 }
391 return Ok(Value::List(result));
392 }
393 }
394 Ok(Value::List(vec![]))
395 }
396 "keys" => {
397 if args.len() == 1 {
398 let val = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
399 if let Value::Map(map) = val {
400 let source = match map.get("_all_props") {
403 Some(Value::Map(all)) => all,
404 _ => &map,
405 };
406 let mut key_strings: Vec<String> = source
407 .iter()
408 .filter(|(k, v)| !v.is_null() && !k.starts_with('_'))
409 .map(|(k, _)| k.clone())
410 .collect();
411 key_strings.sort();
412 let keys: Vec<Value> =
413 key_strings.into_iter().map(Value::String).collect();
414 return Ok(Value::List(keys));
415 }
416 if val.is_null() {
417 return Ok(Value::Null);
418 }
419 }
420 Ok(Value::List(vec![]))
421 }
422 "size" | "length" => {
423 if args.len() == 1 {
424 let val = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
425 let sz = match &val {
426 Value::List(arr) => arr.len() as i64,
427 Value::String(s) => s.len() as i64,
428 Value::Map(m) => m.len() as i64,
429 _ => 0,
430 };
431 return Ok(Value::Int(sz));
432 }
433 Ok(Value::Null)
434 }
435 "date" | "time" | "localtime" | "datetime" | "localdatetime" | "duration" => {
437 let mut eval_args = Vec::with_capacity(args.len());
438 for arg in args {
439 eval_args.push(self.evaluate_expr_impl(arg, batch, row_idx)?);
440 }
441 crate::query::datetime::eval_datetime_function(
442 &name.to_uppercase(),
443 &eval_args,
444 )
445 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
446 }
447 "split" => {
448 let mut eval_args = Vec::with_capacity(args.len());
449 for arg in args {
450 eval_args.push(self.evaluate_expr_impl(arg, batch, row_idx)?);
451 }
452 crate::query::expr_eval::eval_split(&eval_args).map_err(|e| {
453 datafusion::error::DataFusionError::Execution(e.to_string())
454 })
455 }
456 _ => {
457 Ok(Value::List(vec![]))
459 }
460 }
461 }
462
463 Expr::BinaryOp { left, op, right } => {
465 let l = self.evaluate_expr_impl(left, batch, row_idx)?;
466 let r = self.evaluate_expr_impl(right, batch, row_idx)?;
467 crate::query::expr_eval::eval_binary_op(&l, op, &r)
468 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
469 }
470
471 Expr::Map(entries) => {
473 let mut map = HashMap::new();
474 for (key, val_expr) in entries {
475 let val = self.evaluate_expr_impl(val_expr, batch, row_idx)?;
476 map.insert(key.clone(), val);
477 }
478 Ok(Value::Map(map))
479 }
480
481 Expr::ArrayIndex { array, index } => {
483 let arr_val = self.evaluate_expr_impl(array, batch, row_idx)?;
484 let idx_val = self.evaluate_expr_impl(index, batch, row_idx)?;
485 match (&arr_val, idx_val.as_i64()) {
486 (Value::List(list), Some(i)) => {
487 let len = list.len() as i64;
489 let resolved = if i < 0 { len + i } else { i };
490 if resolved >= 0 && (resolved as usize) < list.len() {
491 Ok(list[resolved as usize].clone())
492 } else {
493 Ok(Value::Null)
494 }
495 }
496 _ => Ok(Value::Null),
497 }
498 }
499
500 _ => Ok(Value::Null),
502 }
503 }
504
505 fn get_column_value(
507 &self,
508 batch: &RecordBatch,
509 col_name: &str,
510 row_idx: usize,
511 ) -> DFResult<Value> {
512 let col = batch.column_by_name(col_name).ok_or_else(|| {
513 datafusion::error::DataFusionError::Execution(format!(
514 "Column '{}' not found for UNWIND",
515 col_name
516 ))
517 })?;
518
519 Ok(arrow_to_json_value(col.as_ref(), row_idx))
520 }
521
522 fn build_output_batch(
524 &self,
525 input: &RecordBatch,
526 expansions: &[(usize, Value)],
527 ) -> DFResult<RecordBatch> {
528 if expansions.is_empty() {
529 return Ok(RecordBatch::new_empty(Arc::clone(&self.schema)));
530 }
531
532 let num_rows = expansions.len();
533
534 let indices: Vec<u64> = expansions.iter().map(|(idx, _)| *idx as u64).collect();
536 let indices_array = UInt64Array::from(indices);
537
538 let mut columns: Vec<ArrayRef> = Vec::new();
540 for col in input.columns() {
541 let expanded = take(col.as_ref(), &indices_array, None)?;
542 columns.push(expanded);
543 }
544
545 let unwind_field = self.schema.field(self.schema.fields().len() - 1);
547 let is_cv_encoded = unwind_field
548 .metadata()
549 .get("cv_encoded")
550 .is_some_and(|v| v == "true");
551
552 let unwind_col: ArrayRef = match (unwind_field.data_type(), is_cv_encoded) {
553 (DataType::Boolean, false) => {
554 let mut builder = BooleanBuilder::with_capacity(num_rows);
555 for (_, value) in expansions {
556 if let Value::Bool(b) = value {
557 builder.append_value(*b);
558 } else {
559 builder.append_null();
560 }
561 }
562 Arc::new(builder.finish())
563 }
564 (DataType::Int64, false) => {
565 let mut builder = Int64Builder::with_capacity(num_rows);
566 for (_, value) in expansions {
567 if let Value::Int(i) = value {
568 builder.append_value(*i);
569 } else {
570 builder.append_null();
571 }
572 }
573 Arc::new(builder.finish())
574 }
575 (DataType::Float64, false) => {
576 let mut builder = Float64Builder::with_capacity(num_rows);
577 for (_, value) in expansions {
578 if let Value::Float(f) = value {
579 builder.append_value(*f);
580 } else {
581 builder.append_null();
582 }
583 }
584 Arc::new(builder.finish())
585 }
586 (DataType::Utf8, false) => {
587 let mut builder = StringBuilder::new();
588 for (_, value) in expansions {
589 if let Value::String(s) = value {
590 builder.append_value(s);
591 } else {
592 builder.append_null();
593 }
594 }
595 Arc::new(builder.finish())
596 }
597 (DataType::LargeBinary, _) => {
598 let mut builder = LargeBinaryBuilder::with_capacity(num_rows, num_rows * 16);
600 for (_, value) in expansions {
601 if value.is_null() {
602 builder.append_null();
603 } else {
604 let encoded = uni_common::cypher_value_codec::encode(value);
605 builder.append_value(&encoded);
606 }
607 }
608 Arc::new(builder.finish())
609 }
610 _ => {
611 let mut builder = StringBuilder::new();
613 for (_, value) in expansions {
614 if value.is_null() {
615 builder.append_null();
616 } else {
617 let json_val: serde_json::Value = value.clone().into();
618 let json_str =
619 serde_json::to_string(&json_val).unwrap_or_else(|_| "null".to_string());
620 builder.append_value(&json_str);
621 }
622 }
623 Arc::new(builder.finish())
624 }
625 };
626 columns.push(unwind_col);
627
628 self.metrics.record_output(num_rows);
629
630 RecordBatch::try_new(Arc::clone(&self.schema), columns)
631 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
632 }
633}
634
635impl Stream for GraphUnwindStream {
636 type Item = DFResult<RecordBatch>;
637
638 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
639 match self.input.poll_next_unpin(cx) {
640 Poll::Ready(Some(Ok(batch))) => {
641 let result = self.process_batch(batch);
642 Poll::Ready(Some(result))
643 }
644 other => other,
645 }
646 }
647}
648
649impl RecordBatchStream for GraphUnwindStream {
650 fn schema(&self) -> SchemaRef {
651 Arc::clone(&self.schema)
652 }
653}
654
655pub(crate) fn arrow_to_json_value(array: &dyn Array, row: usize) -> Value {
657 use arrow_array::{
658 BooleanArray, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array,
659 LargeStringArray, ListArray, StringArray, UInt8Array, UInt16Array, UInt32Array,
660 UInt64Array,
661 };
662
663 if array.is_null(row) {
664 return Value::Null;
665 }
666
667 let any = array.as_any();
668
669 if let Some(arr) = any.downcast_ref::<StringArray>() {
671 return Value::String(arr.value(row).to_string());
672 }
673 if let Some(arr) = any.downcast_ref::<LargeStringArray>() {
674 return Value::String(arr.value(row).to_string());
675 }
676
677 macro_rules! try_int {
679 ($arr_type:ty) => {
680 if let Some(arr) = any.downcast_ref::<$arr_type>() {
681 return Value::Int(arr.value(row) as i64);
682 }
683 };
684 }
685 try_int!(Int64Array);
686 try_int!(Int32Array);
687 try_int!(Int16Array);
688 try_int!(Int8Array);
689 try_int!(UInt64Array);
690 try_int!(UInt32Array);
691 try_int!(UInt16Array);
692 try_int!(UInt8Array);
693
694 if let Some(arr) = any.downcast_ref::<Float64Array>() {
696 return Value::Float(arr.value(row));
697 }
698 if let Some(arr) = any.downcast_ref::<Float32Array>() {
699 return Value::Float(arr.value(row) as f64);
700 }
701
702 if let Some(arr) = any.downcast_ref::<BooleanArray>() {
704 return Value::Bool(arr.value(row));
705 }
706
707 if let Some(arr) = any.downcast_ref::<ListArray>() {
709 let values = arr.value(row);
710 let result: Vec<Value> = (0..values.len())
711 .map(|i| arrow_to_json_value(values.as_ref(), i))
712 .collect();
713 return Value::List(result);
714 }
715
716 if let Some(arr) = any.downcast_ref::<arrow_array::LargeBinaryArray>() {
718 let bytes = arr.value(row);
719 if let Ok(uni_val) = uni_common::cypher_value_codec::decode(bytes) {
720 return uni_val;
721 }
722 if let Ok(parsed) = serde_json::from_slice::<serde_json::Value>(bytes) {
724 return Value::from(parsed);
725 }
726 return Value::Null;
727 }
728
729 if let Some(s) = any.downcast_ref::<arrow_array::StructArray>() {
731 let mut map = HashMap::new();
732 for (field, child) in s.fields().iter().zip(s.columns()) {
733 map.insert(
734 field.name().clone(),
735 arrow_to_json_value(child.as_ref(), row),
736 );
737 }
738 return Value::Map(map);
739 }
740
741 Value::Null
743}
744
745#[cfg(test)]
746mod tests {
747 use super::*;
748 use arrow_array::{LargeBinaryArray, UInt64Array};
749 use uni_cypher::ast::CypherLiteral;
750
751 #[test]
752 fn test_build_schema() {
753 let input_schema = Arc::new(Schema::new(vec![
754 Field::new("n._vid", DataType::UInt64, false),
755 Field::new("n.name", DataType::Utf8, true),
756 ]));
757
758 let expr = Expr::Variable("some_list".to_string());
760 let output_schema = GraphUnwindExec::build_schema(input_schema, "item", &expr);
761
762 assert_eq!(output_schema.fields().len(), 3);
763 assert_eq!(output_schema.field(0).name(), "n._vid");
764 assert_eq!(output_schema.field(1).name(), "n.name");
765 assert_eq!(output_schema.field(2).name(), "item");
766 assert_eq!(output_schema.field(2).data_type(), &DataType::LargeBinary);
767 assert_eq!(
768 output_schema
769 .field(2)
770 .metadata()
771 .get("cv_encoded")
772 .map(String::as_str),
773 Some("true")
774 );
775 }
776
777 #[test]
778 fn test_build_schema_boolean_list() {
779 let input_schema = Arc::new(Schema::new(vec![Field::new(
780 "n._vid",
781 DataType::UInt64,
782 false,
783 )]));
784
785 let expr = Expr::List(vec![
786 Expr::Literal(CypherLiteral::Bool(true)),
787 Expr::Literal(CypherLiteral::Bool(false)),
788 Expr::Literal(CypherLiteral::Null),
789 ]);
790 let output_schema = GraphUnwindExec::build_schema(input_schema, "a", &expr);
791
792 let field = output_schema.field(1);
793 assert_eq!(field.name(), "a");
794 assert_eq!(field.data_type(), &DataType::Boolean);
795 assert!(field.metadata().is_empty());
796 }
797
798 #[test]
799 fn test_build_schema_integer_list() {
800 let input_schema = Arc::new(Schema::new(vec![Field::new(
801 "n._vid",
802 DataType::UInt64,
803 false,
804 )]));
805
806 let expr = Expr::List(vec![
807 Expr::Literal(CypherLiteral::Integer(1)),
808 Expr::Literal(CypherLiteral::Integer(2)),
809 Expr::Literal(CypherLiteral::Integer(3)),
810 ]);
811 let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
812
813 let field = output_schema.field(1);
814 assert_eq!(field.name(), "x");
815 assert_eq!(field.data_type(), &DataType::Int64);
816 assert!(field.metadata().is_empty());
817 }
818
819 #[test]
820 fn test_build_schema_float_list() {
821 let input_schema = Arc::new(Schema::new(vec![Field::new(
822 "n._vid",
823 DataType::UInt64,
824 false,
825 )]));
826
827 let expr = Expr::List(vec![
828 Expr::Literal(CypherLiteral::Float(1.5)),
829 Expr::Literal(CypherLiteral::Float(2.5)),
830 ]);
831 let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
832
833 let field = output_schema.field(1);
834 assert_eq!(field.name(), "x");
835 assert_eq!(field.data_type(), &DataType::Float64);
836 assert!(field.metadata().is_empty());
837 }
838
839 #[test]
840 fn test_build_schema_string_list() {
841 let input_schema = Arc::new(Schema::new(vec![Field::new(
842 "n._vid",
843 DataType::UInt64,
844 false,
845 )]));
846
847 let expr = Expr::List(vec![
848 Expr::Literal(CypherLiteral::String("hello".to_string())),
849 Expr::Literal(CypherLiteral::String("world".to_string())),
850 ]);
851 let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
852
853 let field = output_schema.field(1);
854 assert_eq!(field.name(), "x");
855 assert_eq!(field.data_type(), &DataType::Utf8);
856 assert!(field.metadata().is_empty());
858 }
859
860 #[test]
861 fn test_build_schema_mixed_list() {
862 let input_schema = Arc::new(Schema::new(vec![Field::new(
863 "n._vid",
864 DataType::UInt64,
865 false,
866 )]));
867
868 let expr = Expr::List(vec![
869 Expr::Literal(CypherLiteral::Integer(1)),
870 Expr::Literal(CypherLiteral::String("hello".to_string())),
871 ]);
872 let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
873
874 let field = output_schema.field(1);
875 assert_eq!(field.name(), "x");
876 assert_eq!(field.data_type(), &DataType::LargeBinary);
877 assert_eq!(
878 field.metadata().get("cv_encoded").map(String::as_str),
879 Some("true")
880 );
881 }
882
883 #[test]
884 fn test_evaluate_literal_list() {
885 use arrow_array::builder::UInt64Builder;
886 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
887
888 let mut vid_builder = UInt64Builder::new();
890 vid_builder.append_value(1);
891
892 let batch = RecordBatch::try_new(
893 Arc::new(Schema::new(vec![Field::new(
894 "n._vid",
895 DataType::UInt64,
896 false,
897 )])),
898 vec![Arc::new(vid_builder.finish())],
899 )
900 .unwrap();
901
902 let input_schema = Arc::new(Schema::new(vec![Field::new(
904 "n._vid",
905 DataType::UInt64,
906 false,
907 )]));
908
909 let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
911
912 let stream = GraphUnwindStream {
914 input: Box::pin(empty_stream),
915 expr: Expr::List(vec![
916 Expr::Literal(CypherLiteral::Integer(1)),
917 Expr::Literal(CypherLiteral::Integer(2)),
918 Expr::Literal(CypherLiteral::Integer(3)),
919 ]),
920 params: HashMap::new(),
921 schema: Arc::new(Schema::new(vec![
922 Field::new("n._vid", DataType::UInt64, false),
923 Field::new("x", DataType::Utf8, true),
924 ])),
925 metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
926 };
927
928 let result = stream.evaluate_expr_for_row(&batch, 0).unwrap();
929 match result {
930 Value::List(items) => {
931 assert_eq!(items.len(), 3);
932 assert_eq!(items[0], Value::Int(1));
933 assert_eq!(items[1], Value::Int(2));
934 assert_eq!(items[2], Value::Int(3));
935 }
936 _ => panic!("Expected list"),
937 }
938 }
939
940 #[test]
941 fn test_evaluate_map_literal() {
942 use arrow_array::builder::UInt64Builder;
943 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
944
945 let mut vid_builder = UInt64Builder::new();
946 vid_builder.append_value(1);
947
948 let batch = RecordBatch::try_new(
949 Arc::new(Schema::new(vec![Field::new(
950 "n._vid",
951 DataType::UInt64,
952 false,
953 )])),
954 vec![Arc::new(vid_builder.finish())],
955 )
956 .unwrap();
957
958 let input_schema = Arc::new(Schema::new(vec![Field::new(
959 "n._vid",
960 DataType::UInt64,
961 false,
962 )]));
963
964 let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
965
966 let stream = GraphUnwindStream {
967 input: Box::pin(empty_stream),
968 expr: Expr::Map(vec![
969 ("a".to_string(), Expr::Literal(CypherLiteral::Integer(1))),
970 (
971 "b".to_string(),
972 Expr::Literal(CypherLiteral::String("hello".to_string())),
973 ),
974 ]),
975 params: HashMap::new(),
976 schema: Arc::new(Schema::new(vec![
977 Field::new("n._vid", DataType::UInt64, false),
978 Field::new("x", DataType::LargeBinary, true),
979 ])),
980 metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
981 };
982
983 let result = stream.evaluate_expr_for_row(&batch, 0).unwrap();
984 match result {
985 Value::Map(map) => {
986 assert_eq!(map.get("a"), Some(&Value::Int(1)));
987 assert_eq!(map.get("b"), Some(&Value::String("hello".to_string())));
988 }
989 _ => panic!("Expected Map, got {:?}", result),
990 }
991 }
992
993 #[test]
994 fn test_evaluate_map_property_access() {
995 use arrow_array::builder::UInt64Builder;
996 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
997
998 let mut vid_builder = UInt64Builder::new();
999 vid_builder.append_value(1);
1000
1001 let batch = RecordBatch::try_new(
1002 Arc::new(Schema::new(vec![Field::new(
1003 "n._vid",
1004 DataType::UInt64,
1005 false,
1006 )])),
1007 vec![Arc::new(vid_builder.finish())],
1008 )
1009 .unwrap();
1010
1011 let input_schema = Arc::new(Schema::new(vec![Field::new(
1012 "n._vid",
1013 DataType::UInt64,
1014 false,
1015 )]));
1016
1017 let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
1018
1019 let map_expr = Expr::Map(vec![
1021 ("a".to_string(), Expr::Literal(CypherLiteral::Integer(1))),
1022 (
1023 "b".to_string(),
1024 Expr::Literal(CypherLiteral::String("x".to_string())),
1025 ),
1026 ]);
1027 let prop_expr = Expr::Property(Box::new(map_expr), "a".to_string());
1028
1029 let stream = GraphUnwindStream {
1030 input: Box::pin(empty_stream),
1031 expr: prop_expr.clone(),
1032 params: HashMap::new(),
1033 schema: Arc::new(Schema::new(vec![
1034 Field::new("n._vid", DataType::UInt64, false),
1035 Field::new("x", DataType::LargeBinary, true),
1036 ])),
1037 metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
1038 };
1039
1040 let result = stream.evaluate_expr_impl(&prop_expr, &batch, 0).unwrap();
1041 assert_eq!(result, Value::Int(1));
1042 }
1043
1044 #[test]
1045 fn test_arrow_to_json_value_uint64_is_coerced_to_int() {
1046 let arr = UInt64Array::from(vec![Some(42u64)]);
1047 let value = arrow_to_json_value(&arr, 0);
1048 assert_eq!(value, Value::Int(42));
1049 }
1050
1051 #[test]
1052 fn test_arrow_to_json_value_largebinary_decodes_cypher_map() {
1053 let encoded = uni_common::cypher_value_codec::encode(&Value::Map(HashMap::new()));
1054 let arr = LargeBinaryArray::from(vec![Some(encoded.as_slice())]);
1055 let value = arrow_to_json_value(&arr, 0);
1056 assert_eq!(value, Value::Map(HashMap::new()));
1057 }
1058}