1use arrow_array::{ArrayRef, RecordBatch};
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11use datafusion::arrow::array::Array;
12use datafusion::common::Result as DFResult;
13use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
14use datafusion::physical_plan::PlanProperties;
15use datafusion::prelude::SessionContext;
16use futures::TryStreamExt;
17use parking_lot::RwLock;
18use std::collections::HashMap;
19use std::sync::Arc;
20use uni_common::Value;
21use uni_common::core::schema::Schema as UniSchema;
22use uni_cypher::ast::{BinaryOp, CypherLiteral, Expr};
23use uni_store::storage::manager::StorageManager;
24
25use super::GraphExecutionContext;
26use super::procedure_call::map_yield_to_canonical;
27use super::unwind::arrow_to_json_value;
28use crate::query::df_graph::MutationContext;
29use crate::query::df_planner::HybridPhysicalPlanner;
30use crate::query::planner::LogicalPlan;
31
32pub fn arrow_err(e: arrow::error::ArrowError) -> datafusion::error::DataFusionError {
38 datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
39}
40
41pub fn compute_plan_properties(schema: SchemaRef) -> Arc<PlanProperties> {
48 Arc::new(PlanProperties::new(
49 EquivalenceProperties::new(schema),
50 Partitioning::UnknownPartitioning(1),
51 datafusion::physical_plan::execution_plan::EmissionType::Incremental,
52 datafusion::physical_plan::execution_plan::Boundedness::Bounded,
53 ))
54}
55
56pub fn labels_data_type() -> DataType {
62 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)))
63}
64
65pub fn column_as_vid_array(
76 col: &dyn arrow_array::Array,
77) -> datafusion::error::Result<std::borrow::Cow<'_, arrow_array::UInt64Array>> {
78 use arrow_array::{Int64Array, StructArray, UInt64Array};
79 use arrow_schema::DataType;
80
81 if let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() {
82 return Ok(std::borrow::Cow::Borrowed(arr));
83 }
84
85 if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
86 let cast: UInt64Array = arr.iter().map(|v| v.map(|i| i as u64)).collect();
87 return Ok(std::borrow::Cow::Owned(cast));
88 }
89
90 if let Some(arr) = col.as_any().downcast_ref::<StructArray>()
93 && let DataType::Struct(fields) = arr.data_type()
94 && let Some((vid_idx, _)) = fields.find("_vid")
95 {
96 return column_as_vid_array(arr.column(vid_idx).as_ref());
97 }
98
99 if let Some(arr) = col.as_any().downcast_ref::<arrow_array::LargeBinaryArray>() {
103 let vids = vids_from_large_binary(arr);
104 return Ok(std::borrow::Cow::Owned(vids));
105 }
106
107 if *col.data_type() == DataType::Null {
109 let vids: UInt64Array = (0..col.len()).map(|_| None::<u64>).collect();
110 return Ok(std::borrow::Cow::Owned(vids));
111 }
112
113 Err(datafusion::error::DataFusionError::Execution(format!(
114 "VID column has type {:?}, expected UInt64 or Int64",
115 col.data_type()
116 )))
117}
118
119fn extract_vid_from_value(val: &Value) -> Option<u64> {
124 match val {
125 Value::Node(node) => Some(node.vid.as_u64()),
126 Value::Map(map) => {
127 if let Some(Value::Int(vid)) = map.get("_vid") {
135 return Some(*vid as u64);
136 }
137 if let Some(Value::String(id_str)) = map.get("_id") {
139 return id_str
140 .strip_prefix("Vid(")
141 .and_then(|s| s.strip_suffix(')'))
142 .unwrap_or(id_str)
143 .parse::<u64>()
144 .ok();
145 }
146 if let Some(Value::Int(id)) = map.get("_id") {
147 return Some(*id as u64);
148 }
149 None
150 }
151 _ => None,
152 }
153}
154
155fn vids_from_large_binary(arr: &arrow_array::LargeBinaryArray) -> arrow_array::UInt64Array {
160 use uni_common::cypher_value_codec;
161
162 (0..arr.len())
163 .map(|i| {
164 if arr.is_null(i) {
165 return None;
166 }
167 cypher_value_codec::decode(arr.value(i))
168 .ok()
169 .as_ref()
170 .and_then(extract_vid_from_value)
171 })
172 .collect()
173}
174
175pub fn extract_vids_from_cypher_value_column(col: &dyn Array) -> DFResult<arrow_array::ArrayRef> {
181 let binary_col = col
182 .as_any()
183 .downcast_ref::<arrow_array::LargeBinaryArray>()
184 .ok_or_else(|| {
185 datafusion::error::DataFusionError::Execution(
186 "extract_vids_from_cypher_value_column: expected LargeBinary column".to_string(),
187 )
188 })?;
189 Ok(Arc::new(vids_from_large_binary(binary_col)) as arrow_array::ArrayRef)
190}
191
192pub(crate) fn extract_column_value<T: arrow_array::Array + 'static, R>(
198 batch: &RecordBatch,
199 col_name: &str,
200 row_idx: usize,
201 extract_fn: impl FnOnce(&T, usize) -> R,
202) -> Option<R> {
203 let (idx, _) = batch.schema().column_with_name(col_name)?;
204 let col = batch.column(idx);
205 let arr = col.as_any().downcast_ref::<T>()?;
206 if arr.is_valid(row_idx) {
207 Some(extract_fn(arr, row_idx))
208 } else {
209 None
210 }
211}
212
213pub fn node_struct_fields() -> arrow_schema::Fields {
218 arrow_schema::Fields::from(vec![
219 Field::new("_vid", DataType::UInt64, false),
220 Field::new("_labels", labels_data_type(), true),
221 Field::new("properties", DataType::LargeBinary, true),
222 ])
223}
224
225pub fn edge_struct_fields() -> arrow_schema::Fields {
230 arrow_schema::Fields::from(vec![
231 Field::new("_eid", DataType::UInt64, false),
232 Field::new("_type_name", DataType::Utf8, false),
233 Field::new("_src", DataType::UInt64, false),
234 Field::new("_dst", DataType::UInt64, false),
235 Field::new("properties", DataType::LargeBinary, true),
236 ])
237}
238
239pub fn encode_props_to_cv(props: &std::collections::HashMap<String, uni_common::Value>) -> Vec<u8> {
245 let val = uni_common::Value::Map(props.clone());
246 uni_common::cypher_value_codec::encode(&val)
247}
248
249pub fn build_edge_list_field(step_var: &str) -> Field {
254 let edge_item = Field::new("item", DataType::Struct(edge_struct_fields()), true);
255 Field::new(step_var, DataType::List(Arc::new(edge_item)), true)
257}
258
259pub fn build_path_struct_field(path_var: &str) -> Field {
263 let node_item = Field::new("item", DataType::Struct(node_struct_fields()), true);
264 let nodes_field = Field::new("nodes", DataType::List(Arc::new(node_item)), true);
265
266 let edge_item = Field::new("item", DataType::Struct(edge_struct_fields()), true);
267 let relationships_field =
268 Field::new("relationships", DataType::List(Arc::new(edge_item)), true);
269
270 Field::new(
271 path_var,
272 DataType::Struct(arrow_schema::Fields::from(vec![
273 nodes_field,
274 relationships_field,
275 ])),
276 true,
277 )
278}
279
280pub fn extend_schema_with_path(input_schema: SchemaRef, path_variable: &str) -> SchemaRef {
285 let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
286 fields.push(Arc::new(build_path_struct_field(path_variable)));
287 Arc::new(Schema::new(fields))
288}
289
290pub fn build_path_struct_array(
296 nodes_array: ArrayRef,
297 rels_array: ArrayRef,
298 path_validity: Vec<bool>,
299) -> DFResult<arrow_array::StructArray> {
300 Ok(arrow_array::StructArray::try_new(
301 arrow_schema::Fields::from(vec![
302 Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true)),
303 Arc::new(Field::new(
304 "relationships",
305 rels_array.data_type().clone(),
306 true,
307 )),
308 ]),
309 vec![nodes_array, rels_array],
310 Some(arrow::buffer::NullBuffer::from(path_validity)),
311 )?)
312}
313
314pub fn new_edge_list_builder()
320-> arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder> {
321 use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, StructBuilder, UInt64Builder};
322 arrow_array::builder::ListBuilder::new(StructBuilder::new(
323 edge_struct_fields(),
324 vec![
325 Box::new(UInt64Builder::new()),
326 Box::new(StringBuilder::new()),
327 Box::new(UInt64Builder::new()),
328 Box::new(UInt64Builder::new()),
329 Box::new(LargeBinaryBuilder::new()),
330 ],
331 ))
332}
333
334pub fn new_node_list_builder()
339-> arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder> {
340 use arrow_array::builder::{
341 LargeBinaryBuilder, ListBuilder, StringBuilder, StructBuilder, UInt64Builder,
342 };
343 arrow_array::builder::ListBuilder::new(StructBuilder::new(
344 node_struct_fields(),
345 vec![
346 Box::new(UInt64Builder::new()),
347 Box::new(ListBuilder::new(StringBuilder::new())),
348 Box::new(LargeBinaryBuilder::new()),
349 ],
350 ))
351}
352
353pub fn append_edge_to_struct(
359 struct_builder: &mut arrow_array::builder::StructBuilder,
360 eid: uni_common::core::id::Eid,
361 type_name: &str,
362 src_vid: u64,
363 dst_vid: u64,
364 query_ctx: &uni_store::runtime::context::QueryContext,
365) {
366 use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, UInt64Builder};
367 use uni_store::runtime::l0_visibility;
368
369 struct_builder
370 .field_builder::<UInt64Builder>(0)
371 .unwrap()
372 .append_value(eid.as_u64());
373 struct_builder
374 .field_builder::<StringBuilder>(1)
375 .unwrap()
376 .append_value(type_name);
377 struct_builder
378 .field_builder::<UInt64Builder>(2)
379 .unwrap()
380 .append_value(src_vid);
381 struct_builder
382 .field_builder::<UInt64Builder>(3)
383 .unwrap()
384 .append_value(dst_vid);
385 let props_builder = struct_builder
386 .field_builder::<LargeBinaryBuilder>(4)
387 .unwrap();
388 if let Some(props) = l0_visibility::get_edge_properties(eid, query_ctx) {
389 let cv_bytes = encode_props_to_cv(&props);
390 props_builder.append_value(&cv_bytes);
391 } else {
392 props_builder.append_null();
393 }
394 struct_builder.append(true);
395}
396
397fn append_null_edge_struct(struct_builder: &mut arrow_array::builder::StructBuilder) {
402 use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, UInt64Builder};
403
404 struct_builder
405 .field_builder::<UInt64Builder>(0)
406 .unwrap()
407 .append_value(0);
408 struct_builder
409 .field_builder::<StringBuilder>(1)
410 .unwrap()
411 .append_value("");
412 struct_builder
413 .field_builder::<UInt64Builder>(2)
414 .unwrap()
415 .append_value(0);
416 struct_builder
417 .field_builder::<UInt64Builder>(3)
418 .unwrap()
419 .append_value(0);
420 struct_builder
421 .field_builder::<LargeBinaryBuilder>(4)
422 .unwrap()
423 .append_null();
424 struct_builder.append(false);
425}
426
427pub fn append_edge_to_struct_optional(
433 struct_builder: &mut arrow_array::builder::StructBuilder,
434 eid: Option<uni_common::core::id::Eid>,
435 src_vid: u64,
436 dst_vid: u64,
437 batch_type_name: Option<String>,
438 query_ctx: &uni_store::runtime::context::QueryContext,
439) {
440 match eid {
441 Some(e) => {
442 use uni_store::runtime::l0_visibility;
443 let type_name = batch_type_name
444 .or_else(|| l0_visibility::get_edge_type(e, query_ctx))
445 .unwrap_or_default();
446 append_edge_to_struct(struct_builder, e, &type_name, src_vid, dst_vid, query_ctx);
447 }
448 None => append_null_edge_struct(struct_builder),
449 }
450}
451
452pub fn append_node_to_struct(
458 struct_builder: &mut arrow_array::builder::StructBuilder,
459 vid: uni_common::core::id::Vid,
460 query_ctx: &uni_store::runtime::context::QueryContext,
461) {
462 use arrow_array::builder::{LargeBinaryBuilder, ListBuilder, StringBuilder, UInt64Builder};
463 use uni_store::runtime::l0_visibility;
464
465 struct_builder
466 .field_builder::<UInt64Builder>(0)
467 .unwrap()
468 .append_value(vid.as_u64());
469 let labels = l0_visibility::get_vertex_labels(vid, query_ctx);
470 let labels_builder = struct_builder
471 .field_builder::<ListBuilder<StringBuilder>>(1)
472 .unwrap();
473 let values = labels_builder.values();
474 for lbl in &labels {
475 values.append_value(lbl);
476 }
477 labels_builder.append(true);
478 let props_builder = struct_builder
479 .field_builder::<LargeBinaryBuilder>(2)
480 .unwrap();
481 if let Some(props) = l0_visibility::get_vertex_properties(vid, query_ctx) {
482 let cv_bytes = encode_props_to_cv(&props);
483 props_builder.append_value(&cv_bytes);
484 } else {
485 props_builder.append_null();
486 }
487 struct_builder.append(true);
488}
489
490fn append_null_node_struct(struct_builder: &mut arrow_array::builder::StructBuilder) {
495 use arrow_array::builder::{LargeBinaryBuilder, ListBuilder, StringBuilder, UInt64Builder};
496
497 struct_builder
498 .field_builder::<UInt64Builder>(0)
499 .unwrap()
500 .append_value(0);
501 struct_builder
502 .field_builder::<ListBuilder<StringBuilder>>(1)
503 .unwrap()
504 .append(true);
505 struct_builder
506 .field_builder::<LargeBinaryBuilder>(2)
507 .unwrap()
508 .append_null();
509 struct_builder.append(false);
510}
511
512pub fn append_node_to_struct_optional(
517 struct_builder: &mut arrow_array::builder::StructBuilder,
518 vid: Option<uni_common::core::id::Vid>,
519 query_ctx: &uni_store::runtime::context::QueryContext,
520) {
521 match vid {
522 Some(v) => append_node_to_struct(struct_builder, v, query_ctx),
523 None => append_null_node_struct(struct_builder),
524 }
525}
526
527pub fn large_list_of_cv_to_cv_array(
541 list: &datafusion::arrow::array::LargeListArray,
542) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
543 use datafusion::arrow::array::{LargeBinaryArray, LargeBinaryBuilder};
544
545 let values = list.values();
546 let binary_values = values
547 .as_any()
548 .downcast_ref::<LargeBinaryArray>()
549 .ok_or_else(|| {
550 datafusion::error::DataFusionError::Execution(
551 "large_list_of_cv_to_cv_array: inner values must be LargeBinaryArray".to_string(),
552 )
553 })?;
554
555 let mut builder = LargeBinaryBuilder::new();
556
557 for row_idx in 0..list.len() {
558 if list.is_null(row_idx) {
559 builder.append_null();
560 continue;
561 }
562
563 let start = list.offsets()[row_idx] as usize;
564 let end = list.offsets()[row_idx + 1] as usize;
565
566 let mut json_elements = Vec::with_capacity(end - start);
567 for elem_idx in start..end {
568 if binary_values.is_null(elem_idx) {
569 json_elements.push(serde_json::Value::Null);
570 } else {
571 let blob = binary_values.value(elem_idx);
572 match uni_common::cypher_value_codec::decode(blob) {
573 Ok(uni_val) => {
574 let json_val: serde_json::Value = uni_val.into();
575 json_elements.push(json_val);
576 }
577 Err(_) => json_elements.push(serde_json::Value::Null),
578 }
579 }
580 }
581
582 let uni_val: uni_common::Value = serde_json::Value::Array(json_elements).into();
583 let bytes = uni_common::cypher_value_codec::encode(&uni_val);
584 builder.append_value(&bytes);
585 }
586
587 Ok(Arc::new(builder.finish()))
588}
589
590fn arrow_element_to_json(
595 col: &dyn datafusion::arrow::array::Array,
596 idx: usize,
597) -> serde_json::Value {
598 use datafusion::arrow::array::{
599 BooleanArray, Float64Array, Int64Array, StringArray, UInt64Array,
600 };
601
602 if col.is_null(idx) {
603 return serde_json::Value::Null;
604 }
605
606 if let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() {
607 serde_json::Value::Number(serde_json::Number::from(arr.value(idx)))
608 } else if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
609 serde_json::Value::Number(serde_json::Number::from(arr.value(idx)))
610 } else if let Some(arr) = col.as_any().downcast_ref::<Float64Array>() {
611 serde_json::Number::from_f64(arr.value(idx))
612 .map(serde_json::Value::Number)
613 .unwrap_or(serde_json::Value::Null)
614 } else if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
615 serde_json::Value::String(arr.value(idx).to_string())
616 } else if let Some(arr) = col.as_any().downcast_ref::<BooleanArray>() {
617 serde_json::Value::Bool(arr.value(idx))
618 } else if let Some(arr) = col.as_any().downcast_ref::<arrow_array::LargeBinaryArray>() {
619 uni_common::cypher_value_codec::decode(arr.value(idx))
620 .map(|v| v.into())
621 .unwrap_or(serde_json::Value::Null)
622 } else {
623 serde_json::Value::Null
624 }
625}
626
627pub fn typed_large_list_to_cv_array(
642 list: &datafusion::arrow::array::LargeListArray,
643) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
644 use datafusion::arrow::array::{LargeBinaryBuilder, StructArray};
645
646 let values = list.values();
647
648 if values.data_type() == &DataType::LargeBinary {
650 return large_list_of_cv_to_cv_array(list);
651 }
652
653 let elem_to_json: Box<dyn Fn(usize) -> serde_json::Value> = match values.data_type() {
656 DataType::UInt64
657 | DataType::Int64
658 | DataType::Float64
659 | DataType::Utf8
660 | DataType::Boolean => {
661 let values = values.clone();
662 Box::new(move |idx| arrow_element_to_json(values.as_ref(), idx))
663 }
664 DataType::Struct(_) => {
665 let typed = values
666 .as_any()
667 .downcast_ref::<StructArray>()
668 .ok_or_else(|| {
669 datafusion::error::DataFusionError::Execution(
670 "Expected StructArray".to_string(),
671 )
672 })?;
673 let fields: Vec<_> = typed.fields().iter().cloned().collect();
674 let columns: Vec<_> = (0..typed.num_columns())
675 .map(|i| typed.column(i).clone())
676 .collect();
677 let nulls = typed.nulls().cloned();
678 Box::new(move |idx| {
679 if nulls.as_ref().is_some_and(|n| n.is_null(idx)) {
680 return serde_json::Value::Null;
681 }
682 let mut map = serde_json::Map::new();
683 for (field_idx, field) in fields.iter().enumerate() {
684 let value = arrow_element_to_json(columns[field_idx].as_ref(), idx);
685 map.insert(field.name().clone(), value);
686 }
687 serde_json::Value::Object(map)
688 })
689 }
690 other => {
691 return Err(datafusion::error::DataFusionError::Execution(format!(
692 "Unsupported element type for typed_large_list_to_cv_array: {:?}",
693 other
694 )));
695 }
696 };
697
698 let mut builder = LargeBinaryBuilder::new();
699
700 for row_idx in 0..list.len() {
701 if list.is_null(row_idx) {
702 builder.append_null();
703 continue;
704 }
705
706 let start = list.offsets()[row_idx] as usize;
707 let end = list.offsets()[row_idx + 1] as usize;
708 let json_elements: Vec<serde_json::Value> = (start..end).map(&elem_to_json).collect();
709
710 let uni_val: uni_common::Value = serde_json::Value::Array(json_elements).into();
711 let bytes = uni_common::cypher_value_codec::encode(&uni_val);
712 builder.append_value(&bytes);
713 }
714
715 Ok(Arc::new(builder.finish()))
716}
717
718pub fn cv_array_to_large_list(
726 array: &dyn datafusion::arrow::array::Array,
727 element_type: &DataType,
728) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
729 use datafusion::arrow::array::LargeBinaryArray;
730 use datafusion::arrow::buffer::{OffsetBuffer, ScalarBuffer};
731
732 let binary_arr = array
733 .as_any()
734 .downcast_ref::<LargeBinaryArray>()
735 .ok_or_else(|| {
736 datafusion::error::DataFusionError::Execution(
737 "cv_array_to_large_list: expected LargeBinaryArray".to_string(),
738 )
739 })?;
740
741 let num_rows = binary_arr.len();
743 let mut all_elements: Vec<Vec<serde_json::Value>> = Vec::with_capacity(num_rows);
744 let mut nulls = Vec::with_capacity(num_rows);
745
746 for i in 0..num_rows {
747 if binary_arr.is_null(i) {
748 all_elements.push(Vec::new());
749 nulls.push(false);
750 continue;
751 }
752
753 let blob = binary_arr.value(i);
754 let uni_val = match uni_common::cypher_value_codec::decode(blob) {
755 Ok(v) => v,
756 Err(_) => {
757 all_elements.push(Vec::new());
758 nulls.push(false);
759 continue;
760 }
761 };
762 let json_val_decoded: serde_json::Value = uni_val.into();
763
764 match json_val_decoded {
765 serde_json::Value::Array(elements) => {
766 all_elements.push(elements);
767 nulls.push(true);
768 }
769 _ => {
770 all_elements.push(Vec::new());
771 nulls.push(true);
772 }
773 }
774 }
775
776 let mut offsets: Vec<i64> = Vec::with_capacity(num_rows + 1);
778 offsets.push(0);
779
780 let values_array: Arc<dyn datafusion::arrow::array::Array> = match element_type {
781 DataType::Int64 => {
782 let mut builder = datafusion::arrow::array::builder::Int64Builder::new();
783 for elems in &all_elements {
784 for elem in elems {
785 if let serde_json::Value::Number(n) = elem {
786 if let Some(i) = n.as_i64() {
787 builder.append_value(i);
788 } else if let Some(f) = n.as_f64() {
789 builder.append_value(f as i64);
790 } else {
791 builder.append_null();
792 }
793 } else {
794 builder.append_null();
795 }
796 }
797 offsets.push(offsets.last().unwrap() + elems.len() as i64);
798 }
799 Arc::new(builder.finish())
800 }
801 DataType::Float64 => {
802 let mut builder = datafusion::arrow::array::builder::Float64Builder::new();
803 for elems in &all_elements {
804 for elem in elems {
805 if let serde_json::Value::Number(n) = elem
806 && let Some(f) = n.as_f64()
807 {
808 builder.append_value(f);
809 } else {
810 builder.append_null();
811 }
812 }
813 offsets.push(offsets.last().unwrap() + elems.len() as i64);
814 }
815 Arc::new(builder.finish())
816 }
817 DataType::Utf8 | DataType::LargeUtf8 => {
818 let mut builder = datafusion::arrow::array::builder::StringBuilder::new();
819 for elems in &all_elements {
820 for elem in elems {
821 match elem {
822 serde_json::Value::String(s) => builder.append_value(s),
823 serde_json::Value::Null => builder.append_null(),
824 other => builder.append_value(other.to_string()),
825 }
826 }
827 offsets.push(offsets.last().unwrap() + elems.len() as i64);
828 }
829 Arc::new(builder.finish())
830 }
831 DataType::Boolean => {
832 let mut builder = datafusion::arrow::array::builder::BooleanBuilder::new();
833 for elems in &all_elements {
834 for elem in elems {
835 if let serde_json::Value::Bool(b) = elem {
836 builder.append_value(*b);
837 } else {
838 builder.append_null();
839 }
840 }
841 offsets.push(offsets.last().unwrap() + elems.len() as i64);
842 }
843 Arc::new(builder.finish())
844 }
845 _ => {
847 let mut builder = datafusion::arrow::array::builder::LargeBinaryBuilder::new();
848 for elems in &all_elements {
849 for elem in elems {
850 let uni_val: uni_common::Value = elem.clone().into();
851 let bytes = uni_common::cypher_value_codec::encode(&uni_val);
852 builder.append_value(&bytes);
853 }
854 offsets.push(offsets.last().unwrap() + elems.len() as i64);
855 }
856 Arc::new(builder.finish())
857 }
858 };
859
860 let field = Arc::new(Field::new("item", element_type.clone(), true));
861 let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets));
862 let null_buffer = datafusion::arrow::buffer::NullBuffer::from(nulls);
863
864 let large_list = datafusion::arrow::array::LargeListArray::new(
865 field,
866 offset_buffer,
867 values_array,
868 Some(null_buffer),
869 );
870
871 Ok(Arc::new(large_list))
872}
873
874pub async fn collect_all_partitions(
879 plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
880 task_ctx: Arc<datafusion::execution::TaskContext>,
881) -> DFResult<Vec<RecordBatch>> {
882 let partition_count = plan.properties().output_partitioning().partition_count();
883
884 let mut all_batches = Vec::new();
885 for partition in 0..partition_count {
886 let stream = plan.execute(partition, task_ctx.clone())?;
887 let batches: Vec<RecordBatch> = stream.try_collect().await?;
888 all_batches.extend(batches);
889 }
890 Ok(all_batches)
891}
892
893#[expect(
900 clippy::too_many_arguments,
901 reason = "Threading mutation_ctx for CALL subquery writes"
902)]
903pub async fn execute_subplan(
904 plan: &LogicalPlan,
905 params: &HashMap<String, Value>,
906 outer_values: &HashMap<String, Value>,
907 graph_ctx: &Arc<GraphExecutionContext>,
908 session_ctx: &Arc<RwLock<SessionContext>>,
909 storage: &Arc<StorageManager>,
910 schema_info: &Arc<UniSchema>,
911 mutation_ctx: Option<&Arc<MutationContext>>,
912) -> DFResult<Vec<RecordBatch>> {
913 execute_subplan_with_outer_vars(
914 plan,
915 params,
916 outer_values,
917 graph_ctx,
918 session_ctx,
919 storage,
920 schema_info,
921 &std::collections::HashSet::new(),
922 mutation_ctx,
923 )
924 .await
925}
926
927#[expect(
928 clippy::too_many_arguments,
929 reason = "Threading outer_entity_vars for nested EXISTS and mutation_ctx for CALL writes"
930)]
931pub async fn execute_subplan_with_outer_vars(
937 plan: &LogicalPlan,
938 params: &HashMap<String, Value>,
939 outer_values: &HashMap<String, Value>,
940 graph_ctx: &Arc<GraphExecutionContext>,
941 session_ctx: &Arc<RwLock<SessionContext>>,
942 storage: &Arc<StorageManager>,
943 schema_info: &Arc<UniSchema>,
944 outer_entity_vars: &std::collections::HashSet<String>,
945 mutation_ctx: Option<&Arc<MutationContext>>,
946) -> DFResult<Vec<RecordBatch>> {
947 let mut planner = HybridPhysicalPlanner::with_l0_context(
948 session_ctx.clone(),
949 storage.clone(),
950 graph_ctx.l0_context().clone(),
951 graph_ctx.property_manager().clone(),
952 schema_info.clone(),
953 params.clone(),
954 outer_values.clone(),
955 );
956 planner.set_outer_entity_vars(outer_entity_vars.clone());
957
958 if let Some(registry) = graph_ctx.algo_registry() {
961 planner = planner.with_algo_registry(registry.clone());
962 }
963 if let Some(registry) = graph_ctx.procedure_registry() {
964 planner = planner.with_procedure_registry(registry.clone());
965 }
966 if let Some(runtime) = graph_ctx.xervo_runtime() {
967 planner = planner.with_xervo_runtime(runtime.clone());
968 }
969
970 if let Some(mc) = mutation_ctx {
974 planner = planner.with_mutation_context(mc.clone());
975 }
976
977 let execution_plan = planner.plan(plan).map_err(|e| {
978 datafusion::error::DataFusionError::Execution(format!("Sub-plan error: {e}"))
979 })?;
980
981 let task_ctx = session_ctx.read().task_ctx();
982 let all_batches = collect_all_partitions(&execution_plan, task_ctx).await?;
983
984 Ok(all_batches)
985}
986
987pub fn extract_row_params(batch: &RecordBatch, row_idx: usize) -> HashMap<String, Value> {
991 let schema = batch.schema();
992 (0..batch.num_columns())
993 .map(|col_idx| {
994 let col_name = schema.field(col_idx).name().clone();
995 let val = arrow_to_json_value(batch.column(col_idx).as_ref(), row_idx);
996 (col_name, val)
997 })
998 .collect()
999}
1000
1001fn infer_procedure_call_schema(
1008 procedure_name: &str,
1009 yield_items: &[(String, Option<String>)],
1010 _schema_info: &UniSchema,
1011) -> SchemaRef {
1012 let infer_type = |name: &str| -> DataType {
1013 match procedure_name {
1014 "uni.schema.labels" => match name {
1015 "propertyCount" | "nodeCount" | "indexCount" => DataType::Int64,
1016 _ => DataType::Utf8,
1017 },
1018 "uni.schema.edgeTypes" | "uni.schema.relationshipTypes" => match name {
1019 "propertyCount" => DataType::Int64,
1020 _ => DataType::Utf8,
1021 },
1022 "uni.schema.constraints" => match name {
1023 "enabled" => DataType::Boolean,
1024 _ => DataType::Utf8,
1025 },
1026 "uni.schema.labelInfo" => match name {
1027 "nullable" | "indexed" | "unique" => DataType::Boolean,
1028 _ => DataType::Utf8,
1029 },
1030 n if super::procedure_call::is_node_yield_procedure_static(n) => {
1035 super::procedure_call::canonical_search_type(map_yield_to_canonical(name))
1036 }
1037 _ => DataType::Utf8,
1039 }
1040 };
1041
1042 let fields: Vec<Field> = yield_items
1043 .iter()
1044 .map(|(name, alias)| {
1045 let col_name = alias.as_ref().unwrap_or(name);
1046 Field::new(col_name, infer_type(name), true)
1047 })
1048 .collect();
1049
1050 Arc::new(Schema::new(fields))
1051}
1052
1053pub fn infer_logical_plan_schema(plan: &LogicalPlan, schema_info: &UniSchema) -> SchemaRef {
1060 if let LogicalPlan::Project { projections, .. } = plan {
1062 let fields: Vec<Field> = projections
1063 .iter()
1064 .map(|(expr, alias)| {
1065 let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
1066 let dt = infer_expr_type(expr, schema_info);
1067 Field::new(name, dt, true)
1068 })
1069 .collect();
1070 return Arc::new(Schema::new(fields));
1071 }
1072
1073 match plan {
1075 LogicalPlan::Sort { input, .. }
1076 | LogicalPlan::Limit { input, .. }
1077 | LogicalPlan::Filter { input, .. }
1078 | LogicalPlan::Distinct { input } => infer_logical_plan_schema(input, schema_info),
1079
1080 LogicalPlan::ProcedureCall {
1081 procedure_name,
1082 yield_items,
1083 ..
1084 } => infer_procedure_call_schema(procedure_name, yield_items, schema_info),
1085
1086 _ => {
1087 Arc::new(Schema::empty())
1089 }
1090 }
1091}
1092
1093fn infer_expr_type(expr: &Expr, schema_info: &UniSchema) -> DataType {
1095 match expr {
1096 Expr::Property(base, key) => {
1097 if let Expr::Variable(_) = base.as_ref() {
1098 for props in schema_info.properties.values() {
1100 if let Some(meta) = props.get(key.as_str()) {
1101 return meta.r#type.to_arrow();
1102 }
1103 }
1104 DataType::LargeBinary
1105 } else {
1106 DataType::LargeBinary
1107 }
1108 }
1109 Expr::BinaryOp { left, op, right } => match op {
1110 BinaryOp::Add | BinaryOp::Sub | BinaryOp::Mul | BinaryOp::Div | BinaryOp::Mod => {
1111 let lt = infer_expr_type(left, schema_info);
1112 let rt = infer_expr_type(right, schema_info);
1113 numeric_promotion(<, &rt)
1114 }
1115 BinaryOp::Eq
1116 | BinaryOp::NotEq
1117 | BinaryOp::Lt
1118 | BinaryOp::LtEq
1119 | BinaryOp::Gt
1120 | BinaryOp::GtEq
1121 | BinaryOp::And
1122 | BinaryOp::Or => DataType::Boolean,
1123 _ => DataType::LargeBinary,
1124 },
1125 Expr::Literal(lit) => match lit {
1126 CypherLiteral::Integer(_) => DataType::Int64,
1127 CypherLiteral::Float(_) => DataType::Float64,
1128 CypherLiteral::String(_) => DataType::Utf8,
1129 CypherLiteral::Bool(_) => DataType::Boolean,
1130 CypherLiteral::Null => DataType::Null,
1131 CypherLiteral::Bytes(_) => DataType::LargeBinary,
1132 },
1133 Expr::Variable(_) => DataType::LargeBinary,
1134 Expr::FunctionCall { name, args, .. } => match name.to_lowercase().as_str() {
1135 "count" => DataType::Int64,
1136 "sum" | "avg" => {
1137 if let Some(arg) = args.first() {
1138 let arg_type = infer_expr_type(arg, schema_info);
1139 if matches!(arg_type, DataType::Float32 | DataType::Float64) {
1140 DataType::Float64
1141 } else {
1142 DataType::Int64
1143 }
1144 } else {
1145 DataType::Int64
1146 }
1147 }
1148 "min" | "max" => {
1149 if let Some(arg) = args.first() {
1150 infer_expr_type(arg, schema_info)
1151 } else {
1152 DataType::LargeBinary
1153 }
1154 }
1155 "tostring" | "trim" | "ltrim" | "rtrim" | "tolower" | "toupper" | "left" | "right"
1156 | "substring" | "replace" | "reverse" | "type" => DataType::Utf8,
1157 "tointeger" | "toint" | "size" | "length" | "id" => DataType::Int64,
1158 "tofloat" => DataType::Float64,
1159 "toboolean" => DataType::Boolean,
1160 _ => DataType::LargeBinary,
1161 },
1162 _ => DataType::LargeBinary,
1163 }
1164}
1165
1166fn numeric_promotion(left: &DataType, right: &DataType) -> DataType {
1168 match (left, right) {
1169 (DataType::Float64, _) | (_, DataType::Float64) => DataType::Float64,
1170 (DataType::Float32, _) | (_, DataType::Float32) => DataType::Float64,
1171 (DataType::Int64, _) | (_, DataType::Int64) => DataType::Int64,
1172 (DataType::Int32, _) | (_, DataType::Int32) => DataType::Int64,
1173 _ => DataType::Int64,
1174 }
1175}
1176
1177pub(crate) fn evaluate_simple_expr(
1186 expr: &Expr,
1187 params: &HashMap<String, Value>,
1188 outer_values: &HashMap<String, Value>,
1189) -> DFResult<Value> {
1190 match expr {
1191 Expr::Literal(lit) => Ok(lit.to_value()),
1192
1193 Expr::Parameter(name) => params.get(name).cloned().ok_or_else(|| {
1194 datafusion::error::DataFusionError::Execution(format!("Parameter '{}' not found", name))
1195 }),
1196
1197 Expr::Variable(name) => {
1198 let vid_key = format!("{}._vid", name);
1200 if let Some(val) = outer_values.get(&vid_key) {
1201 return Ok(val.clone());
1202 }
1203 outer_values.get(name).cloned().ok_or_else(|| {
1205 datafusion::error::DataFusionError::Execution(format!(
1206 "Variable '{}' not found in scope (looked for '{}' and '{}')",
1207 name, vid_key, name
1208 ))
1209 })
1210 }
1211
1212 Expr::List(items) => {
1213 let values: Vec<Value> = items
1214 .iter()
1215 .map(|item| evaluate_simple_expr(item, params, outer_values))
1216 .collect::<DFResult<_>>()?;
1217 Ok(Value::List(values))
1218 }
1219
1220 Expr::Map(entries) => {
1221 let map: HashMap<String, Value> = entries
1222 .iter()
1223 .map(|(k, v)| {
1224 evaluate_simple_expr(v, params, outer_values).map(|val| (k.clone(), val))
1225 })
1226 .collect::<DFResult<_>>()?;
1227 Ok(Value::Map(map))
1228 }
1229
1230 _ => Err(datafusion::error::DataFusionError::Execution(format!(
1231 "Unsupported expression type for procedure argument: {:?}",
1232 expr
1233 ))),
1234 }
1235}
1236
1237pub fn merged_edge_schema_props(
1245 uni_schema: &UniSchema,
1246 edge_type_ids: &[u32],
1247) -> HashMap<String, uni_common::core::schema::PropertyMeta> {
1248 let mut merged: HashMap<String, uni_common::core::schema::PropertyMeta> = HashMap::new();
1249 let mut sorted_ids = edge_type_ids.to_vec();
1250 sorted_ids.sort_unstable();
1251
1252 for edge_type_id in sorted_ids {
1253 if let Some(edge_type_name) = uni_schema.edge_type_name_by_id_unified(edge_type_id)
1254 && let Some(props) = uni_schema.properties.get(edge_type_name.as_str())
1255 {
1256 for (prop_name, meta) in props {
1257 match merged.get_mut(prop_name) {
1258 Some(existing) => {
1259 if existing.r#type != meta.r#type {
1260 existing.r#type = uni_common::core::schema::DataType::CypherValue;
1261 }
1262 existing.nullable |= meta.nullable;
1263 }
1264 None => {
1265 merged.insert(prop_name.clone(), meta.clone());
1266 }
1267 }
1268 }
1269 }
1270 }
1271
1272 merged
1273}
1274
1275#[derive(Clone, Debug, PartialEq, Eq, Hash)]
1283pub(crate) enum ScalarKey {
1284 Null,
1285 Bool(bool),
1286 Int64(i64),
1287 Utf8(String),
1288 Binary(Vec<u8>),
1289}
1290
1291pub(crate) fn extract_scalar_key(
1297 batch: &RecordBatch,
1298 key_indices: &[usize],
1299 row_idx: usize,
1300) -> Vec<ScalarKey> {
1301 use arrow::array::Array;
1302 key_indices
1303 .iter()
1304 .map(|&col_idx| {
1305 let col = batch.column(col_idx);
1306 if col.is_null(row_idx) {
1307 return ScalarKey::Null;
1308 }
1309 match col.data_type() {
1310 arrow_schema::DataType::Boolean => {
1311 let arr = col
1312 .as_any()
1313 .downcast_ref::<arrow_array::BooleanArray>()
1314 .unwrap();
1315 ScalarKey::Bool(arr.value(row_idx))
1316 }
1317 arrow_schema::DataType::Int64 => {
1318 let arr = col
1319 .as_any()
1320 .downcast_ref::<arrow_array::Int64Array>()
1321 .unwrap();
1322 ScalarKey::Int64(arr.value(row_idx))
1323 }
1324 arrow_schema::DataType::Utf8 => {
1325 let arr = col
1326 .as_any()
1327 .downcast_ref::<arrow_array::StringArray>()
1328 .unwrap();
1329 ScalarKey::Utf8(arr.value(row_idx).to_string())
1330 }
1331 arrow_schema::DataType::LargeBinary => {
1332 let arr = col
1333 .as_any()
1334 .downcast_ref::<arrow_array::LargeBinaryArray>()
1335 .unwrap();
1336 ScalarKey::Binary(arr.value(row_idx).to_vec())
1337 }
1338 arrow_schema::DataType::Float64 => {
1339 let arr = col
1341 .as_any()
1342 .downcast_ref::<arrow_array::Float64Array>()
1343 .unwrap();
1344 ScalarKey::Int64(arr.value(row_idx).to_bits() as i64)
1345 }
1346 arrow_schema::DataType::LargeUtf8 => {
1347 let arr = col
1348 .as_any()
1349 .downcast_ref::<arrow_array::LargeStringArray>()
1350 .unwrap();
1351 ScalarKey::Utf8(arr.value(row_idx).to_string())
1352 }
1353 _ => {
1354 let formatter = arrow::util::display::ArrayFormatter::try_new(
1356 col.as_ref(),
1357 &arrow::util::display::FormatOptions::default(),
1358 );
1359 match formatter {
1360 Ok(f) => ScalarKey::Utf8(f.value(row_idx).to_string()),
1361 Err(_) => ScalarKey::Utf8(format!("opaque@{row_idx}")),
1362 }
1363 }
1364 }
1365 })
1366 .collect()
1367}
1368
1369pub use uni_query_functions::similar_to::calculate_score;
1373
1374#[cfg(test)]
1375mod tests {
1376 use super::*;
1377 use arrow_array::{LargeBinaryArray, UInt64Array};
1378 use arrow_schema::Schema;
1379
1380 #[test]
1381 fn test_extract_row_params_loses_uint64_to_int() {
1382 let schema = Arc::new(Schema::new(vec![Field::new(
1383 "n._vid",
1384 DataType::UInt64,
1385 true,
1386 )]));
1387 let batch = RecordBatch::try_new(schema, vec![Arc::new(UInt64Array::from(vec![Some(7)]))])
1388 .expect("batch should be valid");
1389
1390 let params = extract_row_params(&batch, 0);
1391 assert_eq!(params.get("n._vid"), Some(&Value::Int(7)));
1392 }
1393
1394 #[test]
1395 fn test_extract_row_params_decodes_largebinary_to_map() {
1396 let encoded = uni_common::cypher_value_codec::encode(&Value::Map(HashMap::new()));
1397 let schema = Arc::new(Schema::new(vec![Field::new(
1398 "m._all_props",
1399 DataType::LargeBinary,
1400 true,
1401 )]));
1402 let batch = RecordBatch::try_new(
1403 schema,
1404 vec![Arc::new(LargeBinaryArray::from(vec![Some(
1405 encoded.as_slice(),
1406 )]))],
1407 )
1408 .expect("batch should be valid");
1409
1410 let params = extract_row_params(&batch, 0);
1411 assert_eq!(
1412 params.get("m._all_props"),
1413 Some(&Value::Map(HashMap::new()))
1414 );
1415 }
1416}