1use arrow_array::{ArrayRef, RecordBatch};
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11use datafusion::arrow::array::Array;
12use datafusion::common::Result as DFResult;
13use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
14use datafusion::physical_plan::PlanProperties;
15use datafusion::prelude::SessionContext;
16use futures::TryStreamExt;
17use parking_lot::RwLock;
18use std::collections::HashMap;
19use std::sync::Arc;
20use uni_common::Value;
21use uni_common::core::schema::Schema as UniSchema;
22use uni_cypher::ast::{BinaryOp, CypherLiteral, Expr};
23use uni_store::storage::manager::StorageManager;
24
25use super::GraphExecutionContext;
26use super::procedure_call::map_yield_to_canonical;
27use super::unwind::arrow_to_json_value;
28use crate::query::df_graph::MutationContext;
29use crate::query::df_planner::HybridPhysicalPlanner;
30use crate::query::executor::core::{OperatorStats, collect_plan_metrics};
31use crate::query::planner::LogicalPlan;
32
33pub fn arrow_err(e: arrow::error::ArrowError) -> datafusion::error::DataFusionError {
39 datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
40}
41
42pub fn compute_plan_properties(schema: SchemaRef) -> Arc<PlanProperties> {
49 Arc::new(PlanProperties::new(
50 EquivalenceProperties::new(schema),
51 Partitioning::UnknownPartitioning(1),
52 datafusion::physical_plan::execution_plan::EmissionType::Incremental,
53 datafusion::physical_plan::execution_plan::Boundedness::Bounded,
54 ))
55}
56
57pub fn labels_data_type() -> DataType {
63 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)))
64}
65
66pub fn column_as_vid_array(
77 col: &dyn arrow_array::Array,
78) -> datafusion::error::Result<std::borrow::Cow<'_, arrow_array::UInt64Array>> {
79 use arrow_array::{Int64Array, StructArray, UInt64Array};
80 use arrow_schema::DataType;
81
82 if let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() {
83 return Ok(std::borrow::Cow::Borrowed(arr));
84 }
85
86 if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
87 let cast: UInt64Array = arr.iter().map(|v| v.map(|i| i as u64)).collect();
88 return Ok(std::borrow::Cow::Owned(cast));
89 }
90
91 if let Some(arr) = col.as_any().downcast_ref::<StructArray>()
94 && let DataType::Struct(fields) = arr.data_type()
95 && let Some((vid_idx, _)) = fields.find("_vid")
96 {
97 return column_as_vid_array(arr.column(vid_idx).as_ref());
98 }
99
100 if let Some(arr) = col.as_any().downcast_ref::<arrow_array::LargeBinaryArray>() {
104 let vids = vids_from_large_binary(arr);
105 return Ok(std::borrow::Cow::Owned(vids));
106 }
107
108 if *col.data_type() == DataType::Null {
110 let vids: UInt64Array = (0..col.len()).map(|_| None::<u64>).collect();
111 return Ok(std::borrow::Cow::Owned(vids));
112 }
113
114 Err(datafusion::error::DataFusionError::Execution(format!(
115 "VID column has type {:?}, expected UInt64 or Int64",
116 col.data_type()
117 )))
118}
119
120fn extract_vid_from_value(val: &Value) -> Option<u64> {
125 match val {
126 Value::Node(node) => Some(node.vid.as_u64()),
127 Value::Map(map) => {
128 if let Some(Value::Int(vid)) = map.get("_vid") {
136 return Some(*vid as u64);
137 }
138 if let Some(Value::String(id_str)) = map.get("_id") {
140 return id_str
141 .strip_prefix("Vid(")
142 .and_then(|s| s.strip_suffix(')'))
143 .unwrap_or(id_str)
144 .parse::<u64>()
145 .ok();
146 }
147 if let Some(Value::Int(id)) = map.get("_id") {
148 return Some(*id as u64);
149 }
150 None
151 }
152 _ => None,
153 }
154}
155
156fn vids_from_large_binary(arr: &arrow_array::LargeBinaryArray) -> arrow_array::UInt64Array {
161 use uni_common::cypher_value_codec;
162
163 (0..arr.len())
164 .map(|i| {
165 if arr.is_null(i) {
166 return None;
167 }
168 cypher_value_codec::decode(arr.value(i))
169 .ok()
170 .as_ref()
171 .and_then(extract_vid_from_value)
172 })
173 .collect()
174}
175
176pub fn extract_vids_from_cypher_value_column(col: &dyn Array) -> DFResult<arrow_array::ArrayRef> {
182 let binary_col = col
183 .as_any()
184 .downcast_ref::<arrow_array::LargeBinaryArray>()
185 .ok_or_else(|| {
186 datafusion::error::DataFusionError::Execution(
187 "extract_vids_from_cypher_value_column: expected LargeBinary column".to_string(),
188 )
189 })?;
190 Ok(Arc::new(vids_from_large_binary(binary_col)) as arrow_array::ArrayRef)
191}
192
193pub(crate) fn extract_column_value<T: arrow_array::Array + 'static, R>(
199 batch: &RecordBatch,
200 col_name: &str,
201 row_idx: usize,
202 extract_fn: impl FnOnce(&T, usize) -> R,
203) -> Option<R> {
204 let (idx, _) = batch.schema().column_with_name(col_name)?;
205 let col = batch.column(idx);
206 let arr = col.as_any().downcast_ref::<T>()?;
207 if arr.is_valid(row_idx) {
208 Some(extract_fn(arr, row_idx))
209 } else {
210 None
211 }
212}
213
214pub fn node_struct_fields() -> arrow_schema::Fields {
219 arrow_schema::Fields::from(vec![
220 Field::new("_vid", DataType::UInt64, false),
221 Field::new("_labels", labels_data_type(), true),
222 Field::new("properties", DataType::LargeBinary, true),
223 ])
224}
225
226pub fn edge_struct_fields() -> arrow_schema::Fields {
231 arrow_schema::Fields::from(vec![
232 Field::new("_eid", DataType::UInt64, false),
233 Field::new("_type_name", DataType::Utf8, false),
234 Field::new("_src", DataType::UInt64, false),
235 Field::new("_dst", DataType::UInt64, false),
236 Field::new("properties", DataType::LargeBinary, true),
237 ])
238}
239
240pub fn encode_props_to_cv(props: &std::collections::HashMap<String, uni_common::Value>) -> Vec<u8> {
246 let val = uni_common::Value::Map(props.clone());
247 uni_common::cypher_value_codec::encode(&val)
248}
249
250pub fn build_edge_list_field(step_var: &str) -> Field {
255 let edge_item = Field::new("item", DataType::Struct(edge_struct_fields()), true);
256 Field::new(step_var, DataType::List(Arc::new(edge_item)), true)
258}
259
260pub fn build_path_struct_field(path_var: &str) -> Field {
264 let node_item = Field::new("item", DataType::Struct(node_struct_fields()), true);
265 let nodes_field = Field::new("nodes", DataType::List(Arc::new(node_item)), true);
266
267 let edge_item = Field::new("item", DataType::Struct(edge_struct_fields()), true);
268 let relationships_field =
269 Field::new("relationships", DataType::List(Arc::new(edge_item)), true);
270
271 Field::new(
272 path_var,
273 DataType::Struct(arrow_schema::Fields::from(vec![
274 nodes_field,
275 relationships_field,
276 ])),
277 true,
278 )
279}
280
281pub fn extend_schema_with_path(input_schema: SchemaRef, path_variable: &str) -> SchemaRef {
286 let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
287 fields.push(Arc::new(build_path_struct_field(path_variable)));
288 Arc::new(Schema::new(fields))
289}
290
291pub fn build_path_struct_array(
297 nodes_array: ArrayRef,
298 rels_array: ArrayRef,
299 path_validity: Vec<bool>,
300) -> DFResult<arrow_array::StructArray> {
301 Ok(arrow_array::StructArray::try_new(
302 arrow_schema::Fields::from(vec![
303 Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true)),
304 Arc::new(Field::new(
305 "relationships",
306 rels_array.data_type().clone(),
307 true,
308 )),
309 ]),
310 vec![nodes_array, rels_array],
311 Some(arrow::buffer::NullBuffer::from(path_validity)),
312 )?)
313}
314
315pub fn new_edge_list_builder()
321-> arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder> {
322 use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, StructBuilder, UInt64Builder};
323 arrow_array::builder::ListBuilder::new(StructBuilder::new(
324 edge_struct_fields(),
325 vec![
326 Box::new(UInt64Builder::new()),
327 Box::new(StringBuilder::new()),
328 Box::new(UInt64Builder::new()),
329 Box::new(UInt64Builder::new()),
330 Box::new(LargeBinaryBuilder::new()),
331 ],
332 ))
333}
334
335pub fn new_node_list_builder()
340-> arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder> {
341 use arrow_array::builder::{
342 LargeBinaryBuilder, ListBuilder, StringBuilder, StructBuilder, UInt64Builder,
343 };
344 arrow_array::builder::ListBuilder::new(StructBuilder::new(
345 node_struct_fields(),
346 vec![
347 Box::new(UInt64Builder::new()),
348 Box::new(ListBuilder::new(StringBuilder::new())),
349 Box::new(LargeBinaryBuilder::new()),
350 ],
351 ))
352}
353
354pub fn append_edge_to_struct(
360 struct_builder: &mut arrow_array::builder::StructBuilder,
361 eid: uni_common::core::id::Eid,
362 type_name: &str,
363 src_vid: u64,
364 dst_vid: u64,
365 query_ctx: &uni_store::runtime::context::QueryContext,
366) {
367 use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, UInt64Builder};
368 use uni_store::runtime::l0_visibility;
369
370 struct_builder
371 .field_builder::<UInt64Builder>(0)
372 .unwrap()
373 .append_value(eid.as_u64());
374 struct_builder
375 .field_builder::<StringBuilder>(1)
376 .unwrap()
377 .append_value(type_name);
378 struct_builder
379 .field_builder::<UInt64Builder>(2)
380 .unwrap()
381 .append_value(src_vid);
382 struct_builder
383 .field_builder::<UInt64Builder>(3)
384 .unwrap()
385 .append_value(dst_vid);
386 let props_builder = struct_builder
387 .field_builder::<LargeBinaryBuilder>(4)
388 .unwrap();
389 if let Some(props) = l0_visibility::get_edge_properties(eid, query_ctx) {
390 let cv_bytes = encode_props_to_cv(&props);
391 props_builder.append_value(&cv_bytes);
392 } else {
393 props_builder.append_null();
394 }
395 struct_builder.append(true);
396}
397
398fn append_null_edge_struct(struct_builder: &mut arrow_array::builder::StructBuilder) {
403 use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, UInt64Builder};
404
405 struct_builder
406 .field_builder::<UInt64Builder>(0)
407 .unwrap()
408 .append_value(0);
409 struct_builder
410 .field_builder::<StringBuilder>(1)
411 .unwrap()
412 .append_value("");
413 struct_builder
414 .field_builder::<UInt64Builder>(2)
415 .unwrap()
416 .append_value(0);
417 struct_builder
418 .field_builder::<UInt64Builder>(3)
419 .unwrap()
420 .append_value(0);
421 struct_builder
422 .field_builder::<LargeBinaryBuilder>(4)
423 .unwrap()
424 .append_null();
425 struct_builder.append(false);
426}
427
428pub fn append_edge_to_struct_optional(
434 struct_builder: &mut arrow_array::builder::StructBuilder,
435 eid: Option<uni_common::core::id::Eid>,
436 src_vid: u64,
437 dst_vid: u64,
438 batch_type_name: Option<String>,
439 query_ctx: &uni_store::runtime::context::QueryContext,
440) {
441 match eid {
442 Some(e) => {
443 use uni_store::runtime::l0_visibility;
444 let type_name = batch_type_name
445 .or_else(|| l0_visibility::get_edge_type(e, query_ctx))
446 .unwrap_or_default();
447 append_edge_to_struct(struct_builder, e, &type_name, src_vid, dst_vid, query_ctx);
448 }
449 None => append_null_edge_struct(struct_builder),
450 }
451}
452
453pub fn append_node_to_struct(
459 struct_builder: &mut arrow_array::builder::StructBuilder,
460 vid: uni_common::core::id::Vid,
461 query_ctx: &uni_store::runtime::context::QueryContext,
462) {
463 use arrow_array::builder::{LargeBinaryBuilder, ListBuilder, StringBuilder, UInt64Builder};
464 use uni_store::runtime::l0_visibility;
465
466 struct_builder
467 .field_builder::<UInt64Builder>(0)
468 .unwrap()
469 .append_value(vid.as_u64());
470 let labels = l0_visibility::get_vertex_labels(vid, query_ctx);
471 let labels_builder = struct_builder
472 .field_builder::<ListBuilder<StringBuilder>>(1)
473 .unwrap();
474 let values = labels_builder.values();
475 for lbl in &labels {
476 values.append_value(lbl);
477 }
478 labels_builder.append(true);
479 let props_builder = struct_builder
480 .field_builder::<LargeBinaryBuilder>(2)
481 .unwrap();
482 if let Some(props) = l0_visibility::get_vertex_properties(vid, query_ctx) {
483 let cv_bytes = encode_props_to_cv(&props);
484 props_builder.append_value(&cv_bytes);
485 } else {
486 props_builder.append_null();
487 }
488 struct_builder.append(true);
489}
490
491fn append_null_node_struct(struct_builder: &mut arrow_array::builder::StructBuilder) {
496 use arrow_array::builder::{LargeBinaryBuilder, ListBuilder, StringBuilder, UInt64Builder};
497
498 struct_builder
499 .field_builder::<UInt64Builder>(0)
500 .unwrap()
501 .append_value(0);
502 struct_builder
503 .field_builder::<ListBuilder<StringBuilder>>(1)
504 .unwrap()
505 .append(true);
506 struct_builder
507 .field_builder::<LargeBinaryBuilder>(2)
508 .unwrap()
509 .append_null();
510 struct_builder.append(false);
511}
512
513pub fn append_node_to_struct_optional(
518 struct_builder: &mut arrow_array::builder::StructBuilder,
519 vid: Option<uni_common::core::id::Vid>,
520 query_ctx: &uni_store::runtime::context::QueryContext,
521) {
522 match vid {
523 Some(v) => append_node_to_struct(struct_builder, v, query_ctx),
524 None => append_null_node_struct(struct_builder),
525 }
526}
527
528pub fn large_list_of_cv_to_cv_array(
542 list: &datafusion::arrow::array::LargeListArray,
543) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
544 use datafusion::arrow::array::{LargeBinaryArray, LargeBinaryBuilder};
545
546 let values = list.values();
547 let binary_values = values
548 .as_any()
549 .downcast_ref::<LargeBinaryArray>()
550 .ok_or_else(|| {
551 datafusion::error::DataFusionError::Execution(
552 "large_list_of_cv_to_cv_array: inner values must be LargeBinaryArray".to_string(),
553 )
554 })?;
555
556 let mut builder = LargeBinaryBuilder::new();
557
558 for row_idx in 0..list.len() {
559 if list.is_null(row_idx) {
560 builder.append_null();
561 continue;
562 }
563
564 let start = list.offsets()[row_idx] as usize;
565 let end = list.offsets()[row_idx + 1] as usize;
566
567 let mut json_elements = Vec::with_capacity(end - start);
568 for elem_idx in start..end {
569 if binary_values.is_null(elem_idx) {
570 json_elements.push(serde_json::Value::Null);
571 } else {
572 let blob = binary_values.value(elem_idx);
573 match uni_common::cypher_value_codec::decode(blob) {
574 Ok(uni_val) => {
575 let json_val: serde_json::Value = uni_val.into();
576 json_elements.push(json_val);
577 }
578 Err(_) => json_elements.push(serde_json::Value::Null),
579 }
580 }
581 }
582
583 let uni_val: uni_common::Value = serde_json::Value::Array(json_elements).into();
584 let bytes = uni_common::cypher_value_codec::encode(&uni_val);
585 builder.append_value(&bytes);
586 }
587
588 Ok(Arc::new(builder.finish()))
589}
590
591fn arrow_element_to_json(
596 col: &dyn datafusion::arrow::array::Array,
597 idx: usize,
598) -> serde_json::Value {
599 use datafusion::arrow::array::{
600 BooleanArray, Float64Array, Int64Array, StringArray, UInt64Array,
601 };
602
603 if col.is_null(idx) {
604 return serde_json::Value::Null;
605 }
606
607 if let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() {
608 serde_json::Value::Number(serde_json::Number::from(arr.value(idx)))
609 } else if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
610 serde_json::Value::Number(serde_json::Number::from(arr.value(idx)))
611 } else if let Some(arr) = col.as_any().downcast_ref::<Float64Array>() {
612 serde_json::Number::from_f64(arr.value(idx))
613 .map(serde_json::Value::Number)
614 .unwrap_or(serde_json::Value::Null)
615 } else if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
616 serde_json::Value::String(arr.value(idx).to_string())
617 } else if let Some(arr) = col.as_any().downcast_ref::<BooleanArray>() {
618 serde_json::Value::Bool(arr.value(idx))
619 } else if let Some(arr) = col.as_any().downcast_ref::<arrow_array::LargeBinaryArray>() {
620 uni_common::cypher_value_codec::decode(arr.value(idx))
621 .map(|v| v.into())
622 .unwrap_or(serde_json::Value::Null)
623 } else {
624 serde_json::Value::Null
625 }
626}
627
628pub fn typed_large_list_to_cv_array(
643 list: &datafusion::arrow::array::LargeListArray,
644) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
645 use datafusion::arrow::array::{LargeBinaryBuilder, StructArray};
646
647 let values = list.values();
648
649 if values.data_type() == &DataType::LargeBinary {
651 return large_list_of_cv_to_cv_array(list);
652 }
653
654 let elem_to_json: Box<dyn Fn(usize) -> serde_json::Value> = match values.data_type() {
657 DataType::UInt64
658 | DataType::Int64
659 | DataType::Float64
660 | DataType::Utf8
661 | DataType::Boolean => {
662 let values = values.clone();
663 Box::new(move |idx| arrow_element_to_json(values.as_ref(), idx))
664 }
665 DataType::Struct(_) => {
666 let typed = values
667 .as_any()
668 .downcast_ref::<StructArray>()
669 .ok_or_else(|| {
670 datafusion::error::DataFusionError::Execution(
671 "Expected StructArray".to_string(),
672 )
673 })?;
674 let fields: Vec<_> = typed.fields().iter().cloned().collect();
675 let columns: Vec<_> = (0..typed.num_columns())
676 .map(|i| typed.column(i).clone())
677 .collect();
678 let nulls = typed.nulls().cloned();
679 Box::new(move |idx| {
680 if nulls.as_ref().is_some_and(|n| n.is_null(idx)) {
681 return serde_json::Value::Null;
682 }
683 let mut map = serde_json::Map::new();
684 for (field_idx, field) in fields.iter().enumerate() {
685 let value = arrow_element_to_json(columns[field_idx].as_ref(), idx);
686 map.insert(field.name().clone(), value);
687 }
688 serde_json::Value::Object(map)
689 })
690 }
691 other => {
692 return Err(datafusion::error::DataFusionError::Execution(format!(
693 "Unsupported element type for typed_large_list_to_cv_array: {:?}",
694 other
695 )));
696 }
697 };
698
699 let mut builder = LargeBinaryBuilder::new();
700
701 for row_idx in 0..list.len() {
702 if list.is_null(row_idx) {
703 builder.append_null();
704 continue;
705 }
706
707 let start = list.offsets()[row_idx] as usize;
708 let end = list.offsets()[row_idx + 1] as usize;
709 let json_elements: Vec<serde_json::Value> = (start..end).map(&elem_to_json).collect();
710
711 let uni_val: uni_common::Value = serde_json::Value::Array(json_elements).into();
712 let bytes = uni_common::cypher_value_codec::encode(&uni_val);
713 builder.append_value(&bytes);
714 }
715
716 Ok(Arc::new(builder.finish()))
717}
718
719pub fn cv_array_to_large_list(
727 array: &dyn datafusion::arrow::array::Array,
728 element_type: &DataType,
729) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
730 use datafusion::arrow::array::LargeBinaryArray;
731 use datafusion::arrow::buffer::{OffsetBuffer, ScalarBuffer};
732
733 let binary_arr = array
734 .as_any()
735 .downcast_ref::<LargeBinaryArray>()
736 .ok_or_else(|| {
737 datafusion::error::DataFusionError::Execution(
738 "cv_array_to_large_list: expected LargeBinaryArray".to_string(),
739 )
740 })?;
741
742 let num_rows = binary_arr.len();
744 let mut all_elements: Vec<Vec<serde_json::Value>> = Vec::with_capacity(num_rows);
745 let mut nulls = Vec::with_capacity(num_rows);
746
747 for i in 0..num_rows {
748 if binary_arr.is_null(i) {
749 all_elements.push(Vec::new());
750 nulls.push(false);
751 continue;
752 }
753
754 let blob = binary_arr.value(i);
755 let uni_val = match uni_common::cypher_value_codec::decode(blob) {
756 Ok(v) => v,
757 Err(_) => {
758 all_elements.push(Vec::new());
759 nulls.push(false);
760 continue;
761 }
762 };
763 let json_val_decoded: serde_json::Value = uni_val.into();
764
765 match json_val_decoded {
766 serde_json::Value::Array(elements) => {
767 all_elements.push(elements);
768 nulls.push(true);
769 }
770 _ => {
771 all_elements.push(Vec::new());
772 nulls.push(true);
773 }
774 }
775 }
776
777 let mut offsets: Vec<i64> = Vec::with_capacity(num_rows + 1);
779 offsets.push(0);
780
781 let values_array: Arc<dyn datafusion::arrow::array::Array> = match element_type {
782 DataType::Int64 => {
783 let mut builder = datafusion::arrow::array::builder::Int64Builder::new();
784 for elems in &all_elements {
785 for elem in elems {
786 if let serde_json::Value::Number(n) = elem {
787 if let Some(i) = n.as_i64() {
788 builder.append_value(i);
789 } else if let Some(f) = n.as_f64() {
790 builder.append_value(f as i64);
791 } else {
792 builder.append_null();
793 }
794 } else {
795 builder.append_null();
796 }
797 }
798 offsets.push(offsets.last().unwrap() + elems.len() as i64);
799 }
800 Arc::new(builder.finish())
801 }
802 DataType::Float64 => {
803 let mut builder = datafusion::arrow::array::builder::Float64Builder::new();
804 for elems in &all_elements {
805 for elem in elems {
806 if let serde_json::Value::Number(n) = elem
807 && let Some(f) = n.as_f64()
808 {
809 builder.append_value(f);
810 } else {
811 builder.append_null();
812 }
813 }
814 offsets.push(offsets.last().unwrap() + elems.len() as i64);
815 }
816 Arc::new(builder.finish())
817 }
818 DataType::Utf8 | DataType::LargeUtf8 => {
819 let mut builder = datafusion::arrow::array::builder::StringBuilder::new();
820 for elems in &all_elements {
821 for elem in elems {
822 match elem {
823 serde_json::Value::String(s) => builder.append_value(s),
824 serde_json::Value::Null => builder.append_null(),
825 other => builder.append_value(other.to_string()),
826 }
827 }
828 offsets.push(offsets.last().unwrap() + elems.len() as i64);
829 }
830 Arc::new(builder.finish())
831 }
832 DataType::Boolean => {
833 let mut builder = datafusion::arrow::array::builder::BooleanBuilder::new();
834 for elems in &all_elements {
835 for elem in elems {
836 if let serde_json::Value::Bool(b) = elem {
837 builder.append_value(*b);
838 } else {
839 builder.append_null();
840 }
841 }
842 offsets.push(offsets.last().unwrap() + elems.len() as i64);
843 }
844 Arc::new(builder.finish())
845 }
846 _ => {
848 let mut builder = datafusion::arrow::array::builder::LargeBinaryBuilder::new();
849 for elems in &all_elements {
850 for elem in elems {
851 let uni_val: uni_common::Value = elem.clone().into();
852 let bytes = uni_common::cypher_value_codec::encode(&uni_val);
853 builder.append_value(&bytes);
854 }
855 offsets.push(offsets.last().unwrap() + elems.len() as i64);
856 }
857 Arc::new(builder.finish())
858 }
859 };
860
861 let field = Arc::new(Field::new("item", element_type.clone(), true));
862 let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets));
863 let null_buffer = datafusion::arrow::buffer::NullBuffer::from(nulls);
864
865 let large_list = datafusion::arrow::array::LargeListArray::new(
866 field,
867 offset_buffer,
868 values_array,
869 Some(null_buffer),
870 );
871
872 Ok(Arc::new(large_list))
873}
874
875pub async fn collect_all_partitions(
880 plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
881 task_ctx: Arc<datafusion::execution::TaskContext>,
882) -> DFResult<Vec<RecordBatch>> {
883 let partition_count = plan.properties().output_partitioning().partition_count();
884
885 let mut all_batches = Vec::new();
886 for partition in 0..partition_count {
887 let stream = plan.execute(partition, task_ctx.clone())?;
888 let batches: Vec<RecordBatch> = stream.try_collect().await?;
889 all_batches.extend(batches);
890 }
891 Ok(all_batches)
892}
893
894#[expect(
901 clippy::too_many_arguments,
902 reason = "Threading mutation_ctx for CALL subquery writes"
903)]
904pub async fn execute_subplan(
905 plan: &LogicalPlan,
906 params: &HashMap<String, Value>,
907 outer_values: &HashMap<String, Value>,
908 graph_ctx: &Arc<GraphExecutionContext>,
909 session_ctx: &Arc<RwLock<SessionContext>>,
910 storage: &Arc<StorageManager>,
911 schema_info: &Arc<UniSchema>,
912 mutation_ctx: Option<&Arc<MutationContext>>,
913) -> DFResult<Vec<RecordBatch>> {
914 let (batches, _plan) = execute_subplan_with_outer_vars(
915 plan,
916 params,
917 outer_values,
918 graph_ctx,
919 session_ctx,
920 storage,
921 schema_info,
922 &std::collections::HashSet::new(),
923 mutation_ctx,
924 )
925 .await?;
926 Ok(batches)
927}
928
929#[expect(
939 clippy::too_many_arguments,
940 reason = "Mirrors execute_subplan's parameter list; profiling adds only the return value"
941)]
942pub async fn execute_subplan_collecting(
943 plan: &LogicalPlan,
944 params: &HashMap<String, Value>,
945 outer_values: &HashMap<String, Value>,
946 graph_ctx: &Arc<GraphExecutionContext>,
947 session_ctx: &Arc<RwLock<SessionContext>>,
948 storage: &Arc<StorageManager>,
949 schema_info: &Arc<UniSchema>,
950 mutation_ctx: Option<&Arc<MutationContext>>,
951) -> DFResult<(Vec<RecordBatch>, Vec<OperatorStats>)> {
952 let (batches, plan) = execute_subplan_with_outer_vars(
953 plan,
954 params,
955 outer_values,
956 graph_ctx,
957 session_ctx,
958 storage,
959 schema_info,
960 &std::collections::HashSet::new(),
961 mutation_ctx,
962 )
963 .await?;
964 let operators = collect_plan_metrics(&plan);
965 Ok((batches, operators))
966}
967
968#[expect(
969 clippy::too_many_arguments,
970 reason = "Threading outer_entity_vars for nested EXISTS and mutation_ctx for CALL writes"
971)]
972pub async fn execute_subplan_with_outer_vars(
978 plan: &LogicalPlan,
979 params: &HashMap<String, Value>,
980 outer_values: &HashMap<String, Value>,
981 graph_ctx: &Arc<GraphExecutionContext>,
982 session_ctx: &Arc<RwLock<SessionContext>>,
983 storage: &Arc<StorageManager>,
984 schema_info: &Arc<UniSchema>,
985 outer_entity_vars: &std::collections::HashSet<String>,
986 mutation_ctx: Option<&Arc<MutationContext>>,
987) -> DFResult<(
988 Vec<RecordBatch>,
989 Arc<dyn datafusion::physical_plan::ExecutionPlan>,
990)> {
991 let mut planner = HybridPhysicalPlanner::with_l0_context(
992 session_ctx.clone(),
993 storage.clone(),
994 graph_ctx.l0_context().clone(),
995 graph_ctx.property_manager().clone(),
996 schema_info.clone(),
997 params.clone(),
998 outer_values.clone(),
999 );
1000 planner.set_outer_entity_vars(outer_entity_vars.clone());
1001
1002 if let Some(registry) = graph_ctx.algo_registry() {
1005 planner = planner.with_algo_registry(registry.clone());
1006 }
1007 if let Some(registry) = graph_ctx.procedure_registry() {
1008 planner = planner.with_procedure_registry(registry.clone());
1009 }
1010 if let Some(runtime) = graph_ctx.xervo_runtime() {
1011 planner = planner.with_xervo_runtime(runtime.clone());
1012 }
1013
1014 if let Some(mc) = mutation_ctx {
1018 planner = planner.with_mutation_context(mc.clone());
1019 }
1020
1021 let execution_plan = planner.plan(plan).map_err(|e| {
1022 datafusion::error::DataFusionError::Execution(format!("Sub-plan error: {e}"))
1023 })?;
1024
1025 let task_ctx = session_ctx.read().task_ctx();
1026 let all_batches = collect_all_partitions(&execution_plan, task_ctx).await?;
1027
1028 Ok((all_batches, execution_plan))
1029}
1030
1031pub fn extract_row_params(batch: &RecordBatch, row_idx: usize) -> HashMap<String, Value> {
1035 let schema = batch.schema();
1036 (0..batch.num_columns())
1037 .map(|col_idx| {
1038 let col_name = schema.field(col_idx).name().clone();
1039 let val = arrow_to_json_value(batch.column(col_idx).as_ref(), row_idx);
1040 (col_name, val)
1041 })
1042 .collect()
1043}
1044
1045fn infer_procedure_call_schema(
1052 procedure_name: &str,
1053 yield_items: &[(String, Option<String>)],
1054 _schema_info: &UniSchema,
1055) -> SchemaRef {
1056 let infer_type = |name: &str| -> DataType {
1057 match procedure_name {
1058 "uni.schema.labels" => match name {
1059 "propertyCount" | "nodeCount" | "indexCount" => DataType::Int64,
1060 _ => DataType::Utf8,
1061 },
1062 "uni.schema.edgeTypes" | "uni.schema.relationshipTypes" => match name {
1063 "propertyCount" => DataType::Int64,
1064 _ => DataType::Utf8,
1065 },
1066 "uni.schema.constraints" => match name {
1067 "enabled" => DataType::Boolean,
1068 _ => DataType::Utf8,
1069 },
1070 "uni.schema.labelInfo" => match name {
1071 "nullable" | "indexed" | "unique" => DataType::Boolean,
1072 _ => DataType::Utf8,
1073 },
1074 n if super::procedure_call::is_node_yield_procedure_static(n) => {
1079 super::procedure_call::canonical_search_type(map_yield_to_canonical(name))
1080 }
1081 _ => DataType::Utf8,
1083 }
1084 };
1085
1086 let fields: Vec<Field> = yield_items
1087 .iter()
1088 .map(|(name, alias)| {
1089 let col_name = alias.as_ref().unwrap_or(name);
1090 Field::new(col_name, infer_type(name), true)
1091 })
1092 .collect();
1093
1094 Arc::new(Schema::new(fields))
1095}
1096
1097pub fn infer_logical_plan_schema(plan: &LogicalPlan, schema_info: &UniSchema) -> SchemaRef {
1104 if let LogicalPlan::Project { projections, .. } = plan {
1106 let fields: Vec<Field> = projections
1107 .iter()
1108 .map(|(expr, alias)| {
1109 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
1110 let dt = infer_expr_type(expr, schema_info);
1111 Field::new(name, dt, true)
1112 })
1113 .collect();
1114 return Arc::new(Schema::new(fields));
1115 }
1116
1117 match plan {
1119 LogicalPlan::Sort { input, .. }
1120 | LogicalPlan::Limit { input, .. }
1121 | LogicalPlan::Filter { input, .. }
1122 | LogicalPlan::Distinct { input } => infer_logical_plan_schema(input, schema_info),
1123
1124 LogicalPlan::ProcedureCall {
1125 procedure_name,
1126 yield_items,
1127 ..
1128 } => infer_procedure_call_schema(procedure_name, yield_items, schema_info),
1129
1130 _ => {
1131 Arc::new(Schema::empty())
1133 }
1134 }
1135}
1136
1137fn infer_expr_type(expr: &Expr, schema_info: &UniSchema) -> DataType {
1139 match expr {
1140 Expr::Property(base, key) => {
1141 if let Expr::Variable(_) = base.as_ref() {
1142 for props in schema_info.properties.values() {
1144 if let Some(meta) = props.get(key.as_str()) {
1145 return meta.r#type.to_arrow();
1146 }
1147 }
1148 DataType::LargeBinary
1149 } else {
1150 DataType::LargeBinary
1151 }
1152 }
1153 Expr::BinaryOp { left, op, right } => match op {
1154 BinaryOp::Add | BinaryOp::Sub | BinaryOp::Mul | BinaryOp::Div | BinaryOp::Mod => {
1155 let lt = infer_expr_type(left, schema_info);
1156 let rt = infer_expr_type(right, schema_info);
1157 numeric_promotion(<, &rt)
1158 }
1159 BinaryOp::Eq
1160 | BinaryOp::NotEq
1161 | BinaryOp::Lt
1162 | BinaryOp::LtEq
1163 | BinaryOp::Gt
1164 | BinaryOp::GtEq
1165 | BinaryOp::And
1166 | BinaryOp::Or => DataType::Boolean,
1167 _ => DataType::LargeBinary,
1168 },
1169 Expr::Literal(lit) => match lit {
1170 CypherLiteral::Integer(_) => DataType::Int64,
1171 CypherLiteral::Float(_) => DataType::Float64,
1172 CypherLiteral::String(_) => DataType::Utf8,
1173 CypherLiteral::Bool(_) => DataType::Boolean,
1174 CypherLiteral::Null => DataType::Null,
1175 CypherLiteral::Bytes(_) => DataType::LargeBinary,
1176 },
1177 Expr::Variable(_) => DataType::LargeBinary,
1178 Expr::FunctionCall { name, args, .. } => match name.to_lowercase().as_str() {
1179 "count" => DataType::Int64,
1180 "sum" | "avg" => {
1181 if let Some(arg) = args.first() {
1182 let arg_type = infer_expr_type(arg, schema_info);
1183 if matches!(arg_type, DataType::Float32 | DataType::Float64) {
1184 DataType::Float64
1185 } else {
1186 DataType::Int64
1187 }
1188 } else {
1189 DataType::Int64
1190 }
1191 }
1192 "min" | "max" => {
1193 if let Some(arg) = args.first() {
1194 infer_expr_type(arg, schema_info)
1195 } else {
1196 DataType::LargeBinary
1197 }
1198 }
1199 "tostring" | "trim" | "ltrim" | "rtrim" | "tolower" | "toupper" | "left" | "right"
1200 | "substring" | "replace" | "reverse" | "type" => DataType::Utf8,
1201 "tointeger" | "toint" | "size" | "length" | "id" => DataType::Int64,
1202 "tofloat" => DataType::Float64,
1203 "stdev" | "stddev" | "stdevp" | "stddevp" | "variance" | "variancep" => {
1205 DataType::Float64
1206 }
1207 "toboolean" => DataType::Boolean,
1208 _ => DataType::LargeBinary,
1209 },
1210 _ => DataType::LargeBinary,
1211 }
1212}
1213
1214fn numeric_promotion(left: &DataType, right: &DataType) -> DataType {
1216 match (left, right) {
1217 (DataType::Float64, _) | (_, DataType::Float64) => DataType::Float64,
1218 (DataType::Float32, _) | (_, DataType::Float32) => DataType::Float64,
1219 (DataType::Int64, _) | (_, DataType::Int64) => DataType::Int64,
1220 (DataType::Int32, _) | (_, DataType::Int32) => DataType::Int64,
1221 _ => DataType::Int64,
1222 }
1223}
1224
1225pub(crate) fn evaluate_simple_expr(
1234 expr: &Expr,
1235 params: &HashMap<String, Value>,
1236 outer_values: &HashMap<String, Value>,
1237) -> DFResult<Value> {
1238 match expr {
1239 Expr::Literal(lit) => Ok(lit.to_value()),
1240
1241 Expr::Parameter(name) => params.get(name).cloned().ok_or_else(|| {
1242 datafusion::error::DataFusionError::Execution(format!("Parameter '{}' not found", name))
1243 }),
1244
1245 Expr::Variable(name) => {
1246 let vid_key = format!("{}._vid", name);
1248 if let Some(val) = outer_values.get(&vid_key) {
1249 return Ok(val.clone());
1250 }
1251 outer_values.get(name).cloned().ok_or_else(|| {
1253 datafusion::error::DataFusionError::Execution(format!(
1254 "Variable '{}' not found in scope (looked for '{}' and '{}')",
1255 name, vid_key, name
1256 ))
1257 })
1258 }
1259
1260 Expr::List(items) => {
1261 let values: Vec<Value> = items
1262 .iter()
1263 .map(|item| evaluate_simple_expr(item, params, outer_values))
1264 .collect::<DFResult<_>>()?;
1265 Ok(Value::List(values))
1266 }
1267
1268 Expr::Map(entries) => {
1269 let map: HashMap<String, Value> = entries
1270 .iter()
1271 .map(|(k, v)| {
1272 evaluate_simple_expr(v, params, outer_values).map(|val| (k.clone(), val))
1273 })
1274 .collect::<DFResult<_>>()?;
1275 Ok(Value::Map(map))
1276 }
1277
1278 _ => Err(datafusion::error::DataFusionError::Execution(format!(
1279 "Unsupported expression type for procedure argument: {:?}",
1280 expr
1281 ))),
1282 }
1283}
1284
1285pub fn merged_edge_schema_props(
1293 uni_schema: &UniSchema,
1294 edge_type_ids: &[u32],
1295) -> HashMap<String, uni_common::core::schema::PropertyMeta> {
1296 let mut merged: HashMap<String, uni_common::core::schema::PropertyMeta> = HashMap::new();
1297 let mut sorted_ids = edge_type_ids.to_vec();
1298 sorted_ids.sort_unstable();
1299
1300 for edge_type_id in sorted_ids {
1301 if let Some(edge_type_name) = uni_schema.edge_type_name_by_id_unified(edge_type_id)
1302 && let Some(props) = uni_schema.properties.get(edge_type_name.as_str())
1303 {
1304 for (prop_name, meta) in props {
1305 match merged.get_mut(prop_name) {
1306 Some(existing) => {
1307 if existing.r#type != meta.r#type {
1308 existing.r#type = uni_common::core::schema::DataType::CypherValue;
1309 }
1310 existing.nullable |= meta.nullable;
1311 }
1312 None => {
1313 merged.insert(prop_name.clone(), meta.clone());
1314 }
1315 }
1316 }
1317 }
1318 }
1319
1320 merged
1321}
1322
1323#[derive(Clone, Debug, PartialEq, Eq, Hash)]
1331pub(crate) enum ScalarKey {
1332 Null,
1333 Bool(bool),
1334 Int64(i64),
1335 Utf8(String),
1336 Binary(Vec<u8>),
1337}
1338
1339pub(crate) fn extract_scalar_key(
1345 batch: &RecordBatch,
1346 key_indices: &[usize],
1347 row_idx: usize,
1348) -> Vec<ScalarKey> {
1349 use arrow::array::Array;
1350 key_indices
1351 .iter()
1352 .map(|&col_idx| {
1353 let col = batch.column(col_idx);
1354 if col.is_null(row_idx) {
1355 return ScalarKey::Null;
1356 }
1357 match col.data_type() {
1358 arrow_schema::DataType::Boolean => {
1359 let arr = col
1360 .as_any()
1361 .downcast_ref::<arrow_array::BooleanArray>()
1362 .unwrap();
1363 ScalarKey::Bool(arr.value(row_idx))
1364 }
1365 arrow_schema::DataType::Int64 => {
1366 let arr = col
1367 .as_any()
1368 .downcast_ref::<arrow_array::Int64Array>()
1369 .unwrap();
1370 ScalarKey::Int64(arr.value(row_idx))
1371 }
1372 arrow_schema::DataType::Utf8 => {
1373 let arr = col
1374 .as_any()
1375 .downcast_ref::<arrow_array::StringArray>()
1376 .unwrap();
1377 ScalarKey::Utf8(arr.value(row_idx).to_string())
1378 }
1379 arrow_schema::DataType::LargeBinary => {
1380 let arr = col
1381 .as_any()
1382 .downcast_ref::<arrow_array::LargeBinaryArray>()
1383 .unwrap();
1384 ScalarKey::Binary(arr.value(row_idx).to_vec())
1385 }
1386 arrow_schema::DataType::Float64 => {
1387 let arr = col
1389 .as_any()
1390 .downcast_ref::<arrow_array::Float64Array>()
1391 .unwrap();
1392 ScalarKey::Int64(arr.value(row_idx).to_bits() as i64)
1393 }
1394 arrow_schema::DataType::LargeUtf8 => {
1395 let arr = col
1396 .as_any()
1397 .downcast_ref::<arrow_array::LargeStringArray>()
1398 .unwrap();
1399 ScalarKey::Utf8(arr.value(row_idx).to_string())
1400 }
1401 _ => {
1402 let formatter = arrow::util::display::ArrayFormatter::try_new(
1404 col.as_ref(),
1405 &arrow::util::display::FormatOptions::default(),
1406 );
1407 match formatter {
1408 Ok(f) => ScalarKey::Utf8(f.value(row_idx).to_string()),
1409 Err(_) => ScalarKey::Utf8(format!("opaque@{row_idx}")),
1410 }
1411 }
1412 }
1413 })
1414 .collect()
1415}
1416
1417pub use uni_query_functions::similar_to::calculate_score;
1421
1422#[cfg(test)]
1423mod tests {
1424 use super::*;
1425 use arrow_array::{LargeBinaryArray, UInt64Array};
1426 use arrow_schema::Schema;
1427
1428 #[test]
1429 fn test_extract_row_params_loses_uint64_to_int() {
1430 let schema = Arc::new(Schema::new(vec![Field::new(
1431 "n._vid",
1432 DataType::UInt64,
1433 true,
1434 )]));
1435 let batch = RecordBatch::try_new(schema, vec![Arc::new(UInt64Array::from(vec![Some(7)]))])
1436 .expect("batch should be valid");
1437
1438 let params = extract_row_params(&batch, 0);
1439 assert_eq!(params.get("n._vid"), Some(&Value::Int(7)));
1440 }
1441
1442 #[test]
1443 fn test_extract_row_params_decodes_largebinary_to_map() {
1444 let encoded = uni_common::cypher_value_codec::encode(&Value::Map(HashMap::new()));
1445 let schema = Arc::new(Schema::new(vec![Field::new(
1446 "m._all_props",
1447 DataType::LargeBinary,
1448 true,
1449 )]));
1450 let batch = RecordBatch::try_new(
1451 schema,
1452 vec![Arc::new(LargeBinaryArray::from(vec![Some(
1453 encoded.as_slice(),
1454 )]))],
1455 )
1456 .expect("batch should be valid");
1457
1458 let params = extract_row_params(&batch, 0);
1459 assert_eq!(
1460 params.get("m._all_props"),
1461 Some(&Value::Map(HashMap::new()))
1462 );
1463 }
1464}