1use crate::query::df_graph::common::{arrow_err, compute_plan_properties};
27use arrow::compute::take;
28use arrow_array::builder::{
29 BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder,
30};
31use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
32use arrow_schema::{DataType, Field, Schema, SchemaRef};
33use datafusion::common::Result as DFResult;
34use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
35use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
36use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
37use futures::{Stream, StreamExt};
38use std::any::Any;
39use std::collections::HashMap;
40use std::fmt;
41use std::pin::Pin;
42use std::sync::Arc;
43use std::task::{Context, Poll};
44use uni_common::Value;
45use uni_cypher::ast::{CypherLiteral, Expr};
46
47struct ElementTypeInfo {
49 data_type: DataType,
51 is_cv_encoded: bool,
53}
54
55pub struct GraphUnwindExec {
62 input: Arc<dyn ExecutionPlan>,
64
65 expr: Expr,
67
68 variable: String,
70
71 params: HashMap<String, Value>,
73
74 schema: SchemaRef,
76
77 properties: PlanProperties,
79
80 metrics: ExecutionPlanMetricsSet,
82}
83
84impl fmt::Debug for GraphUnwindExec {
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 f.debug_struct("GraphUnwindExec")
87 .field("expr", &self.expr)
88 .field("variable", &self.variable)
89 .finish()
90 }
91}
92
93impl GraphUnwindExec {
94 pub fn new(
103 input: Arc<dyn ExecutionPlan>,
104 expr: Expr,
105 variable: impl Into<String>,
106 params: HashMap<String, Value>,
107 ) -> Self {
108 let variable = variable.into();
109
110 let schema = Self::build_schema(input.schema(), &variable, &expr);
112 let properties = compute_plan_properties(schema.clone());
113
114 Self {
115 input,
116 expr,
117 variable,
118 params,
119 schema,
120 properties,
121 metrics: ExecutionPlanMetricsSet::new(),
122 }
123 }
124
125 fn infer_element_type(expr: &Expr) -> ElementTypeInfo {
131 let json_fallback = || ElementTypeInfo {
132 data_type: DataType::LargeBinary,
133 is_cv_encoded: true,
134 };
135
136 let Expr::List(items) = expr else {
137 return json_fallback();
138 };
139
140 let first_type = items.iter().find_map(|item| match item {
142 Expr::Literal(CypherLiteral::Null) => None,
143 Expr::Literal(CypherLiteral::Bool(_)) => Some(DataType::Boolean),
144 Expr::Literal(CypherLiteral::Integer(_)) => Some(DataType::Int64),
145 Expr::Literal(CypherLiteral::Float(_)) => Some(DataType::Float64),
146 Expr::Literal(CypherLiteral::String(_)) => Some(DataType::Utf8),
147 _ => Some(DataType::Utf8), });
149
150 let Some(expected) = first_type else {
151 return json_fallback(); };
153
154 let all_match = items.iter().all(|item| match item {
156 Expr::Literal(CypherLiteral::Null) => true,
157 Expr::Literal(CypherLiteral::Bool(_)) => expected == DataType::Boolean,
158 Expr::Literal(CypherLiteral::Integer(_)) => expected == DataType::Int64,
159 Expr::Literal(CypherLiteral::Float(_)) => expected == DataType::Float64,
160 Expr::Literal(CypherLiteral::String(_)) => expected == DataType::Utf8,
161 _ => false, });
163
164 if all_match {
165 ElementTypeInfo {
166 data_type: expected,
167 is_cv_encoded: false,
168 }
169 } else {
170 json_fallback()
171 }
172 }
173
174 fn build_schema(input_schema: SchemaRef, variable: &str, expr: &Expr) -> SchemaRef {
180 let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
181
182 let type_info = Self::infer_element_type(expr);
183
184 let mut field = Field::new(variable, type_info.data_type, true);
185 if type_info.is_cv_encoded {
186 field = field.with_metadata(HashMap::from([("cv_encoded".into(), "true".into())]));
187 }
188 fields.push(Arc::new(field));
189
190 Arc::new(Schema::new(fields))
191 }
192}
193
194impl DisplayAs for GraphUnwindExec {
195 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 write!(
197 f,
198 "GraphUnwindExec: {} AS {}",
199 self.expr.to_string_repr(),
200 self.variable
201 )
202 }
203}
204
205impl ExecutionPlan for GraphUnwindExec {
206 fn name(&self) -> &str {
207 "GraphUnwindExec"
208 }
209
210 fn as_any(&self) -> &dyn Any {
211 self
212 }
213
214 fn schema(&self) -> SchemaRef {
215 Arc::clone(&self.schema)
216 }
217
218 fn properties(&self) -> &PlanProperties {
219 &self.properties
220 }
221
222 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
223 vec![&self.input]
224 }
225
226 fn with_new_children(
227 self: Arc<Self>,
228 children: Vec<Arc<dyn ExecutionPlan>>,
229 ) -> DFResult<Arc<dyn ExecutionPlan>> {
230 if children.len() != 1 {
231 return Err(datafusion::error::DataFusionError::Plan(
232 "GraphUnwindExec requires exactly one child".to_string(),
233 ));
234 }
235
236 Ok(Arc::new(Self::new(
237 Arc::clone(&children[0]),
238 self.expr.clone(),
239 self.variable.clone(),
240 self.params.clone(),
241 )))
242 }
243
244 fn execute(
245 &self,
246 partition: usize,
247 context: Arc<TaskContext>,
248 ) -> DFResult<SendableRecordBatchStream> {
249 let input_stream = self.input.execute(partition, context)?;
250 let metrics = BaselineMetrics::new(&self.metrics, partition);
251
252 Ok(Box::pin(GraphUnwindStream {
253 input: input_stream,
254 expr: self.expr.clone(),
255 params: self.params.clone(),
256 schema: Arc::clone(&self.schema),
257 metrics,
258 }))
259 }
260
261 fn metrics(&self) -> Option<MetricsSet> {
262 Some(self.metrics.clone_inner())
263 }
264}
265
266struct GraphUnwindStream {
268 input: SendableRecordBatchStream,
270
271 expr: Expr,
273
274 params: HashMap<String, Value>,
276
277 schema: SchemaRef,
279
280 metrics: BaselineMetrics,
282}
283
284impl GraphUnwindStream {
285 fn process_batch(&self, batch: RecordBatch) -> DFResult<RecordBatch> {
287 let mut expansions: Vec<(usize, Value)> = Vec::new(); for row_idx in 0..batch.num_rows() {
291 let list_value = self.evaluate_expr_for_row(&batch, row_idx)?;
293
294 match list_value {
295 Value::List(items) => {
296 for item in items {
297 expansions.push((row_idx, item));
298 }
299 }
300 Value::Null => {
301 }
303 other => {
304 expansions.push((row_idx, other));
306 }
307 }
308 }
309
310 self.build_output_batch(&batch, &expansions)
311 }
312
313 fn evaluate_expr_for_row(&self, batch: &RecordBatch, row_idx: usize) -> DFResult<Value> {
315 self.evaluate_expr_impl(&self.expr, batch, row_idx)
316 }
317
318 fn evaluate_expr_impl(
320 &self,
321 expr: &Expr,
322 batch: &RecordBatch,
323 row_idx: usize,
324 ) -> DFResult<Value> {
325 match expr {
326 Expr::List(items) => {
328 let mut values = Vec::with_capacity(items.len());
329 for item in items {
330 values.push(self.evaluate_expr_impl(item, batch, row_idx)?);
331 }
332 Ok(Value::List(values))
333 }
334
335 Expr::Literal(lit) => Ok(lit.to_value()),
337
338 Expr::Parameter(name) => self.params.get(name).cloned().ok_or_else(|| {
340 datafusion::error::DataFusionError::Execution(format!(
341 "Parameter '{}' not found",
342 name
343 ))
344 }),
345
346 Expr::Variable(var_name) => self.get_column_value(batch, var_name, row_idx),
348
349 Expr::Property(base_expr, prop_name) => {
351 if let Expr::Variable(var_name) = base_expr.as_ref() {
353 let col_name = format!("{}.{}", var_name, prop_name);
354 if batch.schema().column_with_name(&col_name).is_some() {
355 return self.get_column_value(batch, &col_name, row_idx);
356 }
357 }
358
359 let base_value = self.evaluate_expr_impl(base_expr, batch, row_idx)?;
361 if let Value::Map(map) = base_value {
362 Ok(map.get(prop_name).cloned().unwrap_or(Value::Null))
363 } else {
364 Ok(Value::Null)
365 }
366 }
367
368 Expr::FunctionCall { name, args, .. } => {
370 let name_lower = name.to_lowercase();
371 match name_lower.as_str() {
372 "range" => {
373 if args.len() >= 2 {
374 let start = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
375 let end = self.evaluate_expr_impl(&args[1], batch, row_idx)?;
376 let step = if args.len() >= 3 {
377 self.evaluate_expr_impl(&args[2], batch, row_idx)?
378 } else {
379 Value::Int(1)
380 };
381
382 if let (Some(s), Some(e), Some(st)) =
383 (start.as_i64(), end.as_i64(), step.as_i64())
384 {
385 let mut result = Vec::new();
386 let mut i = s;
387 while (st > 0 && i <= e) || (st < 0 && i >= e) {
388 result.push(Value::Int(i));
389 i += st;
390 }
391 return Ok(Value::List(result));
392 }
393 }
394 Ok(Value::List(vec![]))
395 }
396 "keys" => {
397 if args.len() == 1 {
398 let val = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
399 if let Value::Map(map) = val {
400 let source = match map.get("_all_props") {
403 Some(Value::Map(all)) => all,
404 _ => &map,
405 };
406 let mut key_strings: Vec<String> = source
407 .iter()
408 .filter(|(k, v)| !v.is_null() && !k.starts_with('_'))
409 .map(|(k, _)| k.clone())
410 .collect();
411 key_strings.sort();
412 let keys: Vec<Value> =
413 key_strings.into_iter().map(Value::String).collect();
414 return Ok(Value::List(keys));
415 }
416 if val.is_null() {
417 return Ok(Value::Null);
418 }
419 }
420 Ok(Value::List(vec![]))
421 }
422 "size" | "length" => {
423 if args.len() == 1 {
424 let val = self.evaluate_expr_impl(&args[0], batch, row_idx)?;
425 let sz = match &val {
426 Value::List(arr) => arr.len() as i64,
427 Value::String(s) => s.len() as i64,
428 Value::Map(m) => m.len() as i64,
429 _ => 0,
430 };
431 return Ok(Value::Int(sz));
432 }
433 Ok(Value::Null)
434 }
435 "date" | "time" | "localtime" | "datetime" | "localdatetime" | "duration" => {
437 let mut eval_args = Vec::with_capacity(args.len());
438 for arg in args {
439 eval_args.push(self.evaluate_expr_impl(arg, batch, row_idx)?);
440 }
441 crate::query::datetime::eval_datetime_function(
442 &name.to_uppercase(),
443 &eval_args,
444 )
445 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
446 }
447 "split" => {
448 let mut eval_args = Vec::with_capacity(args.len());
449 for arg in args {
450 eval_args.push(self.evaluate_expr_impl(arg, batch, row_idx)?);
451 }
452 crate::query::expr_eval::eval_split(&eval_args).map_err(|e| {
453 datafusion::error::DataFusionError::Execution(e.to_string())
454 })
455 }
456 _ => {
457 Ok(Value::List(vec![]))
459 }
460 }
461 }
462
463 Expr::BinaryOp { left, op, right } => {
465 let l = self.evaluate_expr_impl(left, batch, row_idx)?;
466 let r = self.evaluate_expr_impl(right, batch, row_idx)?;
467 crate::query::expr_eval::eval_binary_op(&l, op, &r)
468 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
469 }
470
471 Expr::Map(entries) => {
473 let mut map = HashMap::new();
474 for (key, val_expr) in entries {
475 let val = self.evaluate_expr_impl(val_expr, batch, row_idx)?;
476 map.insert(key.clone(), val);
477 }
478 Ok(Value::Map(map))
479 }
480
481 Expr::ArrayIndex { array, index } => {
483 let arr_val = self.evaluate_expr_impl(array, batch, row_idx)?;
484 let idx_val = self.evaluate_expr_impl(index, batch, row_idx)?;
485 match (&arr_val, idx_val.as_i64()) {
486 (Value::List(list), Some(i)) => {
487 let len = list.len() as i64;
489 let resolved = if i < 0 { len + i } else { i };
490 if resolved >= 0 && (resolved as usize) < list.len() {
491 Ok(list[resolved as usize].clone())
492 } else {
493 Ok(Value::Null)
494 }
495 }
496 _ => Ok(Value::Null),
497 }
498 }
499
500 _ => Ok(Value::Null),
502 }
503 }
504
505 fn get_column_value(
507 &self,
508 batch: &RecordBatch,
509 col_name: &str,
510 row_idx: usize,
511 ) -> DFResult<Value> {
512 let col = batch.column_by_name(col_name).ok_or_else(|| {
513 datafusion::error::DataFusionError::Execution(format!(
514 "Column '{}' not found for UNWIND",
515 col_name
516 ))
517 })?;
518
519 Ok(arrow_to_json_value(col.as_ref(), row_idx))
520 }
521
522 fn build_output_batch(
524 &self,
525 input: &RecordBatch,
526 expansions: &[(usize, Value)],
527 ) -> DFResult<RecordBatch> {
528 if expansions.is_empty() {
529 return Ok(RecordBatch::new_empty(Arc::clone(&self.schema)));
530 }
531
532 let num_rows = expansions.len();
533
534 let indices: Vec<u64> = expansions.iter().map(|(idx, _)| *idx as u64).collect();
536 let indices_array = UInt64Array::from(indices);
537
538 let mut columns: Vec<ArrayRef> = Vec::new();
540 for col in input.columns() {
541 let expanded = take(col.as_ref(), &indices_array, None)?;
542 columns.push(expanded);
543 }
544
545 let unwind_field = self.schema.field(self.schema.fields().len() - 1);
547 let is_cv_encoded = unwind_field
548 .metadata()
549 .get("cv_encoded")
550 .is_some_and(|v| v == "true");
551
552 let unwind_col: ArrayRef = match (unwind_field.data_type(), is_cv_encoded) {
553 (DataType::Boolean, false) => {
554 let mut builder = BooleanBuilder::with_capacity(num_rows);
555 for (_, value) in expansions {
556 if let Value::Bool(b) = value {
557 builder.append_value(*b);
558 } else {
559 builder.append_null();
560 }
561 }
562 Arc::new(builder.finish())
563 }
564 (DataType::Int64, false) => {
565 let mut builder = Int64Builder::with_capacity(num_rows);
566 for (_, value) in expansions {
567 if let Value::Int(i) = value {
568 builder.append_value(*i);
569 } else {
570 builder.append_null();
571 }
572 }
573 Arc::new(builder.finish())
574 }
575 (DataType::Float64, false) => {
576 let mut builder = Float64Builder::with_capacity(num_rows);
577 for (_, value) in expansions {
578 if let Value::Float(f) = value {
579 builder.append_value(*f);
580 } else {
581 builder.append_null();
582 }
583 }
584 Arc::new(builder.finish())
585 }
586 (DataType::Utf8, false) => {
587 let mut builder = StringBuilder::new();
588 for (_, value) in expansions {
589 if let Value::String(s) = value {
590 builder.append_value(s);
591 } else {
592 builder.append_null();
593 }
594 }
595 Arc::new(builder.finish())
596 }
597 (DataType::LargeBinary, _) => {
598 let mut builder = LargeBinaryBuilder::with_capacity(num_rows, num_rows * 16);
600 for (_, value) in expansions {
601 if value.is_null() {
602 builder.append_null();
603 } else {
604 let encoded = uni_common::cypher_value_codec::encode(value);
605 builder.append_value(&encoded);
606 }
607 }
608 Arc::new(builder.finish())
609 }
610 _ => {
611 let mut builder = StringBuilder::new();
613 for (_, value) in expansions {
614 if value.is_null() {
615 builder.append_null();
616 } else {
617 let json_val: serde_json::Value = value.clone().into();
618 let json_str =
619 serde_json::to_string(&json_val).unwrap_or_else(|_| "null".to_string());
620 builder.append_value(&json_str);
621 }
622 }
623 Arc::new(builder.finish())
624 }
625 };
626 columns.push(unwind_col);
627
628 self.metrics.record_output(num_rows);
629
630 RecordBatch::try_new(Arc::clone(&self.schema), columns).map_err(arrow_err)
631 }
632}
633
634impl Stream for GraphUnwindStream {
635 type Item = DFResult<RecordBatch>;
636
637 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
638 match self.input.poll_next_unpin(cx) {
639 Poll::Ready(Some(Ok(batch))) => {
640 let result = self.process_batch(batch);
641 Poll::Ready(Some(result))
642 }
643 other => other,
644 }
645 }
646}
647
648impl RecordBatchStream for GraphUnwindStream {
649 fn schema(&self) -> SchemaRef {
650 Arc::clone(&self.schema)
651 }
652}
653
654pub(crate) fn arrow_to_json_value(array: &dyn Array, row: usize) -> Value {
656 use arrow_array::{
657 BooleanArray, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array,
658 LargeStringArray, ListArray, StringArray, UInt8Array, UInt16Array, UInt32Array,
659 UInt64Array,
660 };
661
662 if array.is_null(row) {
663 return Value::Null;
664 }
665
666 let any = array.as_any();
667
668 if let Some(arr) = any.downcast_ref::<StringArray>() {
670 return Value::String(arr.value(row).to_string());
671 }
672 if let Some(arr) = any.downcast_ref::<LargeStringArray>() {
673 return Value::String(arr.value(row).to_string());
674 }
675
676 macro_rules! try_int {
678 ($arr_type:ty) => {
679 if let Some(arr) = any.downcast_ref::<$arr_type>() {
680 return Value::Int(arr.value(row) as i64);
681 }
682 };
683 }
684 try_int!(Int64Array);
685 try_int!(Int32Array);
686 try_int!(Int16Array);
687 try_int!(Int8Array);
688 try_int!(UInt64Array);
689 try_int!(UInt32Array);
690 try_int!(UInt16Array);
691 try_int!(UInt8Array);
692
693 if let Some(arr) = any.downcast_ref::<Float64Array>() {
695 return Value::Float(arr.value(row));
696 }
697 if let Some(arr) = any.downcast_ref::<Float32Array>() {
698 return Value::Float(arr.value(row) as f64);
699 }
700
701 if let Some(arr) = any.downcast_ref::<BooleanArray>() {
703 return Value::Bool(arr.value(row));
704 }
705
706 if let Some(arr) = any.downcast_ref::<ListArray>() {
708 let values = arr.value(row);
709 let result: Vec<Value> = (0..values.len())
710 .map(|i| arrow_to_json_value(values.as_ref(), i))
711 .collect();
712 return Value::List(result);
713 }
714
715 if let Some(arr) = any.downcast_ref::<arrow_array::LargeBinaryArray>() {
717 let bytes = arr.value(row);
718 if let Ok(uni_val) = uni_common::cypher_value_codec::decode(bytes) {
719 return uni_val;
720 }
721 if let Ok(parsed) = serde_json::from_slice::<serde_json::Value>(bytes) {
723 return Value::from(parsed);
724 }
725 return Value::Null;
726 }
727
728 if let Some(s) = any.downcast_ref::<arrow_array::StructArray>() {
730 let mut map = HashMap::new();
731 for (field, child) in s.fields().iter().zip(s.columns()) {
732 map.insert(
733 field.name().clone(),
734 arrow_to_json_value(child.as_ref(), row),
735 );
736 }
737 return Value::Map(map);
738 }
739
740 Value::Null
742}
743
744#[cfg(test)]
745mod tests {
746 use super::*;
747 use arrow_array::{LargeBinaryArray, UInt64Array};
748 use uni_cypher::ast::CypherLiteral;
749
750 #[test]
751 fn test_build_schema() {
752 let input_schema = Arc::new(Schema::new(vec![
753 Field::new("n._vid", DataType::UInt64, false),
754 Field::new("n.name", DataType::Utf8, true),
755 ]));
756
757 let expr = Expr::Variable("some_list".to_string());
759 let output_schema = GraphUnwindExec::build_schema(input_schema, "item", &expr);
760
761 assert_eq!(output_schema.fields().len(), 3);
762 assert_eq!(output_schema.field(0).name(), "n._vid");
763 assert_eq!(output_schema.field(1).name(), "n.name");
764 assert_eq!(output_schema.field(2).name(), "item");
765 assert_eq!(output_schema.field(2).data_type(), &DataType::LargeBinary);
766 assert_eq!(
767 output_schema
768 .field(2)
769 .metadata()
770 .get("cv_encoded")
771 .map(String::as_str),
772 Some("true")
773 );
774 }
775
776 #[test]
777 fn test_build_schema_boolean_list() {
778 let input_schema = Arc::new(Schema::new(vec![Field::new(
779 "n._vid",
780 DataType::UInt64,
781 false,
782 )]));
783
784 let expr = Expr::List(vec![
785 Expr::Literal(CypherLiteral::Bool(true)),
786 Expr::Literal(CypherLiteral::Bool(false)),
787 Expr::Literal(CypherLiteral::Null),
788 ]);
789 let output_schema = GraphUnwindExec::build_schema(input_schema, "a", &expr);
790
791 let field = output_schema.field(1);
792 assert_eq!(field.name(), "a");
793 assert_eq!(field.data_type(), &DataType::Boolean);
794 assert!(field.metadata().is_empty());
795 }
796
797 #[test]
798 fn test_build_schema_integer_list() {
799 let input_schema = Arc::new(Schema::new(vec![Field::new(
800 "n._vid",
801 DataType::UInt64,
802 false,
803 )]));
804
805 let expr = Expr::List(vec![
806 Expr::Literal(CypherLiteral::Integer(1)),
807 Expr::Literal(CypherLiteral::Integer(2)),
808 Expr::Literal(CypherLiteral::Integer(3)),
809 ]);
810 let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
811
812 let field = output_schema.field(1);
813 assert_eq!(field.name(), "x");
814 assert_eq!(field.data_type(), &DataType::Int64);
815 assert!(field.metadata().is_empty());
816 }
817
818 #[test]
819 fn test_build_schema_float_list() {
820 let input_schema = Arc::new(Schema::new(vec![Field::new(
821 "n._vid",
822 DataType::UInt64,
823 false,
824 )]));
825
826 let expr = Expr::List(vec![
827 Expr::Literal(CypherLiteral::Float(1.5)),
828 Expr::Literal(CypherLiteral::Float(2.5)),
829 ]);
830 let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
831
832 let field = output_schema.field(1);
833 assert_eq!(field.name(), "x");
834 assert_eq!(field.data_type(), &DataType::Float64);
835 assert!(field.metadata().is_empty());
836 }
837
838 #[test]
839 fn test_build_schema_string_list() {
840 let input_schema = Arc::new(Schema::new(vec![Field::new(
841 "n._vid",
842 DataType::UInt64,
843 false,
844 )]));
845
846 let expr = Expr::List(vec![
847 Expr::Literal(CypherLiteral::String("hello".to_string())),
848 Expr::Literal(CypherLiteral::String("world".to_string())),
849 ]);
850 let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
851
852 let field = output_schema.field(1);
853 assert_eq!(field.name(), "x");
854 assert_eq!(field.data_type(), &DataType::Utf8);
855 assert!(field.metadata().is_empty());
857 }
858
859 #[test]
860 fn test_build_schema_mixed_list() {
861 let input_schema = Arc::new(Schema::new(vec![Field::new(
862 "n._vid",
863 DataType::UInt64,
864 false,
865 )]));
866
867 let expr = Expr::List(vec![
868 Expr::Literal(CypherLiteral::Integer(1)),
869 Expr::Literal(CypherLiteral::String("hello".to_string())),
870 ]);
871 let output_schema = GraphUnwindExec::build_schema(input_schema, "x", &expr);
872
873 let field = output_schema.field(1);
874 assert_eq!(field.name(), "x");
875 assert_eq!(field.data_type(), &DataType::LargeBinary);
876 assert_eq!(
877 field.metadata().get("cv_encoded").map(String::as_str),
878 Some("true")
879 );
880 }
881
882 #[test]
883 fn test_evaluate_literal_list() {
884 use arrow_array::builder::UInt64Builder;
885 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
886
887 let mut vid_builder = UInt64Builder::new();
889 vid_builder.append_value(1);
890
891 let batch = RecordBatch::try_new(
892 Arc::new(Schema::new(vec![Field::new(
893 "n._vid",
894 DataType::UInt64,
895 false,
896 )])),
897 vec![Arc::new(vid_builder.finish())],
898 )
899 .unwrap();
900
901 let input_schema = Arc::new(Schema::new(vec![Field::new(
903 "n._vid",
904 DataType::UInt64,
905 false,
906 )]));
907
908 let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
910
911 let stream = GraphUnwindStream {
913 input: Box::pin(empty_stream),
914 expr: Expr::List(vec![
915 Expr::Literal(CypherLiteral::Integer(1)),
916 Expr::Literal(CypherLiteral::Integer(2)),
917 Expr::Literal(CypherLiteral::Integer(3)),
918 ]),
919 params: HashMap::new(),
920 schema: Arc::new(Schema::new(vec![
921 Field::new("n._vid", DataType::UInt64, false),
922 Field::new("x", DataType::Utf8, true),
923 ])),
924 metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
925 };
926
927 let result = stream.evaluate_expr_for_row(&batch, 0).unwrap();
928 match result {
929 Value::List(items) => {
930 assert_eq!(items.len(), 3);
931 assert_eq!(items[0], Value::Int(1));
932 assert_eq!(items[1], Value::Int(2));
933 assert_eq!(items[2], Value::Int(3));
934 }
935 _ => panic!("Expected list"),
936 }
937 }
938
939 #[test]
940 fn test_evaluate_map_literal() {
941 use arrow_array::builder::UInt64Builder;
942 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
943
944 let mut vid_builder = UInt64Builder::new();
945 vid_builder.append_value(1);
946
947 let batch = RecordBatch::try_new(
948 Arc::new(Schema::new(vec![Field::new(
949 "n._vid",
950 DataType::UInt64,
951 false,
952 )])),
953 vec![Arc::new(vid_builder.finish())],
954 )
955 .unwrap();
956
957 let input_schema = Arc::new(Schema::new(vec![Field::new(
958 "n._vid",
959 DataType::UInt64,
960 false,
961 )]));
962
963 let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
964
965 let stream = GraphUnwindStream {
966 input: Box::pin(empty_stream),
967 expr: Expr::Map(vec![
968 ("a".to_string(), Expr::Literal(CypherLiteral::Integer(1))),
969 (
970 "b".to_string(),
971 Expr::Literal(CypherLiteral::String("hello".to_string())),
972 ),
973 ]),
974 params: HashMap::new(),
975 schema: Arc::new(Schema::new(vec![
976 Field::new("n._vid", DataType::UInt64, false),
977 Field::new("x", DataType::LargeBinary, true),
978 ])),
979 metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
980 };
981
982 let result = stream.evaluate_expr_for_row(&batch, 0).unwrap();
983 match result {
984 Value::Map(map) => {
985 assert_eq!(map.get("a"), Some(&Value::Int(1)));
986 assert_eq!(map.get("b"), Some(&Value::String("hello".to_string())));
987 }
988 _ => panic!("Expected Map, got {:?}", result),
989 }
990 }
991
992 #[test]
993 fn test_evaluate_map_property_access() {
994 use arrow_array::builder::UInt64Builder;
995 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
996
997 let mut vid_builder = UInt64Builder::new();
998 vid_builder.append_value(1);
999
1000 let batch = RecordBatch::try_new(
1001 Arc::new(Schema::new(vec![Field::new(
1002 "n._vid",
1003 DataType::UInt64,
1004 false,
1005 )])),
1006 vec![Arc::new(vid_builder.finish())],
1007 )
1008 .unwrap();
1009
1010 let input_schema = Arc::new(Schema::new(vec![Field::new(
1011 "n._vid",
1012 DataType::UInt64,
1013 false,
1014 )]));
1015
1016 let empty_stream = RecordBatchStreamAdapter::new(input_schema, futures::stream::empty());
1017
1018 let map_expr = Expr::Map(vec![
1020 ("a".to_string(), Expr::Literal(CypherLiteral::Integer(1))),
1021 (
1022 "b".to_string(),
1023 Expr::Literal(CypherLiteral::String("x".to_string())),
1024 ),
1025 ]);
1026 let prop_expr = Expr::Property(Box::new(map_expr), "a".to_string());
1027
1028 let stream = GraphUnwindStream {
1029 input: Box::pin(empty_stream),
1030 expr: prop_expr.clone(),
1031 params: HashMap::new(),
1032 schema: Arc::new(Schema::new(vec![
1033 Field::new("n._vid", DataType::UInt64, false),
1034 Field::new("x", DataType::LargeBinary, true),
1035 ])),
1036 metrics: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
1037 };
1038
1039 let result = stream.evaluate_expr_impl(&prop_expr, &batch, 0).unwrap();
1040 assert_eq!(result, Value::Int(1));
1041 }
1042
1043 #[test]
1044 fn test_arrow_to_json_value_uint64_is_coerced_to_int() {
1045 let arr = UInt64Array::from(vec![Some(42u64)]);
1046 let value = arrow_to_json_value(&arr, 0);
1047 assert_eq!(value, Value::Int(42));
1048 }
1049
1050 #[test]
1051 fn test_arrow_to_json_value_largebinary_decodes_cypher_map() {
1052 let encoded = uni_common::cypher_value_codec::encode(&Value::Map(HashMap::new()));
1053 let arr = LargeBinaryArray::from(vec![Some(encoded.as_slice())]);
1054 let value = arrow_to_json_value(&arr, 0);
1055 assert_eq!(value, Value::Map(HashMap::new()));
1056 }
1057}