1use arrow::array::ArrayRef;
27use arrow::datatypes::DataType;
28use arrow_array::{
29 Array, BooleanArray, FixedSizeBinaryArray, Float32Array, Float64Array, Int32Array, Int64Array,
30 LargeBinaryArray, LargeStringArray, StringArray, UInt64Array,
31};
32use chrono::Offset;
33use datafusion::error::Result as DFResult;
34use datafusion::logical_expr::{
35 ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature,
36 Volatility,
37};
38use datafusion::prelude::SessionContext;
39use datafusion::scalar::ScalarValue;
40use std::any::Any;
41use std::hash::{Hash, Hasher};
42use std::sync::Arc;
43use uni_common::Value;
44use uni_cypher::ast::BinaryOp;
45use uni_store::storage::arrow_convert::values_to_array;
46
47use super::expr_eval::cypher_eq;
48
49macro_rules! impl_udf_eq_hash {
53 ($type:ty) => {
54 impl PartialEq for $type {
55 fn eq(&self, other: &Self) -> bool {
56 self.signature == other.signature
57 }
58 }
59
60 impl Eq for $type {}
61
62 impl Hash for $type {
63 fn hash<H: Hasher>(&self, state: &mut H) {
64 self.name().hash(state);
65 }
66 }
67 };
68}
69
70pub fn register_cypher_udfs(ctx: &SessionContext) -> DFResult<()> {
80 ctx.register_udf(create_id_udf());
81 ctx.register_udf(create_created_at_udf());
82 ctx.register_udf(create_updated_at_udf());
83 ctx.register_udf(create_type_udf());
84 ctx.register_udf(create_keys_udf());
85 ctx.register_udf(create_properties_udf());
86 ctx.register_udf(create_labels_udf());
87 ctx.register_udf(create_nodes_udf());
88 ctx.register_udf(create_relationships_udf());
89 ctx.register_udf(create_range_udf());
90 ctx.register_udf(create_index_udf());
91 ctx.register_udf(create_startnode_udf());
92 ctx.register_udf(create_endnode_udf());
93
94 ctx.register_udf(create_to_integer_udf());
96 ctx.register_udf(create_to_float_udf());
97 ctx.register_udf(create_to_boolean_udf());
98
99 ctx.register_udf(create_bitwise_or_udf());
101 ctx.register_udf(create_bitwise_and_udf());
102 ctx.register_udf(create_bitwise_xor_udf());
103 ctx.register_udf(create_bitwise_not_udf());
104 ctx.register_udf(create_shift_left_udf());
105 ctx.register_udf(create_shift_right_udf());
106
107 for name in &[
109 "date",
111 "time",
112 "localtime",
113 "localdatetime",
114 "datetime",
115 "duration",
116 "btic",
117 "duration.between",
119 "duration.inmonths",
120 "duration.indays",
121 "duration.inseconds",
122 "datetime.fromepoch",
123 "datetime.fromepochmillis",
124 "date.truncate",
126 "time.truncate",
127 "datetime.truncate",
128 "localdatetime.truncate",
129 "localtime.truncate",
130 "datetime.transaction",
132 "datetime.statement",
133 "datetime.realtime",
134 "date.transaction",
135 "date.statement",
136 "date.realtime",
137 "time.transaction",
138 "time.statement",
139 "time.realtime",
140 "localtime.transaction",
141 "localtime.statement",
142 "localtime.realtime",
143 "localdatetime.transaction",
144 "localdatetime.statement",
145 "localdatetime.realtime",
146 ] {
147 ctx.register_udf(create_temporal_udf(name));
148 }
149
150 ctx.register_udf(create_duration_property_udf());
152 ctx.register_udf(create_temporal_property_udf());
153 ctx.register_udf(create_tostring_udf());
154 ctx.register_udf(create_cypher_sort_key_udf());
155 ctx.register_udf(create_has_null_udf());
156 ctx.register_udf(create_cypher_size_udf());
157
158 ctx.register_udf(create_cypher_starts_with_udf());
160 ctx.register_udf(create_cypher_ends_with_udf());
161 ctx.register_udf(create_cypher_contains_udf());
162
163 ctx.register_udf(create_cypher_list_compare_udf());
165
166 ctx.register_udf(create_cypher_xor_udf());
168
169 ctx.register_udf(create_cypher_equal_udf());
171 ctx.register_udf(create_cypher_not_equal_udf());
172 ctx.register_udf(create_cypher_gt_udf());
173 ctx.register_udf(create_cypher_gt_eq_udf());
174 ctx.register_udf(create_cypher_lt_udf());
175 ctx.register_udf(create_cypher_lt_eq_udf());
176
177 ctx.register_udf(create_cv_to_bool_udf());
179
180 ctx.register_udf(create_cypher_add_udf());
182 ctx.register_udf(create_cypher_sub_udf());
183 ctx.register_udf(create_cypher_mul_udf());
184 ctx.register_udf(create_cypher_div_udf());
185 ctx.register_udf(create_cypher_mod_udf());
186
187 ctx.register_udf(create_map_project_udf());
189
190 ctx.register_udf(create_make_cypher_list_udf());
192
193 ctx.register_udf(create_cypher_in_udf());
195
196 ctx.register_udf(create_cypher_list_concat_udf());
198 ctx.register_udf(create_cypher_list_append_udf());
199 ctx.register_udf(create_cypher_list_slice_udf());
200 ctx.register_udf(create_cypher_tail_udf());
201 ctx.register_udf(create_cypher_head_udf());
202 ctx.register_udf(create_cypher_last_udf());
203 ctx.register_udf(create_cypher_reverse_udf());
204 ctx.register_udf(create_cypher_substring_udf());
205 ctx.register_udf(create_cypher_split_udf());
206 ctx.register_udf(create_cypher_list_to_cv_udf());
207 ctx.register_udf(create_cypher_scalar_to_cv_udf());
208
209 for name in &["year", "month", "day", "hour", "minute", "second"] {
211 ctx.register_udf(create_temporal_udf(name));
212 }
213
214 ctx.register_udf(create_cypher_to_float64_udf());
216
217 ctx.register_udf(create_similar_to_udf());
219 ctx.register_udf(create_vector_similarity_udf());
220
221 ctx.register_udaf(create_cypher_min_udaf());
223 ctx.register_udaf(create_cypher_max_udaf());
224 ctx.register_udaf(create_cypher_sum_udaf());
225 ctx.register_udaf(create_cypher_collect_udaf());
226
227 ctx.register_udaf(create_cypher_percentile_disc_udaf());
229 ctx.register_udaf(create_cypher_percentile_cont_udaf());
230
231 register_btic_scalar_udfs(ctx)?;
233
234 ctx.register_udaf(create_btic_min_udaf());
236 ctx.register_udaf(create_btic_max_udaf());
237 ctx.register_udaf(create_btic_span_agg_udaf());
238 ctx.register_udaf(create_btic_count_at_udaf());
239
240 Ok(())
241}
242
243pub fn register_custom_udfs(
251 ctx: &SessionContext,
252 registry: &crate::custom_functions::CustomFunctionRegistry,
253) -> DFResult<()> {
254 for (name, func) in registry.iter() {
255 let lower = name.to_lowercase();
258 ctx.register_udf(ScalarUDF::new_from_impl(CustomScalarUdf::new(
259 lower,
260 func.clone(),
261 )));
262 ctx.register_udf(ScalarUDF::new_from_impl(CustomScalarUdf::new(
264 name.to_string(),
265 func.clone(),
266 )));
267 }
268 Ok(())
269}
270
271struct CustomScalarUdf {
276 name: String,
277 func: crate::custom_functions::CustomScalarFn,
278 signature: Signature,
279}
280
281impl CustomScalarUdf {
282 fn new(name: String, func: crate::custom_functions::CustomScalarFn) -> Self {
283 Self {
284 signature: Signature::new(TypeSignature::VariadicAny, Volatility::Volatile),
285 name,
286 func,
287 }
288 }
289}
290
291impl std::fmt::Debug for CustomScalarUdf {
292 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293 f.debug_struct("CustomScalarUdf")
294 .field("name", &self.name)
295 .finish()
296 }
297}
298
299impl_udf_eq_hash!(CustomScalarUdf);
300
301impl ScalarUDFImpl for CustomScalarUdf {
302 fn as_any(&self) -> &dyn Any {
303 self
304 }
305
306 fn name(&self) -> &str {
307 &self.name
308 }
309
310 fn signature(&self) -> &Signature {
311 &self.signature
312 }
313
314 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
315 Ok(DataType::LargeBinary)
316 }
317
318 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
319 let func = &self.func;
320 invoke_cypher_udf(args, &DataType::LargeBinary, |vals| {
321 func(vals).map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
322 })
323 }
324}
325
326pub fn create_id_udf() -> ScalarUDF {
334 ScalarUDF::new_from_impl(IdUdf::new())
335}
336
337#[derive(Debug)]
338struct IdUdf {
339 signature: Signature,
340}
341
342impl IdUdf {
343 fn new() -> Self {
344 Self {
345 signature: Signature::new(
346 TypeSignature::Exact(vec![DataType::UInt64]),
347 Volatility::Immutable,
348 ),
349 }
350 }
351}
352
353impl_udf_eq_hash!(IdUdf);
354
355impl ScalarUDFImpl for IdUdf {
356 fn as_any(&self) -> &dyn Any {
357 self
358 }
359
360 fn name(&self) -> &str {
361 "id"
362 }
363
364 fn signature(&self) -> &Signature {
365 &self.signature
366 }
367
368 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
369 Ok(DataType::UInt64)
370 }
371
372 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
373 if args.args.is_empty() {
375 return Err(datafusion::error::DataFusionError::Execution(
376 "id(): requires 1 argument".to_string(),
377 ));
378 }
379 Ok(args.args[0].clone())
380 }
381}
382
383pub fn create_created_at_udf() -> ScalarUDF {
396 ScalarUDF::new_from_impl(SystemTimestampUdf::new("created_at"))
397}
398
399pub fn create_updated_at_udf() -> ScalarUDF {
406 ScalarUDF::new_from_impl(SystemTimestampUdf::new("updated_at"))
407}
408
409#[derive(Debug)]
410struct SystemTimestampUdf {
411 name: &'static str,
412 signature: Signature,
413}
414
415impl SystemTimestampUdf {
416 fn new(name: &'static str) -> Self {
417 Self {
418 name,
419 signature: Signature::new(
420 TypeSignature::Exact(vec![DataType::Timestamp(
421 arrow_schema::TimeUnit::Nanosecond,
422 Some("UTC".into()),
423 )]),
424 Volatility::Immutable,
425 ),
426 }
427 }
428}
429
430impl_udf_eq_hash!(SystemTimestampUdf);
431
432impl ScalarUDFImpl for SystemTimestampUdf {
433 fn as_any(&self) -> &dyn Any {
434 self
435 }
436
437 fn name(&self) -> &str {
438 self.name
439 }
440
441 fn signature(&self) -> &Signature {
442 &self.signature
443 }
444
445 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
446 Ok(DataType::Timestamp(
447 arrow_schema::TimeUnit::Nanosecond,
448 Some("UTC".into()),
449 ))
450 }
451
452 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
453 if args.args.is_empty() {
458 return Err(datafusion::error::DataFusionError::Execution(format!(
459 "{}(): requires 1 argument",
460 self.name
461 )));
462 }
463 Ok(args.args[0].clone())
464 }
465}
466
467pub fn create_type_udf() -> ScalarUDF {
475 ScalarUDF::new_from_impl(TypeUdf::new())
476}
477
478#[derive(Debug)]
479struct TypeUdf {
480 signature: Signature,
481}
482
483impl TypeUdf {
484 fn new() -> Self {
485 Self {
486 signature: Signature::new(TypeSignature::Any(1), Volatility::Immutable),
490 }
491 }
492}
493
494impl_udf_eq_hash!(TypeUdf);
495
496impl ScalarUDFImpl for TypeUdf {
497 fn as_any(&self) -> &dyn Any {
498 self
499 }
500
501 fn name(&self) -> &str {
502 "type"
503 }
504
505 fn signature(&self) -> &Signature {
506 &self.signature
507 }
508
509 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
510 Ok(DataType::Utf8)
511 }
512
513 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
514 if args.args.is_empty() {
515 return Err(datafusion::error::DataFusionError::Execution(
516 "type(): requires 1 argument".to_string(),
517 ));
518 }
519 let output_type = DataType::Utf8;
520 invoke_cypher_udf(args, &output_type, |val_args| {
521 if val_args.is_empty() {
522 return Err(datafusion::error::DataFusionError::Execution(
523 "type(): requires 1 argument".to_string(),
524 ));
525 }
526 let val = &val_args[0];
527 match val {
528 Value::Map(map) => {
530 if let Some(Value::String(t)) = map.get("_type") {
531 Ok(Value::String(t.clone()))
532 } else {
533 Err(datafusion::error::DataFusionError::Execution(
535 "TypeError: InvalidArgumentValue - type() requires a relationship argument".to_string(),
536 ))
537 }
538 }
539 Value::Null => Ok(Value::Null),
540 _ => Err(datafusion::error::DataFusionError::Execution(
541 "TypeError: InvalidArgumentValue - type() requires a relationship argument"
542 .to_string(),
543 )),
544 }
545 })
546 }
547}
548
549pub fn create_keys_udf() -> ScalarUDF {
557 ScalarUDF::new_from_impl(KeysUdf::new())
558}
559
560#[derive(Debug)]
561struct KeysUdf {
562 signature: Signature,
563}
564
565impl KeysUdf {
566 fn new() -> Self {
567 Self {
568 signature: Signature::new(TypeSignature::Any(1), Volatility::Immutable),
569 }
570 }
571}
572
573impl_udf_eq_hash!(KeysUdf);
574
575impl ScalarUDFImpl for KeysUdf {
576 fn as_any(&self) -> &dyn Any {
577 self
578 }
579
580 fn name(&self) -> &str {
581 "keys"
582 }
583
584 fn signature(&self) -> &Signature {
585 &self.signature
586 }
587
588 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
589 Ok(DataType::List(Arc::new(
590 arrow::datatypes::Field::new_list_field(DataType::Utf8, true),
591 )))
592 }
593
594 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
595 let output_type = self.return_type(&[])?;
596 invoke_cypher_udf(args, &output_type, |val_args| {
597 if val_args.is_empty() {
598 return Err(datafusion::error::DataFusionError::Execution(
599 "keys(): requires 1 argument".to_string(),
600 ));
601 }
602
603 let arg = &val_args[0];
604 let keys = match arg {
605 Value::Map(map) => {
606 let (source, is_entity) = match map.get("_all_props") {
616 Some(Value::Map(all)) => (all, true),
617 _ => (map, false),
618 };
619 let mut key_strings: Vec<String> = source
620 .iter()
621 .filter(|(k, v)| !k.starts_with('_') && (!is_entity || !v.is_null()))
622 .map(|(k, _)| k.clone())
623 .collect();
624 key_strings.sort();
625 key_strings
626 .into_iter()
627 .map(Value::String)
628 .collect::<Vec<_>>()
629 }
630 Value::Null => {
631 return Ok(Value::Null);
632 }
633 _ => {
634 vec![]
637 }
638 };
639
640 Ok(Value::List(keys))
641 })
642 }
643}
644
645pub fn create_properties_udf() -> ScalarUDF {
650 ScalarUDF::new_from_impl(PropertiesUdf::new())
651}
652
653#[derive(Debug)]
654struct PropertiesUdf {
655 signature: Signature,
656}
657
658impl PropertiesUdf {
659 fn new() -> Self {
660 Self {
661 signature: Signature::new(TypeSignature::Any(1), Volatility::Immutable),
662 }
663 }
664}
665
666impl_udf_eq_hash!(PropertiesUdf);
667
668impl ScalarUDFImpl for PropertiesUdf {
669 fn as_any(&self) -> &dyn Any {
670 self
671 }
672
673 fn name(&self) -> &str {
674 "properties"
675 }
676
677 fn signature(&self) -> &Signature {
678 &self.signature
679 }
680
681 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
682 Ok(DataType::LargeBinary)
684 }
685
686 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
687 let output_type = self.return_type(&[])?;
688 invoke_cypher_udf(args, &output_type, |val_args| {
689 if val_args.is_empty() {
690 return Err(datafusion::error::DataFusionError::Execution(
691 "properties(): requires 1 argument".to_string(),
692 ));
693 }
694
695 let arg = &val_args[0];
696 match arg {
697 Value::Map(map) => {
698 let identity_null = map
708 .get("_vid")
709 .map(|v| v.is_null())
710 .or_else(|| map.get("_eid").map(|v| v.is_null()))
711 .unwrap_or(false);
712 if identity_null {
713 return Ok(Value::Null);
714 }
715
716 let source = match map.get("_all_props") {
718 Some(Value::Map(all)) => all,
719 _ => map,
720 };
721 let filtered: std::collections::HashMap<String, Value> = source
723 .iter()
724 .filter(|(k, _)| !k.starts_with('_'))
725 .map(|(k, v)| (k.clone(), v.clone()))
726 .collect();
727 Ok(Value::Map(filtered))
728 }
729 _ => Ok(Value::Null),
730 }
731 })
732 }
733}
734
735pub fn create_index_udf() -> ScalarUDF {
740 ScalarUDF::new_from_impl(IndexUdf::new())
741}
742
743#[derive(Debug)]
744struct IndexUdf {
745 signature: Signature,
746}
747
748impl IndexUdf {
749 fn new() -> Self {
750 Self {
751 signature: Signature::any(2, Volatility::Immutable),
752 }
753 }
754}
755
756impl_udf_eq_hash!(IndexUdf);
757
758impl ScalarUDFImpl for IndexUdf {
759 fn as_any(&self) -> &dyn Any {
760 self
761 }
762
763 fn name(&self) -> &str {
764 "index"
765 }
766
767 fn signature(&self) -> &Signature {
768 &self.signature
769 }
770
771 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
772 Ok(DataType::LargeBinary)
774 }
775
776 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
777 let output_type = self.return_type(&[])?;
778 invoke_cypher_udf(args, &output_type, |val_args| {
779 if val_args.len() != 2 {
780 return Err(datafusion::error::DataFusionError::Execution(
781 "index(): requires 2 arguments".to_string(),
782 ));
783 }
784
785 let container = &val_args[0];
786 let index = &val_args[1];
787
788 let index_as_int = index.as_i64();
792
793 let result = match container {
794 Value::List(arr) => {
795 if let Some(i) = index_as_int {
796 let idx = if i < 0 {
797 let pos = arr.len() as i64 + i;
798 if pos < 0 { -1 } else { pos }
799 } else {
800 i
801 };
802 if idx >= 0 && (idx as usize) < arr.len() {
803 arr[idx as usize].clone()
804 } else {
805 Value::Null
806 }
807 } else if index.is_null() {
808 Value::Null
809 } else {
810 return Err(datafusion::error::DataFusionError::Execution(format!(
811 "TypeError: InvalidArgumentType - list index must be an integer, got: {:?}",
812 index
813 )));
814 }
815 }
816 Value::Map(map) => {
817 if let Some(key) = index.as_str() {
818 if let Some(val) = map.get(key) {
820 val.clone()
821 } else if let Some(Value::Map(all_props)) = map.get("_all_props") {
822 all_props.get(key).cloned().unwrap_or(Value::Null)
824 } else if let Some(Value::Map(props)) = map.get("properties") {
825 props.get(key).cloned().unwrap_or(Value::Null)
827 } else {
828 Value::Null
829 }
830 } else if !index.is_null() {
831 return Err(datafusion::error::DataFusionError::Execution(
832 "index(): map index must be a string".to_string(),
833 ));
834 } else {
835 Value::Null
836 }
837 }
838 Value::Node(node) => {
839 if let Some(key) = index.as_str() {
840 node.properties.get(key).cloned().unwrap_or(Value::Null)
841 } else if !index.is_null() {
842 return Err(datafusion::error::DataFusionError::Execution(
843 "index(): node index must be a string".to_string(),
844 ));
845 } else {
846 Value::Null
847 }
848 }
849 Value::Edge(edge) => {
850 if let Some(key) = index.as_str() {
851 edge.properties.get(key).cloned().unwrap_or(Value::Null)
852 } else if !index.is_null() {
853 return Err(datafusion::error::DataFusionError::Execution(
854 "index(): edge index must be a string".to_string(),
855 ));
856 } else {
857 Value::Null
858 }
859 }
860 Value::Null => Value::Null,
861 _ => {
862 return Err(datafusion::error::DataFusionError::Execution(format!(
863 "TypeError: InvalidArgumentType - cannot index into {:?}",
864 container
865 )));
866 }
867 };
868
869 Ok(result)
870 })
871 }
872}
873
874pub fn create_labels_udf() -> ScalarUDF {
879 ScalarUDF::new_from_impl(LabelsUdf::new())
880}
881
882#[derive(Debug)]
883struct LabelsUdf {
884 signature: Signature,
885}
886
887impl LabelsUdf {
888 fn new() -> Self {
889 Self {
890 signature: Signature::any(1, Volatility::Immutable),
891 }
892 }
893}
894
895impl_udf_eq_hash!(LabelsUdf);
896
897impl ScalarUDFImpl for LabelsUdf {
898 fn as_any(&self) -> &dyn Any {
899 self
900 }
901
902 fn name(&self) -> &str {
903 "labels"
904 }
905
906 fn signature(&self) -> &Signature {
907 &self.signature
908 }
909
910 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
911 Ok(DataType::List(Arc::new(
912 arrow::datatypes::Field::new_list_field(DataType::Utf8, true),
913 )))
914 }
915
916 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
917 let output_type = self.return_type(&[])?;
918 invoke_cypher_udf(args, &output_type, |val_args| {
919 if val_args.is_empty() {
920 return Err(datafusion::error::DataFusionError::Execution(
921 "labels(): requires 1 argument".to_string(),
922 ));
923 }
924
925 let node = &val_args[0];
926 match node {
927 Value::Map(map) => {
928 if let Some(Value::List(arr)) = map.get("_labels") {
929 Ok(Value::List(arr.clone()))
930 } else {
931 Err(datafusion::error::DataFusionError::Execution(
933 "TypeError: InvalidArgumentValue - labels() requires a node argument"
934 .to_string(),
935 ))
936 }
937 }
938 Value::Null => Ok(Value::Null),
939 _ => Err(datafusion::error::DataFusionError::Execution(
940 "TypeError: InvalidArgumentValue - labels() requires a node argument"
941 .to_string(),
942 )),
943 }
944 })
945 }
946}
947
948pub fn create_nodes_udf() -> ScalarUDF {
953 ScalarUDF::new_from_impl(NodesUdf::new())
954}
955
956#[derive(Debug)]
957struct NodesUdf {
958 signature: Signature,
959}
960
961impl NodesUdf {
962 fn new() -> Self {
963 Self {
964 signature: Signature::any(1, Volatility::Immutable),
965 }
966 }
967}
968
969impl_udf_eq_hash!(NodesUdf);
970
971impl ScalarUDFImpl for NodesUdf {
972 fn as_any(&self) -> &dyn Any {
973 self
974 }
975
976 fn name(&self) -> &str {
977 "nodes"
978 }
979
980 fn signature(&self) -> &Signature {
981 &self.signature
982 }
983
984 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
985 Ok(DataType::LargeBinary)
986 }
987
988 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
989 let output_type = self.return_type(&[])?;
990 invoke_cypher_udf(args, &output_type, |val_args| {
991 if val_args.is_empty() {
992 return Err(datafusion::error::DataFusionError::Execution(
993 "nodes(): requires 1 argument".to_string(),
994 ));
995 }
996
997 let path = &val_args[0];
998 let nodes = match path {
999 Value::Map(map) => map.get("nodes").cloned().unwrap_or(Value::Null),
1000 _ => Value::Null,
1001 };
1002
1003 Ok(nodes)
1004 })
1005 }
1006}
1007
1008pub fn create_relationships_udf() -> ScalarUDF {
1013 ScalarUDF::new_from_impl(RelationshipsUdf::new())
1014}
1015
1016#[derive(Debug)]
1017struct RelationshipsUdf {
1018 signature: Signature,
1019}
1020
1021impl RelationshipsUdf {
1022 fn new() -> Self {
1023 Self {
1024 signature: Signature::any(1, Volatility::Immutable),
1025 }
1026 }
1027}
1028
1029impl_udf_eq_hash!(RelationshipsUdf);
1030
1031impl ScalarUDFImpl for RelationshipsUdf {
1032 fn as_any(&self) -> &dyn Any {
1033 self
1034 }
1035
1036 fn name(&self) -> &str {
1037 "relationships"
1038 }
1039
1040 fn signature(&self) -> &Signature {
1041 &self.signature
1042 }
1043
1044 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
1045 Ok(DataType::LargeBinary)
1046 }
1047
1048 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
1049 let output_type = self.return_type(&[])?;
1050 invoke_cypher_udf(args, &output_type, |val_args| {
1051 if val_args.is_empty() {
1052 return Err(datafusion::error::DataFusionError::Execution(
1053 "relationships(): requires 1 argument".to_string(),
1054 ));
1055 }
1056
1057 let path = &val_args[0];
1058 let rels = match path {
1059 Value::Map(map) => map.get("relationships").cloned().unwrap_or(Value::Null),
1060 _ => Value::Null,
1061 };
1062
1063 Ok(rels)
1064 })
1065 }
1066}
1067
1068pub fn create_startnode_udf() -> ScalarUDF {
1077 ScalarUDF::new_from_impl(StartNodeUdf::new())
1078}
1079
1080#[derive(Debug)]
1081struct StartNodeUdf {
1082 signature: Signature,
1083}
1084
1085impl StartNodeUdf {
1086 fn new() -> Self {
1087 Self {
1088 signature: Signature::new(TypeSignature::VariadicAny, Volatility::Immutable),
1089 }
1090 }
1091}
1092
1093impl_udf_eq_hash!(StartNodeUdf);
1094
1095impl ScalarUDFImpl for StartNodeUdf {
1096 fn as_any(&self) -> &dyn Any {
1097 self
1098 }
1099
1100 fn name(&self) -> &str {
1101 "startnode"
1102 }
1103
1104 fn signature(&self) -> &Signature {
1105 &self.signature
1106 }
1107
1108 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
1109 Ok(DataType::LargeBinary)
1110 }
1111
1112 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
1113 let output_type = DataType::LargeBinary;
1114 invoke_cypher_udf(args, &output_type, |val_args| {
1115 startnode_endnode_impl(val_args, true)
1116 })
1117 }
1118}
1119
1120pub fn create_endnode_udf() -> ScalarUDF {
1126 ScalarUDF::new_from_impl(EndNodeUdf::new())
1127}
1128
1129#[derive(Debug)]
1130struct EndNodeUdf {
1131 signature: Signature,
1132}
1133
1134impl EndNodeUdf {
1135 fn new() -> Self {
1136 Self {
1137 signature: Signature::new(TypeSignature::VariadicAny, Volatility::Immutable),
1138 }
1139 }
1140}
1141
1142impl_udf_eq_hash!(EndNodeUdf);
1143
1144impl ScalarUDFImpl for EndNodeUdf {
1145 fn as_any(&self) -> &dyn Any {
1146 self
1147 }
1148
1149 fn name(&self) -> &str {
1150 "endnode"
1151 }
1152
1153 fn signature(&self) -> &Signature {
1154 &self.signature
1155 }
1156
1157 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
1158 Ok(DataType::LargeBinary)
1159 }
1160
1161 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
1162 let output_type = DataType::LargeBinary;
1163 invoke_cypher_udf(args, &output_type, |val_args| {
1164 startnode_endnode_impl(val_args, false)
1165 })
1166 }
1167}
1168
1169fn startnode_endnode_impl(val_args: &[Value], is_start: bool) -> DFResult<Value> {
1174 if val_args.is_empty() {
1175 let fn_name = if is_start { "startNode" } else { "endNode" };
1176 return Err(datafusion::error::DataFusionError::Execution(format!(
1177 "{fn_name}(): requires at least 1 argument"
1178 )));
1179 }
1180
1181 let edge_val = &val_args[0];
1182 let target_vid = extract_endpoint_vid(edge_val, is_start);
1183
1184 let target_vid = match target_vid {
1185 Some(vid) => vid,
1186 None => return Ok(Value::Null),
1187 };
1188
1189 for node_val in val_args.iter().skip(1) {
1191 if let Some(vid) = extract_vid(node_val)
1192 && vid == target_vid
1193 {
1194 return Ok(node_val.clone());
1195 }
1196 }
1197
1198 let mut map = std::collections::HashMap::new();
1200 map.insert("_vid".to_string(), Value::Int(target_vid as i64));
1201 Ok(Value::Map(map))
1202}
1203
1204fn extract_endpoint_vid(val: &Value, is_start: bool) -> Option<u64> {
1206 match val {
1207 Value::Edge(edge) => {
1208 let vid = if is_start { edge.src } else { edge.dst };
1209 Some(vid.as_u64())
1210 }
1211 Value::Map(map) => {
1212 let key = if is_start { "_src_vid" } else { "_dst_vid" };
1214 if let Some(v) = map.get(key) {
1215 return v.as_u64();
1216 }
1217 let key2 = if is_start { "_src" } else { "_dst" };
1219 if let Some(v) = map.get(key2) {
1220 return v.as_u64();
1221 }
1222 let node_key = if is_start { "_startNode" } else { "_endNode" };
1224 if let Some(node_val) = map.get(node_key) {
1225 return extract_vid(node_val);
1226 }
1227 None
1228 }
1229 _ => None,
1230 }
1231}
1232
1233fn extract_vid(val: &Value) -> Option<u64> {
1235 match val {
1236 Value::Map(map) => map.get("_vid").and_then(|v| v.as_u64()),
1237 _ => None,
1238 }
1239}
1240
1241fn extract_i64_range_arg(arg: &ColumnarValue, row_idx: usize, name: &str) -> DFResult<i64> {
1248 match arg {
1249 ColumnarValue::Scalar(sv) => match sv {
1250 ScalarValue::Int8(Some(v)) => Ok(*v as i64),
1251 ScalarValue::Int16(Some(v)) => Ok(*v as i64),
1252 ScalarValue::Int32(Some(v)) => Ok(*v as i64),
1253 ScalarValue::Int64(Some(v)) => Ok(*v),
1254 ScalarValue::UInt8(Some(v)) => Ok(*v as i64),
1255 ScalarValue::UInt16(Some(v)) => Ok(*v as i64),
1256 ScalarValue::UInt32(Some(v)) => Ok(*v as i64),
1257 ScalarValue::UInt64(Some(v)) => Ok(*v as i64),
1258 ScalarValue::LargeBinary(Some(bytes)) => {
1259 scalar_binary_to_value(bytes).as_i64().ok_or_else(|| {
1260 datafusion::error::DataFusionError::Execution(format!(
1261 "ArgumentError: InvalidArgumentType - range() {} must be an integer",
1262 name
1263 ))
1264 })
1265 }
1266 _ => Err(datafusion::error::DataFusionError::Execution(format!(
1267 "ArgumentError: InvalidArgumentType - range() {} must be an integer",
1268 name
1269 ))),
1270 },
1271 ColumnarValue::Array(arr) => {
1272 if row_idx >= arr.len() || arr.is_null(row_idx) {
1273 return Err(datafusion::error::DataFusionError::Execution(format!(
1274 "ArgumentError: InvalidArgumentType - range() {} must be an integer",
1275 name
1276 )));
1277 }
1278 if !arr.is_empty() {
1280 use datafusion::arrow::array::{
1281 Int8Array, Int16Array, Int32Array, Int64Array, UInt8Array, UInt16Array,
1282 UInt32Array, UInt64Array,
1283 };
1284 match arr.data_type() {
1285 DataType::Int8 => Ok(arr
1286 .as_any()
1287 .downcast_ref::<Int8Array>()
1288 .unwrap()
1289 .value(row_idx) as i64),
1290 DataType::Int16 => Ok(arr
1291 .as_any()
1292 .downcast_ref::<Int16Array>()
1293 .unwrap()
1294 .value(row_idx) as i64),
1295 DataType::Int32 => Ok(arr
1296 .as_any()
1297 .downcast_ref::<Int32Array>()
1298 .unwrap()
1299 .value(row_idx) as i64),
1300 DataType::Int64 => Ok(arr
1301 .as_any()
1302 .downcast_ref::<Int64Array>()
1303 .unwrap()
1304 .value(row_idx)),
1305 DataType::UInt8 => Ok(arr
1306 .as_any()
1307 .downcast_ref::<UInt8Array>()
1308 .unwrap()
1309 .value(row_idx) as i64),
1310 DataType::UInt16 => Ok(arr
1311 .as_any()
1312 .downcast_ref::<UInt16Array>()
1313 .unwrap()
1314 .value(row_idx) as i64),
1315 DataType::UInt32 => Ok(arr
1316 .as_any()
1317 .downcast_ref::<UInt32Array>()
1318 .unwrap()
1319 .value(row_idx) as i64),
1320 DataType::UInt64 => Ok(arr
1321 .as_any()
1322 .downcast_ref::<UInt64Array>()
1323 .unwrap()
1324 .value(row_idx) as i64),
1325 DataType::LargeBinary => {
1326 let bytes = arr
1327 .as_any()
1328 .downcast_ref::<LargeBinaryArray>()
1329 .unwrap()
1330 .value(row_idx);
1331 scalar_binary_to_value(bytes).as_i64().ok_or_else(|| {
1332 datafusion::error::DataFusionError::Execution(format!(
1333 "ArgumentError: InvalidArgumentType - range() {} must be an integer",
1334 name
1335 ))
1336 })
1337 }
1338 _ => Err(datafusion::error::DataFusionError::Execution(format!(
1339 "ArgumentError: InvalidArgumentType - range() {} must be an integer",
1340 name
1341 ))),
1342 }
1343 } else {
1344 Err(datafusion::error::DataFusionError::Execution(format!(
1345 "ArgumentError: InvalidArgumentType - range() {} must be an integer",
1346 name
1347 )))
1348 }
1349 }
1350 }
1351}
1352
1353pub fn create_range_udf() -> ScalarUDF {
1355 ScalarUDF::new_from_impl(RangeUdf::new())
1356}
1357
1358#[derive(Debug)]
1359struct RangeUdf {
1360 signature: Signature,
1361}
1362
1363impl RangeUdf {
1364 fn new() -> Self {
1365 Self {
1366 signature: Signature::one_of(
1367 vec![TypeSignature::Any(2), TypeSignature::Any(3)],
1368 Volatility::Immutable,
1369 ),
1370 }
1371 }
1372}
1373
1374impl_udf_eq_hash!(RangeUdf);
1375
1376impl ScalarUDFImpl for RangeUdf {
1377 fn as_any(&self) -> &dyn Any {
1378 self
1379 }
1380
1381 fn name(&self) -> &str {
1382 "range"
1383 }
1384
1385 fn signature(&self) -> &Signature {
1386 &self.signature
1387 }
1388
1389 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
1390 Ok(DataType::List(Arc::new(
1391 arrow::datatypes::Field::new_list_field(DataType::Int64, true),
1392 )))
1393 }
1394
1395 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
1396 if args.args.len() < 2 || args.args.len() > 3 {
1397 return Err(datafusion::error::DataFusionError::Execution(
1398 "range(): requires 2 or 3 arguments".to_string(),
1399 ));
1400 }
1401
1402 let len = args
1403 .args
1404 .iter()
1405 .find_map(|arg| match arg {
1406 ColumnarValue::Array(arr) => Some(arr.len()),
1407 _ => None,
1408 })
1409 .unwrap_or(1);
1410
1411 let mut list_builder =
1412 arrow_array::builder::ListBuilder::new(arrow_array::builder::Int64Builder::new());
1413
1414 for row_idx in 0..len {
1415 let start = extract_i64_range_arg(&args.args[0], row_idx, "start")?;
1416 let end = extract_i64_range_arg(&args.args[1], row_idx, "end")?;
1417 let step = if args.args.len() == 3 {
1418 extract_i64_range_arg(&args.args[2], row_idx, "step")?
1419 } else {
1420 1
1421 };
1422
1423 if step == 0 {
1424 return Err(datafusion::error::DataFusionError::Execution(
1425 "range(): step cannot be zero".to_string(),
1426 ));
1427 }
1428
1429 if step > 0 && start <= end {
1430 let mut current = start;
1431 while current <= end {
1432 list_builder.values().append_value(current);
1433 current += step;
1434 }
1435 } else if step < 0 && start >= end {
1436 let mut current = start;
1437 while current >= end {
1438 list_builder.values().append_value(current);
1439 current += step;
1440 }
1441 }
1442 list_builder.append(true);
1444 }
1445
1446 let list_arr = Arc::new(list_builder.finish()) as ArrayRef;
1447 if len == 1
1448 && args
1449 .args
1450 .iter()
1451 .all(|arg| matches!(arg, ColumnarValue::Scalar(_)))
1452 {
1453 Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
1454 &list_arr, 0,
1455 )?))
1456 } else {
1457 Ok(ColumnarValue::Array(list_arr))
1458 }
1459 }
1460}
1461
1462fn invoke_binary_bitwise_op<F>(
1470 args: &ScalarFunctionArgs,
1471 name: &str,
1472 op: F,
1473) -> DFResult<ColumnarValue>
1474where
1475 F: Fn(i64, i64) -> i64,
1476{
1477 use arrow_array::Int64Array;
1478 use datafusion::common::ScalarValue;
1479 use datafusion::error::DataFusionError;
1480
1481 if args.args.len() != 2 {
1482 return Err(DataFusionError::Execution(format!(
1483 "{}(): requires exactly 2 arguments",
1484 name
1485 )));
1486 }
1487
1488 let left = &args.args[0];
1489 let right = &args.args[1];
1490
1491 match (left, right) {
1492 (
1493 ColumnarValue::Scalar(ScalarValue::Int64(Some(l))),
1494 ColumnarValue::Scalar(ScalarValue::Int64(Some(r))),
1495 ) => Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(op(*l, *r))))),
1496 (ColumnarValue::Array(l_arr), ColumnarValue::Array(r_arr)) => {
1497 let l_arr = l_arr.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
1498 DataFusionError::Execution(format!("{}(): left array must be Int64", name))
1499 })?;
1500 let r_arr = r_arr.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
1501 DataFusionError::Execution(format!("{}(): right array must be Int64", name))
1502 })?;
1503
1504 let result: Int64Array = l_arr
1505 .iter()
1506 .zip(r_arr.iter())
1507 .map(|(l, r)| match (l, r) {
1508 (Some(l), Some(r)) => Some(op(l, r)),
1509 _ => None,
1510 })
1511 .collect();
1512
1513 Ok(ColumnarValue::Array(Arc::new(result)))
1514 }
1515 _ => Err(DataFusionError::Execution(format!(
1516 "{}(): mixed scalar/array not supported",
1517 name
1518 ))),
1519 }
1520}
1521
1522fn invoke_unary_bitwise_op<F>(
1526 args: &ScalarFunctionArgs,
1527 name: &str,
1528 op: F,
1529) -> DFResult<ColumnarValue>
1530where
1531 F: Fn(i64) -> i64,
1532{
1533 use arrow_array::Int64Array;
1534 use datafusion::common::ScalarValue;
1535 use datafusion::error::DataFusionError;
1536
1537 if args.args.len() != 1 {
1538 return Err(DataFusionError::Execution(format!(
1539 "{}(): requires exactly 1 argument",
1540 name
1541 )));
1542 }
1543
1544 let operand = &args.args[0];
1545
1546 match operand {
1547 ColumnarValue::Scalar(ScalarValue::Int64(Some(v))) => {
1548 Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(op(*v)))))
1549 }
1550 ColumnarValue::Array(arr) => {
1551 let arr = arr.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
1552 DataFusionError::Execution(format!("{}(): array must be Int64", name))
1553 })?;
1554
1555 let result: Int64Array = arr.iter().map(|v| v.map(&op)).collect();
1556
1557 Ok(ColumnarValue::Array(Arc::new(result)))
1558 }
1559 _ => Err(DataFusionError::Execution(format!(
1560 "{}(): invalid argument type",
1561 name
1562 ))),
1563 }
1564}
1565
1566macro_rules! define_binary_bitwise_udf {
1570 ($struct_name:ident, $udf_name:literal, $op:expr) => {
1571 #[derive(Debug)]
1572 struct $struct_name {
1573 signature: Signature,
1574 }
1575
1576 impl $struct_name {
1577 fn new() -> Self {
1578 Self {
1579 signature: Signature::exact(
1580 vec![DataType::Int64, DataType::Int64],
1581 Volatility::Immutable,
1582 ),
1583 }
1584 }
1585 }
1586
1587 impl_udf_eq_hash!($struct_name);
1588
1589 impl ScalarUDFImpl for $struct_name {
1590 fn as_any(&self) -> &dyn Any {
1591 self
1592 }
1593
1594 fn name(&self) -> &str {
1595 $udf_name
1596 }
1597
1598 fn signature(&self) -> &Signature {
1599 &self.signature
1600 }
1601
1602 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
1603 Ok(DataType::Int64)
1604 }
1605
1606 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
1607 invoke_binary_bitwise_op(&args, $udf_name, $op)
1608 }
1609 }
1610 };
1611}
1612
1613macro_rules! define_unary_bitwise_udf {
1617 ($struct_name:ident, $udf_name:literal, $op:expr) => {
1618 #[derive(Debug)]
1619 struct $struct_name {
1620 signature: Signature,
1621 }
1622
1623 impl $struct_name {
1624 fn new() -> Self {
1625 Self {
1626 signature: Signature::exact(vec![DataType::Int64], Volatility::Immutable),
1627 }
1628 }
1629 }
1630
1631 impl_udf_eq_hash!($struct_name);
1632
1633 impl ScalarUDFImpl for $struct_name {
1634 fn as_any(&self) -> &dyn Any {
1635 self
1636 }
1637
1638 fn name(&self) -> &str {
1639 $udf_name
1640 }
1641
1642 fn signature(&self) -> &Signature {
1643 &self.signature
1644 }
1645
1646 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
1647 Ok(DataType::Int64)
1648 }
1649
1650 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
1651 invoke_unary_bitwise_op(&args, $udf_name, $op)
1652 }
1653 }
1654 };
1655}
1656
1657define_binary_bitwise_udf!(BitwiseOrUdf, "uni.bitwise.or", |l, r| l | r);
1659define_binary_bitwise_udf!(BitwiseAndUdf, "uni.bitwise.and", |l, r| l & r);
1660define_binary_bitwise_udf!(BitwiseXorUdf, "uni.bitwise.xor", |l, r| l ^ r);
1661define_binary_bitwise_udf!(ShiftLeftUdf, "uni.bitwise.shiftLeft", |l, r| l << r);
1662define_binary_bitwise_udf!(ShiftRightUdf, "uni.bitwise.shiftRight", |l, r| l >> r);
1663
1664define_unary_bitwise_udf!(BitwiseNotUdf, "uni.bitwise.not", |v| !v);
1666
1667pub fn create_bitwise_or_udf() -> ScalarUDF {
1669 ScalarUDF::new_from_impl(BitwiseOrUdf::new())
1670}
1671
1672pub fn create_bitwise_and_udf() -> ScalarUDF {
1674 ScalarUDF::new_from_impl(BitwiseAndUdf::new())
1675}
1676
1677pub fn create_bitwise_xor_udf() -> ScalarUDF {
1679 ScalarUDF::new_from_impl(BitwiseXorUdf::new())
1680}
1681
1682pub fn create_bitwise_not_udf() -> ScalarUDF {
1684 ScalarUDF::new_from_impl(BitwiseNotUdf::new())
1685}
1686
1687pub fn create_shift_left_udf() -> ScalarUDF {
1689 ScalarUDF::new_from_impl(ShiftLeftUdf::new())
1690}
1691
1692pub fn create_shift_right_udf() -> ScalarUDF {
1694 ScalarUDF::new_from_impl(ShiftRightUdf::new())
1695}
1696
1697fn create_temporal_udf(name: &str) -> ScalarUDF {
1708 ScalarUDF::new_from_impl(TemporalUdf::new(name.to_string()))
1709}
1710
1711#[derive(Debug)]
1712struct TemporalUdf {
1713 name: String,
1714 signature: Signature,
1715}
1716
1717impl TemporalUdf {
1718 fn new(name: String) -> Self {
1719 Self {
1720 name,
1721 signature: Signature::new(
1724 TypeSignature::OneOf(vec![
1725 TypeSignature::Exact(vec![]),
1726 TypeSignature::VariadicAny,
1727 ]),
1728 Volatility::Immutable,
1729 ),
1730 }
1731 }
1732}
1733
1734impl_udf_eq_hash!(TemporalUdf);
1735
1736impl ScalarUDFImpl for TemporalUdf {
1737 fn as_any(&self) -> &dyn Any {
1738 self
1739 }
1740
1741 fn name(&self) -> &str {
1742 &self.name
1743 }
1744
1745 fn signature(&self) -> &Signature {
1746 &self.signature
1747 }
1748
1749 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
1750 let name = self.name.to_lowercase();
1751 match name.as_str() {
1752 "year" | "month" | "day" | "hour" | "minute" | "second" => Ok(DataType::Int64),
1754 "datetime"
1760 | "localdatetime"
1761 | "date"
1762 | "time"
1763 | "localtime"
1764 | "duration"
1765 | "date.truncate"
1766 | "time.truncate"
1767 | "datetime.truncate"
1768 | "localdatetime.truncate"
1769 | "localtime.truncate"
1770 | "duration.between"
1771 | "duration.inmonths"
1772 | "duration.indays"
1773 | "duration.inseconds"
1774 | "datetime.fromepoch"
1775 | "datetime.fromepochmillis"
1776 | "datetime.transaction"
1777 | "datetime.statement"
1778 | "datetime.realtime"
1779 | "date.transaction"
1780 | "date.statement"
1781 | "date.realtime"
1782 | "time.transaction"
1783 | "time.statement"
1784 | "time.realtime"
1785 | "localtime.transaction"
1786 | "localtime.statement"
1787 | "localtime.realtime"
1788 | "localdatetime.transaction"
1789 | "localdatetime.statement"
1790 | "localdatetime.realtime"
1791 | "btic" => Ok(DataType::LargeBinary),
1792 _ => Ok(DataType::Utf8),
1793 }
1794 }
1795
1796 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
1797 let func_name = self.name.to_uppercase();
1798 let output_type = self.return_type(&[])?;
1799 invoke_cypher_udf(args, &output_type, |val_args| {
1800 crate::datetime::eval_datetime_function(&func_name, val_args).map_err(|e| {
1801 datafusion::error::DataFusionError::Execution(format!("{}(): {}", self.name, e))
1802 })
1803 })
1804 }
1805}
1806
1807fn create_duration_property_udf() -> ScalarUDF {
1812 ScalarUDF::new_from_impl(DurationPropertyUdf::new())
1813}
1814
1815#[derive(Debug)]
1816struct DurationPropertyUdf {
1817 signature: Signature,
1818}
1819
1820impl DurationPropertyUdf {
1821 fn new() -> Self {
1822 Self {
1823 signature: Signature::new(
1824 TypeSignature::Exact(vec![DataType::Utf8, DataType::Utf8]),
1825 Volatility::Immutable,
1826 ),
1827 }
1828 }
1829}
1830
1831impl_udf_eq_hash!(DurationPropertyUdf);
1832
1833impl ScalarUDFImpl for DurationPropertyUdf {
1834 fn as_any(&self) -> &dyn Any {
1835 self
1836 }
1837
1838 fn name(&self) -> &str {
1839 "_duration_property"
1840 }
1841
1842 fn signature(&self) -> &Signature {
1843 &self.signature
1844 }
1845
1846 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
1847 Ok(DataType::Int64)
1848 }
1849
1850 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
1851 let output_type = self.return_type(&[])?;
1852 invoke_cypher_udf(args, &output_type, |val_args| {
1853 if val_args.len() != 2 {
1854 return Err(datafusion::error::DataFusionError::Execution(
1855 "_duration_property(): requires 2 arguments (duration_string, component)"
1856 .to_string(),
1857 ));
1858 }
1859
1860 let dur_string_owned;
1861 let dur_str = match &val_args[0] {
1862 Value::String(s) => s.as_str(),
1863 Value::Temporal(uni_common::TemporalValue::Duration { .. }) => {
1864 dur_string_owned = val_args[0].to_string();
1865 &dur_string_owned
1866 }
1867 Value::Null => return Ok(Value::Null),
1868 _ => {
1869 return Err(datafusion::error::DataFusionError::Execution(
1870 "_duration_property(): duration must be a string or temporal duration"
1871 .to_string(),
1872 ));
1873 }
1874 };
1875 let component = match &val_args[1] {
1876 Value::String(s) => s,
1877 _ => {
1878 return Err(datafusion::error::DataFusionError::Execution(
1879 "_duration_property(): component must be a string".to_string(),
1880 ));
1881 }
1882 };
1883
1884 crate::datetime::eval_duration_accessor(dur_str, component).map_err(|e| {
1885 datafusion::error::DataFusionError::Execution(format!(
1886 "_duration_property(): {}",
1887 e
1888 ))
1889 })
1890 })
1891 }
1892}
1893
1894fn create_tostring_udf() -> ScalarUDF {
1899 ScalarUDF::new_from_impl(ToStringUdf::new())
1900}
1901
1902#[derive(Debug)]
1903struct ToStringUdf {
1904 signature: Signature,
1905}
1906
1907impl ToStringUdf {
1908 fn new() -> Self {
1909 Self {
1910 signature: Signature::variadic_any(Volatility::Immutable),
1911 }
1912 }
1913}
1914
1915impl_udf_eq_hash!(ToStringUdf);
1916
1917impl ScalarUDFImpl for ToStringUdf {
1918 fn as_any(&self) -> &dyn Any {
1919 self
1920 }
1921
1922 fn name(&self) -> &str {
1923 "tostring"
1924 }
1925
1926 fn signature(&self) -> &Signature {
1927 &self.signature
1928 }
1929
1930 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
1931 Ok(DataType::Utf8)
1932 }
1933
1934 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
1935 let output_type = self.return_type(&[])?;
1936 invoke_cypher_udf(args, &output_type, |val_args| {
1937 if val_args.is_empty() {
1938 return Err(datafusion::error::DataFusionError::Execution(
1939 "toString(): requires 1 argument".to_string(),
1940 ));
1941 }
1942 match &val_args[0] {
1943 Value::Null => Ok(Value::Null),
1944 Value::String(s) => Ok(Value::String(s.clone())),
1945 Value::Int(i) => Ok(Value::String(i.to_string())),
1946 Value::Float(f) => Ok(Value::String(f.to_string())),
1947 Value::Bool(b) => Ok(Value::String(b.to_string())),
1948 Value::Temporal(t) => Ok(Value::String(t.to_string())),
1949 other => {
1950 let type_name = match other {
1951 Value::List(_) => "List",
1952 Value::Map(_) => "Map",
1953 Value::Node { .. } => "Node",
1954 Value::Edge { .. } => "Relationship",
1955 Value::Path { .. } => "Path",
1956 _ => "Unknown",
1957 };
1958 Err(datafusion::error::DataFusionError::Execution(format!(
1959 "TypeError: InvalidArgumentValue - toString() does not accept {} values",
1960 type_name
1961 )))
1962 }
1963 }
1964 })
1965 }
1966}
1967
1968fn create_temporal_property_udf() -> ScalarUDF {
1973 ScalarUDF::new_from_impl(TemporalPropertyUdf::new())
1974}
1975
1976#[derive(Debug)]
1977struct TemporalPropertyUdf {
1978 signature: Signature,
1979}
1980
1981impl TemporalPropertyUdf {
1982 fn new() -> Self {
1983 Self {
1984 signature: Signature::variadic_any(Volatility::Immutable),
1985 }
1986 }
1987}
1988
1989impl_udf_eq_hash!(TemporalPropertyUdf);
1990
1991impl ScalarUDFImpl for TemporalPropertyUdf {
1992 fn as_any(&self) -> &dyn Any {
1993 self
1994 }
1995
1996 fn name(&self) -> &str {
1997 "_temporal_property"
1998 }
1999
2000 fn signature(&self) -> &Signature {
2001 &self.signature
2002 }
2003
2004 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
2005 Ok(DataType::LargeBinary)
2006 }
2007
2008 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
2009 let output_type = self.return_type(&[])?;
2010 invoke_cypher_udf(args, &output_type, |val_args| {
2011 if val_args.len() != 2 {
2012 return Err(datafusion::error::DataFusionError::Execution(
2013 "_temporal_property(): requires 2 arguments (temporal_value, component)"
2014 .to_string(),
2015 ));
2016 }
2017
2018 let component = match &val_args[1] {
2019 Value::String(s) => s.clone(),
2020 _ => {
2021 return Err(datafusion::error::DataFusionError::Execution(
2022 "_temporal_property(): component must be a string".to_string(),
2023 ));
2024 }
2025 };
2026
2027 crate::datetime::eval_temporal_accessor_value(&val_args[0], &component).map_err(|e| {
2028 datafusion::error::DataFusionError::Execution(format!(
2029 "_temporal_property(): {}",
2030 e
2031 ))
2032 })
2033 })
2034 }
2035}
2036
2037macro_rules! downcast_arr {
2040 ($arr:expr, $array_type:ty) => {
2041 $arr.as_any().downcast_ref::<$array_type>().ok_or_else(|| {
2042 datafusion::error::DataFusionError::Execution(format!(
2043 "Failed to downcast to {}",
2044 stringify!($array_type)
2045 ))
2046 })?
2047 };
2048}
2049
2050fn cypher_type_name(val: &Value) -> &'static str {
2052 match val {
2053 Value::Null => "Null",
2054 Value::Bool(_) => "Boolean",
2055 Value::Int(_) => "Integer",
2056 Value::Float(_) => "Float",
2057 Value::String(_) => "String",
2058 Value::Bytes(_) => "Bytes",
2059 Value::List(_) => "List",
2060 Value::Map(_) => "Map",
2061 Value::Node(_) => "Node",
2062 Value::Edge(_) => "Relationship",
2063 Value::Path(_) => "Path",
2064 Value::Vector(_) => "Vector",
2065 Value::Temporal(_) => "Temporal",
2066 _ => "Unknown",
2067 }
2068}
2069
2070fn string_to_value(s: &str) -> Value {
2072 if (s.starts_with('{') || s.starts_with('[') || s.starts_with('"'))
2073 && let Ok(obj) = serde_json::from_str::<serde_json::Value>(s)
2074 {
2075 return Value::from(obj);
2076 }
2077 Value::String(s.to_string())
2078}
2079
2080fn get_value_from_array(
2086 arr: &ArrayRef,
2087 row: usize,
2088 field: Option<&arrow::datatypes::Field>,
2089) -> DFResult<Value> {
2090 if arr.is_null(row) {
2091 return Ok(Value::Null);
2092 }
2093
2094 match arr.data_type() {
2095 DataType::LargeBinary => {
2096 let typed = downcast_arr!(arr, LargeBinaryArray);
2097 let bytes = typed.value(row);
2098 if field.is_some_and(|f| {
2102 f.metadata()
2103 .get("uni_raw_bytes")
2104 .is_some_and(|v| v == "true")
2105 }) {
2106 return Ok(Value::Bytes(bytes.to_vec()));
2107 }
2108 if let Ok(val) = uni_common::cypher_value_codec::decode(bytes) {
2109 return Ok(val);
2110 }
2111 Ok(serde_json::from_slice::<serde_json::Value>(bytes)
2113 .map(Value::from)
2114 .unwrap_or(Value::Null))
2115 }
2116 DataType::Int64 => Ok(Value::Int(downcast_arr!(arr, Int64Array).value(row))),
2117 DataType::Float64 => Ok(Value::Float(downcast_arr!(arr, Float64Array).value(row))),
2118 DataType::Utf8 => Ok(string_to_value(downcast_arr!(arr, StringArray).value(row))),
2119 DataType::LargeUtf8 => Ok(string_to_value(
2120 downcast_arr!(arr, LargeStringArray).value(row),
2121 )),
2122 DataType::Boolean => Ok(Value::Bool(downcast_arr!(arr, BooleanArray).value(row))),
2123 DataType::UInt64 => Ok(Value::Int(downcast_arr!(arr, UInt64Array).value(row) as i64)),
2124 DataType::Int32 => Ok(Value::Int(downcast_arr!(arr, Int32Array).value(row) as i64)),
2125 DataType::Float32 => Ok(Value::Float(
2126 downcast_arr!(arr, Float32Array).value(row) as f64
2127 )),
2128 DataType::List(_) | DataType::LargeList(_) => Ok(
2133 uni_store::storage::arrow_convert::arrow_to_value(arr.as_ref(), row, None),
2134 ),
2135 _ => {
2138 let scalar = ScalarValue::try_from_array(arr, row).map_err(|e| {
2139 datafusion::error::DataFusionError::Execution(format!(
2140 "Cannot extract scalar from array at row {}: {}",
2141 row, e
2142 ))
2143 })?;
2144 scalar_to_value(&scalar)
2145 }
2146 }
2147}
2148
2149fn get_value_args_for_row(
2155 args: &[ColumnarValue],
2156 arg_fields: &[arrow::datatypes::FieldRef],
2157 row: usize,
2158) -> DFResult<Vec<Value>> {
2159 args.iter()
2160 .enumerate()
2161 .map(|(idx, arg)| match arg {
2162 ColumnarValue::Scalar(scalar) => scalar_to_value(scalar),
2163 ColumnarValue::Array(arr) => {
2164 get_value_from_array(arr, row, arg_fields.get(idx).map(|f| f.as_ref()))
2165 }
2166 })
2167 .collect()
2168}
2169
2170fn invoke_cypher_udf<F>(
2172 args: ScalarFunctionArgs,
2173 output_type: &DataType,
2174 f: F,
2175) -> DFResult<ColumnarValue>
2176where
2177 F: Fn(&[Value]) -> DFResult<Value>,
2178{
2179 let len = args
2180 .args
2181 .iter()
2182 .find_map(|arg| match arg {
2183 ColumnarValue::Array(arr) => Some(arr.len()),
2184 _ => None,
2185 })
2186 .unwrap_or(1);
2187
2188 if len == 1
2189 && args
2190 .args
2191 .iter()
2192 .all(|a| matches!(a, ColumnarValue::Scalar(_)))
2193 {
2194 let row_args = get_value_args_for_row(&args.args, &args.arg_fields, 0)?;
2195 let res = f(&row_args)?;
2196 if matches!(output_type, DataType::LargeBinary | DataType::List(_)) {
2197 let arr = values_to_array(&[res], output_type)
2199 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2200 return Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(&arr, 0)?));
2201 }
2202 if res.is_null() {
2204 let typed_null = ScalarValue::try_from(output_type).unwrap_or(ScalarValue::Utf8(None));
2205 return Ok(ColumnarValue::Scalar(typed_null));
2206 }
2207 return value_to_columnar(&res);
2208 }
2209
2210 let mut results = Vec::with_capacity(len);
2211 for i in 0..len {
2212 let row_args = get_value_args_for_row(&args.args, &args.arg_fields, i)?;
2213 results.push(f(&row_args)?);
2214 }
2215
2216 let arr = values_to_array(&results, output_type)
2217 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2218 Ok(ColumnarValue::Array(arr))
2219}
2220
2221fn scalar_arr_to_value(arr: &dyn arrow::array::Array) -> DFResult<Value> {
2224 if arr.is_empty() || arr.is_null(0) {
2225 Ok(Value::Null)
2226 } else {
2227 Ok(uni_store::storage::arrow_convert::arrow_to_value(
2229 arr, 0, None,
2230 ))
2231 }
2232}
2233
2234fn resolve_timezone_offset(tz_name: &str, nanos_utc: i64) -> i32 {
2236 if tz_name == "UTC" || tz_name == "Z" {
2237 return 0;
2238 }
2239 if let Ok(tz) = tz_name.parse::<chrono_tz::Tz>() {
2240 let dt = chrono::DateTime::from_timestamp_nanos(nanos_utc).with_timezone(&tz);
2241 dt.offset().fix().local_minus_utc()
2242 } else {
2243 0
2244 }
2245}
2246
2247fn duration_micros_to_value(micros: i64) -> Value {
2249 let dur = crate::datetime::CypherDuration::from_micros(micros);
2250 Value::Temporal(uni_common::TemporalValue::Duration {
2251 months: dur.months,
2252 days: dur.days,
2253 nanos: dur.nanos,
2254 })
2255}
2256
2257fn timestamp_nanos_to_value(nanos: i64, tz: Option<&Arc<str>>) -> DFResult<Value> {
2259 if let Some(tz_str) = tz {
2260 let offset = resolve_timezone_offset(tz_str.as_ref(), nanos);
2261 let tz_name = if tz_str.as_ref() == "UTC" {
2262 None
2263 } else {
2264 Some(tz_str.to_string())
2265 };
2266 Ok(Value::Temporal(uni_common::TemporalValue::DateTime {
2267 nanos_since_epoch: nanos,
2268 offset_seconds: offset,
2269 timezone_name: tz_name,
2270 }))
2271 } else {
2272 Ok(Value::Temporal(uni_common::TemporalValue::LocalDateTime {
2273 nanos_since_epoch: nanos,
2274 }))
2275 }
2276}
2277
2278pub fn scalar_to_value(scalar: &ScalarValue) -> DFResult<Value> {
2280 match scalar {
2281 ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s)) => {
2282 if (s.starts_with('{') || s.starts_with('[') || s.starts_with('"'))
2285 && let Ok(obj) = serde_json::from_str::<serde_json::Value>(s)
2286 {
2287 return Ok(Value::from(obj));
2288 }
2289 Ok(Value::String(s.clone()))
2290 }
2291 ScalarValue::LargeBinary(Some(b)) => {
2292 if let Ok(val) = uni_common::cypher_value_codec::decode(b) {
2295 return Ok(val);
2296 }
2297 if let Ok(obj) = serde_json::from_slice::<serde_json::Value>(b) {
2298 Ok(Value::from(obj))
2299 } else {
2300 Ok(Value::Null)
2301 }
2302 }
2303 ScalarValue::Int64(Some(i)) => Ok(Value::Int(*i)),
2304 ScalarValue::Int32(Some(i)) => Ok(Value::Int(*i as i64)),
2305 ScalarValue::Float64(Some(f)) => {
2306 Ok(Value::Float(*f))
2308 }
2309 ScalarValue::Boolean(Some(b)) => Ok(Value::Bool(*b)),
2310 ScalarValue::Struct(arr) => scalar_arr_to_value(arr.as_ref()),
2311 ScalarValue::List(arr) => scalar_arr_to_value(arr.as_ref()),
2312 ScalarValue::LargeList(arr) => scalar_arr_to_value(arr.as_ref()),
2313 ScalarValue::FixedSizeList(arr) => scalar_arr_to_value(arr.as_ref()),
2314 ScalarValue::UInt64(Some(u)) => Ok(Value::Int(*u as i64)),
2316 ScalarValue::UInt32(Some(u)) => Ok(Value::Int(*u as i64)),
2317 ScalarValue::UInt16(Some(u)) => Ok(Value::Int(*u as i64)),
2318 ScalarValue::UInt8(Some(u)) => Ok(Value::Int(*u as i64)),
2319 ScalarValue::Int16(Some(i)) => Ok(Value::Int(*i as i64)),
2320 ScalarValue::Int8(Some(i)) => Ok(Value::Int(*i as i64)),
2321
2322 ScalarValue::Date32(Some(days)) => Ok(Value::Temporal(uni_common::TemporalValue::Date {
2324 days_since_epoch: *days,
2325 })),
2326 ScalarValue::Date64(Some(millis)) => {
2327 let days = (*millis / 86_400_000) as i32;
2328 Ok(Value::Temporal(uni_common::TemporalValue::Date {
2329 days_since_epoch: days,
2330 }))
2331 }
2332 ScalarValue::TimestampNanosecond(Some(nanos), tz) => {
2333 timestamp_nanos_to_value(*nanos, tz.as_ref())
2334 }
2335 ScalarValue::TimestampMicrosecond(Some(micros), tz) => {
2336 timestamp_nanos_to_value(*micros * 1_000, tz.as_ref())
2337 }
2338 ScalarValue::TimestampMillisecond(Some(millis), tz) => {
2339 timestamp_nanos_to_value(*millis * 1_000_000, tz.as_ref())
2340 }
2341 ScalarValue::TimestampSecond(Some(secs), tz) => {
2342 timestamp_nanos_to_value(*secs * 1_000_000_000, tz.as_ref())
2343 }
2344 ScalarValue::Time64Nanosecond(Some(nanos)) => {
2345 Ok(Value::Temporal(uni_common::TemporalValue::LocalTime {
2346 nanos_since_midnight: *nanos,
2347 }))
2348 }
2349 ScalarValue::Time64Microsecond(Some(micros)) => {
2350 Ok(Value::Temporal(uni_common::TemporalValue::LocalTime {
2351 nanos_since_midnight: *micros * 1_000,
2352 }))
2353 }
2354 ScalarValue::IntervalMonthDayNano(Some(v)) => {
2355 Ok(Value::Temporal(uni_common::TemporalValue::Duration {
2356 months: v.months as i64,
2357 days: v.days as i64,
2358 nanos: v.nanoseconds,
2359 }))
2360 }
2361 ScalarValue::DurationMicrosecond(Some(micros)) => Ok(duration_micros_to_value(*micros)),
2362 ScalarValue::DurationMillisecond(Some(millis)) => {
2363 Ok(duration_micros_to_value(*millis * 1_000))
2364 }
2365 ScalarValue::DurationSecond(Some(secs)) => Ok(duration_micros_to_value(*secs * 1_000_000)),
2366 ScalarValue::DurationNanosecond(Some(nanos)) => {
2367 Ok(Value::Temporal(uni_common::TemporalValue::Duration {
2368 months: 0,
2369 days: 0,
2370 nanos: *nanos,
2371 }))
2372 }
2373 ScalarValue::Float32(Some(f)) => Ok(Value::Float(*f as f64)),
2374
2375 ScalarValue::FixedSizeBinary(24, Some(bytes)) => {
2377 match uni_btic::encode::decode_slice(bytes) {
2378 Ok(btic) => Ok(Value::Temporal(uni_common::TemporalValue::Btic {
2379 lo: btic.lo(),
2380 hi: btic.hi(),
2381 meta: btic.meta(),
2382 })),
2383 Err(e) => Err(datafusion::error::DataFusionError::Execution(format!(
2384 "BTIC decode error: {e}"
2385 ))),
2386 }
2387 }
2388
2389 ScalarValue::Null
2391 | ScalarValue::Utf8(None)
2392 | ScalarValue::LargeUtf8(None)
2393 | ScalarValue::LargeBinary(None)
2394 | ScalarValue::Int64(None)
2395 | ScalarValue::Int32(None)
2396 | ScalarValue::Int16(None)
2397 | ScalarValue::Int8(None)
2398 | ScalarValue::UInt64(None)
2399 | ScalarValue::UInt32(None)
2400 | ScalarValue::UInt16(None)
2401 | ScalarValue::UInt8(None)
2402 | ScalarValue::Float64(None)
2403 | ScalarValue::Float32(None)
2404 | ScalarValue::Boolean(None)
2405 | ScalarValue::Date32(None)
2406 | ScalarValue::Date64(None)
2407 | ScalarValue::TimestampMicrosecond(None, _)
2408 | ScalarValue::TimestampMillisecond(None, _)
2409 | ScalarValue::TimestampSecond(None, _)
2410 | ScalarValue::TimestampNanosecond(None, _)
2411 | ScalarValue::Time64Microsecond(None)
2412 | ScalarValue::Time64Nanosecond(None)
2413 | ScalarValue::DurationMicrosecond(None)
2414 | ScalarValue::DurationMillisecond(None)
2415 | ScalarValue::DurationSecond(None)
2416 | ScalarValue::DurationNanosecond(None)
2417 | ScalarValue::IntervalMonthDayNano(None)
2418 | ScalarValue::FixedSizeBinary(_, None) => Ok(Value::Null),
2419 other => Err(datafusion::error::DataFusionError::Execution(format!(
2420 "scalar_to_value(): unsupported scalar type {other:?}"
2421 ))),
2422 }
2423}
2424
2425fn value_to_columnar(val: &Value) -> DFResult<ColumnarValue> {
2427 let scalar = match val {
2428 Value::String(s) => ScalarValue::Utf8(Some(s.clone())),
2429 Value::Int(i) => ScalarValue::Int64(Some(*i)),
2430 Value::Float(f) => ScalarValue::Float64(Some(*f)),
2431 Value::Bool(b) => ScalarValue::Boolean(Some(*b)),
2432 Value::Null => ScalarValue::Utf8(None),
2433 Value::Temporal(tv) => {
2434 use uni_common::TemporalValue;
2435 match tv {
2436 TemporalValue::Date { days_since_epoch } => {
2437 ScalarValue::Date32(Some(*days_since_epoch))
2438 }
2439 TemporalValue::LocalTime {
2440 nanos_since_midnight,
2441 } => ScalarValue::Time64Nanosecond(Some(*nanos_since_midnight)),
2442 TemporalValue::Time {
2443 nanos_since_midnight,
2444 ..
2445 } => ScalarValue::Time64Nanosecond(Some(*nanos_since_midnight)),
2446 TemporalValue::LocalDateTime { nanos_since_epoch } => {
2447 ScalarValue::TimestampNanosecond(Some(*nanos_since_epoch), None)
2448 }
2449 TemporalValue::DateTime {
2450 nanos_since_epoch,
2451 timezone_name,
2452 ..
2453 } => {
2454 let tz = timezone_name.as_deref().unwrap_or("UTC");
2455 ScalarValue::TimestampNanosecond(Some(*nanos_since_epoch), Some(tz.into()))
2456 }
2457 TemporalValue::Duration {
2458 months,
2459 days,
2460 nanos,
2461 } => ScalarValue::IntervalMonthDayNano(Some(
2462 arrow::datatypes::IntervalMonthDayNano {
2463 months: *months as i32,
2464 days: *days as i32,
2465 nanoseconds: *nanos,
2466 },
2467 )),
2468 TemporalValue::Btic { lo, hi, meta } => {
2469 let btic = uni_btic::Btic::new(*lo, *hi, *meta).map_err(|e| {
2470 datafusion::error::DataFusionError::Execution(format!("invalid BTIC: {e}"))
2471 })?;
2472 let packed = uni_btic::encode::encode(&btic);
2473 ScalarValue::FixedSizeBinary(24, Some(packed.to_vec()))
2474 }
2475 }
2476 }
2477 other => {
2478 return Err(datafusion::error::DataFusionError::Execution(format!(
2479 "value_to_columnar(): unsupported type {other:?}"
2480 )));
2481 }
2482 };
2483 Ok(ColumnarValue::Scalar(scalar))
2484}
2485
2486pub fn create_has_null_udf() -> ScalarUDF {
2492 ScalarUDF::new_from_impl(HasNullUdf::new())
2493}
2494
2495#[derive(Debug)]
2496struct HasNullUdf {
2497 signature: Signature,
2498}
2499
2500impl HasNullUdf {
2501 fn new() -> Self {
2502 Self {
2503 signature: Signature::any(1, Volatility::Immutable),
2504 }
2505 }
2506}
2507
2508impl_udf_eq_hash!(HasNullUdf);
2509
2510impl ScalarUDFImpl for HasNullUdf {
2511 fn as_any(&self) -> &dyn Any {
2512 self
2513 }
2514
2515 fn name(&self) -> &str {
2516 "_has_null"
2517 }
2518
2519 fn signature(&self) -> &Signature {
2520 &self.signature
2521 }
2522
2523 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
2524 Ok(DataType::Boolean)
2525 }
2526
2527 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
2528 if args.args.len() != 1 {
2529 return Err(datafusion::error::DataFusionError::Execution(
2530 "_has_null(): requires 1 argument".to_string(),
2531 ));
2532 }
2533
2534 fn check_list_nulls<T: arrow_array::OffsetSizeTrait>(
2536 arr: &arrow_array::GenericListArray<T>,
2537 idx: usize,
2538 ) -> bool {
2539 if arr.is_null(idx) || arr.is_empty() {
2540 false
2541 } else {
2542 arr.value(idx).null_count() > 0
2543 }
2544 }
2545
2546 match &args.args[0] {
2547 ColumnarValue::Scalar(scalar) => {
2548 let has_null = match scalar {
2549 ScalarValue::List(arr) => arr
2550 .as_any()
2551 .downcast_ref::<arrow::array::ListArray>()
2552 .map(|a| !a.is_empty() && a.value(0).null_count() > 0)
2553 .unwrap_or(arr.null_count() > 0),
2554 ScalarValue::LargeList(arr) => arr.len() > 0 && arr.value(0).null_count() > 0,
2555 ScalarValue::FixedSizeList(arr) => {
2556 arr.len() > 0 && arr.value(0).null_count() > 0
2557 }
2558 _ => false,
2559 };
2560 Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(has_null))))
2561 }
2562 ColumnarValue::Array(arr) => {
2563 use arrow_array::{LargeListArray, ListArray};
2564
2565 let results: arrow::array::BooleanArray =
2566 if let Some(list_arr) = arr.as_any().downcast_ref::<ListArray>() {
2567 (0..list_arr.len())
2568 .map(|i| {
2569 if list_arr.is_null(i) {
2570 None
2571 } else {
2572 Some(check_list_nulls(list_arr, i))
2573 }
2574 })
2575 .collect()
2576 } else if let Some(large) = arr.as_any().downcast_ref::<LargeListArray>() {
2577 (0..large.len())
2578 .map(|i| {
2579 if large.is_null(i) {
2580 None
2581 } else {
2582 Some(check_list_nulls(large, i))
2583 }
2584 })
2585 .collect()
2586 } else {
2587 return Err(datafusion::error::DataFusionError::Execution(
2588 "_has_null(): requires list array".to_string(),
2589 ));
2590 };
2591 Ok(ColumnarValue::Array(Arc::new(results)))
2592 }
2593 }
2594 }
2595}
2596
2597pub fn create_to_integer_udf() -> ScalarUDF {
2602 ScalarUDF::new_from_impl(ToIntegerUdf::new())
2603}
2604
2605#[derive(Debug)]
2606struct ToIntegerUdf {
2607 signature: Signature,
2608}
2609
2610impl ToIntegerUdf {
2611 fn new() -> Self {
2612 Self {
2613 signature: Signature::any(1, Volatility::Immutable),
2614 }
2615 }
2616}
2617
2618impl_udf_eq_hash!(ToIntegerUdf);
2619
2620impl ScalarUDFImpl for ToIntegerUdf {
2621 fn as_any(&self) -> &dyn Any {
2622 self
2623 }
2624
2625 fn name(&self) -> &str {
2626 "tointeger"
2627 }
2628
2629 fn signature(&self) -> &Signature {
2630 &self.signature
2631 }
2632
2633 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
2634 Ok(DataType::Int64)
2635 }
2636
2637 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
2638 let output_type = self.return_type(&[])?;
2639 invoke_cypher_udf(args, &output_type, |val_args| {
2640 if val_args.is_empty() {
2641 return Err(datafusion::error::DataFusionError::Execution(
2642 "tointeger(): requires 1 argument".to_string(),
2643 ));
2644 }
2645
2646 let val = &val_args[0];
2647 match val {
2648 Value::Int(i) => Ok(Value::Int(*i)),
2649 Value::Float(f) => Ok(Value::Int(*f as i64)),
2650 Value::String(s) => {
2651 if let Ok(i) = s.parse::<i64>() {
2652 Ok(Value::Int(i))
2653 } else if let Ok(f) = s.parse::<f64>() {
2654 Ok(Value::Int(f as i64))
2655 } else {
2656 Ok(Value::Null)
2657 }
2658 }
2659 Value::Null => Ok(Value::Null),
2660 other => Err(datafusion::error::DataFusionError::Execution(format!(
2661 "InvalidArgumentValue: tointeger(): cannot convert {} to integer",
2662 cypher_type_name(other)
2663 ))),
2664 }
2665 })
2666 }
2667}
2668
2669pub fn create_to_float_udf() -> ScalarUDF {
2674 ScalarUDF::new_from_impl(ToFloatUdf::new())
2675}
2676
2677#[derive(Debug)]
2678struct ToFloatUdf {
2679 signature: Signature,
2680}
2681
2682impl ToFloatUdf {
2683 fn new() -> Self {
2684 Self {
2685 signature: Signature::any(1, Volatility::Immutable),
2686 }
2687 }
2688}
2689
2690impl_udf_eq_hash!(ToFloatUdf);
2691
2692impl ScalarUDFImpl for ToFloatUdf {
2693 fn as_any(&self) -> &dyn Any {
2694 self
2695 }
2696
2697 fn name(&self) -> &str {
2698 "tofloat"
2699 }
2700
2701 fn signature(&self) -> &Signature {
2702 &self.signature
2703 }
2704
2705 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
2706 Ok(DataType::Float64)
2707 }
2708
2709 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
2710 let output_type = self.return_type(&[])?;
2711 invoke_cypher_udf(args, &output_type, |val_args| {
2712 if val_args.is_empty() {
2713 return Err(datafusion::error::DataFusionError::Execution(
2714 "tofloat(): requires 1 argument".to_string(),
2715 ));
2716 }
2717
2718 let val = &val_args[0];
2719 match val {
2720 Value::Int(i) => Ok(Value::Float(*i as f64)),
2721 Value::Float(f) => Ok(Value::Float(*f)),
2722 Value::String(s) => {
2723 if let Ok(f) = s.parse::<f64>() {
2724 Ok(Value::Float(f))
2725 } else {
2726 Ok(Value::Null)
2727 }
2728 }
2729 Value::Null => Ok(Value::Null),
2730 other => Err(datafusion::error::DataFusionError::Execution(format!(
2731 "InvalidArgumentValue: tofloat(): cannot convert {} to float",
2732 cypher_type_name(other)
2733 ))),
2734 }
2735 })
2736 }
2737}
2738
2739pub fn create_to_boolean_udf() -> ScalarUDF {
2744 ScalarUDF::new_from_impl(ToBooleanUdf::new())
2745}
2746
2747#[derive(Debug)]
2748struct ToBooleanUdf {
2749 signature: Signature,
2750}
2751
2752impl ToBooleanUdf {
2753 fn new() -> Self {
2754 Self {
2755 signature: Signature::any(1, Volatility::Immutable),
2756 }
2757 }
2758}
2759
2760impl_udf_eq_hash!(ToBooleanUdf);
2761
2762impl ScalarUDFImpl for ToBooleanUdf {
2763 fn as_any(&self) -> &dyn Any {
2764 self
2765 }
2766
2767 fn name(&self) -> &str {
2768 "toboolean"
2769 }
2770
2771 fn signature(&self) -> &Signature {
2772 &self.signature
2773 }
2774
2775 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
2776 Ok(DataType::Boolean)
2777 }
2778
2779 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
2780 let output_type = self.return_type(&[])?;
2781 invoke_cypher_udf(args, &output_type, |val_args| {
2782 if val_args.is_empty() {
2783 return Err(datafusion::error::DataFusionError::Execution(
2784 "toboolean(): requires 1 argument".to_string(),
2785 ));
2786 }
2787
2788 let val = &val_args[0];
2789 match val {
2790 Value::Bool(b) => Ok(Value::Bool(*b)),
2791 Value::String(s) => {
2792 let s_lower = s.to_lowercase();
2793 if s_lower == "true" {
2794 Ok(Value::Bool(true))
2795 } else if s_lower == "false" {
2796 Ok(Value::Bool(false))
2797 } else {
2798 Ok(Value::Null)
2799 }
2800 }
2801 Value::Null => Ok(Value::Null),
2802 Value::Int(i) => Ok(Value::Bool(*i != 0)),
2803 other => Err(datafusion::error::DataFusionError::Execution(format!(
2804 "InvalidArgumentValue: toboolean(): cannot convert {} to boolean",
2805 cypher_type_name(other)
2806 ))),
2807 }
2808 })
2809 }
2810}
2811
2812pub fn create_cypher_sort_key_udf() -> ScalarUDF {
2819 ScalarUDF::new_from_impl(CypherSortKeyUdf::new())
2820}
2821
2822#[derive(Debug)]
2823struct CypherSortKeyUdf {
2824 signature: Signature,
2825}
2826
2827impl CypherSortKeyUdf {
2828 fn new() -> Self {
2829 Self {
2830 signature: Signature::any(1, Volatility::Immutable),
2831 }
2832 }
2833}
2834
2835impl_udf_eq_hash!(CypherSortKeyUdf);
2836
2837impl ScalarUDFImpl for CypherSortKeyUdf {
2838 fn as_any(&self) -> &dyn Any {
2839 self
2840 }
2841
2842 fn name(&self) -> &str {
2843 "_cypher_sort_key"
2844 }
2845
2846 fn signature(&self) -> &Signature {
2847 &self.signature
2848 }
2849
2850 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
2851 Ok(DataType::LargeBinary)
2852 }
2853
2854 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
2855 if args.args.len() != 1 {
2856 return Err(datafusion::error::DataFusionError::Execution(
2857 "_cypher_sort_key(): requires 1 argument".to_string(),
2858 ));
2859 }
2860
2861 let arg = &args.args[0];
2862 match arg {
2863 ColumnarValue::Scalar(s) => {
2864 let val = if s.is_null() {
2865 Value::Null
2866 } else {
2867 scalar_to_value(s)?
2868 };
2869 let key = encode_cypher_sort_key(&val);
2870 Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(Some(key))))
2871 }
2872 ColumnarValue::Array(arr) => {
2873 let field = args.arg_fields.first().map(|f| f.as_ref());
2874 let mut keys: Vec<Option<Vec<u8>>> = Vec::with_capacity(arr.len());
2875 for i in 0..arr.len() {
2876 let val = if arr.is_null(i) {
2877 Value::Null
2878 } else {
2879 get_value_from_array(arr, i, field)?
2880 };
2881 keys.push(Some(encode_cypher_sort_key(&val)));
2882 }
2883 let array = LargeBinaryArray::from(
2884 keys.iter()
2885 .map(|k| k.as_deref())
2886 .collect::<Vec<Option<&[u8]>>>(),
2887 );
2888 Ok(ColumnarValue::Array(Arc::new(array)))
2889 }
2890 }
2891 }
2892}
2893
2894pub fn encode_cypher_sort_key(value: &Value) -> Vec<u8> {
2900 let mut buf = Vec::with_capacity(32);
2901 encode_sort_key_to_buf(value, &mut buf);
2902 buf
2903}
2904
2905fn encode_sort_key_to_buf(value: &Value, buf: &mut Vec<u8>) {
2907 if let Value::Map(map) = value {
2909 if let Some(tv) = sort_key_map_as_temporal(map) {
2910 buf.push(0x07); encode_temporal_payload(&tv, buf);
2912 return;
2913 }
2914 let rank = sort_key_map_rank(map);
2915 if rank != 0 {
2916 buf.push(rank);
2918 match rank {
2919 0x01 => encode_map_as_node_payload(map, buf),
2920 0x02 => encode_map_as_edge_payload(map, buf),
2921 0x04 => encode_map_as_path_payload(map, buf),
2922 _ => {} }
2924 return;
2925 }
2926 }
2927
2928 if let Value::String(s) = value {
2930 if let Some(tv) = sort_key_string_as_temporal(s) {
2931 buf.push(0x07); encode_temporal_payload(&tv, buf);
2933 return;
2934 }
2935 if let Some(temporal_type) = crate::datetime::classify_temporal(s) {
2938 buf.push(0x07); if encode_wide_temporal_sort_key(s, temporal_type, buf) {
2940 return;
2941 }
2942 buf.pop();
2944 }
2945 }
2946
2947 let rank = sort_key_type_rank(value);
2948 buf.push(rank);
2949
2950 match value {
2951 Value::Null => {} Value::Float(f) if f.is_nan() => {} Value::Bool(b) => buf.push(if *b { 0x01 } else { 0x00 }),
2954 Value::Int(i) => {
2955 let f = *i as f64;
2956 buf.extend_from_slice(&encode_order_preserving_f64(f));
2957 }
2958 Value::Float(f) => {
2959 buf.extend_from_slice(&encode_order_preserving_f64(*f));
2960 }
2961 Value::String(s) => {
2962 byte_stuff_terminate(s.as_bytes(), buf);
2963 }
2964 Value::Temporal(tv) => {
2965 encode_temporal_payload(tv, buf);
2966 }
2967 Value::List(items) => {
2968 encode_list_payload(items, buf);
2969 }
2970 Value::Map(map) => {
2971 encode_map_payload(map, buf);
2972 }
2973 Value::Node(node) => {
2974 encode_node_payload(node, buf);
2975 }
2976 Value::Edge(edge) => {
2977 encode_edge_payload(edge, buf);
2978 }
2979 Value::Path(path) => {
2980 encode_path_payload(path, buf);
2981 }
2982 Value::Bytes(b) => {
2984 byte_stuff_terminate(b, buf);
2985 }
2986 Value::Vector(v) => {
2987 for f in v {
2988 buf.extend_from_slice(&encode_order_preserving_f64(*f as f64));
2989 }
2990 }
2991 _ => {} }
2993}
2994
2995fn sort_key_type_rank(v: &Value) -> u8 {
2999 match v {
3000 Value::Map(map) => sort_key_map_rank(map),
3001 Value::Node(_) => 0x01,
3002 Value::Edge(_) => 0x02,
3003 Value::List(_) => 0x03,
3004 Value::Path(_) => 0x04,
3005 Value::String(_) => 0x05,
3006 Value::Bool(_) => 0x06,
3007 Value::Temporal(_) => 0x07,
3008 Value::Int(_) => 0x08,
3009 Value::Float(f) if f.is_nan() => 0x09,
3010 Value::Float(_) => 0x08,
3011 Value::Null => 0x0A,
3012 Value::Bytes(_) | Value::Vector(_) => 0x0B,
3013 _ => 0x0B, }
3015}
3016
3017fn sort_key_map_rank(map: &std::collections::HashMap<String, Value>) -> u8 {
3019 if sort_key_map_as_temporal(map).is_some() {
3020 0x07
3021 } else if map.contains_key("nodes")
3022 && (map.contains_key("relationships") || map.contains_key("edges"))
3023 {
3024 0x04 } else if map.contains_key("_eid")
3026 || map.contains_key("_src")
3027 || map.contains_key("_dst")
3028 || map.contains_key("_type")
3029 || map.contains_key("_type_name")
3030 {
3031 0x02 } else if map.contains_key("_vid") || map.contains_key("_labels") || map.contains_key("_label")
3033 {
3034 0x01 } else {
3036 0x00 }
3038}
3039
3040fn sort_key_map_as_temporal(
3044 map: &std::collections::HashMap<String, Value>,
3045) -> Option<uni_common::TemporalValue> {
3046 super::expr_eval::temporal_from_map_wrapper(map)
3047}
3048
3049fn sort_key_string_as_temporal(s: &str) -> Option<uni_common::TemporalValue> {
3053 super::expr_eval::temporal_from_value(&Value::String(s.to_string()))
3054}
3055
3056fn encode_wide_temporal_sort_key(
3063 s: &str,
3064 temporal_type: uni_common::TemporalType,
3065 buf: &mut Vec<u8>,
3066) -> bool {
3067 match temporal_type {
3068 uni_common::TemporalType::LocalDateTime => {
3069 if let Some(ndt) = parse_naive_datetime(s) {
3070 buf.push(0x03); let wide_nanos = naive_datetime_to_wide_nanos(&ndt);
3072 buf.extend_from_slice(&encode_order_preserving_i128(wide_nanos));
3073 return true;
3074 }
3075 false
3076 }
3077 uni_common::TemporalType::DateTime => {
3078 let base = if let Some(bracket_pos) = s.find('[') {
3080 &s[..bracket_pos]
3081 } else {
3082 s
3083 };
3084 if let Ok(dt) = chrono::DateTime::parse_from_str(base, "%Y-%m-%dT%H:%M:%S%.f%:z") {
3085 buf.push(0x04); let utc = dt.naive_utc();
3087 let wide_nanos = naive_datetime_to_wide_nanos(&utc);
3088 buf.extend_from_slice(&encode_order_preserving_i128(wide_nanos));
3089 return true;
3090 }
3091 if let Ok(dt) = chrono::DateTime::parse_from_str(base, "%Y-%m-%dT%H:%M:%S%:z") {
3092 buf.push(0x04); let utc = dt.naive_utc();
3094 let wide_nanos = naive_datetime_to_wide_nanos(&utc);
3095 buf.extend_from_slice(&encode_order_preserving_i128(wide_nanos));
3096 return true;
3097 }
3098 false
3099 }
3100 uni_common::TemporalType::Date => {
3101 if let Ok(nd) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d")
3102 && let Some(epoch) = chrono::NaiveDate::from_ymd_opt(1970, 1, 1)
3103 {
3104 buf.push(0x00); let days = nd.signed_duration_since(epoch).num_days() as i32;
3106 buf.extend_from_slice(&encode_order_preserving_i32(days));
3107 return true;
3108 }
3109 false
3110 }
3111 _ => false,
3112 }
3113}
3114
3115fn parse_naive_datetime(s: &str) -> Option<chrono::NaiveDateTime> {
3117 chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f")
3118 .ok()
3119 .or_else(|| chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S").ok())
3120}
3121
3122fn naive_datetime_to_wide_nanos(ndt: &chrono::NaiveDateTime) -> i128 {
3125 let secs = ndt.and_utc().timestamp() as i128;
3126 let subsec_nanos = ndt.and_utc().timestamp_subsec_nanos() as i128;
3127 secs * 1_000_000_000 + subsec_nanos
3128}
3129
3130fn encode_map_as_node_payload(map: &std::collections::HashMap<String, Value>, buf: &mut Vec<u8>) {
3132 let mut labels: Vec<String> = Vec::new();
3134 if let Some(Value::List(lbls)) = map.get("_labels") {
3135 for l in lbls {
3136 if let Value::String(s) = l {
3137 labels.push(s.clone());
3138 }
3139 }
3140 } else if let Some(Value::String(lbl)) = map.get("_label") {
3141 labels.push(lbl.clone());
3142 }
3143 labels.sort();
3144
3145 let vid = map.get("_vid").and_then(|v| v.as_i64()).unwrap_or(0) as u64;
3147
3148 let labels_joined = labels.join("\x01");
3150 byte_stuff_terminate(labels_joined.as_bytes(), buf);
3151
3152 buf.extend_from_slice(&vid.to_be_bytes());
3154
3155 let mut props: std::collections::HashMap<String, Value> = std::collections::HashMap::new();
3157 for (k, v) in map {
3158 if !k.starts_with('_') {
3159 props.insert(k.clone(), v.clone());
3160 }
3161 }
3162 encode_map_payload(&props, buf);
3163}
3164
3165fn encode_map_as_edge_payload(map: &std::collections::HashMap<String, Value>, buf: &mut Vec<u8>) {
3167 let edge_type = map
3168 .get("_type")
3169 .or_else(|| map.get("_type_name"))
3170 .and_then(|v| {
3171 if let Value::String(s) = v {
3172 Some(s.as_str())
3173 } else {
3174 None
3175 }
3176 })
3177 .unwrap_or("");
3178
3179 byte_stuff_terminate(edge_type.as_bytes(), buf);
3180
3181 let src = map.get("_src").and_then(|v| v.as_i64()).unwrap_or(0) as u64;
3182 let dst = map.get("_dst").and_then(|v| v.as_i64()).unwrap_or(0) as u64;
3183 let eid = map.get("_eid").and_then(|v| v.as_i64()).unwrap_or(0) as u64;
3184
3185 buf.extend_from_slice(&src.to_be_bytes());
3186 buf.extend_from_slice(&dst.to_be_bytes());
3187 buf.extend_from_slice(&eid.to_be_bytes());
3188
3189 let mut props: std::collections::HashMap<String, Value> = std::collections::HashMap::new();
3191 for (k, v) in map {
3192 if !k.starts_with('_') {
3193 props.insert(k.clone(), v.clone());
3194 }
3195 }
3196 encode_map_payload(&props, buf);
3197}
3198
3199fn encode_map_as_path_payload(map: &std::collections::HashMap<String, Value>, buf: &mut Vec<u8>) {
3201 if let Some(Value::List(nodes)) = map.get("nodes") {
3203 encode_list_payload(nodes, buf);
3204 } else {
3205 buf.push(0x00); }
3207 let edges = map.get("relationships").or_else(|| map.get("edges"));
3209 if let Some(Value::List(edges)) = edges {
3210 encode_list_payload(edges, buf);
3211 } else {
3212 buf.push(0x00); }
3214}
3215
3216fn encode_order_preserving_f64(f: f64) -> [u8; 8] {
3223 let bits = f.to_bits();
3224 let encoded = if bits >> 63 == 1 {
3225 !bits
3227 } else {
3228 bits ^ (1u64 << 63)
3230 };
3231 encoded.to_be_bytes()
3232}
3233
3234fn encode_order_preserving_i64(i: i64) -> [u8; 8] {
3236 ((i as u64) ^ (1u64 << 63)).to_be_bytes()
3238}
3239
3240fn encode_order_preserving_i32(i: i32) -> [u8; 4] {
3242 ((i as u32) ^ (1u32 << 31)).to_be_bytes()
3243}
3244
3245fn encode_order_preserving_i128(i: i128) -> [u8; 16] {
3247 ((i as u128) ^ (1u128 << 127)).to_be_bytes()
3248}
3249
3250fn byte_stuff_terminate(data: &[u8], buf: &mut Vec<u8>) {
3255 byte_stuff(data, buf);
3256 buf.push(0x00);
3257 buf.push(0x00);
3258}
3259
3260fn byte_stuff(data: &[u8], buf: &mut Vec<u8>) {
3262 for &b in data {
3263 buf.push(b);
3264 if b == 0x00 {
3265 buf.push(0xFF);
3266 }
3267 }
3268}
3269
3270fn encode_list_payload(items: &[Value], buf: &mut Vec<u8>) {
3275 for item in items {
3276 buf.push(0x01); let elem_key = encode_cypher_sort_key(item);
3278 byte_stuff_terminate(&elem_key, buf);
3279 }
3280 buf.push(0x00); }
3282
3283fn encode_map_payload(map: &std::collections::HashMap<String, Value>, buf: &mut Vec<u8>) {
3285 let mut pairs: Vec<(&String, &Value)> = map.iter().collect();
3286 pairs.sort_by_key(|(k, _)| *k);
3287
3288 for (key, value) in pairs {
3289 buf.push(0x01); byte_stuff_terminate(key.as_bytes(), buf);
3291 let val_key = encode_cypher_sort_key(value);
3292 byte_stuff_terminate(&val_key, buf);
3293 }
3294 buf.push(0x00); }
3296
3297fn encode_node_payload(node: &uni_common::Node, buf: &mut Vec<u8>) {
3301 let mut labels = node.labels.clone();
3302 labels.sort();
3303 let labels_joined = labels.join("\x01");
3304 byte_stuff_terminate(labels_joined.as_bytes(), buf);
3305
3306 buf.extend_from_slice(&node.vid.as_u64().to_be_bytes());
3307
3308 encode_map_payload(&node.properties, buf);
3309}
3310
3311fn encode_edge_payload(edge: &uni_common::Edge, buf: &mut Vec<u8>) {
3315 byte_stuff_terminate(edge.edge_type.as_bytes(), buf);
3316
3317 buf.extend_from_slice(&edge.src.as_u64().to_be_bytes());
3318 buf.extend_from_slice(&edge.dst.as_u64().to_be_bytes());
3319 buf.extend_from_slice(&edge.eid.as_u64().to_be_bytes());
3320
3321 encode_map_payload(&edge.properties, buf);
3322}
3323
3324fn encode_path_payload(path: &uni_common::Path, buf: &mut Vec<u8>) {
3328 for node in &path.nodes {
3330 buf.push(0x01); let mut node_key = Vec::new();
3332 node_key.push(0x01); encode_node_payload(node, &mut node_key);
3334 byte_stuff_terminate(&node_key, buf);
3335 }
3336 buf.push(0x00); for edge in &path.edges {
3340 buf.push(0x01); let mut edge_key = Vec::new();
3342 edge_key.push(0x02); encode_edge_payload(edge, &mut edge_key);
3344 byte_stuff_terminate(&edge_key, buf);
3345 }
3346 buf.push(0x00); }
3348
3349fn encode_temporal_payload(tv: &uni_common::TemporalValue, buf: &mut Vec<u8>) {
3351 match tv {
3352 uni_common::TemporalValue::Date { days_since_epoch } => {
3353 buf.push(0x00); buf.extend_from_slice(&encode_order_preserving_i32(*days_since_epoch));
3355 }
3356 uni_common::TemporalValue::LocalTime {
3357 nanos_since_midnight,
3358 } => {
3359 buf.push(0x01); buf.extend_from_slice(&encode_order_preserving_i64(*nanos_since_midnight));
3361 }
3362 uni_common::TemporalValue::Time {
3363 nanos_since_midnight,
3364 offset_seconds,
3365 } => {
3366 buf.push(0x02); let utc_nanos =
3368 *nanos_since_midnight as i128 - (*offset_seconds as i128) * 1_000_000_000;
3369 buf.extend_from_slice(&encode_order_preserving_i128(utc_nanos));
3370 }
3371 uni_common::TemporalValue::LocalDateTime { nanos_since_epoch } => {
3372 buf.push(0x03); buf.extend_from_slice(&encode_order_preserving_i128(*nanos_since_epoch as i128));
3375 }
3376 uni_common::TemporalValue::DateTime {
3377 nanos_since_epoch, ..
3378 } => {
3379 buf.push(0x04); buf.extend_from_slice(&encode_order_preserving_i128(*nanos_since_epoch as i128));
3382 }
3383 uni_common::TemporalValue::Duration {
3384 months,
3385 days,
3386 nanos,
3387 } => {
3388 buf.push(0x05); buf.extend_from_slice(&encode_order_preserving_i64(*months));
3390 buf.extend_from_slice(&encode_order_preserving_i64(*days));
3391 buf.extend_from_slice(&encode_order_preserving_i64(*nanos));
3392 }
3393 uni_common::TemporalValue::Btic { lo, hi, meta } => {
3394 buf.push(0x06); if let Ok(btic) = uni_btic::Btic::new(*lo, *hi, *meta) {
3397 buf.extend_from_slice(&uni_btic::encode::encode(&btic));
3398 } else {
3399 buf.extend_from_slice(&encode_order_preserving_i64(*lo));
3400 buf.extend_from_slice(&encode_order_preserving_i64(*hi));
3401 }
3402 }
3403 }
3404}
3405
3406#[derive(Debug)]
3415struct BticScalarUdf {
3416 name: String,
3417 signature: Signature,
3418 return_type: DataType,
3419}
3420
3421impl BticScalarUdf {
3422 fn new(name: &str, num_args: usize, return_type: DataType) -> Self {
3423 Self {
3424 name: name.to_string(),
3425 signature: Signature::new(TypeSignature::Any(num_args), Volatility::Immutable),
3426 return_type,
3427 }
3428 }
3429}
3430
3431impl_udf_eq_hash!(BticScalarUdf);
3432
3433impl ScalarUDFImpl for BticScalarUdf {
3434 fn as_any(&self) -> &dyn Any {
3435 self
3436 }
3437 fn name(&self) -> &str {
3438 &self.name
3439 }
3440 fn signature(&self) -> &Signature {
3441 &self.signature
3442 }
3443 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
3444 Ok(self.return_type.clone())
3445 }
3446 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
3447 let fname = self.name.to_uppercase();
3448 let rt = self.return_type.clone();
3449 invoke_cypher_udf(args, &rt, |val_args| {
3450 crate::expr_eval::eval_btic_function(&fname, val_args)
3451 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
3452 })
3453 }
3454}
3455
3456fn register_btic_scalar_udfs(ctx: &SessionContext) -> DFResult<()> {
3458 for name in &["btic_lo", "btic_hi"] {
3460 ctx.register_udf(ScalarUDF::new_from_impl(BticScalarUdf::new(
3461 name,
3462 1,
3463 DataType::LargeBinary,
3464 )));
3465 }
3466 ctx.register_udf(ScalarUDF::new_from_impl(BticScalarUdf::new(
3468 "btic_duration",
3469 1,
3470 DataType::Int64,
3471 )));
3472 for name in &["btic_is_instant", "btic_is_unbounded", "btic_is_finite"] {
3474 ctx.register_udf(ScalarUDF::new_from_impl(BticScalarUdf::new(
3475 name,
3476 1,
3477 DataType::Boolean,
3478 )));
3479 }
3480 for name in &[
3482 "btic_granularity",
3483 "btic_lo_granularity",
3484 "btic_hi_granularity",
3485 "btic_certainty",
3486 "btic_lo_certainty",
3487 "btic_hi_certainty",
3488 ] {
3489 ctx.register_udf(ScalarUDF::new_from_impl(BticScalarUdf::new(
3490 name,
3491 1,
3492 DataType::Utf8,
3493 )));
3494 }
3495 for name in &[
3497 "btic_contains_point",
3498 "btic_overlaps",
3499 "btic_contains",
3500 "btic_before",
3501 "btic_after",
3502 "btic_meets",
3503 "btic_adjacent",
3504 "btic_disjoint",
3505 "btic_equals",
3506 "btic_starts",
3507 "btic_during",
3508 "btic_finishes",
3509 ] {
3510 ctx.register_udf(ScalarUDF::new_from_impl(BticScalarUdf::new(
3511 name,
3512 2,
3513 DataType::Boolean,
3514 )));
3515 }
3516 for name in &["btic_intersection", "btic_span", "btic_gap"] {
3518 ctx.register_udf(ScalarUDF::new_from_impl(BticScalarUdf::new(
3519 name,
3520 2,
3521 DataType::LargeBinary,
3522 )));
3523 }
3524 Ok(())
3525}
3526
3527#[derive(Debug, Clone)]
3533struct BticMinMaxUdaf {
3534 name: String,
3535 signature: Signature,
3536 is_max: bool,
3537}
3538
3539impl BticMinMaxUdaf {
3540 fn new(is_max: bool) -> Self {
3541 Self {
3542 name: (if is_max { "btic_max" } else { "btic_min" }).to_string(),
3543 signature: Signature::new(TypeSignature::Any(1), Volatility::Immutable),
3544 is_max,
3545 }
3546 }
3547}
3548
3549impl_udf_eq_hash!(BticMinMaxUdaf);
3550
3551impl AggregateUDFImpl for BticMinMaxUdaf {
3552 fn as_any(&self) -> &dyn Any {
3553 self
3554 }
3555 fn name(&self) -> &str {
3556 &self.name
3557 }
3558 fn signature(&self) -> &Signature {
3559 &self.signature
3560 }
3561 fn return_type(&self, _args: &[DataType]) -> DFResult<DataType> {
3562 Ok(DataType::LargeBinary)
3563 }
3564 fn accumulator(
3565 &self,
3566 _acc_args: datafusion::logical_expr::function::AccumulatorArgs,
3567 ) -> DFResult<Box<dyn DfAccumulator>> {
3568 Ok(Box::new(BticMinMaxAccumulator {
3569 current: None,
3570 is_max: self.is_max,
3571 }))
3572 }
3573 fn state_fields(
3574 &self,
3575 args: datafusion::logical_expr::function::StateFieldsArgs,
3576 ) -> DFResult<Vec<Arc<arrow::datatypes::Field>>> {
3577 Ok(vec![Arc::new(arrow::datatypes::Field::new(
3578 args.name,
3579 DataType::LargeBinary,
3580 true,
3581 ))])
3582 }
3583}
3584
3585#[derive(Debug)]
3586struct BticMinMaxAccumulator {
3587 current: Option<uni_btic::Btic>,
3588 is_max: bool,
3589}
3590
3591impl DfAccumulator for BticMinMaxAccumulator {
3592 fn update_batch(&mut self, values: &[ArrayRef]) -> DFResult<()> {
3593 let arr = &values[0];
3594 for i in 0..arr.len() {
3595 if arr.is_null(i) {
3596 continue;
3597 }
3598 let Some(btic) = decode_btic_from_array(arr, i)? else {
3599 continue;
3600 };
3601 self.current = Some(match self.current.take() {
3602 None => btic,
3603 Some(cur) => {
3604 if (self.is_max && btic > cur) || (!self.is_max && btic < cur) {
3605 btic
3606 } else {
3607 cur
3608 }
3609 }
3610 });
3611 }
3612 Ok(())
3613 }
3614 fn evaluate(&mut self) -> DFResult<ScalarValue> {
3615 Ok(btic_to_scalar_value(self.current.as_ref()))
3616 }
3617 fn size(&self) -> usize {
3618 std::mem::size_of::<Self>()
3619 }
3620 fn state(&mut self) -> DFResult<Vec<ScalarValue>> {
3621 Ok(vec![self.evaluate()?])
3622 }
3623 fn merge_batch(&mut self, states: &[ArrayRef]) -> DFResult<()> {
3624 self.update_batch(states)
3625 }
3626}
3627
3628#[derive(Debug, Clone)]
3630struct BticSpanAggUdaf {
3631 signature: Signature,
3632}
3633
3634impl BticSpanAggUdaf {
3635 fn new() -> Self {
3636 Self {
3637 signature: Signature::new(TypeSignature::Any(1), Volatility::Immutable),
3638 }
3639 }
3640}
3641
3642impl_udf_eq_hash!(BticSpanAggUdaf);
3643
3644impl AggregateUDFImpl for BticSpanAggUdaf {
3645 fn as_any(&self) -> &dyn Any {
3646 self
3647 }
3648 fn name(&self) -> &str {
3649 "btic_span_agg"
3650 }
3651 fn signature(&self) -> &Signature {
3652 &self.signature
3653 }
3654 fn return_type(&self, _args: &[DataType]) -> DFResult<DataType> {
3655 Ok(DataType::LargeBinary)
3656 }
3657 fn accumulator(
3658 &self,
3659 _acc_args: datafusion::logical_expr::function::AccumulatorArgs,
3660 ) -> DFResult<Box<dyn DfAccumulator>> {
3661 Ok(Box::new(BticSpanAggAccumulator { current: None }))
3662 }
3663 fn state_fields(
3664 &self,
3665 args: datafusion::logical_expr::function::StateFieldsArgs,
3666 ) -> DFResult<Vec<Arc<arrow::datatypes::Field>>> {
3667 Ok(vec![Arc::new(arrow::datatypes::Field::new(
3668 args.name,
3669 DataType::LargeBinary,
3670 true,
3671 ))])
3672 }
3673}
3674
3675#[derive(Debug)]
3676struct BticSpanAggAccumulator {
3677 current: Option<uni_btic::Btic>,
3678}
3679
3680impl DfAccumulator for BticSpanAggAccumulator {
3681 fn update_batch(&mut self, values: &[ArrayRef]) -> DFResult<()> {
3682 let arr = &values[0];
3683 for i in 0..arr.len() {
3684 if arr.is_null(i) {
3685 continue;
3686 }
3687 let Some(btic) = decode_btic_from_array(arr, i)? else {
3688 continue;
3689 };
3690 self.current = Some(match self.current.take() {
3691 None => btic,
3692 Some(cur) => uni_btic::set_ops::span(&cur, &btic),
3693 });
3694 }
3695 Ok(())
3696 }
3697 fn evaluate(&mut self) -> DFResult<ScalarValue> {
3698 Ok(btic_to_scalar_value(self.current.as_ref()))
3699 }
3700 fn size(&self) -> usize {
3701 std::mem::size_of::<Self>()
3702 }
3703 fn state(&mut self) -> DFResult<Vec<ScalarValue>> {
3704 Ok(vec![self.evaluate()?])
3705 }
3706 fn merge_batch(&mut self, states: &[ArrayRef]) -> DFResult<()> {
3707 self.update_batch(states)
3708 }
3709}
3710
3711#[derive(Debug, Clone)]
3713struct BticCountAtUdaf {
3714 signature: Signature,
3715}
3716
3717impl BticCountAtUdaf {
3718 fn new() -> Self {
3719 Self {
3720 signature: Signature::new(TypeSignature::Any(2), Volatility::Immutable),
3721 }
3722 }
3723}
3724
3725impl_udf_eq_hash!(BticCountAtUdaf);
3726
3727impl AggregateUDFImpl for BticCountAtUdaf {
3728 fn as_any(&self) -> &dyn Any {
3729 self
3730 }
3731 fn name(&self) -> &str {
3732 "btic_count_at"
3733 }
3734 fn signature(&self) -> &Signature {
3735 &self.signature
3736 }
3737 fn return_type(&self, _args: &[DataType]) -> DFResult<DataType> {
3738 Ok(DataType::Int64)
3739 }
3740 fn accumulator(
3741 &self,
3742 _acc_args: datafusion::logical_expr::function::AccumulatorArgs,
3743 ) -> DFResult<Box<dyn DfAccumulator>> {
3744 Ok(Box::new(BticCountAtAccumulator { count: 0 }))
3745 }
3746 fn state_fields(
3747 &self,
3748 args: datafusion::logical_expr::function::StateFieldsArgs,
3749 ) -> DFResult<Vec<Arc<arrow::datatypes::Field>>> {
3750 Ok(vec![Arc::new(arrow::datatypes::Field::new(
3751 args.name,
3752 DataType::Int64,
3753 true,
3754 ))])
3755 }
3756}
3757
3758#[derive(Debug)]
3759struct BticCountAtAccumulator {
3760 count: i64,
3761}
3762
3763impl DfAccumulator for BticCountAtAccumulator {
3764 fn update_batch(&mut self, values: &[ArrayRef]) -> DFResult<()> {
3765 if values.len() < 2 {
3766 return Ok(());
3767 }
3768 let btic_arr = &values[0];
3769 let point_arr = &values[1];
3770
3771 for i in 0..btic_arr.len() {
3772 if btic_arr.is_null(i) || point_arr.is_null(i) {
3773 continue;
3774 }
3775 let Some(btic) = decode_btic_from_array(btic_arr, i)? else {
3776 continue;
3777 };
3778
3779 let point_ms = if let Some(int_arr) = point_arr.as_any().downcast_ref::<Int64Array>() {
3781 int_arr.value(i)
3782 } else if let Some(lb) = point_arr.as_any().downcast_ref::<LargeBinaryArray>() {
3783 let val = scalar_binary_to_value(lb.value(i));
3784 match &val {
3785 Value::Int(ms) => *ms,
3786 Value::Temporal(uni_common::TemporalValue::DateTime {
3787 nanos_since_epoch,
3788 ..
3789 }) => nanos_since_epoch / 1_000_000,
3790 _ => continue,
3791 }
3792 } else {
3793 continue;
3794 };
3795
3796 if uni_btic::predicates::contains_point(&btic, point_ms) {
3797 self.count += 1;
3798 }
3799 }
3800 Ok(())
3801 }
3802 fn evaluate(&mut self) -> DFResult<ScalarValue> {
3803 Ok(ScalarValue::Int64(Some(self.count)))
3804 }
3805 fn size(&self) -> usize {
3806 std::mem::size_of::<Self>()
3807 }
3808 fn state(&mut self) -> DFResult<Vec<ScalarValue>> {
3809 Ok(vec![ScalarValue::Int64(Some(self.count))])
3810 }
3811 fn merge_batch(&mut self, states: &[ArrayRef]) -> DFResult<()> {
3812 let arr = &states[0];
3813 if let Some(int_arr) = arr.as_any().downcast_ref::<Int64Array>() {
3814 for i in 0..int_arr.len() {
3815 if !int_arr.is_null(i) {
3816 self.count += int_arr.value(i);
3817 }
3818 }
3819 }
3820 Ok(())
3821 }
3822}
3823
3824fn btic_to_scalar_value(btic: Option<&uni_btic::Btic>) -> ScalarValue {
3826 match btic {
3827 None => ScalarValue::LargeBinary(None),
3828 Some(b) => {
3829 let val = Value::Temporal(uni_common::TemporalValue::Btic {
3830 lo: b.lo(),
3831 hi: b.hi(),
3832 meta: b.meta(),
3833 });
3834 ScalarValue::LargeBinary(Some(uni_common::cypher_value_codec::encode(&val)))
3835 }
3836 }
3837}
3838
3839fn decode_btic_from_array(arr: &ArrayRef, row: usize) -> DFResult<Option<uni_btic::Btic>> {
3841 if let Some(fsb) = arr.as_any().downcast_ref::<FixedSizeBinaryArray>() {
3843 let bytes = fsb.value(row);
3844 return uni_btic::encode::decode_slice(bytes)
3845 .map(Some)
3846 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()));
3847 }
3848 if let Some(lb) = arr.as_any().downcast_ref::<LargeBinaryArray>() {
3850 let val = scalar_binary_to_value(lb.value(row));
3851 if let Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta }) = val {
3852 return uni_btic::Btic::new(lo, hi, meta)
3853 .map(Some)
3854 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()));
3855 }
3856 return Ok(None);
3857 }
3858 Ok(None)
3859}
3860
3861pub fn create_btic_min_udaf() -> AggregateUDF {
3862 AggregateUDF::from(BticMinMaxUdaf::new(false))
3863}
3864
3865pub fn create_btic_max_udaf() -> AggregateUDF {
3866 AggregateUDF::from(BticMinMaxUdaf::new(true))
3867}
3868
3869pub fn create_btic_span_agg_udaf() -> AggregateUDF {
3870 AggregateUDF::from(BticSpanAggUdaf::new())
3871}
3872
3873pub fn create_btic_count_at_udaf() -> AggregateUDF {
3874 AggregateUDF::from(BticCountAtUdaf::new())
3875}
3876
3877pub fn invoke_cypher_string_op<F>(
3882 args: &ScalarFunctionArgs,
3883 name: &str,
3884 op: F,
3885) -> DFResult<ColumnarValue>
3886where
3887 F: Fn(&str, &str) -> bool,
3888{
3889 use arrow_array::{BooleanArray, LargeBinaryArray, LargeStringArray, StringArray};
3890 use datafusion::common::ScalarValue;
3891 use datafusion::error::DataFusionError;
3892
3893 if args.args.len() != 2 {
3894 return Err(DataFusionError::Execution(format!(
3895 "{}(): requires exactly 2 arguments",
3896 name
3897 )));
3898 }
3899
3900 let left = &args.args[0];
3901 let right = &args.args[1];
3902
3903 let extract_string = |scalar: &ScalarValue| -> Option<String> {
3905 match scalar {
3906 ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s)) => Some(s.clone()),
3907 ScalarValue::LargeBinary(Some(bytes)) => {
3908 match uni_common::cypher_value_codec::decode(bytes) {
3910 Ok(uni_common::Value::String(s)) => Some(s),
3911 _ => None,
3912 }
3913 }
3914 ScalarValue::Utf8(None)
3915 | ScalarValue::LargeUtf8(None)
3916 | ScalarValue::LargeBinary(None)
3917 | ScalarValue::Null => None,
3918 _ => None,
3919 }
3920 };
3921
3922 match (left, right) {
3923 (ColumnarValue::Scalar(l_scalar), ColumnarValue::Scalar(r_scalar)) => {
3924 let l_str = extract_string(l_scalar);
3925 let r_str = extract_string(r_scalar);
3926
3927 match (l_str, r_str) {
3928 (Some(l), Some(r)) => Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(op(
3929 &l, &r,
3930 ))))),
3931 _ => Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None))),
3932 }
3933 }
3934 (ColumnarValue::Array(l_arr), ColumnarValue::Scalar(r_scalar)) => {
3935 let r_val = extract_string(r_scalar);
3937
3938 if r_val.is_none() {
3939 let nulls = arrow_array::new_null_array(&DataType::Boolean, l_arr.len());
3941 return Ok(ColumnarValue::Array(nulls));
3942 }
3943 let pattern = r_val.unwrap();
3944
3945 let result_array = if let Some(arr) = l_arr.as_any().downcast_ref::<StringArray>() {
3947 arr.iter()
3948 .map(|opt_s| opt_s.map(|s| op(s, &pattern)))
3949 .collect::<BooleanArray>()
3950 } else if let Some(arr) = l_arr.as_any().downcast_ref::<LargeStringArray>() {
3951 arr.iter()
3952 .map(|opt_s| opt_s.map(|s| op(s, &pattern)))
3953 .collect::<BooleanArray>()
3954 } else if let Some(arr) = l_arr.as_any().downcast_ref::<LargeBinaryArray>() {
3955 arr.iter()
3957 .map(|opt_bytes| {
3958 opt_bytes.and_then(|bytes| {
3959 match uni_common::cypher_value_codec::decode(bytes) {
3960 Ok(uni_common::Value::String(s)) => Some(op(&s, &pattern)),
3961 _ => None,
3962 }
3963 })
3964 })
3965 .collect::<BooleanArray>()
3966 } else {
3967 arrow_array::new_null_array(&DataType::Boolean, l_arr.len())
3969 .as_any()
3970 .downcast_ref::<BooleanArray>()
3971 .unwrap()
3972 .clone()
3973 };
3974
3975 Ok(ColumnarValue::Array(Arc::new(result_array)))
3976 }
3977 (ColumnarValue::Scalar(l_scalar), ColumnarValue::Array(r_arr)) => {
3978 let l_val = extract_string(l_scalar);
3980
3981 if l_val.is_none() {
3982 let nulls = arrow_array::new_null_array(&DataType::Boolean, r_arr.len());
3983 return Ok(ColumnarValue::Array(nulls));
3984 }
3985 let target = l_val.unwrap();
3986
3987 let result_array = if let Some(arr) = r_arr.as_any().downcast_ref::<StringArray>() {
3988 arr.iter()
3989 .map(|opt_s| opt_s.map(|s| op(&target, s)))
3990 .collect::<BooleanArray>()
3991 } else if let Some(arr) = r_arr.as_any().downcast_ref::<LargeStringArray>() {
3992 arr.iter()
3993 .map(|opt_s| opt_s.map(|s| op(&target, s)))
3994 .collect::<BooleanArray>()
3995 } else if let Some(arr) = r_arr.as_any().downcast_ref::<LargeBinaryArray>() {
3996 arr.iter()
3998 .map(|opt_bytes| {
3999 opt_bytes.and_then(|bytes| {
4000 match uni_common::cypher_value_codec::decode(bytes) {
4001 Ok(uni_common::Value::String(s)) => Some(op(&target, &s)),
4002 _ => None,
4003 }
4004 })
4005 })
4006 .collect::<BooleanArray>()
4007 } else {
4008 arrow_array::new_null_array(&DataType::Boolean, r_arr.len())
4010 .as_any()
4011 .downcast_ref::<BooleanArray>()
4012 .unwrap()
4013 .clone()
4014 };
4015
4016 Ok(ColumnarValue::Array(Arc::new(result_array)))
4017 }
4018 (ColumnarValue::Array(l_arr), ColumnarValue::Array(r_arr)) => {
4019 if l_arr.len() != r_arr.len() {
4021 return Err(DataFusionError::Execution(format!(
4022 "{}(): array lengths must match",
4023 name
4024 )));
4025 }
4026
4027 let extract_string_at = |arr: &dyn Array, idx: usize| -> Option<String> {
4029 if let Some(str_arr) = arr.as_any().downcast_ref::<StringArray>() {
4030 str_arr.value(idx).to_string().into()
4031 } else if let Some(str_arr) = arr.as_any().downcast_ref::<LargeStringArray>() {
4032 str_arr.value(idx).to_string().into()
4033 } else if let Some(bin_arr) = arr.as_any().downcast_ref::<LargeBinaryArray>() {
4034 if bin_arr.is_null(idx) {
4035 return None;
4036 }
4037 let bytes = bin_arr.value(idx);
4038 match uni_common::cypher_value_codec::decode(bytes) {
4039 Ok(uni_common::Value::String(s)) => Some(s),
4040 _ => None,
4041 }
4042 } else {
4043 None
4044 }
4045 };
4046
4047 let result: BooleanArray = (0..l_arr.len())
4048 .map(|idx| {
4049 match (
4050 extract_string_at(l_arr.as_ref(), idx),
4051 extract_string_at(r_arr.as_ref(), idx),
4052 ) {
4053 (Some(l_str), Some(r_str)) => Some(op(&l_str, &r_str)),
4054 _ => None,
4055 }
4056 })
4057 .collect();
4058
4059 Ok(ColumnarValue::Array(Arc::new(result)))
4060 }
4061 }
4062}
4063
4064macro_rules! define_string_op_udf {
4065 ($struct_name:ident, $udf_name:literal, $op:expr) => {
4066 #[derive(Debug)]
4067 struct $struct_name {
4068 signature: Signature,
4069 }
4070
4071 impl $struct_name {
4072 fn new() -> Self {
4073 Self {
4074 signature: Signature::any(2, Volatility::Immutable),
4076 }
4077 }
4078 }
4079
4080 impl_udf_eq_hash!($struct_name);
4081
4082 impl ScalarUDFImpl for $struct_name {
4083 fn as_any(&self) -> &dyn Any {
4084 self
4085 }
4086 fn name(&self) -> &str {
4087 $udf_name
4088 }
4089 fn signature(&self) -> &Signature {
4090 &self.signature
4091 }
4092 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
4093 Ok(DataType::Boolean)
4094 }
4095
4096 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
4097 invoke_cypher_string_op(&args, $udf_name, $op)
4098 }
4099 }
4100 };
4101}
4102
4103define_string_op_udf!(CypherStartsWithUdf, "_cypher_starts_with", |s, p| s
4104 .starts_with(p));
4105define_string_op_udf!(CypherEndsWithUdf, "_cypher_ends_with", |s, p| s
4106 .ends_with(p));
4107define_string_op_udf!(CypherContainsUdf, "_cypher_contains", |s, p| s.contains(p));
4108
4109pub fn create_cypher_starts_with_udf() -> ScalarUDF {
4110 ScalarUDF::new_from_impl(CypherStartsWithUdf::new())
4111}
4112pub fn create_cypher_ends_with_udf() -> ScalarUDF {
4113 ScalarUDF::new_from_impl(CypherEndsWithUdf::new())
4114}
4115pub fn create_cypher_contains_udf() -> ScalarUDF {
4116 ScalarUDF::new_from_impl(CypherContainsUdf::new())
4117}
4118
4119pub fn create_cypher_equal_udf() -> ScalarUDF {
4120 ScalarUDF::new_from_impl(CypherCompareUdf::new("_cypher_equal", BinaryOp::Eq))
4121}
4122pub fn create_cypher_not_equal_udf() -> ScalarUDF {
4123 ScalarUDF::new_from_impl(CypherCompareUdf::new("_cypher_not_equal", BinaryOp::NotEq))
4124}
4125pub fn create_cypher_lt_udf() -> ScalarUDF {
4126 ScalarUDF::new_from_impl(CypherCompareUdf::new("_cypher_lt", BinaryOp::Lt))
4127}
4128pub fn create_cypher_lt_eq_udf() -> ScalarUDF {
4129 ScalarUDF::new_from_impl(CypherCompareUdf::new("_cypher_lt_eq", BinaryOp::LtEq))
4130}
4131pub fn create_cypher_gt_udf() -> ScalarUDF {
4132 ScalarUDF::new_from_impl(CypherCompareUdf::new("_cypher_gt", BinaryOp::Gt))
4133}
4134pub fn create_cypher_gt_eq_udf() -> ScalarUDF {
4135 ScalarUDF::new_from_impl(CypherCompareUdf::new("_cypher_gt_eq", BinaryOp::GtEq))
4136}
4137
4138#[expect(clippy::match_like_matches_macro)]
4140fn apply_comparison_op(ord: std::cmp::Ordering, op: &BinaryOp) -> bool {
4141 use std::cmp::Ordering;
4142 match (ord, op) {
4143 (Ordering::Less, BinaryOp::Lt | BinaryOp::LtEq | BinaryOp::NotEq) => true,
4144 (Ordering::Equal, BinaryOp::Eq | BinaryOp::LtEq | BinaryOp::GtEq) => true,
4145 (Ordering::Greater, BinaryOp::Gt | BinaryOp::GtEq | BinaryOp::NotEq) => true,
4146 _ => false,
4147 }
4148}
4149
4150fn compare_f64(lhs: f64, rhs: f64, op: &BinaryOp) -> Option<bool> {
4153 if lhs.is_nan() || rhs.is_nan() {
4154 Some(matches!(op, BinaryOp::NotEq))
4155 } else {
4156 Some(apply_comparison_op(lhs.partial_cmp(&rhs)?, op))
4157 }
4158}
4159
4160fn cv_bytes_as_f64(bytes: &[u8]) -> Option<f64> {
4162 use uni_common::cypher_value_codec::{TAG_FLOAT, TAG_INT, decode_float, decode_int, peek_tag};
4163 match peek_tag(bytes)? {
4164 TAG_INT => decode_int(bytes).map(|i| i as f64),
4165 TAG_FLOAT => decode_float(bytes),
4166 _ => None,
4167 }
4168}
4169
4170fn compare_cv_numeric(bytes: &[u8], rhs: f64, op: &BinaryOp) -> Option<bool> {
4173 use uni_common::cypher_value_codec::{TAG_INT, TAG_NULL, decode_int, peek_tag};
4174 if peek_tag(bytes) == Some(TAG_INT)
4176 && let Some(lhs_int) = decode_int(bytes)
4177 && rhs.fract() == 0.0
4179 && rhs >= i64::MIN as f64
4180 && rhs <= i64::MAX as f64
4181 {
4182 return Some(apply_comparison_op(lhs_int.cmp(&(rhs as i64)), op));
4183 }
4184 if peek_tag(bytes) == Some(TAG_NULL) {
4185 return None;
4186 }
4187 let lhs = cv_bytes_as_f64(bytes)?;
4188 compare_f64(lhs, rhs, op)
4189}
4190
4191fn try_fast_compare(
4195 lhs: &ColumnarValue,
4196 rhs: &ColumnarValue,
4197 op: &BinaryOp,
4198) -> Option<ColumnarValue> {
4199 use arrow_array::builder::BooleanBuilder;
4200 use uni_common::cypher_value_codec::{
4201 TAG_INT, TAG_NULL, TAG_STRING, decode_int, decode_string, peek_tag,
4202 };
4203
4204 let (lhs_arr, rhs_arr) = match (lhs, rhs) {
4205 (ColumnarValue::Array(l), ColumnarValue::Array(r)) => (l, r),
4206 _ => return None,
4207 };
4208
4209 if !matches!(lhs_arr.data_type(), DataType::LargeBinary) {
4211 return None;
4212 }
4213
4214 let lb_arr = lhs_arr.as_any().downcast_ref::<LargeBinaryArray>()?;
4215
4216 match rhs_arr.data_type() {
4217 DataType::Int64 => {
4219 let int_arr = rhs_arr.as_any().downcast_ref::<Int64Array>()?;
4220 let mut builder = BooleanBuilder::with_capacity(lb_arr.len());
4221 for i in 0..lb_arr.len() {
4222 if lb_arr.is_null(i) || int_arr.is_null(i) {
4223 builder.append_null();
4224 } else {
4225 match compare_cv_numeric(lb_arr.value(i), int_arr.value(i) as f64, op) {
4226 Some(result) => builder.append_value(result),
4227 None => builder.append_null(),
4228 }
4229 }
4230 }
4231 Some(ColumnarValue::Array(Arc::new(builder.finish())))
4232 }
4233
4234 DataType::Float64 => {
4236 let float_arr = rhs_arr.as_any().downcast_ref::<Float64Array>()?;
4237 let mut builder = BooleanBuilder::with_capacity(lb_arr.len());
4238 for i in 0..lb_arr.len() {
4239 if lb_arr.is_null(i) || float_arr.is_null(i) {
4240 builder.append_null();
4241 } else {
4242 match compare_cv_numeric(lb_arr.value(i), float_arr.value(i), op) {
4243 Some(result) => builder.append_value(result),
4244 None => builder.append_null(),
4245 }
4246 }
4247 }
4248 Some(ColumnarValue::Array(Arc::new(builder.finish())))
4249 }
4250
4251 DataType::Utf8 | DataType::LargeUtf8 => {
4253 let mut builder = BooleanBuilder::with_capacity(lb_arr.len());
4254 for i in 0..lb_arr.len() {
4255 if lb_arr.is_null(i) || rhs_arr.is_null(i) {
4256 builder.append_null();
4257 } else {
4258 let bytes = lb_arr.value(i);
4259 let rhs_str = if matches!(rhs_arr.data_type(), DataType::Utf8) {
4260 rhs_arr.as_any().downcast_ref::<StringArray>()?.value(i)
4261 } else {
4262 rhs_arr
4263 .as_any()
4264 .downcast_ref::<LargeStringArray>()?
4265 .value(i)
4266 };
4267 match peek_tag(bytes) {
4268 Some(TAG_STRING) => {
4269 if let Some(lhs_str) = decode_string(bytes) {
4270 builder.append_value(apply_comparison_op(
4271 lhs_str.as_str().cmp(rhs_str),
4272 op,
4273 ));
4274 } else {
4275 builder.append_null();
4276 }
4277 }
4278 _ => builder.append_null(),
4279 }
4280 }
4281 }
4282 Some(ColumnarValue::Array(Arc::new(builder.finish())))
4283 }
4284
4285 DataType::LargeBinary => {
4287 let rhs_lb = rhs_arr.as_any().downcast_ref::<LargeBinaryArray>()?;
4288 let mut builder = BooleanBuilder::with_capacity(lb_arr.len());
4289 for i in 0..lb_arr.len() {
4290 if lb_arr.is_null(i) || rhs_lb.is_null(i) {
4291 builder.append_null();
4292 } else {
4293 let lhs_bytes = lb_arr.value(i);
4294 let rhs_bytes = rhs_lb.value(i);
4295 let lhs_tag = peek_tag(lhs_bytes);
4296 let rhs_tag = peek_tag(rhs_bytes);
4297
4298 if lhs_tag == Some(TAG_NULL) || rhs_tag == Some(TAG_NULL) {
4300 builder.append_null();
4301 continue;
4302 }
4303
4304 if lhs_tag == Some(TAG_INT) && rhs_tag == Some(TAG_INT) {
4306 if let (Some(l), Some(r)) = (decode_int(lhs_bytes), decode_int(rhs_bytes)) {
4307 builder.append_value(apply_comparison_op(l.cmp(&r), op));
4308 } else {
4309 builder.append_null();
4310 }
4311 continue;
4312 }
4313
4314 if lhs_tag == Some(TAG_STRING) && rhs_tag == Some(TAG_STRING) {
4316 if let (Some(l), Some(r)) =
4317 (decode_string(lhs_bytes), decode_string(rhs_bytes))
4318 {
4319 builder.append_value(apply_comparison_op(l.cmp(&r), op));
4320 } else {
4321 builder.append_null();
4322 }
4323 continue;
4324 }
4325
4326 if let (Some(l), Some(r)) =
4328 (cv_bytes_as_f64(lhs_bytes), cv_bytes_as_f64(rhs_bytes))
4329 {
4330 match compare_f64(l, r, op) {
4331 Some(result) => builder.append_value(result),
4332 None => builder.append_null(),
4333 }
4334 } else {
4335 return None;
4339 }
4340 }
4341 }
4342 Some(ColumnarValue::Array(Arc::new(builder.finish())))
4343 }
4344
4345 _ => None, }
4347}
4348
4349#[derive(Debug)]
4350struct CypherCompareUdf {
4351 name: String,
4352 op: BinaryOp,
4353 signature: Signature,
4354}
4355
4356impl CypherCompareUdf {
4357 fn new(name: &str, op: BinaryOp) -> Self {
4358 Self {
4359 name: name.to_string(),
4360 op,
4361 signature: Signature::any(2, Volatility::Immutable),
4362 }
4363 }
4364}
4365
4366impl PartialEq for CypherCompareUdf {
4367 fn eq(&self, other: &Self) -> bool {
4368 self.name == other.name
4369 }
4370}
4371
4372impl Eq for CypherCompareUdf {}
4373
4374impl std::hash::Hash for CypherCompareUdf {
4375 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
4376 self.name.hash(state);
4377 }
4378}
4379
4380impl ScalarUDFImpl for CypherCompareUdf {
4381 fn as_any(&self) -> &dyn Any {
4382 self
4383 }
4384 fn name(&self) -> &str {
4385 &self.name
4386 }
4387 fn signature(&self) -> &Signature {
4388 &self.signature
4389 }
4390 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
4391 Ok(DataType::Boolean)
4392 }
4393
4394 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
4395 if args.args.len() != 2 {
4396 return Err(datafusion::error::DataFusionError::Execution(format!(
4397 "{}(): requires 2 arguments",
4398 self.name
4399 )));
4400 }
4401
4402 if let Some(result) = try_fast_compare(&args.args[0], &args.args[1], &self.op) {
4404 return Ok(result);
4405 }
4406
4407 let output_type = DataType::Boolean;
4409 invoke_cypher_udf(args, &output_type, |val_args| {
4410 crate::expr_eval::eval_binary_op(&val_args[0], &self.op, &val_args[1])
4411 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
4412 })
4413 }
4414}
4415
4416pub fn create_cypher_add_udf() -> ScalarUDF {
4422 ScalarUDF::new_from_impl(CypherArithmeticUdf::new("_cypher_add", BinaryOp::Add))
4423}
4424pub fn create_cypher_sub_udf() -> ScalarUDF {
4425 ScalarUDF::new_from_impl(CypherArithmeticUdf::new("_cypher_sub", BinaryOp::Sub))
4426}
4427pub fn create_cypher_mul_udf() -> ScalarUDF {
4428 ScalarUDF::new_from_impl(CypherArithmeticUdf::new("_cypher_mul", BinaryOp::Mul))
4429}
4430pub fn create_cypher_div_udf() -> ScalarUDF {
4431 ScalarUDF::new_from_impl(CypherArithmeticUdf::new("_cypher_div", BinaryOp::Div))
4432}
4433pub fn create_cypher_mod_udf() -> ScalarUDF {
4434 ScalarUDF::new_from_impl(CypherArithmeticUdf::new("_cypher_mod", BinaryOp::Mod))
4435}
4436
4437pub fn create_cypher_abs_udf() -> ScalarUDF {
4439 ScalarUDF::new_from_impl(CypherAbsUdf::new())
4440}
4441
4442pub fn cypher_abs_expr(arg: datafusion::logical_expr::Expr) -> datafusion::logical_expr::Expr {
4444 datafusion::logical_expr::Expr::ScalarFunction(
4445 datafusion::logical_expr::expr::ScalarFunction::new_udf(
4446 Arc::new(create_cypher_abs_udf()),
4447 vec![arg],
4448 ),
4449 )
4450}
4451
4452#[derive(Debug)]
4453struct CypherAbsUdf {
4454 signature: Signature,
4455}
4456
4457impl CypherAbsUdf {
4458 fn new() -> Self {
4459 Self {
4460 signature: Signature::any(1, Volatility::Immutable),
4461 }
4462 }
4463}
4464
4465impl_udf_eq_hash!(CypherAbsUdf);
4466
4467impl ScalarUDFImpl for CypherAbsUdf {
4468 fn as_any(&self) -> &dyn Any {
4469 self
4470 }
4471 fn name(&self) -> &str {
4472 "_cypher_abs"
4473 }
4474 fn signature(&self) -> &Signature {
4475 &self.signature
4476 }
4477 fn return_type(&self, _args: &[DataType]) -> DFResult<DataType> {
4478 Ok(DataType::LargeBinary)
4479 }
4480 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
4481 if args.args.len() != 1 {
4482 return Err(datafusion::error::DataFusionError::Execution(
4483 "_cypher_abs requires exactly 1 argument".into(),
4484 ));
4485 }
4486 invoke_cypher_udf(args, &DataType::LargeBinary, |val_args| {
4487 match &val_args[0] {
4488 Value::Int(i) => i.checked_abs().map(Value::Int).ok_or_else(|| {
4489 datafusion::error::DataFusionError::Execution(
4490 "integer overflow in abs()".into(),
4491 )
4492 }),
4493 Value::Float(f) => Ok(Value::Float(f.abs())),
4494 Value::Null => Ok(Value::Null),
4495 other => Err(datafusion::error::DataFusionError::Execution(format!(
4496 "abs() requires a numeric argument, got {other:?}"
4497 ))),
4498 }
4499 })
4500 }
4501}
4502
4503pub(crate) fn checked_int_op(lhs: i64, rhs: i64, op: &BinaryOp) -> Option<i64> {
4514 match op {
4515 BinaryOp::Add => lhs.checked_add(rhs),
4516 BinaryOp::Sub => lhs.checked_sub(rhs),
4517 BinaryOp::Mul => lhs.checked_mul(rhs),
4518 BinaryOp::Div => {
4520 if rhs == 0 {
4521 None
4522 } else {
4523 lhs.checked_div(rhs)
4524 }
4525 }
4526 BinaryOp::Mod => {
4527 if rhs == 0 {
4528 None
4529 } else {
4530 lhs.checked_rem(rhs)
4531 }
4532 }
4533 _ => None,
4534 }
4535}
4536
4537enum CvArithOutcome {
4544 Value(Vec<u8>),
4546 Null,
4548 Error(datafusion::error::DataFusionError),
4550}
4551
4552fn apply_int_arithmetic(lhs: i64, rhs: i64, op: &BinaryOp) -> CvArithOutcome {
4559 use uni_common::cypher_value_codec::encode_int;
4560 if !matches!(
4561 op,
4562 BinaryOp::Add | BinaryOp::Sub | BinaryOp::Mul | BinaryOp::Div | BinaryOp::Mod
4563 ) {
4564 return CvArithOutcome::Null;
4565 }
4566 match checked_int_op(lhs, rhs, op) {
4567 Some(v) => CvArithOutcome::Value(encode_int(v)),
4568 None => {
4569 let msg = if matches!(op, BinaryOp::Div | BinaryOp::Mod) && rhs == 0 {
4570 "division by zero"
4571 } else {
4572 "integer overflow"
4573 };
4574 CvArithOutcome::Error(datafusion::error::DataFusionError::Execution(msg.into()))
4575 }
4576 }
4577}
4578
4579fn apply_float_arithmetic(lhs: f64, rhs: f64, op: &BinaryOp) -> CvArithOutcome {
4585 use uni_common::cypher_value_codec::encode_float;
4586 let result = match op {
4587 BinaryOp::Add => lhs + rhs,
4588 BinaryOp::Sub => lhs - rhs,
4589 BinaryOp::Mul => lhs * rhs,
4590 BinaryOp::Div => lhs / rhs, BinaryOp::Mod => lhs % rhs,
4592 _ => return CvArithOutcome::Null,
4593 };
4594 CvArithOutcome::Value(encode_float(result))
4595}
4596
4597fn cv_arithmetic_int(bytes: &[u8], rhs: i64, op: &BinaryOp) -> CvArithOutcome {
4603 use uni_common::cypher_value_codec::{TAG_FLOAT, TAG_INT, decode_float, decode_int, peek_tag};
4604 match peek_tag(bytes) {
4605 Some(TAG_INT) => match decode_int(bytes) {
4606 Some(lhs) => apply_int_arithmetic(lhs, rhs, op),
4607 None => CvArithOutcome::Null,
4608 },
4609 Some(TAG_FLOAT) => match decode_float(bytes) {
4610 Some(lhs) => apply_float_arithmetic(lhs, rhs as f64, op),
4611 None => CvArithOutcome::Null,
4612 },
4613 _ => CvArithOutcome::Null,
4614 }
4615}
4616
4617fn cv_arithmetic_float(bytes: &[u8], rhs: f64, op: &BinaryOp) -> CvArithOutcome {
4623 match cv_bytes_as_f64(bytes) {
4624 Some(lhs) => apply_float_arithmetic(lhs, rhs, op),
4625 None => CvArithOutcome::Null,
4626 }
4627}
4628
4629pub(crate) fn cypher_arith_return_type(arg_types: &[DataType]) -> DataType {
4644 let any_large_binary = arg_types.iter().any(|t| matches!(t, DataType::LargeBinary));
4645 if any_large_binary {
4646 return DataType::LargeBinary;
4647 }
4648 let any_float = arg_types.iter().any(|t| matches!(t, DataType::Float64));
4649 if any_float {
4650 return DataType::Float64;
4651 }
4652 let all_int_or_null = !arg_types.is_empty()
4656 && arg_types
4657 .iter()
4658 .all(|t| matches!(t, DataType::Int64 | DataType::Null));
4659 let any_int = arg_types.iter().any(|t| matches!(t, DataType::Int64));
4660 if all_int_or_null && any_int {
4661 DataType::Int64
4662 } else {
4663 DataType::LargeBinary
4664 }
4665}
4666
4667fn try_fast_arithmetic(
4677 lhs: &ColumnarValue,
4678 rhs: &ColumnarValue,
4679 op: &BinaryOp,
4680) -> Option<DFResult<ColumnarValue>> {
4681 use arrow_array::builder::LargeBinaryBuilder;
4682
4683 let (lhs_arr, rhs_arr) = match (lhs, rhs) {
4684 (ColumnarValue::Array(l), ColumnarValue::Array(r)) => (l, r),
4685 _ => return None,
4686 };
4687
4688 match (lhs_arr.data_type(), rhs_arr.data_type()) {
4689 (DataType::Int64, DataType::Int64) => Some(int_kernel_arithmetic(lhs_arr, rhs_arr, op)),
4693
4694 (DataType::Int64, DataType::Float64)
4697 | (DataType::Float64, DataType::Int64)
4698 | (DataType::Float64, DataType::Float64) => {
4699 Some(float_kernel_arithmetic(lhs_arr, rhs_arr, op))
4700 }
4701
4702 (DataType::LargeBinary, DataType::Int64) => {
4704 let lb_arr = lhs_arr.as_any().downcast_ref::<LargeBinaryArray>()?;
4705 let int_arr = rhs_arr.as_any().downcast_ref::<Int64Array>()?;
4706 let mut builder = LargeBinaryBuilder::new();
4707 for i in 0..lb_arr.len() {
4708 if lb_arr.is_null(i) || int_arr.is_null(i) {
4709 builder.append_null();
4710 } else {
4711 match cv_arithmetic_int(lb_arr.value(i), int_arr.value(i), op) {
4712 CvArithOutcome::Value(bytes) => builder.append_value(&bytes),
4713 CvArithOutcome::Null => builder.append_null(),
4714 CvArithOutcome::Error(e) => return Some(Err(e)),
4715 }
4716 }
4717 }
4718 Some(Ok(ColumnarValue::Array(Arc::new(builder.finish()))))
4719 }
4720
4721 (DataType::LargeBinary, DataType::Float64) => {
4723 let lb_arr = lhs_arr.as_any().downcast_ref::<LargeBinaryArray>()?;
4724 let float_arr = rhs_arr.as_any().downcast_ref::<Float64Array>()?;
4725 let mut builder = LargeBinaryBuilder::new();
4726 for i in 0..lb_arr.len() {
4727 if lb_arr.is_null(i) || float_arr.is_null(i) {
4728 builder.append_null();
4729 } else {
4730 match cv_arithmetic_float(lb_arr.value(i), float_arr.value(i), op) {
4731 CvArithOutcome::Value(bytes) => builder.append_value(&bytes),
4732 CvArithOutcome::Null => builder.append_null(),
4733 CvArithOutcome::Error(e) => return Some(Err(e)),
4734 }
4735 }
4736 }
4737 Some(Ok(ColumnarValue::Array(Arc::new(builder.finish()))))
4738 }
4739
4740 _ => None, }
4742}
4743
4744fn int_kernel_arithmetic(
4752 lhs_arr: &ArrayRef,
4753 rhs_arr: &ArrayRef,
4754 op: &BinaryOp,
4755) -> DFResult<ColumnarValue> {
4756 use arrow::compute::kernels::numeric::{add, div, mul, rem, sub};
4757
4758 let result = match op {
4759 BinaryOp::Add => add(lhs_arr, rhs_arr),
4760 BinaryOp::Sub => sub(lhs_arr, rhs_arr),
4761 BinaryOp::Mul => mul(lhs_arr, rhs_arr),
4762 BinaryOp::Div => div(lhs_arr, rhs_arr),
4763 BinaryOp::Mod => rem(lhs_arr, rhs_arr),
4764 other => {
4765 return Err(datafusion::error::DataFusionError::Execution(format!(
4766 "unsupported integer arithmetic operator: {other:?}"
4767 )));
4768 }
4769 };
4770 result
4771 .map(ColumnarValue::Array)
4772 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
4773}
4774
4775fn float_kernel_arithmetic(
4781 lhs_arr: &ArrayRef,
4782 rhs_arr: &ArrayRef,
4783 op: &BinaryOp,
4784) -> DFResult<ColumnarValue> {
4785 use arrow::compute::cast;
4786 use arrow::compute::kernels::numeric::{add, div, mul, rem, sub};
4787
4788 let lhs_f = cast(lhs_arr, &DataType::Float64)
4789 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
4790 let rhs_f = cast(rhs_arr, &DataType::Float64)
4791 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
4792
4793 let result = match op {
4794 BinaryOp::Add => add(&lhs_f, &rhs_f),
4795 BinaryOp::Sub => sub(&lhs_f, &rhs_f),
4796 BinaryOp::Mul => mul(&lhs_f, &rhs_f),
4797 BinaryOp::Div => div(&lhs_f, &rhs_f),
4798 BinaryOp::Mod => rem(&lhs_f, &rhs_f),
4799 other => {
4800 return Err(datafusion::error::DataFusionError::Execution(format!(
4801 "unsupported float arithmetic operator: {other:?}"
4802 )));
4803 }
4804 };
4805 result
4806 .map(ColumnarValue::Array)
4807 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
4808}
4809
4810#[derive(Debug)]
4811struct CypherArithmeticUdf {
4812 name: String,
4813 op: BinaryOp,
4814 signature: Signature,
4815}
4816
4817impl CypherArithmeticUdf {
4818 fn new(name: &str, op: BinaryOp) -> Self {
4819 Self {
4820 name: name.to_string(),
4821 op,
4822 signature: Signature::any(2, Volatility::Immutable),
4823 }
4824 }
4825}
4826
4827impl PartialEq for CypherArithmeticUdf {
4828 fn eq(&self, other: &Self) -> bool {
4829 self.name == other.name
4830 }
4831}
4832
4833impl Eq for CypherArithmeticUdf {}
4834
4835impl std::hash::Hash for CypherArithmeticUdf {
4836 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
4837 self.name.hash(state);
4838 }
4839}
4840
4841impl ScalarUDFImpl for CypherArithmeticUdf {
4842 fn as_any(&self) -> &dyn Any {
4843 self
4844 }
4845 fn name(&self) -> &str {
4846 &self.name
4847 }
4848 fn signature(&self) -> &Signature {
4849 &self.signature
4850 }
4851 fn return_type(&self, arg_types: &[DataType]) -> DFResult<DataType> {
4852 Ok(cypher_arith_return_type(arg_types))
4856 }
4857
4858 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
4859 if args.args.len() != 2 {
4860 return Err(datafusion::error::DataFusionError::Execution(format!(
4861 "{}(): requires 2 arguments",
4862 self.name
4863 )));
4864 }
4865
4866 if let Some(result) = try_fast_arithmetic(&args.args[0], &args.args[1], &self.op) {
4869 return result;
4870 }
4871
4872 let output_type = args.return_field.data_type().clone();
4879 invoke_cypher_udf(args, &output_type, |val_args| {
4880 crate::expr_eval::eval_binary_op(&val_args[0], &self.op, &val_args[1])
4881 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
4882 })
4883 }
4884}
4885
4886pub fn create_cypher_xor_udf() -> ScalarUDF {
4891 ScalarUDF::new_from_impl(CypherXorUdf::new())
4892}
4893
4894#[derive(Debug)]
4895struct CypherXorUdf {
4896 signature: Signature,
4897}
4898
4899impl CypherXorUdf {
4900 fn new() -> Self {
4901 Self {
4902 signature: Signature::any(2, Volatility::Immutable),
4903 }
4904 }
4905}
4906
4907impl_udf_eq_hash!(CypherXorUdf);
4908
4909impl ScalarUDFImpl for CypherXorUdf {
4910 fn as_any(&self) -> &dyn Any {
4911 self
4912 }
4913 fn name(&self) -> &str {
4914 "_cypher_xor"
4915 }
4916 fn signature(&self) -> &Signature {
4917 &self.signature
4918 }
4919 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
4920 Ok(DataType::Boolean)
4921 }
4922
4923 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
4924 let output_type = DataType::Boolean;
4925 invoke_cypher_udf(args, &output_type, |val_args| {
4926 if val_args.len() != 2 {
4927 return Err(datafusion::error::DataFusionError::Execution(
4928 "_cypher_xor(): requires 2 arguments".to_string(),
4929 ));
4930 }
4931 let coerce_bool = |v: &Value| -> Value {
4933 match v {
4934 Value::String(s) if s == "true" => Value::Bool(true),
4935 Value::String(s) if s == "false" => Value::Bool(false),
4936 other => other.clone(),
4937 }
4938 };
4939 let left = coerce_bool(&val_args[0]);
4940 let right = coerce_bool(&val_args[1]);
4941 crate::expr_eval::eval_binary_op(&left, &BinaryOp::Xor, &right)
4942 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
4943 })
4944 }
4945}
4946
4947pub fn create_cv_to_bool_udf() -> ScalarUDF {
4954 ScalarUDF::new_from_impl(CvToBoolUdf::new())
4955}
4956
4957#[derive(Debug)]
4958struct CvToBoolUdf {
4959 signature: Signature,
4960}
4961
4962impl CvToBoolUdf {
4963 fn new() -> Self {
4964 Self {
4965 signature: Signature::exact(vec![DataType::LargeBinary], Volatility::Immutable),
4966 }
4967 }
4968}
4969
4970impl_udf_eq_hash!(CvToBoolUdf);
4971
4972impl ScalarUDFImpl for CvToBoolUdf {
4973 fn as_any(&self) -> &dyn Any {
4974 self
4975 }
4976 fn name(&self) -> &str {
4977 "_cv_to_bool"
4978 }
4979 fn signature(&self) -> &Signature {
4980 &self.signature
4981 }
4982 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
4983 Ok(DataType::Boolean)
4984 }
4985
4986 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
4987 if args.args.len() != 1 {
4988 return Err(datafusion::error::DataFusionError::Execution(
4989 "_cv_to_bool() requires exactly 1 argument".to_string(),
4990 ));
4991 }
4992
4993 match &args.args[0] {
4994 ColumnarValue::Scalar(ScalarValue::LargeBinary(Some(bytes))) => {
4995 use uni_common::cypher_value_codec::{TAG_BOOL, TAG_NULL, decode_bool, peek_tag};
4997 let b = match peek_tag(bytes) {
4998 Some(TAG_BOOL) => decode_bool(bytes).unwrap_or(false),
4999 Some(TAG_NULL) => false,
5000 _ => false, };
5002 Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(b))))
5003 }
5004 ColumnarValue::Scalar(_) => Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None))),
5005 ColumnarValue::Array(arr) => {
5006 let lb_arr = arr
5007 .as_any()
5008 .downcast_ref::<arrow_array::LargeBinaryArray>()
5009 .ok_or_else(|| {
5010 datafusion::error::DataFusionError::Execution(format!(
5011 "_cv_to_bool(): expected LargeBinary array, got {:?}",
5012 arr.data_type()
5013 ))
5014 })?;
5015
5016 let mut builder = arrow_array::builder::BooleanBuilder::with_capacity(lb_arr.len());
5017
5018 use uni_common::cypher_value_codec::{TAG_BOOL, TAG_NULL, decode_bool, peek_tag};
5020
5021 for i in 0..lb_arr.len() {
5022 if lb_arr.is_null(i) {
5023 builder.append_null();
5024 } else {
5025 let bytes = lb_arr.value(i);
5026 let b = match peek_tag(bytes) {
5027 Some(TAG_BOOL) => decode_bool(bytes).unwrap_or(false),
5028 Some(TAG_NULL) => false,
5029 _ => false, };
5031 builder.append_value(b);
5032 }
5033 }
5034 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
5035 }
5036 }
5037 }
5038}
5039
5040pub fn create_cypher_size_udf() -> ScalarUDF {
5046 ScalarUDF::new_from_impl(CypherSizeUdf::new())
5047}
5048
5049#[derive(Debug)]
5050struct CypherSizeUdf {
5051 signature: Signature,
5052}
5053
5054impl CypherSizeUdf {
5055 fn new() -> Self {
5056 Self {
5057 signature: Signature::any(1, Volatility::Immutable),
5058 }
5059 }
5060}
5061
5062impl_udf_eq_hash!(CypherSizeUdf);
5063
5064impl ScalarUDFImpl for CypherSizeUdf {
5065 fn as_any(&self) -> &dyn Any {
5066 self
5067 }
5068
5069 fn name(&self) -> &str {
5070 "_cypher_size"
5071 }
5072
5073 fn signature(&self) -> &Signature {
5074 &self.signature
5075 }
5076
5077 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
5078 Ok(DataType::Int64)
5079 }
5080
5081 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
5082 if args.args.len() != 1 {
5083 return Err(datafusion::error::DataFusionError::Execution(
5084 "_cypher_size() requires exactly 1 argument".to_string(),
5085 ));
5086 }
5087
5088 match &args.args[0] {
5089 ColumnarValue::Scalar(scalar) => {
5090 let result = cypher_size_scalar(scalar)?;
5091 Ok(ColumnarValue::Scalar(result))
5092 }
5093 ColumnarValue::Array(arr) => {
5094 let mut results: Vec<Option<i64>> = Vec::with_capacity(arr.len());
5095 for i in 0..arr.len() {
5096 if arr.is_null(i) {
5097 results.push(None);
5098 } else {
5099 let scalar = ScalarValue::try_from_array(arr, i)?;
5100 match cypher_size_scalar(&scalar)? {
5101 ScalarValue::Int64(v) => results.push(v),
5102 _ => results.push(None),
5103 }
5104 }
5105 }
5106 let arr: ArrayRef = Arc::new(arrow_array::Int64Array::from(results));
5107 Ok(ColumnarValue::Array(arr))
5108 }
5109 }
5110 }
5111}
5112
5113fn cypher_size_scalar(scalar: &ScalarValue) -> DFResult<ScalarValue> {
5114 match scalar {
5115 ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s)) => {
5117 Ok(ScalarValue::Int64(Some(s.chars().count() as i64)))
5118 }
5119 ScalarValue::List(arr) => {
5122 if arr.is_empty() || arr.is_null(0) {
5123 Ok(ScalarValue::Int64(None))
5124 } else {
5125 Ok(ScalarValue::Int64(Some(arr.value(0).len() as i64)))
5126 }
5127 }
5128 ScalarValue::LargeList(arr) => {
5129 if arr.is_empty() || arr.is_null(0) {
5130 Ok(ScalarValue::Int64(None))
5131 } else {
5132 Ok(ScalarValue::Int64(Some(arr.value(0).len() as i64)))
5133 }
5134 }
5135 ScalarValue::LargeBinary(Some(b)) => {
5137 if let Ok(uni_val) = uni_common::cypher_value_codec::decode(b) {
5138 match &uni_val {
5139 uni_common::Value::Node(_) => {
5140 Err(datafusion::error::DataFusionError::Execution(
5141 "TypeError: InvalidArgumentValue - length() is not supported for Node values".to_string(),
5142 ))
5143 }
5144 uni_common::Value::Edge(_) => {
5145 Err(datafusion::error::DataFusionError::Execution(
5146 "TypeError: InvalidArgumentValue - length() is not supported for Relationship values".to_string(),
5147 ))
5148 }
5149 _ => {
5150 let json_val: serde_json::Value = uni_val.into();
5151 match json_val {
5152 serde_json::Value::Array(arr) => Ok(ScalarValue::Int64(Some(arr.len() as i64))),
5153 serde_json::Value::String(s) => {
5154 Ok(ScalarValue::Int64(Some(s.chars().count() as i64)))
5155 }
5156 serde_json::Value::Object(m) => Ok(ScalarValue::Int64(Some(m.len() as i64))),
5157 _ => Ok(ScalarValue::Int64(None)),
5158 }
5159 }
5160 }
5161 } else {
5162 Ok(ScalarValue::Int64(None))
5163 }
5164 }
5165 ScalarValue::Map(arr) => {
5167 if arr.is_empty() || arr.is_null(0) {
5168 Ok(ScalarValue::Int64(None))
5169 } else {
5170 Ok(ScalarValue::Int64(Some(arr.value(0).len() as i64)))
5172 }
5173 }
5174 ScalarValue::Struct(arr) => {
5176 if arr.is_null(0) {
5177 Ok(ScalarValue::Int64(None))
5178 } else {
5179 let schema = arr.fields();
5180 let field_names: Vec<&str> = schema.iter().map(|f| f.name().as_str()).collect();
5181 if field_names.contains(&"_vid") && !field_names.contains(&"relationships") {
5183 return Err(datafusion::error::DataFusionError::Execution(
5184 "TypeError: InvalidArgumentValue - length() is not supported for Node values".to_string(),
5185 ));
5186 }
5187 if field_names.contains(&"_eid")
5189 || (field_names.contains(&"_src") && field_names.contains(&"_dst"))
5190 {
5191 return Err(datafusion::error::DataFusionError::Execution(
5192 "TypeError: InvalidArgumentValue - length() is not supported for Relationship values".to_string(),
5193 ));
5194 }
5195 if let Some((rels_idx, _)) = schema
5197 .iter()
5198 .enumerate()
5199 .find(|(_, f)| f.name() == "relationships")
5200 {
5201 let rels_col = arr.column(rels_idx);
5203 if let Some(list_arr) =
5204 rels_col.as_any().downcast_ref::<arrow_array::ListArray>()
5205 {
5206 if list_arr.is_null(0) {
5207 Ok(ScalarValue::Int64(Some(0)))
5208 } else {
5209 Ok(ScalarValue::Int64(Some(list_arr.value(0).len() as i64)))
5210 }
5211 } else {
5212 Ok(ScalarValue::Int64(Some(arr.num_columns() as i64)))
5213 }
5214 } else {
5215 Ok(ScalarValue::Int64(Some(arr.num_columns() as i64)))
5216 }
5217 }
5218 }
5219 ScalarValue::Null
5221 | ScalarValue::Utf8(None)
5222 | ScalarValue::LargeUtf8(None)
5223 | ScalarValue::LargeBinary(None) => Ok(ScalarValue::Int64(None)),
5224 other => Err(datafusion::error::DataFusionError::Execution(format!(
5225 "_cypher_size(): unsupported type {other:?}"
5226 ))),
5227 }
5228}
5229
5230pub fn create_cypher_list_compare_udf() -> ScalarUDF {
5236 ScalarUDF::new_from_impl(CypherListCompareUdf::new())
5237}
5238
5239#[derive(Debug)]
5240struct CypherListCompareUdf {
5241 signature: Signature,
5242}
5243
5244impl CypherListCompareUdf {
5245 fn new() -> Self {
5246 Self {
5247 signature: Signature::any(3, Volatility::Immutable),
5248 }
5249 }
5250}
5251
5252impl_udf_eq_hash!(CypherListCompareUdf);
5253
5254impl ScalarUDFImpl for CypherListCompareUdf {
5255 fn as_any(&self) -> &dyn Any {
5256 self
5257 }
5258
5259 fn name(&self) -> &str {
5260 "_cypher_list_compare"
5261 }
5262
5263 fn signature(&self) -> &Signature {
5264 &self.signature
5265 }
5266
5267 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
5268 Ok(DataType::Boolean)
5269 }
5270
5271 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
5272 let output_type = DataType::Boolean;
5273 invoke_cypher_udf(args, &output_type, |val_args| {
5274 if val_args.len() != 3 {
5275 return Err(datafusion::error::DataFusionError::Execution(
5276 "_cypher_list_compare(): requires 3 arguments (left, right, op)".to_string(),
5277 ));
5278 }
5279
5280 let left = &val_args[0];
5281 let right = &val_args[1];
5282 let op_str = match &val_args[2] {
5283 Value::String(s) => s.as_str(),
5284 _ => {
5285 return Err(datafusion::error::DataFusionError::Execution(
5286 "_cypher_list_compare(): op must be a string".to_string(),
5287 ));
5288 }
5289 };
5290
5291 let (left_items, right_items) = match (left, right) {
5292 (Value::List(l), Value::List(r)) => (l, r),
5293 (Value::Null, _) | (_, Value::Null) => return Ok(Value::Null),
5294 _ => {
5295 return Err(datafusion::error::DataFusionError::Execution(
5296 "_cypher_list_compare(): both arguments must be lists".to_string(),
5297 ));
5298 }
5299 };
5300
5301 let cmp = cypher_list_cmp(left_items, right_items);
5303
5304 let result = match (op_str, cmp) {
5305 (_, None) => Value::Null,
5306 ("lt", Some(ord)) => Value::Bool(ord == std::cmp::Ordering::Less),
5307 ("lteq", Some(ord)) => Value::Bool(ord != std::cmp::Ordering::Greater),
5308 ("gt", Some(ord)) => Value::Bool(ord == std::cmp::Ordering::Greater),
5309 ("gteq", Some(ord)) => Value::Bool(ord != std::cmp::Ordering::Less),
5310 _ => {
5311 return Err(datafusion::error::DataFusionError::Execution(format!(
5312 "_cypher_list_compare(): unknown op '{}'",
5313 op_str
5314 )));
5315 }
5316 };
5317
5318 Ok(result)
5319 })
5320 }
5321}
5322
5323pub fn create_map_project_udf() -> ScalarUDF {
5328 ScalarUDF::new_from_impl(MapProjectUdf::new())
5329}
5330
5331#[derive(Debug)]
5332struct MapProjectUdf {
5333 signature: Signature,
5334}
5335
5336impl MapProjectUdf {
5337 fn new() -> Self {
5338 Self {
5339 signature: Signature::new(TypeSignature::VariadicAny, Volatility::Immutable),
5340 }
5341 }
5342}
5343
5344impl_udf_eq_hash!(MapProjectUdf);
5345
5346impl ScalarUDFImpl for MapProjectUdf {
5347 fn as_any(&self) -> &dyn Any {
5348 self
5349 }
5350
5351 fn name(&self) -> &str {
5352 "_map_project"
5353 }
5354
5355 fn signature(&self) -> &Signature {
5356 &self.signature
5357 }
5358
5359 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
5360 Ok(DataType::LargeBinary)
5361 }
5362
5363 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
5364 let output_type = self.return_type(&[])?;
5365 invoke_cypher_udf(args, &output_type, |val_args| {
5366 let mut result_map = std::collections::HashMap::new();
5367 let mut i = 0;
5368 while i + 1 < val_args.len() {
5369 let key = &val_args[i];
5370 let value = &val_args[i + 1];
5371 if let Some(k) = key.as_str() {
5372 if k == "__all__" {
5373 match value {
5375 Value::Map(map) => {
5376 let source = match map.get("_all_props") {
5383 Some(Value::Map(all)) => all,
5384 _ => map,
5385 };
5386 for (mk, mv) in source {
5387 if !mk.starts_with('_') {
5388 result_map.insert(mk.clone(), mv.clone());
5389 }
5390 }
5391 }
5392 Value::Node(node) => {
5393 for (pk, pv) in &node.properties {
5394 result_map.insert(pk.clone(), pv.clone());
5395 }
5396 }
5397 Value::Edge(edge) => {
5398 for (pk, pv) in &edge.properties {
5399 result_map.insert(pk.clone(), pv.clone());
5400 }
5401 }
5402 _ => {}
5403 }
5404 } else {
5405 result_map.insert(k.to_string(), value.clone());
5406 }
5407 }
5408 i += 2;
5409 }
5410 Ok(Value::Map(result_map))
5411 })
5412 }
5413}
5414
5415pub fn create_make_cypher_list_udf() -> ScalarUDF {
5420 ScalarUDF::new_from_impl(MakeCypherListUdf::new())
5421}
5422
5423#[derive(Debug)]
5424struct MakeCypherListUdf {
5425 signature: Signature,
5426}
5427
5428impl MakeCypherListUdf {
5429 fn new() -> Self {
5430 Self {
5431 signature: Signature::new(TypeSignature::VariadicAny, Volatility::Immutable),
5432 }
5433 }
5434}
5435
5436impl_udf_eq_hash!(MakeCypherListUdf);
5437
5438impl ScalarUDFImpl for MakeCypherListUdf {
5439 fn as_any(&self) -> &dyn Any {
5440 self
5441 }
5442
5443 fn name(&self) -> &str {
5444 "_make_cypher_list"
5445 }
5446
5447 fn signature(&self) -> &Signature {
5448 &self.signature
5449 }
5450
5451 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
5452 Ok(DataType::LargeBinary)
5453 }
5454
5455 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
5456 let output_type = self.return_type(&[])?;
5457 invoke_cypher_udf(args, &output_type, |val_args| {
5458 Ok(Value::List(val_args.to_vec()))
5459 })
5460 }
5461}
5462
5463pub fn create_cypher_in_udf() -> ScalarUDF {
5480 ScalarUDF::new_from_impl(CypherInUdf::new())
5481}
5482
5483#[derive(Debug)]
5484struct CypherInUdf {
5485 signature: Signature,
5486}
5487
5488impl CypherInUdf {
5489 fn new() -> Self {
5490 Self {
5491 signature: Signature::any(2, Volatility::Immutable),
5492 }
5493 }
5494}
5495
5496impl_udf_eq_hash!(CypherInUdf);
5497
5498impl ScalarUDFImpl for CypherInUdf {
5499 fn as_any(&self) -> &dyn Any {
5500 self
5501 }
5502
5503 fn name(&self) -> &str {
5504 "_cypher_in"
5505 }
5506
5507 fn signature(&self) -> &Signature {
5508 &self.signature
5509 }
5510
5511 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
5512 Ok(DataType::Boolean)
5513 }
5514
5515 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
5516 invoke_cypher_udf(args, &DataType::Boolean, |vals| {
5517 if vals.len() != 2 {
5518 return Err(datafusion::error::DataFusionError::Execution(
5519 "_cypher_in(): requires 2 arguments".to_string(),
5520 ));
5521 }
5522 let element = &vals[0];
5523 let list_val = &vals[1];
5524
5525 if list_val.is_null() {
5527 return Ok(Value::Null);
5528 }
5529
5530 let items = match list_val {
5532 Value::List(items) => items.as_slice(),
5533 _ => {
5534 return Err(datafusion::error::DataFusionError::Execution(format!(
5535 "_cypher_in(): second argument must be a list, got {:?}",
5536 list_val
5537 )));
5538 }
5539 };
5540
5541 if element.is_null() {
5543 return if items.is_empty() {
5544 Ok(Value::Bool(false))
5545 } else {
5546 Ok(Value::Null) };
5548 }
5549
5550 let mut has_null = false;
5552 for item in items {
5553 match cypher_eq(element, item) {
5554 Some(true) => return Ok(Value::Bool(true)),
5555 None => has_null = true,
5556 Some(false) => {}
5557 }
5558 }
5559
5560 if has_null {
5561 Ok(Value::Null) } else {
5563 Ok(Value::Bool(false))
5564 }
5565 })
5566 }
5567}
5568
5569pub fn create_cypher_list_concat_udf() -> ScalarUDF {
5575 ScalarUDF::new_from_impl(CypherListConcatUdf::new())
5576}
5577
5578#[derive(Debug)]
5579struct CypherListConcatUdf {
5580 signature: Signature,
5581}
5582
5583impl CypherListConcatUdf {
5584 fn new() -> Self {
5585 Self {
5586 signature: Signature::any(2, Volatility::Immutable),
5587 }
5588 }
5589}
5590
5591impl_udf_eq_hash!(CypherListConcatUdf);
5592
5593impl ScalarUDFImpl for CypherListConcatUdf {
5594 fn as_any(&self) -> &dyn Any {
5595 self
5596 }
5597
5598 fn name(&self) -> &str {
5599 "_cypher_list_concat"
5600 }
5601
5602 fn signature(&self) -> &Signature {
5603 &self.signature
5604 }
5605
5606 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
5607 Ok(DataType::LargeBinary)
5608 }
5609
5610 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
5611 invoke_cypher_udf(args, &DataType::LargeBinary, |vals| {
5612 if vals.len() != 2 {
5613 return Err(datafusion::error::DataFusionError::Execution(
5614 "_cypher_list_concat(): requires 2 arguments".to_string(),
5615 ));
5616 }
5617 if vals[0].is_null() || vals[1].is_null() {
5619 return Ok(Value::Null);
5620 }
5621 match (&vals[0], &vals[1]) {
5622 (Value::List(left), Value::List(right)) => {
5623 let mut result = left.clone();
5624 result.extend(right.iter().cloned());
5625 Ok(Value::List(result))
5626 }
5627 (Value::List(list), elem) => {
5630 let mut result = list.clone();
5631 result.push(elem.clone());
5632 Ok(Value::List(result))
5633 }
5634 (elem, Value::List(list)) => {
5635 let mut result = vec![elem.clone()];
5636 result.extend(list.iter().cloned());
5637 Ok(Value::List(result))
5638 }
5639 _ => {
5640 crate::expr_eval::eval_binary_op(
5643 &vals[0],
5644 &uni_cypher::ast::BinaryOp::Add,
5645 &vals[1],
5646 )
5647 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
5648 }
5649 }
5650 })
5651 }
5652}
5653
5654pub fn create_cypher_list_append_udf() -> ScalarUDF {
5660 ScalarUDF::new_from_impl(CypherListAppendUdf::new())
5661}
5662
5663#[derive(Debug)]
5664struct CypherListAppendUdf {
5665 signature: Signature,
5666}
5667
5668impl CypherListAppendUdf {
5669 fn new() -> Self {
5670 Self {
5671 signature: Signature::any(2, Volatility::Immutable),
5672 }
5673 }
5674}
5675
5676impl_udf_eq_hash!(CypherListAppendUdf);
5677
5678impl ScalarUDFImpl for CypherListAppendUdf {
5679 fn as_any(&self) -> &dyn Any {
5680 self
5681 }
5682
5683 fn name(&self) -> &str {
5684 "_cypher_list_append"
5685 }
5686
5687 fn signature(&self) -> &Signature {
5688 &self.signature
5689 }
5690
5691 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
5692 Ok(DataType::LargeBinary)
5693 }
5694
5695 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
5696 invoke_cypher_udf(args, &DataType::LargeBinary, |vals| {
5697 if vals.len() != 2 {
5698 return Err(datafusion::error::DataFusionError::Execution(
5699 "_cypher_list_append(): requires 2 arguments".to_string(),
5700 ));
5701 }
5702 let left = &vals[0];
5703 let right = &vals[1];
5704
5705 if left.is_null() || right.is_null() {
5707 return Ok(Value::Null);
5708 }
5709
5710 match (left, right) {
5711 (Value::List(list), elem) => {
5713 let mut result = list.clone();
5714 result.push(elem.clone());
5715 Ok(Value::List(result))
5716 }
5717 (elem, Value::List(list)) => {
5719 let mut result = vec![elem.clone()];
5720 result.extend(list.iter().cloned());
5721 Ok(Value::List(result))
5722 }
5723 _ => Err(datafusion::error::DataFusionError::Execution(format!(
5724 "_cypher_list_append(): at least one argument must be a list, got {:?} and {:?}",
5725 left, right
5726 ))),
5727 }
5728 })
5729 }
5730}
5731
5732pub fn create_cypher_list_slice_udf() -> ScalarUDF {
5738 ScalarUDF::new_from_impl(CypherListSliceUdf::new())
5739}
5740
5741#[derive(Debug)]
5742struct CypherListSliceUdf {
5743 signature: Signature,
5744}
5745
5746impl CypherListSliceUdf {
5747 fn new() -> Self {
5748 Self {
5749 signature: Signature::any(3, Volatility::Immutable),
5750 }
5751 }
5752}
5753
5754impl_udf_eq_hash!(CypherListSliceUdf);
5755
5756impl ScalarUDFImpl for CypherListSliceUdf {
5757 fn as_any(&self) -> &dyn Any {
5758 self
5759 }
5760
5761 fn name(&self) -> &str {
5762 "_cypher_list_slice"
5763 }
5764
5765 fn signature(&self) -> &Signature {
5766 &self.signature
5767 }
5768
5769 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
5770 Ok(DataType::LargeBinary)
5771 }
5772
5773 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
5774 invoke_cypher_udf(args, &DataType::LargeBinary, |vals| {
5775 if vals.len() != 3 {
5776 return Err(datafusion::error::DataFusionError::Execution(
5777 "_cypher_list_slice(): requires 3 arguments (list, start, end)".to_string(),
5778 ));
5779 }
5780 if vals[0].is_null() {
5782 return Ok(Value::Null);
5783 }
5784 let list = match &vals[0] {
5785 Value::List(l) => l,
5786 _ => {
5787 return Err(datafusion::error::DataFusionError::Execution(format!(
5788 "_cypher_list_slice(): first argument must be a list, got {:?}",
5789 vals[0]
5790 )));
5791 }
5792 };
5793 if vals[1].is_null() || vals[2].is_null() {
5795 return Ok(Value::Null);
5796 }
5797
5798 let len = list.len() as i64;
5799 let raw_start = match &vals[1] {
5800 Value::Int(i) => *i,
5801 _ => 0,
5802 };
5803 let raw_end = match &vals[2] {
5804 Value::Int(i) => *i,
5805 _ => len,
5806 };
5807
5808 let start = if raw_start < 0 {
5810 (len + raw_start).max(0) as usize
5811 } else {
5812 (raw_start).min(len) as usize
5813 };
5814 let end = if raw_end == i64::MAX {
5815 len as usize
5816 } else if raw_end < 0 {
5817 (len + raw_end).max(0) as usize
5818 } else {
5819 (raw_end).min(len) as usize
5820 };
5821
5822 if start >= end {
5823 return Ok(Value::List(vec![]));
5824 }
5825 Ok(Value::List(list[start..end.min(list.len())].to_vec()))
5826 })
5827 }
5828}
5829
5830pub fn create_cypher_reverse_udf() -> ScalarUDF {
5841 ScalarUDF::new_from_impl(CypherReverseUdf::new())
5842}
5843
5844#[derive(Debug)]
5845struct CypherReverseUdf {
5846 signature: Signature,
5847}
5848
5849impl CypherReverseUdf {
5850 fn new() -> Self {
5851 Self {
5852 signature: Signature::any(1, Volatility::Immutable),
5853 }
5854 }
5855}
5856
5857impl_udf_eq_hash!(CypherReverseUdf);
5858
5859impl ScalarUDFImpl for CypherReverseUdf {
5860 fn as_any(&self) -> &dyn Any {
5861 self
5862 }
5863
5864 fn name(&self) -> &str {
5865 "_cypher_reverse"
5866 }
5867
5868 fn signature(&self) -> &Signature {
5869 &self.signature
5870 }
5871
5872 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
5873 Ok(DataType::LargeBinary)
5874 }
5875
5876 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
5877 invoke_cypher_udf(args, &DataType::LargeBinary, |vals| {
5878 if vals.len() != 1 {
5879 return Err(datafusion::error::DataFusionError::Execution(
5880 "_cypher_reverse(): requires exactly 1 argument".to_string(),
5881 ));
5882 }
5883 match &vals[0] {
5884 Value::Null => Ok(Value::Null),
5885 Value::String(s) => Ok(Value::String(s.chars().rev().collect())),
5886 Value::List(l) => {
5887 let mut reversed = l.clone();
5888 reversed.reverse();
5889 Ok(Value::List(reversed))
5890 }
5891 other => Err(datafusion::error::DataFusionError::Execution(format!(
5892 "_cypher_reverse(): expected string or list, got {:?}",
5893 other
5894 ))),
5895 }
5896 })
5897 }
5898}
5899
5900pub fn create_cypher_substring_udf() -> ScalarUDF {
5911 ScalarUDF::new_from_impl(CypherSubstringUdf::new())
5912}
5913
5914#[derive(Debug)]
5915struct CypherSubstringUdf {
5916 signature: Signature,
5917}
5918
5919impl CypherSubstringUdf {
5920 fn new() -> Self {
5921 Self {
5922 signature: Signature::variadic_any(Volatility::Immutable),
5923 }
5924 }
5925}
5926
5927impl_udf_eq_hash!(CypherSubstringUdf);
5928
5929impl ScalarUDFImpl for CypherSubstringUdf {
5930 fn as_any(&self) -> &dyn Any {
5931 self
5932 }
5933
5934 fn name(&self) -> &str {
5935 "_cypher_substring"
5936 }
5937
5938 fn signature(&self) -> &Signature {
5939 &self.signature
5940 }
5941
5942 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
5943 Ok(DataType::Utf8)
5944 }
5945
5946 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
5947 invoke_cypher_udf(args, &DataType::Utf8, |vals| {
5948 if vals.len() < 2 || vals.len() > 3 {
5949 return Err(datafusion::error::DataFusionError::Execution(
5950 "_cypher_substring(): requires 2 or 3 arguments".to_string(),
5951 ));
5952 }
5953 if vals.iter().any(|v| v.is_null()) {
5955 return Ok(Value::Null);
5956 }
5957 let s = match &vals[0] {
5958 Value::String(s) => s.as_str(),
5959 other => {
5960 return Err(datafusion::error::DataFusionError::Execution(format!(
5961 "_cypher_substring(): first argument must be a string, got {:?}",
5962 other
5963 )));
5964 }
5965 };
5966 let start = match &vals[1] {
5967 Value::Int(i) => *i,
5968 other => {
5969 return Err(datafusion::error::DataFusionError::Execution(format!(
5970 "_cypher_substring(): second argument must be an integer, got {:?}",
5971 other
5972 )));
5973 }
5974 };
5975
5976 let chars: Vec<char> = s.chars().collect();
5978 let len = chars.len() as i64;
5979
5980 let start_idx = start.max(0).min(len) as usize;
5982
5983 let end_idx = if vals.len() == 3 {
5984 let length = match &vals[2] {
5985 Value::Int(i) => *i,
5986 other => {
5987 return Err(datafusion::error::DataFusionError::Execution(format!(
5988 "_cypher_substring(): third argument must be an integer, got {:?}",
5989 other
5990 )));
5991 }
5992 };
5993 if length < 0 {
5994 return Err(datafusion::error::DataFusionError::Execution(
5995 "ArgumentError: NegativeIntegerArgument - substring length must be non-negative".to_string(),
5996 ));
5997 }
5998 (start_idx as i64 + length).min(len) as usize
5999 } else {
6000 len as usize
6001 };
6002
6003 Ok(Value::String(chars[start_idx..end_idx].iter().collect()))
6004 })
6005 }
6006}
6007
6008pub fn create_cypher_split_udf() -> ScalarUDF {
6017 ScalarUDF::new_from_impl(CypherSplitUdf::new())
6018}
6019
6020#[derive(Debug)]
6021struct CypherSplitUdf {
6022 signature: Signature,
6023}
6024
6025impl CypherSplitUdf {
6026 fn new() -> Self {
6027 Self {
6028 signature: Signature::any(2, Volatility::Immutable),
6029 }
6030 }
6031}
6032
6033impl_udf_eq_hash!(CypherSplitUdf);
6034
6035impl ScalarUDFImpl for CypherSplitUdf {
6036 fn as_any(&self) -> &dyn Any {
6037 self
6038 }
6039
6040 fn name(&self) -> &str {
6041 "_cypher_split"
6042 }
6043
6044 fn signature(&self) -> &Signature {
6045 &self.signature
6046 }
6047
6048 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
6049 Ok(DataType::LargeBinary)
6050 }
6051
6052 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
6053 invoke_cypher_udf(args, &DataType::LargeBinary, |vals| {
6054 if vals.len() != 2 {
6055 return Err(datafusion::error::DataFusionError::Execution(
6056 "_cypher_split(): requires exactly 2 arguments".to_string(),
6057 ));
6058 }
6059 if vals.iter().any(|v| v.is_null()) {
6061 return Ok(Value::Null);
6062 }
6063 let s = match &vals[0] {
6064 Value::String(s) => s.clone(),
6065 other => {
6066 return Err(datafusion::error::DataFusionError::Execution(format!(
6067 "_cypher_split(): first argument must be a string, got {:?}",
6068 other
6069 )));
6070 }
6071 };
6072 let delimiter = match &vals[1] {
6073 Value::String(d) => d.clone(),
6074 other => {
6075 return Err(datafusion::error::DataFusionError::Execution(format!(
6076 "_cypher_split(): second argument must be a string, got {:?}",
6077 other
6078 )));
6079 }
6080 };
6081 let parts: Vec<Value> = s
6082 .split(&delimiter)
6083 .map(|p| Value::String(p.to_string()))
6084 .collect();
6085 Ok(Value::List(parts))
6086 })
6087 }
6088}
6089
6090pub fn create_cypher_list_to_cv_udf() -> ScalarUDF {
6101 ScalarUDF::new_from_impl(CypherListToCvUdf::new())
6102}
6103
6104#[derive(Debug)]
6105struct CypherListToCvUdf {
6106 signature: Signature,
6107}
6108
6109impl CypherListToCvUdf {
6110 fn new() -> Self {
6111 Self {
6112 signature: Signature::any(1, Volatility::Immutable),
6113 }
6114 }
6115}
6116
6117impl_udf_eq_hash!(CypherListToCvUdf);
6118
6119impl ScalarUDFImpl for CypherListToCvUdf {
6120 fn as_any(&self) -> &dyn Any {
6121 self
6122 }
6123
6124 fn name(&self) -> &str {
6125 "_cypher_list_to_cv"
6126 }
6127
6128 fn signature(&self) -> &Signature {
6129 &self.signature
6130 }
6131
6132 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
6133 Ok(DataType::LargeBinary)
6134 }
6135
6136 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
6137 invoke_cypher_udf(args, &DataType::LargeBinary, |vals| {
6138 if vals.len() != 1 {
6139 return Err(datafusion::error::DataFusionError::Execution(
6140 "_cypher_list_to_cv(): requires exactly 1 argument".to_string(),
6141 ));
6142 }
6143 Ok(vals[0].clone())
6144 })
6145 }
6146}
6147
6148pub fn create_cypher_scalar_to_cv_udf() -> ScalarUDF {
6159 ScalarUDF::new_from_impl(CypherScalarToCvUdf::new())
6160}
6161
6162#[derive(Debug)]
6163struct CypherScalarToCvUdf {
6164 signature: Signature,
6165}
6166
6167impl CypherScalarToCvUdf {
6168 fn new() -> Self {
6169 Self {
6170 signature: Signature::any(1, Volatility::Immutable),
6171 }
6172 }
6173}
6174
6175impl_udf_eq_hash!(CypherScalarToCvUdf);
6176
6177impl ScalarUDFImpl for CypherScalarToCvUdf {
6178 fn as_any(&self) -> &dyn Any {
6179 self
6180 }
6181
6182 fn name(&self) -> &str {
6183 "_cypher_scalar_to_cv"
6184 }
6185
6186 fn signature(&self) -> &Signature {
6187 &self.signature
6188 }
6189
6190 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
6191 Ok(DataType::LargeBinary)
6192 }
6193
6194 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
6195 invoke_cypher_udf(args, &DataType::LargeBinary, |vals| {
6196 if vals.len() != 1 {
6197 return Err(datafusion::error::DataFusionError::Execution(
6198 "_cypher_scalar_to_cv(): requires exactly 1 argument".to_string(),
6199 ));
6200 }
6201 Ok(vals[0].clone())
6202 })
6203 }
6204}
6205
6206pub fn create_cypher_tail_udf() -> ScalarUDF {
6218 ScalarUDF::new_from_impl(CypherTailUdf::new())
6219}
6220
6221#[derive(Debug)]
6222struct CypherTailUdf {
6223 signature: Signature,
6224}
6225
6226impl CypherTailUdf {
6227 fn new() -> Self {
6228 Self {
6229 signature: Signature::any(1, Volatility::Immutable),
6230 }
6231 }
6232}
6233
6234impl_udf_eq_hash!(CypherTailUdf);
6235
6236impl ScalarUDFImpl for CypherTailUdf {
6237 fn as_any(&self) -> &dyn Any {
6238 self
6239 }
6240
6241 fn name(&self) -> &str {
6242 "_cypher_tail"
6243 }
6244
6245 fn signature(&self) -> &Signature {
6246 &self.signature
6247 }
6248
6249 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
6250 Ok(DataType::LargeBinary)
6251 }
6252
6253 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
6254 invoke_cypher_udf(args, &DataType::LargeBinary, |vals| {
6255 if vals.len() != 1 {
6256 return Err(datafusion::error::DataFusionError::Execution(
6257 "_cypher_tail(): requires exactly 1 argument".to_string(),
6258 ));
6259 }
6260 match &vals[0] {
6261 Value::Null => Ok(Value::Null),
6262 Value::List(l) => {
6263 if l.is_empty() {
6264 Ok(Value::List(vec![]))
6265 } else {
6266 Ok(Value::List(l[1..].to_vec()))
6267 }
6268 }
6269 other => Err(datafusion::error::DataFusionError::Execution(format!(
6270 "_cypher_tail(): expected list, got {:?}",
6271 other
6272 ))),
6273 }
6274 })
6275 }
6276}
6277
6278pub fn create_cypher_head_udf() -> ScalarUDF {
6289 ScalarUDF::new_from_impl(CypherHeadUdf::new())
6290}
6291
6292#[derive(Debug)]
6293struct CypherHeadUdf {
6294 signature: Signature,
6295}
6296
6297impl CypherHeadUdf {
6298 fn new() -> Self {
6299 Self {
6300 signature: Signature::any(1, Volatility::Immutable),
6301 }
6302 }
6303}
6304
6305impl_udf_eq_hash!(CypherHeadUdf);
6306
6307impl ScalarUDFImpl for CypherHeadUdf {
6308 fn as_any(&self) -> &dyn Any {
6309 self
6310 }
6311
6312 fn name(&self) -> &str {
6313 "head"
6314 }
6315
6316 fn signature(&self) -> &Signature {
6317 &self.signature
6318 }
6319
6320 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
6321 Ok(DataType::LargeBinary)
6322 }
6323
6324 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
6325 invoke_cypher_udf(args, &DataType::LargeBinary, |vals| {
6326 if vals.len() != 1 {
6327 return Err(datafusion::error::DataFusionError::Execution(
6328 "head(): requires exactly 1 argument".to_string(),
6329 ));
6330 }
6331 match &vals[0] {
6332 Value::Null => Ok(Value::Null),
6333 Value::List(l) => Ok(l.first().cloned().unwrap_or(Value::Null)),
6334 other => Err(datafusion::error::DataFusionError::Execution(format!(
6335 "head(): expected list, got {:?}",
6336 other
6337 ))),
6338 }
6339 })
6340 }
6341}
6342
6343pub fn create_cypher_last_udf() -> ScalarUDF {
6354 ScalarUDF::new_from_impl(CypherLastUdf::new())
6355}
6356
6357#[derive(Debug)]
6358struct CypherLastUdf {
6359 signature: Signature,
6360}
6361
6362impl CypherLastUdf {
6363 fn new() -> Self {
6364 Self {
6365 signature: Signature::any(1, Volatility::Immutable),
6366 }
6367 }
6368}
6369
6370impl_udf_eq_hash!(CypherLastUdf);
6371
6372impl ScalarUDFImpl for CypherLastUdf {
6373 fn as_any(&self) -> &dyn Any {
6374 self
6375 }
6376
6377 fn name(&self) -> &str {
6378 "last"
6379 }
6380
6381 fn signature(&self) -> &Signature {
6382 &self.signature
6383 }
6384
6385 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
6386 Ok(DataType::LargeBinary)
6387 }
6388
6389 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
6390 invoke_cypher_udf(args, &DataType::LargeBinary, |vals| {
6391 if vals.len() != 1 {
6392 return Err(datafusion::error::DataFusionError::Execution(
6393 "last(): requires exactly 1 argument".to_string(),
6394 ));
6395 }
6396 match &vals[0] {
6397 Value::Null => Ok(Value::Null),
6398 Value::List(l) => Ok(l.last().cloned().unwrap_or(Value::Null)),
6399 other => Err(datafusion::error::DataFusionError::Execution(format!(
6400 "last(): expected list, got {:?}",
6401 other
6402 ))),
6403 }
6404 })
6405 }
6406}
6407
6408fn cypher_list_cmp(left: &[Value], right: &[Value]) -> Option<std::cmp::Ordering> {
6411 let min_len = left.len().min(right.len());
6412 for i in 0..min_len {
6413 let cmp = cypher_value_cmp(&left[i], &right[i])?;
6414 if cmp != std::cmp::Ordering::Equal {
6415 return Some(cmp);
6416 }
6417 }
6418 Some(left.len().cmp(&right.len()))
6420}
6421
6422fn cypher_value_cmp(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
6425 match (a, b) {
6426 (Value::Null, Value::Null) => Some(std::cmp::Ordering::Equal),
6427 (Value::Null, _) | (_, Value::Null) => None,
6428 (Value::Int(l), Value::Int(r)) => Some(l.cmp(r)),
6429 (Value::Float(l), Value::Float(r)) => l.partial_cmp(r),
6430 (Value::Int(l), Value::Float(r)) => (*l as f64).partial_cmp(r),
6431 (Value::Float(l), Value::Int(r)) => l.partial_cmp(&(*r as f64)),
6432 (Value::String(l), Value::String(r)) => Some(l.cmp(r)),
6433 (Value::Bool(l), Value::Bool(r)) => Some(l.cmp(r)),
6434 (Value::List(l), Value::List(r)) => cypher_list_cmp(l, r),
6435 _ => None, }
6437}
6438
6439struct CypherToFloat64Udf {
6447 signature: Signature,
6448}
6449
6450impl CypherToFloat64Udf {
6451 fn new() -> Self {
6452 Self {
6453 signature: Signature::new(TypeSignature::Any(1), Volatility::Immutable),
6454 }
6455 }
6456}
6457
6458impl_udf_eq_hash!(CypherToFloat64Udf);
6459
6460impl std::fmt::Debug for CypherToFloat64Udf {
6461 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
6462 f.debug_struct("CypherToFloat64Udf").finish()
6463 }
6464}
6465
6466impl ScalarUDFImpl for CypherToFloat64Udf {
6467 fn as_any(&self) -> &dyn Any {
6468 self
6469 }
6470 fn name(&self) -> &str {
6471 "_cypher_to_float64"
6472 }
6473 fn signature(&self) -> &Signature {
6474 &self.signature
6475 }
6476 fn return_type(&self, _args: &[DataType]) -> DFResult<DataType> {
6477 Ok(DataType::Float64)
6478 }
6479 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
6480 if args.args.len() != 1 {
6481 return Err(datafusion::error::DataFusionError::Execution(
6482 "_cypher_to_float64 requires exactly 1 argument".into(),
6483 ));
6484 }
6485 match &args.args[0] {
6486 ColumnarValue::Scalar(scalar) => {
6487 let f = match scalar {
6488 ScalarValue::LargeBinary(Some(bytes)) => cv_bytes_as_f64(bytes),
6489 ScalarValue::Int64(Some(i)) => Some(*i as f64),
6490 ScalarValue::Int32(Some(i)) => Some(*i as f64),
6491 ScalarValue::Float64(Some(f)) => Some(*f),
6492 ScalarValue::Float32(Some(f)) => Some(*f as f64),
6493 _ => None,
6494 };
6495 Ok(ColumnarValue::Scalar(ScalarValue::Float64(f)))
6496 }
6497 ColumnarValue::Array(arr) => {
6498 let len = arr.len();
6499 let mut builder = arrow::array::Float64Builder::with_capacity(len);
6500 match arr.data_type() {
6501 DataType::LargeBinary => {
6502 let lb = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
6503 for i in 0..len {
6504 if lb.is_null(i) {
6505 builder.append_null();
6506 } else {
6507 match cv_bytes_as_f64(lb.value(i)) {
6508 Some(f) => builder.append_value(f),
6509 None => builder.append_null(),
6510 }
6511 }
6512 }
6513 }
6514 DataType::Int64 => {
6515 let int_arr = arr.as_any().downcast_ref::<Int64Array>().unwrap();
6516 for i in 0..len {
6517 if int_arr.is_null(i) {
6518 builder.append_null();
6519 } else {
6520 builder.append_value(int_arr.value(i) as f64);
6521 }
6522 }
6523 }
6524 DataType::Float64 => {
6525 let f_arr = arr.as_any().downcast_ref::<Float64Array>().unwrap();
6526 for i in 0..len {
6527 if f_arr.is_null(i) {
6528 builder.append_null();
6529 } else {
6530 builder.append_value(f_arr.value(i));
6531 }
6532 }
6533 }
6534 _ => {
6535 for _ in 0..len {
6536 builder.append_null();
6537 }
6538 }
6539 }
6540 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
6541 }
6542 }
6543 }
6544}
6545
6546fn create_cypher_to_float64_udf() -> ScalarUDF {
6547 ScalarUDF::from(CypherToFloat64Udf::new())
6548}
6549
6550pub fn cypher_to_float64_expr(
6552 arg: datafusion::logical_expr::Expr,
6553) -> datafusion::logical_expr::Expr {
6554 datafusion::logical_expr::Expr::ScalarFunction(
6555 datafusion::logical_expr::expr::ScalarFunction::new_udf(
6556 Arc::new(create_cypher_to_float64_udf()),
6557 vec![arg],
6558 ),
6559 )
6560}
6561
6562pub fn cypher_to_float64_udf() -> datafusion::logical_expr::ScalarUDF {
6564 create_cypher_to_float64_udf()
6565}
6566
6567fn cypher_type_rank(val: &Value) -> u8 {
6575 match val {
6576 Value::Null => 0,
6577 Value::List(_) => 1,
6578 Value::String(_) => 2,
6579 Value::Bool(_) => 3,
6580 Value::Int(_) | Value::Float(_) => 4,
6581 _ => 5, }
6583}
6584
6585fn cypher_cross_type_cmp(a: &Value, b: &Value) -> std::cmp::Ordering {
6588 use std::cmp::Ordering;
6589 let ra = cypher_type_rank(a);
6590 let rb = cypher_type_rank(b);
6591 if ra != rb {
6592 return ra.cmp(&rb);
6593 }
6594 match (a, b) {
6596 (Value::Int(l), Value::Int(r)) => l.cmp(r),
6597 (Value::Float(l), Value::Float(r)) => l.partial_cmp(r).unwrap_or(Ordering::Equal),
6598 (Value::Int(l), Value::Float(r)) => (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal),
6599 (Value::Float(l), Value::Int(r)) => l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal),
6600 (Value::String(l), Value::String(r)) => l.cmp(r),
6601 (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
6602 (Value::List(l), Value::List(r)) => cypher_list_cmp(l, r).unwrap_or(Ordering::Equal),
6603 _ => Ordering::Equal,
6604 }
6605}
6606
6607fn scalar_binary_to_value(bytes: &[u8]) -> Value {
6609 uni_common::cypher_value_codec::decode(bytes).unwrap_or(Value::Null)
6610}
6611
6612use datafusion::logical_expr::{Accumulator as DfAccumulator, AggregateUDF, AggregateUDFImpl};
6613
6614#[derive(Debug, Clone)]
6616struct CypherMinMaxUdaf {
6617 name: String,
6618 signature: Signature,
6619 is_max: bool,
6620}
6621
6622impl CypherMinMaxUdaf {
6623 fn new(is_max: bool) -> Self {
6624 let name = if is_max { "_cypher_max" } else { "_cypher_min" };
6625 Self {
6626 name: name.to_string(),
6627 signature: Signature::new(TypeSignature::Any(1), Volatility::Immutable),
6628 is_max,
6629 }
6630 }
6631}
6632
6633impl PartialEq for CypherMinMaxUdaf {
6634 fn eq(&self, other: &Self) -> bool {
6635 self.name == other.name
6636 }
6637}
6638
6639impl Eq for CypherMinMaxUdaf {}
6640
6641impl Hash for CypherMinMaxUdaf {
6642 fn hash<H: Hasher>(&self, state: &mut H) {
6643 self.name.hash(state);
6644 }
6645}
6646
6647impl AggregateUDFImpl for CypherMinMaxUdaf {
6648 fn as_any(&self) -> &dyn Any {
6649 self
6650 }
6651 fn name(&self) -> &str {
6652 &self.name
6653 }
6654 fn signature(&self) -> &Signature {
6655 &self.signature
6656 }
6657 fn return_type(&self, args: &[DataType]) -> DFResult<DataType> {
6658 Ok(args.first().cloned().unwrap_or(DataType::LargeBinary))
6660 }
6661 fn accumulator(
6662 &self,
6663 acc_args: datafusion::logical_expr::function::AccumulatorArgs,
6664 ) -> DFResult<Box<dyn DfAccumulator>> {
6665 let raw_bytes = acc_args
6671 .expr_fields
6672 .first()
6673 .and_then(|field| field.metadata().get("uni_raw_bytes"))
6674 .is_some_and(|v| v == "true");
6675 Ok(Box::new(CypherMinMaxAccumulator {
6676 current: None,
6677 is_max: self.is_max,
6678 return_type: acc_args.return_field.data_type().clone(),
6679 raw_bytes,
6680 }))
6681 }
6682 fn state_fields(
6683 &self,
6684 args: datafusion::logical_expr::function::StateFieldsArgs,
6685 ) -> DFResult<Vec<Arc<arrow::datatypes::Field>>> {
6686 Ok(vec![Arc::new(arrow::datatypes::Field::new(
6687 args.name,
6688 DataType::LargeBinary,
6689 true,
6690 ))])
6691 }
6692}
6693
6694#[derive(Debug)]
6695struct CypherMinMaxAccumulator {
6696 current: Option<Value>,
6697 is_max: bool,
6698 return_type: DataType,
6699 raw_bytes: bool,
6701}
6702
6703impl CypherMinMaxAccumulator {
6704 fn accumulate(&mut self, val: Value) {
6706 if val.is_null() {
6707 return;
6708 }
6709 self.current = Some(match self.current.take() {
6710 None => val,
6711 Some(cur) => {
6712 let ord = cypher_cross_type_cmp(&val, &cur);
6713 if (self.is_max && ord == std::cmp::Ordering::Greater)
6714 || (!self.is_max && ord == std::cmp::Ordering::Less)
6715 {
6716 val
6717 } else {
6718 cur
6719 }
6720 }
6721 });
6722 }
6723}
6724
6725impl DfAccumulator for CypherMinMaxAccumulator {
6726 fn update_batch(&mut self, values: &[ArrayRef]) -> DFResult<()> {
6727 let arr = &values[0];
6728 match arr.data_type() {
6729 DataType::LargeBinary => {
6730 let lb = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
6731 for i in 0..lb.len() {
6732 if lb.is_null(i) {
6733 continue;
6734 }
6735 let val = if self.raw_bytes {
6738 Value::Bytes(lb.value(i).to_vec())
6739 } else {
6740 scalar_binary_to_value(lb.value(i))
6741 };
6742 self.accumulate(val);
6743 }
6744 }
6745 _ => {
6746 for i in 0..arr.len() {
6748 if arr.is_null(i) {
6749 continue;
6750 }
6751 let sv = ScalarValue::try_from_array(arr, i).map_err(|e| {
6752 datafusion::error::DataFusionError::Execution(e.to_string())
6753 })?;
6754 self.accumulate(scalar_to_value(&sv)?);
6755 }
6756 }
6757 }
6758 Ok(())
6759 }
6760 fn evaluate(&mut self) -> DFResult<ScalarValue> {
6761 match &self.current {
6762 None => {
6763 ScalarValue::try_from(&self.return_type)
6765 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
6766 }
6767 Some(val) => {
6768 if matches!(self.return_type, DataType::LargeBinary) {
6770 let bytes = uni_common::cypher_value_codec::encode(val);
6771 return Ok(ScalarValue::LargeBinary(Some(bytes)));
6772 }
6773 match val {
6775 Value::Int(i) => match &self.return_type {
6776 DataType::Int64 => Ok(ScalarValue::Int64(Some(*i))),
6777 DataType::UInt64 => Ok(ScalarValue::UInt64(Some(*i as u64))),
6778 _ => {
6779 let bytes = uni_common::cypher_value_codec::encode(val);
6780 Ok(ScalarValue::LargeBinary(Some(bytes)))
6781 }
6782 },
6783 Value::Float(f) => match &self.return_type {
6784 DataType::Float64 => Ok(ScalarValue::Float64(Some(*f))),
6785 _ => {
6786 let bytes = uni_common::cypher_value_codec::encode(val);
6787 Ok(ScalarValue::LargeBinary(Some(bytes)))
6788 }
6789 },
6790 Value::String(s) => match &self.return_type {
6791 DataType::Utf8 => Ok(ScalarValue::Utf8(Some(s.clone()))),
6792 DataType::LargeUtf8 => Ok(ScalarValue::LargeUtf8(Some(s.clone()))),
6793 _ => {
6794 let bytes = uni_common::cypher_value_codec::encode(val);
6795 Ok(ScalarValue::LargeBinary(Some(bytes)))
6796 }
6797 },
6798 Value::Bool(b) => match &self.return_type {
6799 DataType::Boolean => Ok(ScalarValue::Boolean(Some(*b))),
6800 _ => {
6801 let bytes = uni_common::cypher_value_codec::encode(val);
6802 Ok(ScalarValue::LargeBinary(Some(bytes)))
6803 }
6804 },
6805 _ => {
6806 let bytes = uni_common::cypher_value_codec::encode(val);
6808 Ok(ScalarValue::LargeBinary(Some(bytes)))
6809 }
6810 }
6811 }
6812 }
6813 }
6814 fn size(&self) -> usize {
6815 std::mem::size_of_val(self) + self.current.as_ref().map_or(0, |_| 64)
6816 }
6817 fn state(&mut self) -> DFResult<Vec<ScalarValue>> {
6818 Ok(vec![self.evaluate()?])
6819 }
6820 fn merge_batch(&mut self, states: &[ArrayRef]) -> DFResult<()> {
6821 let arr = &states[0];
6825 if let Some(lb) = arr.as_any().downcast_ref::<LargeBinaryArray>() {
6826 for i in 0..lb.len() {
6827 if lb.is_null(i) {
6828 continue;
6829 }
6830 self.accumulate(scalar_binary_to_value(lb.value(i)));
6831 }
6832 }
6833 Ok(())
6834 }
6835}
6836
6837pub fn create_cypher_min_udaf() -> AggregateUDF {
6838 AggregateUDF::from(CypherMinMaxUdaf::new(false))
6839}
6840
6841pub fn create_cypher_max_udaf() -> AggregateUDF {
6842 AggregateUDF::from(CypherMinMaxUdaf::new(true))
6843}
6844
6845#[derive(Debug, Clone)]
6851struct CypherSumUdaf {
6852 signature: Signature,
6853}
6854
6855impl CypherSumUdaf {
6856 fn new() -> Self {
6857 Self {
6858 signature: Signature::new(TypeSignature::Any(1), Volatility::Immutable),
6859 }
6860 }
6861}
6862
6863impl PartialEq for CypherSumUdaf {
6864 fn eq(&self, other: &Self) -> bool {
6865 self.signature == other.signature
6866 }
6867}
6868
6869impl Eq for CypherSumUdaf {}
6870
6871impl Hash for CypherSumUdaf {
6872 fn hash<H: Hasher>(&self, state: &mut H) {
6873 self.name().hash(state);
6874 }
6875}
6876
6877impl AggregateUDFImpl for CypherSumUdaf {
6878 fn as_any(&self) -> &dyn Any {
6879 self
6880 }
6881 fn name(&self) -> &str {
6882 "_cypher_sum"
6883 }
6884 fn signature(&self) -> &Signature {
6885 &self.signature
6886 }
6887 fn return_type(&self, _args: &[DataType]) -> DFResult<DataType> {
6888 Ok(DataType::LargeBinary)
6891 }
6892 fn accumulator(
6893 &self,
6894 _acc_args: datafusion::logical_expr::function::AccumulatorArgs,
6895 ) -> DFResult<Box<dyn DfAccumulator>> {
6896 Ok(Box::new(CypherSumAccumulator {
6897 sum: 0.0,
6898 all_ints: true,
6899 int_sum: 0i64,
6900 has_value: false,
6901 }))
6902 }
6903 fn state_fields(
6904 &self,
6905 args: datafusion::logical_expr::function::StateFieldsArgs,
6906 ) -> DFResult<Vec<Arc<arrow::datatypes::Field>>> {
6907 Ok(vec![
6908 Arc::new(arrow::datatypes::Field::new(
6909 format!("{}_sum", args.name),
6910 DataType::Float64,
6911 true,
6912 )),
6913 Arc::new(arrow::datatypes::Field::new(
6914 format!("{}_int_sum", args.name),
6915 DataType::Int64,
6916 true,
6917 )),
6918 Arc::new(arrow::datatypes::Field::new(
6919 format!("{}_all_ints", args.name),
6920 DataType::Boolean,
6921 true,
6922 )),
6923 Arc::new(arrow::datatypes::Field::new(
6924 format!("{}_has_value", args.name),
6925 DataType::Boolean,
6926 true,
6927 )),
6928 ])
6929 }
6930}
6931
6932#[derive(Debug)]
6933struct CypherSumAccumulator {
6934 sum: f64,
6935 all_ints: bool,
6936 int_sum: i64,
6937 has_value: bool,
6938}
6939
6940impl DfAccumulator for CypherSumAccumulator {
6941 fn update_batch(&mut self, values: &[ArrayRef]) -> DFResult<()> {
6942 let arr = &values[0];
6943 for i in 0..arr.len() {
6944 if arr.is_null(i) {
6945 continue;
6946 }
6947 match arr.data_type() {
6948 DataType::LargeBinary => {
6949 let lb = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
6950 let bytes = lb.value(i);
6951 use uni_common::cypher_value_codec::{
6952 TAG_FLOAT, TAG_INT, decode_float, decode_int, peek_tag,
6953 };
6954 match peek_tag(bytes) {
6955 Some(TAG_INT) => {
6956 if let Some(v) = decode_int(bytes) {
6957 self.sum += v as f64;
6958 self.int_sum = self.int_sum.wrapping_add(v);
6959 self.has_value = true;
6960 }
6961 }
6962 Some(TAG_FLOAT) => {
6963 if let Some(v) = decode_float(bytes) {
6964 self.sum += v;
6965 self.all_ints = false;
6966 self.has_value = true;
6967 }
6968 }
6969 _ => {} }
6971 }
6972 DataType::Int64 => {
6973 let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
6974 let v = a.value(i);
6975 self.sum += v as f64;
6976 self.int_sum = self.int_sum.wrapping_add(v);
6977 self.has_value = true;
6978 }
6979 DataType::Float64 => {
6980 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
6981 self.sum += a.value(i);
6982 self.all_ints = false;
6983 self.has_value = true;
6984 }
6985 _ => {}
6986 }
6987 }
6988 Ok(())
6989 }
6990 fn evaluate(&mut self) -> DFResult<ScalarValue> {
6991 if !self.has_value {
6992 return Ok(ScalarValue::LargeBinary(None));
6993 }
6994 let val = if self.all_ints {
6995 Value::Int(self.int_sum)
6996 } else {
6997 Value::Float(self.sum)
6998 };
6999 let bytes = uni_common::cypher_value_codec::encode(&val);
7000 Ok(ScalarValue::LargeBinary(Some(bytes)))
7001 }
7002 fn size(&self) -> usize {
7003 std::mem::size_of_val(self)
7004 }
7005 fn state(&mut self) -> DFResult<Vec<ScalarValue>> {
7006 Ok(vec![
7007 ScalarValue::Float64(Some(self.sum)),
7008 ScalarValue::Int64(Some(self.int_sum)),
7009 ScalarValue::Boolean(Some(self.all_ints)),
7010 ScalarValue::Boolean(Some(self.has_value)),
7011 ])
7012 }
7013 fn merge_batch(&mut self, states: &[ArrayRef]) -> DFResult<()> {
7014 let sum_arr = states[0].as_any().downcast_ref::<Float64Array>().unwrap();
7015 let int_sum_arr = states[1].as_any().downcast_ref::<Int64Array>().unwrap();
7016 let all_ints_arr = states[2].as_any().downcast_ref::<BooleanArray>().unwrap();
7017 let has_value_arr = states[3].as_any().downcast_ref::<BooleanArray>().unwrap();
7018 for i in 0..sum_arr.len() {
7019 if !has_value_arr.is_null(i) && has_value_arr.value(i) {
7020 self.sum += sum_arr.value(i);
7021 self.int_sum = self.int_sum.wrapping_add(int_sum_arr.value(i));
7022 if !all_ints_arr.value(i) {
7023 self.all_ints = false;
7024 }
7025 self.has_value = true;
7026 }
7027 }
7028 Ok(())
7029 }
7030}
7031
7032pub fn create_cypher_sum_udaf() -> AggregateUDF {
7033 AggregateUDF::from(CypherSumUdaf::new())
7034}
7035
7036#[derive(Debug, Clone)]
7043struct CypherCollectUdaf {
7044 signature: Signature,
7045}
7046
7047impl CypherCollectUdaf {
7048 fn new() -> Self {
7049 Self {
7050 signature: Signature::new(TypeSignature::Any(1), Volatility::Immutable),
7051 }
7052 }
7053}
7054
7055impl PartialEq for CypherCollectUdaf {
7056 fn eq(&self, other: &Self) -> bool {
7057 self.signature == other.signature
7058 }
7059}
7060
7061impl Eq for CypherCollectUdaf {}
7062
7063impl Hash for CypherCollectUdaf {
7064 fn hash<H: Hasher>(&self, state: &mut H) {
7065 self.name().hash(state);
7066 }
7067}
7068
7069impl AggregateUDFImpl for CypherCollectUdaf {
7070 fn as_any(&self) -> &dyn Any {
7071 self
7072 }
7073 fn name(&self) -> &str {
7074 "_cypher_collect"
7075 }
7076 fn signature(&self) -> &Signature {
7077 &self.signature
7078 }
7079 fn return_type(&self, _args: &[DataType]) -> DFResult<DataType> {
7080 Ok(DataType::LargeBinary)
7081 }
7082 fn accumulator(
7083 &self,
7084 acc_args: datafusion::logical_expr::function::AccumulatorArgs,
7085 ) -> DFResult<Box<dyn DfAccumulator>> {
7086 let raw_bytes = acc_args
7093 .expr_fields
7094 .first()
7095 .and_then(|field| field.metadata().get("uni_raw_bytes"))
7096 .is_some_and(|v| v == "true");
7097 Ok(Box::new(CypherCollectAccumulator {
7098 values: Vec::new(),
7099 distinct: acc_args.is_distinct,
7100 raw_bytes,
7101 }))
7102 }
7103 fn state_fields(
7104 &self,
7105 args: datafusion::logical_expr::function::StateFieldsArgs,
7106 ) -> DFResult<Vec<Arc<arrow::datatypes::Field>>> {
7107 Ok(vec![Arc::new(arrow::datatypes::Field::new(
7108 args.name,
7109 DataType::LargeBinary,
7110 true,
7111 ))])
7112 }
7113}
7114
7115#[derive(Debug)]
7116struct CypherCollectAccumulator {
7117 values: Vec<Value>,
7118 distinct: bool,
7119 raw_bytes: bool,
7122}
7123
7124impl CypherCollectAccumulator {
7125 fn push_value(&mut self, val: Value) {
7127 if self.distinct {
7128 let repr = val.to_string();
7130 if self.values.iter().any(|v| v.to_string() == repr) {
7131 return;
7132 }
7133 }
7134 self.values.push(val);
7135 }
7136}
7137
7138impl DfAccumulator for CypherCollectAccumulator {
7139 fn update_batch(&mut self, values: &[ArrayRef]) -> DFResult<()> {
7140 let arr = &values[0];
7141 if self.raw_bytes
7144 && let Some(lb) = arr.as_any().downcast_ref::<LargeBinaryArray>()
7145 {
7146 for i in 0..lb.len() {
7147 if lb.is_null(i) {
7148 continue;
7149 }
7150 self.push_value(Value::Bytes(lb.value(i).to_vec()));
7151 }
7152 return Ok(());
7153 }
7154 for i in 0..arr.len() {
7155 if arr.is_null(i) {
7156 continue;
7157 }
7158 if let Some(struct_arr) = arr.as_any().downcast_ref::<arrow::array::StructArray>()
7162 && struct_arr.num_columns() > 0
7163 && struct_arr.column(0).is_null(i)
7164 {
7165 continue;
7166 }
7167 let sv = ScalarValue::try_from_array(arr, i)
7168 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
7169 let val = scalar_to_value(&sv)?;
7170 if val.is_null() {
7171 continue;
7172 }
7173 self.push_value(val);
7174 }
7175 Ok(())
7176 }
7177 fn evaluate(&mut self) -> DFResult<ScalarValue> {
7178 let val = Value::List(self.values.clone());
7180 let bytes = uni_common::cypher_value_codec::encode(&val);
7181 Ok(ScalarValue::LargeBinary(Some(bytes)))
7182 }
7183 fn size(&self) -> usize {
7184 std::mem::size_of_val(self) + self.values.len() * 64
7185 }
7186 fn state(&mut self) -> DFResult<Vec<ScalarValue>> {
7187 Ok(vec![self.evaluate()?])
7188 }
7189 fn merge_batch(&mut self, states: &[ArrayRef]) -> DFResult<()> {
7190 let arr = &states[0];
7192 if let Some(lb) = arr.as_any().downcast_ref::<LargeBinaryArray>() {
7193 for i in 0..lb.len() {
7194 if lb.is_null(i) {
7195 continue;
7196 }
7197 let val = scalar_binary_to_value(lb.value(i));
7198 if let Value::List(items) = val {
7199 for item in items {
7200 if !item.is_null() {
7201 self.push_value(item);
7202 }
7203 }
7204 }
7205 }
7206 }
7207 Ok(())
7208 }
7209}
7210
7211pub fn create_cypher_collect_udaf() -> AggregateUDF {
7212 AggregateUDF::from(CypherCollectUdaf::new())
7213}
7214
7215pub fn create_cypher_collect_expr(
7217 arg: datafusion::logical_expr::Expr,
7218 distinct: bool,
7219) -> datafusion::logical_expr::Expr {
7220 let udaf = Arc::new(create_cypher_collect_udaf());
7223 if distinct {
7224 datafusion::logical_expr::Expr::AggregateFunction(
7226 datafusion::logical_expr::expr::AggregateFunction::new_udf(
7227 udaf,
7228 vec![arg],
7229 true, None,
7231 vec![],
7232 None,
7233 ),
7234 )
7235 } else {
7236 udaf.call(vec![arg])
7237 }
7238}
7239
7240#[derive(Debug, Clone)]
7246struct CypherPercentileDiscUdaf {
7247 signature: Signature,
7248}
7249
7250impl CypherPercentileDiscUdaf {
7251 fn new() -> Self {
7252 Self {
7253 signature: Signature::new(TypeSignature::Any(2), Volatility::Immutable),
7254 }
7255 }
7256}
7257
7258impl PartialEq for CypherPercentileDiscUdaf {
7259 fn eq(&self, other: &Self) -> bool {
7260 self.signature == other.signature
7261 }
7262}
7263
7264impl Eq for CypherPercentileDiscUdaf {}
7265
7266impl Hash for CypherPercentileDiscUdaf {
7267 fn hash<H: Hasher>(&self, state: &mut H) {
7268 self.name().hash(state);
7269 }
7270}
7271
7272impl AggregateUDFImpl for CypherPercentileDiscUdaf {
7273 fn as_any(&self) -> &dyn Any {
7274 self
7275 }
7276 fn name(&self) -> &str {
7277 "percentiledisc"
7278 }
7279 fn signature(&self) -> &Signature {
7280 &self.signature
7281 }
7282 fn return_type(&self, _args: &[DataType]) -> DFResult<DataType> {
7283 Ok(DataType::Float64)
7284 }
7285 fn accumulator(
7286 &self,
7287 _acc_args: datafusion::logical_expr::function::AccumulatorArgs,
7288 ) -> DFResult<Box<dyn DfAccumulator>> {
7289 Ok(Box::new(CypherPercentileDiscAccumulator {
7290 values: Vec::new(),
7291 percentile: None,
7292 }))
7293 }
7294 fn state_fields(
7295 &self,
7296 args: datafusion::logical_expr::function::StateFieldsArgs,
7297 ) -> DFResult<Vec<Arc<arrow::datatypes::Field>>> {
7298 Ok(vec![
7299 Arc::new(arrow::datatypes::Field::new(
7300 format!("{}_values", args.name),
7301 DataType::List(Arc::new(arrow::datatypes::Field::new(
7302 "item",
7303 DataType::Float64,
7304 true,
7305 ))),
7306 true,
7307 )),
7308 Arc::new(arrow::datatypes::Field::new(
7309 format!("{}_percentile", args.name),
7310 DataType::Float64,
7311 true,
7312 )),
7313 ])
7314 }
7315}
7316
7317#[derive(Debug)]
7318struct CypherPercentileDiscAccumulator {
7319 values: Vec<f64>,
7320 percentile: Option<f64>,
7321}
7322
7323impl CypherPercentileDiscAccumulator {
7324 fn extract_f64(arr: &ArrayRef, i: usize) -> Option<f64> {
7325 if arr.is_null(i) {
7326 return None;
7327 }
7328 match arr.data_type() {
7329 DataType::LargeBinary => {
7330 let lb = arr.as_any().downcast_ref::<LargeBinaryArray>()?;
7331 cv_bytes_as_f64(lb.value(i))
7332 }
7333 DataType::Int64 => {
7334 let a = arr.as_any().downcast_ref::<Int64Array>()?;
7335 Some(a.value(i) as f64)
7336 }
7337 DataType::Float64 => {
7338 let a = arr.as_any().downcast_ref::<Float64Array>()?;
7339 Some(a.value(i))
7340 }
7341 DataType::Int32 => {
7342 let a = arr.as_any().downcast_ref::<Int32Array>()?;
7343 Some(a.value(i) as f64)
7344 }
7345 DataType::Float32 => {
7346 let a = arr.as_any().downcast_ref::<Float32Array>()?;
7347 Some(a.value(i) as f64)
7348 }
7349 _ => None,
7350 }
7351 }
7352
7353 fn extract_percentile(arr: &ArrayRef, i: usize) -> Option<f64> {
7354 if arr.is_null(i) {
7355 return None;
7356 }
7357 match arr.data_type() {
7358 DataType::Float64 => {
7359 let a = arr.as_any().downcast_ref::<Float64Array>()?;
7360 Some(a.value(i))
7361 }
7362 DataType::Int64 => {
7363 let a = arr.as_any().downcast_ref::<Int64Array>()?;
7364 Some(a.value(i) as f64)
7365 }
7366 DataType::LargeBinary => {
7367 let lb = arr.as_any().downcast_ref::<LargeBinaryArray>()?;
7368 cv_bytes_as_f64(lb.value(i))
7369 }
7370 _ => None,
7371 }
7372 }
7373}
7374
7375impl DfAccumulator for CypherPercentileDiscAccumulator {
7376 fn update_batch(&mut self, values: &[ArrayRef]) -> DFResult<()> {
7377 let expr_arr = &values[0];
7378 let pct_arr = &values[1];
7379 for i in 0..expr_arr.len() {
7380 if self.percentile.is_none()
7382 && let Some(p) = Self::extract_percentile(pct_arr, i)
7383 {
7384 if !(0.0..=1.0).contains(&p) {
7385 return Err(datafusion::error::DataFusionError::Execution(
7386 "ArgumentError: NumberOutOfRange - percentileDisc(): percentile value must be between 0.0 and 1.0".to_string(),
7387 ));
7388 }
7389 self.percentile = Some(p);
7390 }
7391 if let Some(f) = Self::extract_f64(expr_arr, i) {
7392 self.values.push(f);
7393 }
7394 }
7395 Ok(())
7396 }
7397 fn evaluate(&mut self) -> DFResult<ScalarValue> {
7398 let pct = match self.percentile {
7399 Some(p) if !(0.0..=1.0).contains(&p) => {
7400 return Err(datafusion::error::DataFusionError::Execution(
7401 "ArgumentError: NumberOutOfRange - percentileDisc(): percentile value must be between 0.0 and 1.0".to_string(),
7402 ));
7403 }
7404 Some(p) => p,
7405 None => 0.0,
7406 };
7407 if self.values.is_empty() {
7408 return Ok(ScalarValue::Float64(None));
7409 }
7410 self.values
7411 .sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
7412 let n = self.values.len();
7413 let idx = (pct * (n as f64 - 1.0)).round() as usize;
7414 let idx = idx.min(n - 1);
7415 let result = self.values[idx];
7416 Ok(ScalarValue::Float64(Some(result)))
7417 }
7418 fn size(&self) -> usize {
7419 std::mem::size_of_val(self) + self.values.capacity() * 8
7420 }
7421 fn state(&mut self) -> DFResult<Vec<ScalarValue>> {
7422 let list_values: Vec<ScalarValue> = self
7424 .values
7425 .iter()
7426 .map(|f| ScalarValue::Float64(Some(*f)))
7427 .collect();
7428 let list_scalar = ScalarValue::List(ScalarValue::new_list(
7429 &list_values,
7430 &DataType::Float64,
7431 true,
7432 ));
7433 Ok(vec![list_scalar, ScalarValue::Float64(self.percentile)])
7434 }
7435 fn merge_batch(&mut self, states: &[ArrayRef]) -> DFResult<()> {
7436 let list_arr = &states[0];
7438 let pct_arr = &states[1];
7439 if self.percentile.is_none()
7441 && let Some(f64_arr) = pct_arr.as_any().downcast_ref::<Float64Array>()
7442 {
7443 for i in 0..f64_arr.len() {
7444 if !f64_arr.is_null(i) {
7445 self.percentile = Some(f64_arr.value(i));
7446 break;
7447 }
7448 }
7449 }
7450 if let Some(list_array) = list_arr.as_any().downcast_ref::<arrow_array::ListArray>() {
7452 for i in 0..list_array.len() {
7453 if list_array.is_null(i) {
7454 continue;
7455 }
7456 let inner = list_array.value(i);
7457 if let Some(f64_arr) = inner.as_any().downcast_ref::<Float64Array>() {
7458 for j in 0..f64_arr.len() {
7459 if !f64_arr.is_null(j) {
7460 self.values.push(f64_arr.value(j));
7461 }
7462 }
7463 }
7464 }
7465 }
7466 Ok(())
7467 }
7468}
7469
7470#[derive(Debug, Clone)]
7472struct CypherPercentileContUdaf {
7473 signature: Signature,
7474}
7475
7476impl CypherPercentileContUdaf {
7477 fn new() -> Self {
7478 Self {
7479 signature: Signature::new(TypeSignature::Any(2), Volatility::Immutable),
7480 }
7481 }
7482}
7483
7484impl PartialEq for CypherPercentileContUdaf {
7485 fn eq(&self, other: &Self) -> bool {
7486 self.signature == other.signature
7487 }
7488}
7489
7490impl Eq for CypherPercentileContUdaf {}
7491
7492impl Hash for CypherPercentileContUdaf {
7493 fn hash<H: Hasher>(&self, state: &mut H) {
7494 self.name().hash(state);
7495 }
7496}
7497
7498impl AggregateUDFImpl for CypherPercentileContUdaf {
7499 fn as_any(&self) -> &dyn Any {
7500 self
7501 }
7502 fn name(&self) -> &str {
7503 "percentilecont"
7504 }
7505 fn signature(&self) -> &Signature {
7506 &self.signature
7507 }
7508 fn return_type(&self, _args: &[DataType]) -> DFResult<DataType> {
7509 Ok(DataType::Float64)
7510 }
7511 fn accumulator(
7512 &self,
7513 _acc_args: datafusion::logical_expr::function::AccumulatorArgs,
7514 ) -> DFResult<Box<dyn DfAccumulator>> {
7515 Ok(Box::new(CypherPercentileContAccumulator {
7516 values: Vec::new(),
7517 percentile: None,
7518 }))
7519 }
7520 fn state_fields(
7521 &self,
7522 args: datafusion::logical_expr::function::StateFieldsArgs,
7523 ) -> DFResult<Vec<Arc<arrow::datatypes::Field>>> {
7524 Ok(vec![
7525 Arc::new(arrow::datatypes::Field::new(
7526 format!("{}_values", args.name),
7527 DataType::List(Arc::new(arrow::datatypes::Field::new(
7528 "item",
7529 DataType::Float64,
7530 true,
7531 ))),
7532 true,
7533 )),
7534 Arc::new(arrow::datatypes::Field::new(
7535 format!("{}_percentile", args.name),
7536 DataType::Float64,
7537 true,
7538 )),
7539 ])
7540 }
7541}
7542
7543#[derive(Debug)]
7544struct CypherPercentileContAccumulator {
7545 values: Vec<f64>,
7546 percentile: Option<f64>,
7547}
7548
7549impl DfAccumulator for CypherPercentileContAccumulator {
7550 fn update_batch(&mut self, values: &[ArrayRef]) -> DFResult<()> {
7551 let expr_arr = &values[0];
7552 let pct_arr = &values[1];
7553 for i in 0..expr_arr.len() {
7554 if self.percentile.is_none()
7555 && let Some(p) = CypherPercentileDiscAccumulator::extract_percentile(pct_arr, i)
7556 {
7557 if !(0.0..=1.0).contains(&p) {
7558 return Err(datafusion::error::DataFusionError::Execution(
7559 "ArgumentError: NumberOutOfRange - percentileCont(): percentile value must be between 0.0 and 1.0".to_string(),
7560 ));
7561 }
7562 self.percentile = Some(p);
7563 }
7564 if let Some(f) = CypherPercentileDiscAccumulator::extract_f64(expr_arr, i) {
7565 self.values.push(f);
7566 }
7567 }
7568 Ok(())
7569 }
7570 fn evaluate(&mut self) -> DFResult<ScalarValue> {
7571 let pct = match self.percentile {
7572 Some(p) if !(0.0..=1.0).contains(&p) => {
7573 return Err(datafusion::error::DataFusionError::Execution(
7574 "ArgumentError: NumberOutOfRange - percentileCont(): percentile value must be between 0.0 and 1.0".to_string(),
7575 ));
7576 }
7577 Some(p) => p,
7578 None => 0.0,
7579 };
7580 if self.values.is_empty() {
7581 return Ok(ScalarValue::Float64(None));
7582 }
7583 self.values
7584 .sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
7585 let n = self.values.len();
7586 if n == 1 {
7587 return Ok(ScalarValue::Float64(Some(self.values[0])));
7588 }
7589 let pos = pct * (n as f64 - 1.0);
7590 let lower = pos.floor() as usize;
7591 let upper = pos.ceil() as usize;
7592 let lower = lower.min(n - 1);
7593 let upper = upper.min(n - 1);
7594 if lower == upper {
7595 Ok(ScalarValue::Float64(Some(self.values[lower])))
7596 } else {
7597 let frac = pos - lower as f64;
7598 let result = self.values[lower] + frac * (self.values[upper] - self.values[lower]);
7599 Ok(ScalarValue::Float64(Some(result)))
7600 }
7601 }
7602 fn size(&self) -> usize {
7603 std::mem::size_of_val(self) + self.values.capacity() * 8
7604 }
7605 fn state(&mut self) -> DFResult<Vec<ScalarValue>> {
7606 let list_values: Vec<ScalarValue> = self
7607 .values
7608 .iter()
7609 .map(|f| ScalarValue::Float64(Some(*f)))
7610 .collect();
7611 let list_scalar = ScalarValue::List(ScalarValue::new_list(
7612 &list_values,
7613 &DataType::Float64,
7614 true,
7615 ));
7616 Ok(vec![list_scalar, ScalarValue::Float64(self.percentile)])
7617 }
7618 fn merge_batch(&mut self, states: &[ArrayRef]) -> DFResult<()> {
7619 let list_arr = &states[0];
7620 let pct_arr = &states[1];
7621 if self.percentile.is_none()
7622 && let Some(f64_arr) = pct_arr.as_any().downcast_ref::<Float64Array>()
7623 {
7624 for i in 0..f64_arr.len() {
7625 if !f64_arr.is_null(i) {
7626 self.percentile = Some(f64_arr.value(i));
7627 break;
7628 }
7629 }
7630 }
7631 if let Some(list_array) = list_arr.as_any().downcast_ref::<arrow_array::ListArray>() {
7632 for i in 0..list_array.len() {
7633 if list_array.is_null(i) {
7634 continue;
7635 }
7636 let inner = list_array.value(i);
7637 if let Some(f64_arr) = inner.as_any().downcast_ref::<Float64Array>() {
7638 for j in 0..f64_arr.len() {
7639 if !f64_arr.is_null(j) {
7640 self.values.push(f64_arr.value(j));
7641 }
7642 }
7643 }
7644 }
7645 }
7646 Ok(())
7647 }
7648}
7649
7650pub fn create_cypher_percentile_disc_udaf() -> AggregateUDF {
7651 AggregateUDF::from(CypherPercentileDiscUdaf::new())
7652}
7653
7654pub fn create_cypher_percentile_cont_udaf() -> AggregateUDF {
7655 AggregateUDF::from(CypherPercentileContUdaf::new())
7656}
7657
7658fn invoke_similarity_udf(
7668 func_name: &str,
7669 min_args: usize,
7670 args: ScalarFunctionArgs,
7671) -> DFResult<ColumnarValue> {
7672 let output_type = DataType::Float64;
7673 invoke_cypher_udf(args, &output_type, |val_args| {
7674 if val_args.len() < min_args {
7675 return Err(datafusion::error::DataFusionError::Execution(format!(
7676 "{} requires at least {} arguments",
7677 func_name, min_args
7678 )));
7679 }
7680 crate::similar_to::eval_similar_to_pure(&val_args[0], &val_args[1])
7681 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
7682 })
7683}
7684
7685pub fn create_similar_to_udf() -> ScalarUDF {
7687 ScalarUDF::new_from_impl(SimilarToUdf::new())
7688}
7689
7690#[derive(Debug)]
7691struct SimilarToUdf {
7692 signature: Signature,
7693}
7694
7695impl SimilarToUdf {
7696 fn new() -> Self {
7697 Self {
7698 signature: Signature::new(TypeSignature::VariadicAny, Volatility::Immutable),
7699 }
7700 }
7701}
7702
7703impl_udf_eq_hash!(SimilarToUdf);
7704
7705impl ScalarUDFImpl for SimilarToUdf {
7706 fn as_any(&self) -> &dyn Any {
7707 self
7708 }
7709
7710 fn name(&self) -> &str {
7711 "similar_to"
7712 }
7713
7714 fn signature(&self) -> &Signature {
7715 &self.signature
7716 }
7717
7718 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
7719 Ok(DataType::Float64)
7720 }
7721
7722 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
7723 invoke_similarity_udf("similar_to", 2, args)
7724 }
7725}
7726
7727pub fn create_vector_similarity_udf() -> ScalarUDF {
7729 ScalarUDF::new_from_impl(VectorSimilarityUdf::new())
7730}
7731
7732#[derive(Debug)]
7733struct VectorSimilarityUdf {
7734 signature: Signature,
7735}
7736
7737impl VectorSimilarityUdf {
7738 fn new() -> Self {
7739 Self {
7740 signature: Signature::new(TypeSignature::Any(2), Volatility::Immutable),
7741 }
7742 }
7743}
7744
7745impl_udf_eq_hash!(VectorSimilarityUdf);
7746
7747impl ScalarUDFImpl for VectorSimilarityUdf {
7748 fn as_any(&self) -> &dyn Any {
7749 self
7750 }
7751
7752 fn name(&self) -> &str {
7753 "vector_similarity"
7754 }
7755
7756 fn signature(&self) -> &Signature {
7757 &self.signature
7758 }
7759
7760 fn return_type(&self, _arg_types: &[DataType]) -> DFResult<DataType> {
7761 Ok(DataType::Float64)
7762 }
7763
7764 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue> {
7765 invoke_similarity_udf("vector_similarity", 2, args)
7766 }
7767}
7768
7769#[cfg(test)]
7770mod tests {
7771 use super::*;
7772 use datafusion::execution::FunctionRegistry;
7773
7774 #[test]
7775 fn test_register_udfs() {
7776 let ctx = SessionContext::new();
7777 register_cypher_udfs(&ctx).unwrap();
7778
7779 assert!(ctx.udf("id").is_ok());
7782 assert!(ctx.udf("type").is_ok());
7783 assert!(ctx.udf("keys").is_ok());
7784 assert!(ctx.udf("range").is_ok());
7785 assert!(
7786 ctx.udf("_make_cypher_list").is_ok(),
7787 "_make_cypher_list UDF should be registered"
7788 );
7789 assert!(
7790 ctx.udf("_cv_to_bool").is_ok(),
7791 "_cv_to_bool UDF should be registered"
7792 );
7793 }
7794
7795 #[test]
7796 fn test_id_udf_signature() {
7797 let udf = create_id_udf();
7798 assert_eq!(udf.name(), "id");
7799 }
7800
7801 #[test]
7802 fn test_has_null_udf() {
7803 use datafusion::arrow::datatypes::{DataType, Field};
7804 use datafusion::config::ConfigOptions;
7805 use datafusion::scalar::ScalarValue;
7806 use std::sync::Arc;
7807
7808 let udf = create_has_null_udf();
7809
7810 let values = vec![
7812 ScalarValue::Int64(Some(1)),
7813 ScalarValue::Int64(Some(2)),
7814 ScalarValue::Int64(None),
7815 ];
7816
7817 let list_scalar = ScalarValue::List(ScalarValue::new_list(&values, &DataType::Int64, true));
7819
7820 let list_field = Arc::new(Field::new(
7821 "item",
7822 DataType::List(Arc::new(Field::new("item", DataType::Int64, true))),
7823 true,
7824 ));
7825
7826 let args = ScalarFunctionArgs {
7827 args: vec![ColumnarValue::Scalar(list_scalar)],
7828 arg_fields: vec![list_field],
7829 number_rows: 1,
7830 return_field: Arc::new(Field::new("result", DataType::Boolean, true)),
7831 config_options: Arc::new(ConfigOptions::default()),
7832 };
7833
7834 let result = udf.invoke_with_args(args).unwrap();
7835
7836 if let ColumnarValue::Scalar(ScalarValue::Boolean(Some(b))) = result {
7837 assert!(b, "has_null should return true for list with null");
7838 } else {
7839 panic!("Unexpected result: {:?}", result);
7840 }
7841 }
7842
7843 fn json_to_cv_bytes(val: &serde_json::Value) -> Vec<u8> {
7849 let uni_val: uni_common::Value = val.clone().into();
7850 uni_common::cypher_value_codec::encode(&uni_val)
7851 }
7852
7853 fn make_multi_scalar_args(scalars: Vec<ScalarValue>) -> ScalarFunctionArgs {
7862 make_multi_scalar_args_with_return(scalars, DataType::LargeBinary)
7863 }
7864
7865 fn make_multi_scalar_args_with_return(
7866 scalars: Vec<ScalarValue>,
7867 return_type: DataType,
7868 ) -> ScalarFunctionArgs {
7869 use datafusion::arrow::datatypes::Field;
7870 use datafusion::config::ConfigOptions;
7871
7872 let arg_fields: Vec<_> = scalars
7873 .iter()
7874 .enumerate()
7875 .map(|(i, s)| Arc::new(Field::new(format!("arg{i}"), s.data_type(), true)))
7876 .collect();
7877 let args: Vec<_> = scalars.into_iter().map(ColumnarValue::Scalar).collect();
7878 ScalarFunctionArgs {
7879 args,
7880 arg_fields,
7881 number_rows: 1,
7882 return_field: Arc::new(Field::new("result", return_type, true)),
7883 config_options: Arc::new(ConfigOptions::default()),
7884 }
7885 }
7886
7887 fn decode_cv_scalar(cv: &ColumnarValue) -> serde_json::Value {
7889 match cv {
7890 ColumnarValue::Scalar(ScalarValue::LargeBinary(Some(bytes))) => {
7891 let val = uni_common::cypher_value_codec::decode(bytes)
7892 .expect("failed to decode CypherValue output");
7893 val.into()
7894 }
7895 other => panic!("expected LargeBinary scalar, got {other:?}"),
7896 }
7897 }
7898
7899 #[test]
7900 fn test_make_cypher_list_scalars() {
7901 let udf = create_make_cypher_list_udf();
7902 let args = make_multi_scalar_args(vec![
7903 ScalarValue::Int64(Some(1)),
7904 ScalarValue::Float64(Some(3.21)),
7905 ScalarValue::Utf8(Some("hello".to_string())),
7906 ScalarValue::Boolean(Some(true)),
7907 ScalarValue::Null,
7908 ]);
7909 let result = udf.invoke_with_args(args).unwrap();
7910 let json = decode_cv_scalar(&result);
7911 let arr = json.as_array().expect("should be array");
7912 assert_eq!(arr.len(), 5);
7913 assert_eq!(arr[0], serde_json::json!(1));
7914 assert_eq!(arr[1], serde_json::json!(3.21));
7915 assert_eq!(arr[2], serde_json::json!("hello"));
7916 assert_eq!(arr[3], serde_json::json!(true));
7917 assert!(arr[4].is_null());
7918 }
7919
7920 #[test]
7921 fn test_make_cypher_list_empty() {
7922 let udf = create_make_cypher_list_udf();
7923 let args = make_multi_scalar_args(vec![]);
7924 let result = udf.invoke_with_args(args).unwrap();
7925 let json = decode_cv_scalar(&result);
7926 let arr = json.as_array().expect("should be array");
7927 assert!(arr.is_empty());
7928 }
7929
7930 #[test]
7931 fn test_make_cypher_list_single() {
7932 let udf = create_make_cypher_list_udf();
7933 let args = make_multi_scalar_args(vec![ScalarValue::Int64(Some(42))]);
7934 let result = udf.invoke_with_args(args).unwrap();
7935 let json = decode_cv_scalar(&result);
7936 let arr = json.as_array().expect("should be array");
7937 assert_eq!(arr.len(), 1);
7938 assert_eq!(arr[0], serde_json::json!(42));
7939 }
7940
7941 #[test]
7942 fn test_make_cypher_list_nested_cypher_value() {
7943 let udf = create_make_cypher_list_udf();
7944 let nested_bytes = json_to_cv_bytes(&serde_json::json!([1, 2]));
7946 let args = make_multi_scalar_args(vec![
7947 ScalarValue::LargeBinary(Some(nested_bytes)),
7948 ScalarValue::Int64(Some(3)),
7949 ]);
7950 let result = udf.invoke_with_args(args).unwrap();
7951 let json = decode_cv_scalar(&result);
7952 let arr = json.as_array().expect("should be array");
7953 assert_eq!(arr.len(), 2);
7954 assert_eq!(arr[0], serde_json::json!([1, 2]));
7955 assert_eq!(arr[1], serde_json::json!(3));
7956 }
7957
7958 fn make_cypher_in_args(
7964 element: &serde_json::Value,
7965 list: &serde_json::Value,
7966 ) -> ScalarFunctionArgs {
7967 make_multi_scalar_args_with_return(
7968 vec![
7969 ScalarValue::LargeBinary(Some(json_to_cv_bytes(element))),
7970 ScalarValue::LargeBinary(Some(json_to_cv_bytes(list))),
7971 ],
7972 DataType::Boolean,
7973 )
7974 }
7975
7976 #[test]
7977 fn test_cypher_in_found() {
7978 let udf = create_cypher_in_udf();
7979 let args = make_cypher_in_args(&serde_json::json!(3), &serde_json::json!([1, 2, 3]));
7980 let result = udf.invoke_with_args(args).unwrap();
7981 match result {
7982 ColumnarValue::Scalar(ScalarValue::Boolean(Some(b))) => assert!(b),
7983 other => panic!("expected Boolean(true), got {other:?}"),
7984 }
7985 }
7986
7987 #[test]
7988 fn test_cypher_in_not_found() {
7989 let udf = create_cypher_in_udf();
7990 let args = make_cypher_in_args(&serde_json::json!(4), &serde_json::json!([1, 2, 3]));
7991 let result = udf.invoke_with_args(args).unwrap();
7992 match result {
7993 ColumnarValue::Scalar(ScalarValue::Boolean(Some(b))) => assert!(!b),
7994 other => panic!("expected Boolean(false), got {other:?}"),
7995 }
7996 }
7997
7998 #[test]
7999 fn test_cypher_in_null_list() {
8000 let udf = create_cypher_in_udf();
8001 let args = make_multi_scalar_args_with_return(
8002 vec![
8003 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!(1)))),
8004 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!(null)))),
8005 ],
8006 DataType::Boolean,
8007 );
8008 let result = udf.invoke_with_args(args).unwrap();
8009 match result {
8010 ColumnarValue::Scalar(ScalarValue::Boolean(None)) => {} other => panic!("expected Boolean(None) for null list, got {other:?}"),
8012 }
8013 }
8014
8015 #[test]
8016 fn test_cypher_in_null_element_nonempty() {
8017 let udf = create_cypher_in_udf();
8018 let args = make_multi_scalar_args_with_return(
8019 vec![
8020 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!(null)))),
8021 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!([1, 2])))),
8022 ],
8023 DataType::Boolean,
8024 );
8025 let result = udf.invoke_with_args(args).unwrap();
8026 match result {
8027 ColumnarValue::Scalar(ScalarValue::Boolean(None)) => {} other => panic!("expected Boolean(None) for null IN non-empty list, got {other:?}"),
8029 }
8030 }
8031
8032 #[test]
8033 fn test_cypher_in_null_element_empty() {
8034 let udf = create_cypher_in_udf();
8035 let args = make_multi_scalar_args_with_return(
8036 vec![
8037 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!(null)))),
8038 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!([])))),
8039 ],
8040 DataType::Boolean,
8041 );
8042 let result = udf.invoke_with_args(args).unwrap();
8043 match result {
8044 ColumnarValue::Scalar(ScalarValue::Boolean(Some(b))) => assert!(!b),
8045 other => panic!("expected Boolean(false) for null IN [], got {other:?}"),
8046 }
8047 }
8048
8049 #[test]
8050 fn test_cypher_in_not_found_with_null() {
8051 let udf = create_cypher_in_udf();
8052 let args = make_cypher_in_args(&serde_json::json!(4), &serde_json::json!([1, null, 3]));
8053 let result = udf.invoke_with_args(args).unwrap();
8054 match result {
8055 ColumnarValue::Scalar(ScalarValue::Boolean(None)) => {} other => panic!("expected Boolean(None) for 4 IN [1,null,3], got {other:?}"),
8057 }
8058 }
8059
8060 #[test]
8061 fn test_cypher_in_cross_type_int_float() {
8062 let udf = create_cypher_in_udf();
8063 let args = make_cypher_in_args(&serde_json::json!(1), &serde_json::json!([1.0, 2.0]));
8064 let result = udf.invoke_with_args(args).unwrap();
8065 match result {
8066 ColumnarValue::Scalar(ScalarValue::Boolean(Some(b))) => assert!(b),
8067 other => panic!("expected Boolean(true) for 1 IN [1.0, 2.0], got {other:?}"),
8068 }
8069 }
8070
8071 #[test]
8076 fn test_list_concat_basic() {
8077 let udf = create_cypher_list_concat_udf();
8078 let args = make_multi_scalar_args(vec![
8079 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!([1, 2])))),
8080 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!([3, 4])))),
8081 ]);
8082 let result = udf.invoke_with_args(args).unwrap();
8083 let json = decode_cv_scalar(&result);
8084 assert_eq!(json, serde_json::json!([1, 2, 3, 4]));
8085 }
8086
8087 #[test]
8088 fn test_list_concat_empty() {
8089 let udf = create_cypher_list_concat_udf();
8090 let args = make_multi_scalar_args(vec![
8091 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!([])))),
8092 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!([1])))),
8093 ]);
8094 let result = udf.invoke_with_args(args).unwrap();
8095 let json = decode_cv_scalar(&result);
8096 assert_eq!(json, serde_json::json!([1]));
8097 }
8098
8099 #[test]
8100 fn test_list_concat_null_left() {
8101 let udf = create_cypher_list_concat_udf();
8102 let args = make_multi_scalar_args(vec![
8103 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!(null)))),
8104 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!([1])))),
8105 ]);
8106 let result = udf.invoke_with_args(args).unwrap();
8107 match result {
8108 ColumnarValue::Scalar(ScalarValue::LargeBinary(Some(bytes))) => {
8109 let uni_val = uni_common::cypher_value_codec::decode(&bytes).expect("decode");
8110 let json: serde_json::Value = uni_val.into();
8111 assert!(json.is_null(), "expected null, got {json}");
8112 }
8113 ColumnarValue::Scalar(ScalarValue::LargeBinary(None)) => {} other => panic!("expected null result, got {other:?}"),
8115 }
8116 }
8117
8118 #[test]
8119 fn test_list_concat_null_right() {
8120 let udf = create_cypher_list_concat_udf();
8121 let args = make_multi_scalar_args(vec![
8122 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!([1])))),
8123 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!(null)))),
8124 ]);
8125 let result = udf.invoke_with_args(args).unwrap();
8126 match result {
8127 ColumnarValue::Scalar(ScalarValue::LargeBinary(Some(bytes))) => {
8128 let uni_val = uni_common::cypher_value_codec::decode(&bytes).expect("decode");
8129 let json: serde_json::Value = uni_val.into();
8130 assert!(json.is_null(), "expected null, got {json}");
8131 }
8132 ColumnarValue::Scalar(ScalarValue::LargeBinary(None)) => {}
8133 other => panic!("expected null result, got {other:?}"),
8134 }
8135 }
8136
8137 #[test]
8142 fn test_list_append_scalar() {
8143 let udf = create_cypher_list_append_udf();
8144 let args = make_multi_scalar_args(vec![
8145 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!([1, 2])))),
8146 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!(3)))),
8147 ]);
8148 let result = udf.invoke_with_args(args).unwrap();
8149 let json = decode_cv_scalar(&result);
8150 assert_eq!(json, serde_json::json!([1, 2, 3]));
8151 }
8152
8153 #[test]
8154 fn test_list_prepend_scalar() {
8155 let udf = create_cypher_list_append_udf();
8156 let args = make_multi_scalar_args(vec![
8157 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!(3)))),
8158 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!([1, 2])))),
8159 ]);
8160 let result = udf.invoke_with_args(args).unwrap();
8161 let json = decode_cv_scalar(&result);
8162 assert_eq!(json, serde_json::json!([3, 1, 2]));
8163 }
8164
8165 #[test]
8166 fn test_list_append_null_list() {
8167 let udf = create_cypher_list_append_udf();
8168 let args = make_multi_scalar_args(vec![
8169 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!(null)))),
8170 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!(3)))),
8171 ]);
8172 let result = udf.invoke_with_args(args).unwrap();
8173 match result {
8174 ColumnarValue::Scalar(ScalarValue::LargeBinary(Some(bytes))) => {
8175 let uni_val = uni_common::cypher_value_codec::decode(&bytes).expect("decode");
8176 let json: serde_json::Value = uni_val.into();
8177 assert!(json.is_null(), "expected null, got {json}");
8178 }
8179 ColumnarValue::Scalar(ScalarValue::LargeBinary(None)) => {}
8180 other => panic!("expected null result, got {other:?}"),
8181 }
8182 }
8183
8184 #[test]
8185 fn test_list_append_null_scalar() {
8186 let udf = create_cypher_list_append_udf();
8187 let args = make_multi_scalar_args(vec![
8188 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!([1, 2])))),
8189 ScalarValue::LargeBinary(Some(json_to_cv_bytes(&serde_json::json!(null)))),
8190 ]);
8191 let result = udf.invoke_with_args(args).unwrap();
8192 match result {
8193 ColumnarValue::Scalar(ScalarValue::LargeBinary(Some(bytes))) => {
8194 let uni_val = uni_common::cypher_value_codec::decode(&bytes).expect("decode");
8195 let json: serde_json::Value = uni_val.into();
8196 assert!(json.is_null(), "expected null, got {json}");
8197 }
8198 ColumnarValue::Scalar(ScalarValue::LargeBinary(None)) => {}
8199 other => panic!("expected null result, got {other:?}"),
8200 }
8201 }
8202
8203 #[test]
8208 fn test_sort_key_cross_type_ordering() {
8209 use uni_common::core::id::{Eid, Vid};
8212 use uni_common::{Edge, Node, Path, TemporalValue, Value};
8213
8214 let map_val = Value::Map([("a".to_string(), Value::String("map".to_string()))].into());
8215 let node_val = Value::Node(Node {
8216 vid: Vid::new(1),
8217 labels: vec!["L".to_string()],
8218 properties: Default::default(),
8219 });
8220 let edge_val = Value::Edge(Edge {
8221 eid: Eid::new(1),
8222 edge_type: "T".to_string(),
8223 src: Vid::new(1),
8224 dst: Vid::new(2),
8225 properties: Default::default(),
8226 });
8227 let list_val = Value::List(vec![Value::Int(1)]);
8228 let path_val = Value::Path(Path {
8229 nodes: vec![Node {
8230 vid: Vid::new(1),
8231 labels: vec!["L".to_string()],
8232 properties: Default::default(),
8233 }],
8234 edges: vec![],
8235 });
8236 let string_val = Value::String("hello".to_string());
8237 let bool_val = Value::Bool(false);
8238 let temporal_val = Value::Temporal(TemporalValue::Date {
8239 days_since_epoch: 1000,
8240 });
8241 let number_val = Value::Int(42);
8242 let nan_val = Value::Float(f64::NAN);
8243 let null_val = Value::Null;
8244
8245 let values = vec![
8246 &map_val,
8247 &node_val,
8248 &edge_val,
8249 &list_val,
8250 &path_val,
8251 &string_val,
8252 &bool_val,
8253 &temporal_val,
8254 &number_val,
8255 &nan_val,
8256 &null_val,
8257 ];
8258
8259 let keys: Vec<Vec<u8>> = values.iter().map(|v| encode_cypher_sort_key(v)).collect();
8260
8261 for i in 0..keys.len() - 1 {
8263 assert!(
8264 keys[i] < keys[i + 1],
8265 "Expected sort_key({:?}) < sort_key({:?}), but {:?} >= {:?}",
8266 values[i],
8267 values[i + 1],
8268 keys[i],
8269 keys[i + 1]
8270 );
8271 }
8272 }
8273
8274 #[test]
8275 fn test_sort_key_numbers() {
8276 let neg_inf = encode_cypher_sort_key(&Value::Float(f64::NEG_INFINITY));
8277 let neg_100 = encode_cypher_sort_key(&Value::Float(-100.0));
8278 let neg_1 = encode_cypher_sort_key(&Value::Int(-1));
8279 let zero_int = encode_cypher_sort_key(&Value::Int(0));
8280 let zero_float = encode_cypher_sort_key(&Value::Float(0.0));
8281 let one_int = encode_cypher_sort_key(&Value::Int(1));
8282 let one_float = encode_cypher_sort_key(&Value::Float(1.0));
8283 let hundred = encode_cypher_sort_key(&Value::Int(100));
8284 let pos_inf = encode_cypher_sort_key(&Value::Float(f64::INFINITY));
8285 let nan = encode_cypher_sort_key(&Value::Float(f64::NAN));
8286
8287 assert!(neg_inf < neg_100, "-inf < -100");
8288 assert!(neg_100 < neg_1, "-100 < -1");
8289 assert!(neg_1 < zero_int, "-1 < 0");
8290 assert_eq!(zero_int, zero_float, "0 int == 0.0 float");
8291 assert!(zero_int < one_int, "0 < 1");
8292 assert_eq!(one_int, one_float, "1 int == 1.0 float");
8293 assert!(one_int < hundred, "1 < 100");
8294 assert!(hundred < pos_inf, "100 < +inf");
8295 assert!(pos_inf < nan, "+inf < NaN");
8297 }
8298
8299 #[test]
8300 fn test_sort_key_booleans() {
8301 let f = encode_cypher_sort_key(&Value::Bool(false));
8302 let t = encode_cypher_sort_key(&Value::Bool(true));
8303 assert!(f < t, "false < true");
8304 }
8305
8306 #[test]
8307 fn test_sort_key_strings() {
8308 let empty = encode_cypher_sort_key(&Value::String(String::new()));
8309 let a = encode_cypher_sort_key(&Value::String("a".to_string()));
8310 let ab = encode_cypher_sort_key(&Value::String("ab".to_string()));
8311 let b = encode_cypher_sort_key(&Value::String("b".to_string()));
8312
8313 assert!(empty < a, "'' < 'a'");
8314 assert!(a < ab, "'a' < 'ab'");
8315 assert!(ab < b, "'ab' < 'b'");
8316 }
8317
8318 #[test]
8319 fn test_sort_key_lists() {
8320 let empty = encode_cypher_sort_key(&Value::List(vec![]));
8321 let one = encode_cypher_sort_key(&Value::List(vec![Value::Int(1)]));
8322 let one_two = encode_cypher_sort_key(&Value::List(vec![Value::Int(1), Value::Int(2)]));
8323 let two = encode_cypher_sort_key(&Value::List(vec![Value::Int(2)]));
8324
8325 assert!(empty < one, "[] < [1]");
8326 assert!(one < one_two, "[1] < [1,2]");
8327 assert!(one_two < two, "[1,2] < [2]");
8328 }
8329
8330 #[test]
8331 fn test_sort_key_temporal() {
8332 use uni_common::TemporalValue;
8333
8334 let date1 = encode_cypher_sort_key(&Value::Temporal(TemporalValue::Date {
8335 days_since_epoch: 100,
8336 }));
8337 let date2 = encode_cypher_sort_key(&Value::Temporal(TemporalValue::Date {
8338 days_since_epoch: 200,
8339 }));
8340 assert!(date1 < date2, "earlier date < later date");
8341
8342 let date = encode_cypher_sort_key(&Value::Temporal(TemporalValue::Date {
8344 days_since_epoch: i32::MAX,
8345 }));
8346 let local_time = encode_cypher_sort_key(&Value::Temporal(TemporalValue::LocalTime {
8347 nanos_since_midnight: 0,
8348 }));
8349 assert!(date < local_time, "Date < LocalTime (by variant rank)");
8350 }
8351
8352 #[test]
8353 fn test_sort_key_nested_lists() {
8354 let inner_a = Value::List(vec![Value::Int(1)]);
8355 let inner_b = Value::List(vec![Value::Int(2)]);
8356
8357 let list_a = encode_cypher_sort_key(&Value::List(vec![inner_a.clone()]));
8358 let list_b = encode_cypher_sort_key(&Value::List(vec![inner_b.clone()]));
8359
8360 assert!(list_a < list_b, "[[1]] < [[2]]");
8361 }
8362
8363 #[test]
8364 fn test_sort_key_null_handling() {
8365 let null_key = encode_cypher_sort_key(&Value::Null);
8366 assert_eq!(null_key, vec![0x0A], "Null produces [0x0A]");
8367
8368 let number_key = encode_cypher_sort_key(&Value::Int(42));
8370 assert!(number_key < null_key, "number < null");
8371 }
8372
8373 #[test]
8374 fn test_byte_stuff_roundtrip() {
8375 let s1 = Value::String("a\x00b".to_string());
8377 let s2 = Value::String("a\x00c".to_string());
8378 let s3 = Value::String("a\x01".to_string());
8379
8380 let k1 = encode_cypher_sort_key(&s1);
8381 let k2 = encode_cypher_sort_key(&s2);
8382 let k3 = encode_cypher_sort_key(&s3);
8383
8384 assert!(k1 < k2, "a\\x00b < a\\x00c");
8385 assert!(k1 < k3, "a\\x00b < a\\x01");
8388 }
8389
8390 #[test]
8391 fn test_sort_key_order_preserving_f64() {
8392 let vals = [f64::NEG_INFINITY, -1.0, -0.0, 0.0, 1.0, f64::INFINITY];
8394 let encoded: Vec<[u8; 8]> = vals
8395 .iter()
8396 .map(|f| encode_order_preserving_f64(*f))
8397 .collect();
8398
8399 for i in 0..encoded.len() - 1 {
8400 assert!(
8401 encoded[i] <= encoded[i + 1],
8402 "encode({}) should <= encode({}), got {:?} vs {:?}",
8403 vals[i],
8404 vals[i + 1],
8405 encoded[i],
8406 encoded[i + 1]
8407 );
8408 }
8409 }
8410
8411 #[test]
8415 fn test_sort_key_string_as_temporal_time_with_offset() {
8416 let tv = sort_key_string_as_temporal("12:35:15+05:00")
8417 .expect("should parse Time with positive offset");
8418 match tv {
8419 uni_common::TemporalValue::Time {
8420 nanos_since_midnight,
8421 offset_seconds,
8422 } => {
8423 assert_eq!(offset_seconds, 5 * 3600, "offset should be +05:00 = 18000s");
8424 let expected_nanos = (12 * 3600 + 35 * 60 + 15) * 1_000_000_000i64;
8426 assert_eq!(nanos_since_midnight, expected_nanos);
8427 }
8428 other => panic!("expected TemporalValue::Time, got {other:?}"),
8429 }
8430 }
8431
8432 #[test]
8433 fn test_sort_key_string_as_temporal_time_negative_offset() {
8434 let tv = sort_key_string_as_temporal("10:35:00-08:00")
8435 .expect("should parse Time with negative offset");
8436 match tv {
8437 uni_common::TemporalValue::Time {
8438 nanos_since_midnight,
8439 offset_seconds,
8440 } => {
8441 assert_eq!(
8442 offset_seconds,
8443 -8 * 3600,
8444 "offset should be -08:00 = -28800s"
8445 );
8446 let expected_nanos = (10 * 3600 + 35 * 60) * 1_000_000_000i64;
8447 assert_eq!(nanos_since_midnight, expected_nanos);
8448 }
8449 other => panic!("expected TemporalValue::Time, got {other:?}"),
8450 }
8451 }
8452
8453 #[test]
8454 fn test_sort_key_string_as_temporal_date() {
8455 use super::super::expr_eval::temporal_from_value;
8456 let tv = temporal_from_value(&Value::String("2024-01-15".into()))
8457 .expect("should parse Date string");
8458 match tv {
8459 uni_common::TemporalValue::Date { days_since_epoch } => {
8460 assert!(days_since_epoch > 0, "2024-01-15 should be after epoch");
8462 }
8463 other => panic!("expected TemporalValue::Date, got {other:?}"),
8464 }
8465 }
8466}