1use arrow_array::{ArrayRef, RecordBatch};
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11use datafusion::arrow::array::Array;
12use datafusion::common::Result as DFResult;
13use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
14use datafusion::physical_plan::PlanProperties;
15use datafusion::prelude::SessionContext;
16use futures::TryStreamExt;
17use parking_lot::RwLock;
18use std::collections::HashMap;
19use std::sync::Arc;
20use uni_common::Value;
21use uni_common::core::schema::Schema as UniSchema;
22use uni_cypher::ast::{BinaryOp, CypherLiteral, Expr};
23use uni_store::storage::manager::StorageManager;
24
25use super::GraphExecutionContext;
26use super::procedure_call::map_yield_to_canonical;
27use super::unwind::arrow_to_json_value;
28use crate::query::df_planner::HybridPhysicalPlanner;
29use crate::query::planner::LogicalPlan;
30
31pub fn compute_plan_properties(schema: SchemaRef) -> PlanProperties {
38 PlanProperties::new(
39 EquivalenceProperties::new(schema),
40 Partitioning::UnknownPartitioning(1),
41 datafusion::physical_plan::execution_plan::EmissionType::Incremental,
42 datafusion::physical_plan::execution_plan::Boundedness::Bounded,
43 )
44}
45
46pub fn labels_data_type() -> DataType {
52 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)))
53}
54
55pub fn column_as_vid_array(
66 col: &dyn arrow_array::Array,
67) -> datafusion::error::Result<std::borrow::Cow<'_, arrow_array::UInt64Array>> {
68 use arrow_array::{Int64Array, StructArray, UInt64Array};
69 use arrow_schema::DataType;
70
71 if let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() {
72 return Ok(std::borrow::Cow::Borrowed(arr));
73 }
74
75 if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
76 let cast: UInt64Array = arr.iter().map(|v| v.map(|i| i as u64)).collect();
77 return Ok(std::borrow::Cow::Owned(cast));
78 }
79
80 if let Some(arr) = col.as_any().downcast_ref::<StructArray>()
83 && let DataType::Struct(fields) = arr.data_type()
84 && let Some((vid_idx, _)) = fields.find("_vid")
85 {
86 return column_as_vid_array(arr.column(vid_idx).as_ref());
87 }
88
89 if let Some(arr) = col.as_any().downcast_ref::<arrow_array::LargeBinaryArray>() {
93 let vids = vids_from_large_binary(arr);
94 return Ok(std::borrow::Cow::Owned(vids));
95 }
96
97 if *col.data_type() == DataType::Null {
99 let vids: UInt64Array = (0..col.len()).map(|_| None::<u64>).collect();
100 return Ok(std::borrow::Cow::Owned(vids));
101 }
102
103 Err(datafusion::error::DataFusionError::Execution(format!(
104 "VID column has type {:?}, expected UInt64 or Int64",
105 col.data_type()
106 )))
107}
108
109fn extract_vid_from_value(val: &Value) -> Option<u64> {
114 match val {
115 Value::Node(node) => Some(node.vid.as_u64()),
116 Value::Map(map) => {
117 if let Some(Value::Int(vid)) = map.get("_vid") {
125 return Some(*vid as u64);
126 }
127 if let Some(Value::String(id_str)) = map.get("_id") {
129 return id_str
130 .strip_prefix("Vid(")
131 .and_then(|s| s.strip_suffix(')'))
132 .unwrap_or(id_str)
133 .parse::<u64>()
134 .ok();
135 }
136 if let Some(Value::Int(id)) = map.get("_id") {
137 return Some(*id as u64);
138 }
139 None
140 }
141 _ => None,
142 }
143}
144
145fn vids_from_large_binary(arr: &arrow_array::LargeBinaryArray) -> arrow_array::UInt64Array {
150 use uni_common::cypher_value_codec;
151
152 (0..arr.len())
153 .map(|i| {
154 if arr.is_null(i) {
155 return None;
156 }
157 cypher_value_codec::decode(arr.value(i))
158 .ok()
159 .as_ref()
160 .and_then(extract_vid_from_value)
161 })
162 .collect()
163}
164
165pub fn extract_vids_from_cypher_value_column(col: &dyn Array) -> DFResult<arrow_array::ArrayRef> {
171 let binary_col = col
172 .as_any()
173 .downcast_ref::<arrow_array::LargeBinaryArray>()
174 .ok_or_else(|| {
175 datafusion::error::DataFusionError::Execution(
176 "extract_vids_from_cypher_value_column: expected LargeBinary column".to_string(),
177 )
178 })?;
179 Ok(Arc::new(vids_from_large_binary(binary_col)) as arrow_array::ArrayRef)
180}
181
182pub(crate) fn extract_column_value<T: arrow_array::Array + 'static, R>(
188 batch: &RecordBatch,
189 col_name: &str,
190 row_idx: usize,
191 extract_fn: impl FnOnce(&T, usize) -> R,
192) -> Option<R> {
193 let (idx, _) = batch.schema().column_with_name(col_name)?;
194 let col = batch.column(idx);
195 let arr = col.as_any().downcast_ref::<T>()?;
196 if arr.is_valid(row_idx) {
197 Some(extract_fn(arr, row_idx))
198 } else {
199 None
200 }
201}
202
203pub fn node_struct_fields() -> arrow_schema::Fields {
208 arrow_schema::Fields::from(vec![
209 Field::new("_vid", DataType::UInt64, false),
210 Field::new("_labels", labels_data_type(), true),
211 Field::new("properties", DataType::LargeBinary, true),
212 ])
213}
214
215pub fn edge_struct_fields() -> arrow_schema::Fields {
220 arrow_schema::Fields::from(vec![
221 Field::new("_eid", DataType::UInt64, false),
222 Field::new("_type_name", DataType::Utf8, false),
223 Field::new("_src", DataType::UInt64, false),
224 Field::new("_dst", DataType::UInt64, false),
225 Field::new("properties", DataType::LargeBinary, true),
226 ])
227}
228
229pub fn encode_props_to_cv(props: &std::collections::HashMap<String, uni_common::Value>) -> Vec<u8> {
235 let val = uni_common::Value::Map(props.clone());
236 uni_common::cypher_value_codec::encode(&val)
237}
238
239pub fn build_edge_list_field(step_var: &str) -> Field {
244 let edge_item = Field::new("item", DataType::Struct(edge_struct_fields()), true);
245 Field::new(step_var, DataType::List(Arc::new(edge_item)), true)
247}
248
249pub fn build_path_struct_field(path_var: &str) -> Field {
253 let node_item = Field::new("item", DataType::Struct(node_struct_fields()), true);
254 let nodes_field = Field::new("nodes", DataType::List(Arc::new(node_item)), true);
255
256 let edge_item = Field::new("item", DataType::Struct(edge_struct_fields()), true);
257 let relationships_field =
258 Field::new("relationships", DataType::List(Arc::new(edge_item)), true);
259
260 Field::new(
261 path_var,
262 DataType::Struct(arrow_schema::Fields::from(vec![
263 nodes_field,
264 relationships_field,
265 ])),
266 true,
267 )
268}
269
270pub fn extend_schema_with_path(input_schema: SchemaRef, path_variable: &str) -> SchemaRef {
275 let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
276 fields.push(Arc::new(build_path_struct_field(path_variable)));
277 Arc::new(Schema::new(fields))
278}
279
280pub fn build_path_struct_array(
286 nodes_array: ArrayRef,
287 rels_array: ArrayRef,
288 path_validity: Vec<bool>,
289) -> DFResult<arrow_array::StructArray> {
290 Ok(arrow_array::StructArray::try_new(
291 arrow_schema::Fields::from(vec![
292 Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true)),
293 Arc::new(Field::new(
294 "relationships",
295 rels_array.data_type().clone(),
296 true,
297 )),
298 ]),
299 vec![nodes_array, rels_array],
300 Some(arrow::buffer::NullBuffer::from(path_validity)),
301 )?)
302}
303
304pub fn new_edge_list_builder()
310-> arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder> {
311 use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, StructBuilder, UInt64Builder};
312 arrow_array::builder::ListBuilder::new(StructBuilder::new(
313 edge_struct_fields(),
314 vec![
315 Box::new(UInt64Builder::new()),
316 Box::new(StringBuilder::new()),
317 Box::new(UInt64Builder::new()),
318 Box::new(UInt64Builder::new()),
319 Box::new(LargeBinaryBuilder::new()),
320 ],
321 ))
322}
323
324pub fn new_node_list_builder()
329-> arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder> {
330 use arrow_array::builder::{
331 LargeBinaryBuilder, ListBuilder, StringBuilder, StructBuilder, UInt64Builder,
332 };
333 arrow_array::builder::ListBuilder::new(StructBuilder::new(
334 node_struct_fields(),
335 vec![
336 Box::new(UInt64Builder::new()),
337 Box::new(ListBuilder::new(StringBuilder::new())),
338 Box::new(LargeBinaryBuilder::new()),
339 ],
340 ))
341}
342
343pub fn append_edge_to_struct(
349 struct_builder: &mut arrow_array::builder::StructBuilder,
350 eid: uni_common::core::id::Eid,
351 type_name: &str,
352 src_vid: u64,
353 dst_vid: u64,
354 query_ctx: &uni_store::runtime::context::QueryContext,
355) {
356 use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, UInt64Builder};
357 use uni_store::runtime::l0_visibility;
358
359 struct_builder
360 .field_builder::<UInt64Builder>(0)
361 .unwrap()
362 .append_value(eid.as_u64());
363 struct_builder
364 .field_builder::<StringBuilder>(1)
365 .unwrap()
366 .append_value(type_name);
367 struct_builder
368 .field_builder::<UInt64Builder>(2)
369 .unwrap()
370 .append_value(src_vid);
371 struct_builder
372 .field_builder::<UInt64Builder>(3)
373 .unwrap()
374 .append_value(dst_vid);
375 let props_builder = struct_builder
376 .field_builder::<LargeBinaryBuilder>(4)
377 .unwrap();
378 if let Some(props) = l0_visibility::get_edge_properties(eid, query_ctx) {
379 let cv_bytes = encode_props_to_cv(&props);
380 props_builder.append_value(&cv_bytes);
381 } else {
382 props_builder.append_null();
383 }
384 struct_builder.append(true);
385}
386
387fn append_null_edge_struct(struct_builder: &mut arrow_array::builder::StructBuilder) {
392 use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, UInt64Builder};
393
394 struct_builder
395 .field_builder::<UInt64Builder>(0)
396 .unwrap()
397 .append_value(0);
398 struct_builder
399 .field_builder::<StringBuilder>(1)
400 .unwrap()
401 .append_value("");
402 struct_builder
403 .field_builder::<UInt64Builder>(2)
404 .unwrap()
405 .append_value(0);
406 struct_builder
407 .field_builder::<UInt64Builder>(3)
408 .unwrap()
409 .append_value(0);
410 struct_builder
411 .field_builder::<LargeBinaryBuilder>(4)
412 .unwrap()
413 .append_null();
414 struct_builder.append(false);
415}
416
417pub fn append_edge_to_struct_optional(
423 struct_builder: &mut arrow_array::builder::StructBuilder,
424 eid: Option<uni_common::core::id::Eid>,
425 src_vid: u64,
426 dst_vid: u64,
427 batch_type_name: Option<String>,
428 query_ctx: &uni_store::runtime::context::QueryContext,
429) {
430 match eid {
431 Some(e) => {
432 use uni_store::runtime::l0_visibility;
433 let type_name = batch_type_name
434 .or_else(|| l0_visibility::get_edge_type(e, query_ctx))
435 .unwrap_or_default();
436 append_edge_to_struct(struct_builder, e, &type_name, src_vid, dst_vid, query_ctx);
437 }
438 None => append_null_edge_struct(struct_builder),
439 }
440}
441
442pub fn append_node_to_struct(
448 struct_builder: &mut arrow_array::builder::StructBuilder,
449 vid: uni_common::core::id::Vid,
450 query_ctx: &uni_store::runtime::context::QueryContext,
451) {
452 use arrow_array::builder::{LargeBinaryBuilder, ListBuilder, StringBuilder, UInt64Builder};
453 use uni_store::runtime::l0_visibility;
454
455 struct_builder
456 .field_builder::<UInt64Builder>(0)
457 .unwrap()
458 .append_value(vid.as_u64());
459 let labels = l0_visibility::get_vertex_labels(vid, query_ctx);
460 let labels_builder = struct_builder
461 .field_builder::<ListBuilder<StringBuilder>>(1)
462 .unwrap();
463 let values = labels_builder.values();
464 for lbl in &labels {
465 values.append_value(lbl);
466 }
467 labels_builder.append(true);
468 let props_builder = struct_builder
469 .field_builder::<LargeBinaryBuilder>(2)
470 .unwrap();
471 if let Some(props) = l0_visibility::get_vertex_properties(vid, query_ctx) {
472 let cv_bytes = encode_props_to_cv(&props);
473 props_builder.append_value(&cv_bytes);
474 } else {
475 props_builder.append_null();
476 }
477 struct_builder.append(true);
478}
479
480fn append_null_node_struct(struct_builder: &mut arrow_array::builder::StructBuilder) {
485 use arrow_array::builder::{LargeBinaryBuilder, ListBuilder, StringBuilder, UInt64Builder};
486
487 struct_builder
488 .field_builder::<UInt64Builder>(0)
489 .unwrap()
490 .append_value(0);
491 struct_builder
492 .field_builder::<ListBuilder<StringBuilder>>(1)
493 .unwrap()
494 .append(true);
495 struct_builder
496 .field_builder::<LargeBinaryBuilder>(2)
497 .unwrap()
498 .append_null();
499 struct_builder.append(false);
500}
501
502pub fn append_node_to_struct_optional(
507 struct_builder: &mut arrow_array::builder::StructBuilder,
508 vid: Option<uni_common::core::id::Vid>,
509 query_ctx: &uni_store::runtime::context::QueryContext,
510) {
511 match vid {
512 Some(v) => append_node_to_struct(struct_builder, v, query_ctx),
513 None => append_null_node_struct(struct_builder),
514 }
515}
516
517pub fn large_list_of_cv_to_cv_array(
531 list: &datafusion::arrow::array::LargeListArray,
532) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
533 use datafusion::arrow::array::{LargeBinaryArray, LargeBinaryBuilder};
534
535 let values = list.values();
536 let binary_values = values
537 .as_any()
538 .downcast_ref::<LargeBinaryArray>()
539 .ok_or_else(|| {
540 datafusion::error::DataFusionError::Execution(
541 "large_list_of_cv_to_cv_array: inner values must be LargeBinaryArray".to_string(),
542 )
543 })?;
544
545 let mut builder = LargeBinaryBuilder::new();
546
547 for row_idx in 0..list.len() {
548 if list.is_null(row_idx) {
549 builder.append_null();
550 continue;
551 }
552
553 let start = list.offsets()[row_idx] as usize;
554 let end = list.offsets()[row_idx + 1] as usize;
555
556 let mut json_elements = Vec::with_capacity(end - start);
557 for elem_idx in start..end {
558 if binary_values.is_null(elem_idx) {
559 json_elements.push(serde_json::Value::Null);
560 } else {
561 let blob = binary_values.value(elem_idx);
562 match uni_common::cypher_value_codec::decode(blob) {
563 Ok(uni_val) => {
564 let json_val: serde_json::Value = uni_val.into();
565 json_elements.push(json_val);
566 }
567 Err(_) => json_elements.push(serde_json::Value::Null),
568 }
569 }
570 }
571
572 let uni_val: uni_common::Value = serde_json::Value::Array(json_elements).into();
573 let bytes = uni_common::cypher_value_codec::encode(&uni_val);
574 builder.append_value(&bytes);
575 }
576
577 Ok(Arc::new(builder.finish()))
578}
579
580fn arrow_element_to_json(
585 col: &dyn datafusion::arrow::array::Array,
586 idx: usize,
587) -> serde_json::Value {
588 use datafusion::arrow::array::{
589 BooleanArray, Float64Array, Int64Array, StringArray, UInt64Array,
590 };
591
592 if col.is_null(idx) {
593 return serde_json::Value::Null;
594 }
595
596 if let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() {
597 serde_json::Value::Number(serde_json::Number::from(arr.value(idx)))
598 } else if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
599 serde_json::Value::Number(serde_json::Number::from(arr.value(idx)))
600 } else if let Some(arr) = col.as_any().downcast_ref::<Float64Array>() {
601 serde_json::Number::from_f64(arr.value(idx))
602 .map(serde_json::Value::Number)
603 .unwrap_or(serde_json::Value::Null)
604 } else if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
605 serde_json::Value::String(arr.value(idx).to_string())
606 } else if let Some(arr) = col.as_any().downcast_ref::<BooleanArray>() {
607 serde_json::Value::Bool(arr.value(idx))
608 } else if let Some(arr) = col.as_any().downcast_ref::<arrow_array::LargeBinaryArray>() {
609 uni_common::cypher_value_codec::decode(arr.value(idx))
610 .map(|v| v.into())
611 .unwrap_or(serde_json::Value::Null)
612 } else {
613 serde_json::Value::Null
614 }
615}
616
617pub fn typed_large_list_to_cv_array(
632 list: &datafusion::arrow::array::LargeListArray,
633) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
634 use datafusion::arrow::array::{LargeBinaryBuilder, StructArray};
635
636 let values = list.values();
637
638 if values.data_type() == &DataType::LargeBinary {
640 return large_list_of_cv_to_cv_array(list);
641 }
642
643 let elem_to_json: Box<dyn Fn(usize) -> serde_json::Value> = match values.data_type() {
646 DataType::UInt64
647 | DataType::Int64
648 | DataType::Float64
649 | DataType::Utf8
650 | DataType::Boolean => {
651 let values = values.clone();
652 Box::new(move |idx| arrow_element_to_json(values.as_ref(), idx))
653 }
654 DataType::Struct(_) => {
655 let typed = values
656 .as_any()
657 .downcast_ref::<StructArray>()
658 .ok_or_else(|| {
659 datafusion::error::DataFusionError::Execution(
660 "Expected StructArray".to_string(),
661 )
662 })?;
663 let fields: Vec<_> = typed.fields().iter().cloned().collect();
664 let columns: Vec<_> = (0..typed.num_columns())
665 .map(|i| typed.column(i).clone())
666 .collect();
667 let nulls = typed.nulls().cloned();
668 Box::new(move |idx| {
669 if nulls.as_ref().is_some_and(|n| n.is_null(idx)) {
670 return serde_json::Value::Null;
671 }
672 let mut map = serde_json::Map::new();
673 for (field_idx, field) in fields.iter().enumerate() {
674 let value = arrow_element_to_json(columns[field_idx].as_ref(), idx);
675 map.insert(field.name().clone(), value);
676 }
677 serde_json::Value::Object(map)
678 })
679 }
680 other => {
681 return Err(datafusion::error::DataFusionError::Execution(format!(
682 "Unsupported element type for typed_large_list_to_cv_array: {:?}",
683 other
684 )));
685 }
686 };
687
688 let mut builder = LargeBinaryBuilder::new();
689
690 for row_idx in 0..list.len() {
691 if list.is_null(row_idx) {
692 builder.append_null();
693 continue;
694 }
695
696 let start = list.offsets()[row_idx] as usize;
697 let end = list.offsets()[row_idx + 1] as usize;
698 let json_elements: Vec<serde_json::Value> = (start..end).map(&elem_to_json).collect();
699
700 let uni_val: uni_common::Value = serde_json::Value::Array(json_elements).into();
701 let bytes = uni_common::cypher_value_codec::encode(&uni_val);
702 builder.append_value(&bytes);
703 }
704
705 Ok(Arc::new(builder.finish()))
706}
707
708pub fn cv_array_to_large_list(
716 array: &dyn datafusion::arrow::array::Array,
717 element_type: &DataType,
718) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
719 use datafusion::arrow::array::LargeBinaryArray;
720 use datafusion::arrow::buffer::{OffsetBuffer, ScalarBuffer};
721
722 let binary_arr = array
723 .as_any()
724 .downcast_ref::<LargeBinaryArray>()
725 .ok_or_else(|| {
726 datafusion::error::DataFusionError::Execution(
727 "cv_array_to_large_list: expected LargeBinaryArray".to_string(),
728 )
729 })?;
730
731 let num_rows = binary_arr.len();
733 let mut all_elements: Vec<Vec<serde_json::Value>> = Vec::with_capacity(num_rows);
734 let mut nulls = Vec::with_capacity(num_rows);
735
736 for i in 0..num_rows {
737 if binary_arr.is_null(i) {
738 all_elements.push(Vec::new());
739 nulls.push(false);
740 continue;
741 }
742
743 let blob = binary_arr.value(i);
744 let uni_val = match uni_common::cypher_value_codec::decode(blob) {
745 Ok(v) => v,
746 Err(_) => {
747 all_elements.push(Vec::new());
748 nulls.push(false);
749 continue;
750 }
751 };
752 let json_val_decoded: serde_json::Value = uni_val.into();
753
754 match json_val_decoded {
755 serde_json::Value::Array(elements) => {
756 all_elements.push(elements);
757 nulls.push(true);
758 }
759 _ => {
760 all_elements.push(Vec::new());
761 nulls.push(true);
762 }
763 }
764 }
765
766 let mut offsets: Vec<i64> = Vec::with_capacity(num_rows + 1);
768 offsets.push(0);
769
770 let values_array: Arc<dyn datafusion::arrow::array::Array> = match element_type {
771 DataType::Int64 => {
772 let mut builder = datafusion::arrow::array::builder::Int64Builder::new();
773 for elems in &all_elements {
774 for elem in elems {
775 if let serde_json::Value::Number(n) = elem {
776 if let Some(i) = n.as_i64() {
777 builder.append_value(i);
778 } else if let Some(f) = n.as_f64() {
779 builder.append_value(f as i64);
780 } else {
781 builder.append_null();
782 }
783 } else {
784 builder.append_null();
785 }
786 }
787 offsets.push(offsets.last().unwrap() + elems.len() as i64);
788 }
789 Arc::new(builder.finish())
790 }
791 DataType::Float64 => {
792 let mut builder = datafusion::arrow::array::builder::Float64Builder::new();
793 for elems in &all_elements {
794 for elem in elems {
795 if let serde_json::Value::Number(n) = elem
796 && let Some(f) = n.as_f64()
797 {
798 builder.append_value(f);
799 } else {
800 builder.append_null();
801 }
802 }
803 offsets.push(offsets.last().unwrap() + elems.len() as i64);
804 }
805 Arc::new(builder.finish())
806 }
807 DataType::Utf8 | DataType::LargeUtf8 => {
808 let mut builder = datafusion::arrow::array::builder::StringBuilder::new();
809 for elems in &all_elements {
810 for elem in elems {
811 match elem {
812 serde_json::Value::String(s) => builder.append_value(s),
813 serde_json::Value::Null => builder.append_null(),
814 other => builder.append_value(other.to_string()),
815 }
816 }
817 offsets.push(offsets.last().unwrap() + elems.len() as i64);
818 }
819 Arc::new(builder.finish())
820 }
821 DataType::Boolean => {
822 let mut builder = datafusion::arrow::array::builder::BooleanBuilder::new();
823 for elems in &all_elements {
824 for elem in elems {
825 if let serde_json::Value::Bool(b) = elem {
826 builder.append_value(*b);
827 } else {
828 builder.append_null();
829 }
830 }
831 offsets.push(offsets.last().unwrap() + elems.len() as i64);
832 }
833 Arc::new(builder.finish())
834 }
835 _ => {
837 let mut builder = datafusion::arrow::array::builder::LargeBinaryBuilder::new();
838 for elems in &all_elements {
839 for elem in elems {
840 let uni_val: uni_common::Value = elem.clone().into();
841 let bytes = uni_common::cypher_value_codec::encode(&uni_val);
842 builder.append_value(&bytes);
843 }
844 offsets.push(offsets.last().unwrap() + elems.len() as i64);
845 }
846 Arc::new(builder.finish())
847 }
848 };
849
850 let field = Arc::new(Field::new("item", element_type.clone(), true));
851 let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets));
852 let null_buffer = datafusion::arrow::buffer::NullBuffer::from(nulls);
853
854 let large_list = datafusion::arrow::array::LargeListArray::new(
855 field,
856 offset_buffer,
857 values_array,
858 Some(null_buffer),
859 );
860
861 Ok(Arc::new(large_list))
862}
863
864pub async fn collect_all_partitions(
869 plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
870 task_ctx: Arc<datafusion::execution::TaskContext>,
871) -> DFResult<Vec<RecordBatch>> {
872 let partition_count = plan.properties().output_partitioning().partition_count();
873
874 let mut all_batches = Vec::new();
875 for partition in 0..partition_count {
876 let stream = plan.execute(partition, task_ctx.clone())?;
877 let batches: Vec<RecordBatch> = stream.try_collect().await?;
878 all_batches.extend(batches);
879 }
880 Ok(all_batches)
881}
882
883pub async fn execute_subplan(
887 plan: &LogicalPlan,
888 params: &HashMap<String, Value>,
889 outer_values: &HashMap<String, Value>,
890 graph_ctx: &Arc<GraphExecutionContext>,
891 session_ctx: &Arc<RwLock<SessionContext>>,
892 storage: &Arc<StorageManager>,
893 schema_info: &Arc<UniSchema>,
894) -> DFResult<Vec<RecordBatch>> {
895 let planner_construction_start = std::time::Instant::now();
896 let l0_context = graph_ctx.l0_context().clone();
897 let prop_manager = graph_ctx.property_manager().clone();
898
899 let planner = HybridPhysicalPlanner::with_l0_context(
900 session_ctx.clone(),
901 storage.clone(),
902 l0_context,
903 prop_manager,
904 schema_info.clone(),
905 params.clone(),
906 outer_values.clone(),
907 );
908 let planner_construction_elapsed = planner_construction_start.elapsed();
909 tracing::debug!(
910 "execute_subplan: planner construction took {:?}",
911 planner_construction_elapsed
912 );
913
914 let planning_start = std::time::Instant::now();
915 let execution_plan = planner.plan(plan).map_err(|e| {
916 datafusion::error::DataFusionError::Execution(format!("Sub-plan error: {}", e))
917 })?;
918 let planning_elapsed = planning_start.elapsed();
919 tracing::debug!("execute_subplan: planning took {:?}", planning_elapsed);
920
921 let execution_start = std::time::Instant::now();
922 let task_ctx = session_ctx.read().task_ctx();
923 let all_batches = collect_all_partitions(&execution_plan, task_ctx).await?;
924 let execution_elapsed = execution_start.elapsed();
925 tracing::debug!("execute_subplan: execution took {:?}", execution_elapsed);
926
927 Ok(all_batches)
928}
929
930pub fn extract_row_params(batch: &RecordBatch, row_idx: usize) -> HashMap<String, Value> {
934 let schema = batch.schema();
935 let mut row = HashMap::new();
936 for col_idx in 0..batch.num_columns() {
937 let col_name = schema.field(col_idx).name().clone();
938 let val = arrow_to_json_value(batch.column(col_idx).as_ref(), row_idx);
939 row.insert(col_name, val);
940 }
941 row
942}
943
944fn infer_procedure_call_schema(
951 procedure_name: &str,
952 yield_items: &[(String, Option<String>)],
953 _schema_info: &UniSchema,
954) -> SchemaRef {
955 let infer_type = |name: &str| -> DataType {
956 match procedure_name {
957 "uni.schema.labels" => match name {
958 "propertyCount" | "nodeCount" | "indexCount" => DataType::Int64,
959 _ => DataType::Utf8,
960 },
961 "uni.schema.edgeTypes" | "uni.schema.relationshipTypes" => match name {
962 "propertyCount" => DataType::Int64,
963 _ => DataType::Utf8,
964 },
965 "uni.schema.constraints" => match name {
966 "enabled" => DataType::Boolean,
967 _ => DataType::Utf8,
968 },
969 "uni.schema.labelInfo" => match name {
970 "nullable" | "indexed" | "unique" => DataType::Boolean,
971 _ => DataType::Utf8,
972 },
973 "uni.vector.query" | "uni.fts.query" | "uni.search" => {
974 match map_yield_to_canonical(name).as_str() {
977 "distance" => DataType::Float64,
978 "score" | "vector_score" | "fts_score" | "raw_score" => DataType::Float32,
979 "vid" => DataType::Int64,
980 _ => DataType::Utf8,
981 }
982 }
983 _ => DataType::Utf8,
985 }
986 };
987
988 let fields: Vec<Field> = yield_items
989 .iter()
990 .map(|(name, alias)| {
991 let col_name = alias.as_ref().unwrap_or(name);
992 Field::new(col_name, infer_type(name), true)
993 })
994 .collect();
995
996 Arc::new(Schema::new(fields))
997}
998
999pub fn infer_logical_plan_schema(plan: &LogicalPlan, schema_info: &UniSchema) -> SchemaRef {
1006 if let LogicalPlan::Project { projections, .. } = plan {
1008 let fields: Vec<Field> = projections
1009 .iter()
1010 .map(|(expr, alias)| {
1011 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
1012 let dt = infer_expr_type(expr, schema_info);
1013 Field::new(name, dt, true)
1014 })
1015 .collect();
1016 return Arc::new(Schema::new(fields));
1017 }
1018
1019 match plan {
1021 LogicalPlan::Sort { input, .. }
1022 | LogicalPlan::Limit { input, .. }
1023 | LogicalPlan::Filter { input, .. }
1024 | LogicalPlan::Distinct { input } => infer_logical_plan_schema(input, schema_info),
1025
1026 LogicalPlan::ProcedureCall {
1027 procedure_name,
1028 yield_items,
1029 ..
1030 } => infer_procedure_call_schema(procedure_name, yield_items, schema_info),
1031
1032 _ => {
1033 Arc::new(Schema::empty())
1035 }
1036 }
1037}
1038
1039fn infer_expr_type(expr: &Expr, schema_info: &UniSchema) -> DataType {
1041 match expr {
1042 Expr::Property(base, key) => {
1043 if let Expr::Variable(_) = base.as_ref() {
1044 for props in schema_info.properties.values() {
1046 if let Some(meta) = props.get(key.as_str()) {
1047 return meta.r#type.to_arrow();
1048 }
1049 }
1050 DataType::LargeBinary
1051 } else {
1052 DataType::LargeBinary
1053 }
1054 }
1055 Expr::BinaryOp { left, op, right } => match op {
1056 BinaryOp::Add | BinaryOp::Sub | BinaryOp::Mul | BinaryOp::Div | BinaryOp::Mod => {
1057 let lt = infer_expr_type(left, schema_info);
1058 let rt = infer_expr_type(right, schema_info);
1059 numeric_promotion(<, &rt)
1060 }
1061 BinaryOp::Eq
1062 | BinaryOp::NotEq
1063 | BinaryOp::Lt
1064 | BinaryOp::LtEq
1065 | BinaryOp::Gt
1066 | BinaryOp::GtEq
1067 | BinaryOp::And
1068 | BinaryOp::Or => DataType::Boolean,
1069 _ => DataType::LargeBinary,
1070 },
1071 Expr::Literal(lit) => match lit {
1072 CypherLiteral::Integer(_) => DataType::Int64,
1073 CypherLiteral::Float(_) => DataType::Float64,
1074 CypherLiteral::String(_) => DataType::Utf8,
1075 CypherLiteral::Bool(_) => DataType::Boolean,
1076 CypherLiteral::Null => DataType::Null,
1077 CypherLiteral::Bytes(_) => DataType::LargeBinary,
1078 },
1079 Expr::Variable(_) => DataType::LargeBinary,
1080 Expr::FunctionCall { name, args, .. } => match name.to_lowercase().as_str() {
1081 "count" => DataType::Int64,
1082 "sum" | "avg" => {
1083 if let Some(arg) = args.first() {
1084 let arg_type = infer_expr_type(arg, schema_info);
1085 if matches!(arg_type, DataType::Float32 | DataType::Float64) {
1086 DataType::Float64
1087 } else {
1088 DataType::Int64
1089 }
1090 } else {
1091 DataType::Int64
1092 }
1093 }
1094 "min" | "max" => {
1095 if let Some(arg) = args.first() {
1096 infer_expr_type(arg, schema_info)
1097 } else {
1098 DataType::LargeBinary
1099 }
1100 }
1101 "tostring" | "trim" | "ltrim" | "rtrim" | "tolower" | "toupper" | "left" | "right"
1102 | "substring" | "replace" | "reverse" | "type" => DataType::Utf8,
1103 "tointeger" | "toint" | "size" | "length" | "id" => DataType::Int64,
1104 "tofloat" => DataType::Float64,
1105 "toboolean" => DataType::Boolean,
1106 _ => DataType::LargeBinary,
1107 },
1108 _ => DataType::LargeBinary,
1109 }
1110}
1111
1112fn numeric_promotion(left: &DataType, right: &DataType) -> DataType {
1114 match (left, right) {
1115 (DataType::Float64, _) | (_, DataType::Float64) => DataType::Float64,
1116 (DataType::Float32, _) | (_, DataType::Float32) => DataType::Float64,
1117 (DataType::Int64, _) | (_, DataType::Int64) => DataType::Int64,
1118 (DataType::Int32, _) | (_, DataType::Int32) => DataType::Int64,
1119 _ => DataType::Int64,
1120 }
1121}
1122
1123pub(crate) fn evaluate_simple_expr(
1130 expr: &Expr,
1131 params: &HashMap<String, Value>,
1132) -> DFResult<Value> {
1133 match expr {
1134 Expr::Literal(lit) => Ok(lit.to_value()),
1135
1136 Expr::Parameter(name) => params.get(name).cloned().ok_or_else(|| {
1137 datafusion::error::DataFusionError::Execution(format!("Parameter '{}' not found", name))
1138 }),
1139
1140 Expr::List(items) => {
1141 let values: Vec<Value> = items
1142 .iter()
1143 .map(|item| evaluate_simple_expr(item, params))
1144 .collect::<DFResult<_>>()?;
1145 Ok(Value::List(values))
1146 }
1147
1148 _ => Err(datafusion::error::DataFusionError::Execution(format!(
1149 "Unsupported expression type for procedure argument: {:?}",
1150 expr
1151 ))),
1152 }
1153}
1154
1155pub fn merged_edge_schema_props(
1163 uni_schema: &UniSchema,
1164 edge_type_ids: &[u32],
1165) -> HashMap<String, uni_common::core::schema::PropertyMeta> {
1166 let mut merged: HashMap<String, uni_common::core::schema::PropertyMeta> = HashMap::new();
1167 let mut sorted_ids = edge_type_ids.to_vec();
1168 sorted_ids.sort_unstable();
1169
1170 for edge_type_id in sorted_ids {
1171 if let Some(edge_type_name) = uni_schema.edge_type_name_by_id_unified(edge_type_id)
1172 && let Some(props) = uni_schema.properties.get(edge_type_name.as_str())
1173 {
1174 for (prop_name, meta) in props {
1175 match merged.get_mut(prop_name) {
1176 Some(existing) => {
1177 if existing.r#type != meta.r#type {
1178 existing.r#type = uni_common::core::schema::DataType::CypherValue;
1179 }
1180 existing.nullable |= meta.nullable;
1181 }
1182 None => {
1183 merged.insert(prop_name.clone(), meta.clone());
1184 }
1185 }
1186 }
1187 }
1188 }
1189
1190 merged
1191}
1192
1193#[derive(Clone, Debug, PartialEq, Eq, Hash)]
1201pub(crate) enum ScalarKey {
1202 Null,
1203 Bool(bool),
1204 Int64(i64),
1205 Utf8(String),
1206 Binary(Vec<u8>),
1207}
1208
1209pub(crate) fn extract_scalar_key(
1215 batch: &RecordBatch,
1216 key_indices: &[usize],
1217 row_idx: usize,
1218) -> Vec<ScalarKey> {
1219 use arrow::array::Array;
1220 key_indices
1221 .iter()
1222 .map(|&col_idx| {
1223 let col = batch.column(col_idx);
1224 if col.is_null(row_idx) {
1225 return ScalarKey::Null;
1226 }
1227 match col.data_type() {
1228 arrow_schema::DataType::Boolean => {
1229 let arr = col
1230 .as_any()
1231 .downcast_ref::<arrow_array::BooleanArray>()
1232 .unwrap();
1233 ScalarKey::Bool(arr.value(row_idx))
1234 }
1235 arrow_schema::DataType::Int64 => {
1236 let arr = col
1237 .as_any()
1238 .downcast_ref::<arrow_array::Int64Array>()
1239 .unwrap();
1240 ScalarKey::Int64(arr.value(row_idx))
1241 }
1242 arrow_schema::DataType::Utf8 => {
1243 let arr = col
1244 .as_any()
1245 .downcast_ref::<arrow_array::StringArray>()
1246 .unwrap();
1247 ScalarKey::Utf8(arr.value(row_idx).to_string())
1248 }
1249 arrow_schema::DataType::LargeBinary => {
1250 let arr = col
1251 .as_any()
1252 .downcast_ref::<arrow_array::LargeBinaryArray>()
1253 .unwrap();
1254 ScalarKey::Binary(arr.value(row_idx).to_vec())
1255 }
1256 arrow_schema::DataType::Float64 => {
1257 let arr = col
1259 .as_any()
1260 .downcast_ref::<arrow_array::Float64Array>()
1261 .unwrap();
1262 ScalarKey::Int64(arr.value(row_idx).to_bits() as i64)
1263 }
1264 arrow_schema::DataType::LargeUtf8 => {
1265 let arr = col
1266 .as_any()
1267 .downcast_ref::<arrow_array::LargeStringArray>()
1268 .unwrap();
1269 ScalarKey::Utf8(arr.value(row_idx).to_string())
1270 }
1271 arrow_schema::DataType::Struct(_) => {
1272 let formatter = arrow::util::display::ArrayFormatter::try_new(
1274 col.as_ref(),
1275 &arrow::util::display::FormatOptions::default(),
1276 );
1277 match formatter {
1278 Ok(f) => ScalarKey::Utf8(f.value(row_idx).to_string()),
1279 Err(_) => ScalarKey::Utf8(format!("struct@{row_idx}")),
1280 }
1281 }
1282 _ => {
1283 let formatter = arrow::util::display::ArrayFormatter::try_new(
1285 col.as_ref(),
1286 &arrow::util::display::FormatOptions::default(),
1287 );
1288 match formatter {
1289 Ok(f) => ScalarKey::Utf8(f.value(row_idx).to_string()),
1290 Err(_) => ScalarKey::Utf8(format!("unknown@{row_idx}")),
1291 }
1292 }
1293 }
1294 })
1295 .collect()
1296}
1297
1298#[cfg(test)]
1299mod tests {
1300 use super::*;
1301 use arrow_array::{LargeBinaryArray, UInt64Array};
1302 use arrow_schema::Schema;
1303
1304 #[test]
1305 fn test_extract_row_params_loses_uint64_to_int() {
1306 let schema = Arc::new(Schema::new(vec![Field::new(
1307 "n._vid",
1308 DataType::UInt64,
1309 true,
1310 )]));
1311 let batch = RecordBatch::try_new(schema, vec![Arc::new(UInt64Array::from(vec![Some(7)]))])
1312 .expect("batch should be valid");
1313
1314 let params = extract_row_params(&batch, 0);
1315 assert_eq!(params.get("n._vid"), Some(&Value::Int(7)));
1316 }
1317
1318 #[test]
1319 fn test_extract_row_params_decodes_largebinary_to_map() {
1320 let encoded = uni_common::cypher_value_codec::encode(&Value::Map(HashMap::new()));
1321 let schema = Arc::new(Schema::new(vec![Field::new(
1322 "m._all_props",
1323 DataType::LargeBinary,
1324 true,
1325 )]));
1326 let batch = RecordBatch::try_new(
1327 schema,
1328 vec![Arc::new(LargeBinaryArray::from(vec![Some(
1329 encoded.as_slice(),
1330 )]))],
1331 )
1332 .expect("batch should be valid");
1333
1334 let params = extract_row_params(&batch, 0);
1335 assert_eq!(
1336 params.get("m._all_props"),
1337 Some(&Value::Map(HashMap::new()))
1338 );
1339 }
1340}