1use std::{fmt::Debug, iter::zip, sync::Arc};
19
20use arrow_array::{Array, ArrayRef, StructArray};
21use arrow_buffer::NullBuffer;
22use arrow_schema::{DataType, Field, FieldRef};
23use datafusion_common::config::ConfigOptions;
24use datafusion_common::{
25 cast::{as_string_view_array, as_struct_array},
26 exec_err, DataFusionError, Result, ScalarValue,
27};
28use datafusion_expr::{Accumulator, ColumnarValue};
29use sedona_common::sedona_internal_err;
30use sedona_schema::{crs::deserialize_crs, datatypes::SedonaType, matchers::ArgMatcher};
31
32use crate::aggregate_udf::{IntoSedonaAccumulatorRefs, SedonaAccumulator, SedonaAccumulatorRef};
33use crate::scalar_udf::{IntoScalarKernelRefs, ScalarKernelRef, SedonaScalarKernel};
34
35#[derive(Debug)]
58pub struct ItemCrsKernel {
59 inner: ScalarKernelRef,
60}
61
62impl ItemCrsKernel {
63 pub fn new_ref(inner: ScalarKernelRef) -> ScalarKernelRef {
69 Arc::new(Self { inner })
70 }
71
72 pub fn wrap_impl(inner: impl IntoScalarKernelRefs) -> Vec<ScalarKernelRef> {
78 let kernels = inner.into_scalar_kernel_refs();
79
80 let mut out = Vec::with_capacity(kernels.len() * 2);
81
82 for inner_kernel in &kernels {
84 out.push(ItemCrsKernel::new_ref(inner_kernel.clone()));
85 }
86
87 for inner_kernel in kernels {
88 out.push(inner_kernel);
89 }
90
91 out
92 }
93}
94
95impl SedonaScalarKernel for ItemCrsKernel {
96 fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
97 return_type_handle_item_crs(self.inner.as_ref(), args)
98 }
99
100 fn invoke_batch_from_args(
101 &self,
102 arg_types: &[SedonaType],
103 args: &[ColumnarValue],
104 return_type: &SedonaType,
105 num_rows: usize,
106 config_options: Option<&ConfigOptions>,
107 ) -> Result<ColumnarValue> {
108 invoke_handle_item_crs(
109 self.inner.as_ref(),
110 arg_types,
111 args,
112 return_type,
113 num_rows,
114 config_options,
115 )
116 }
117
118 fn invoke_batch(
119 &self,
120 _arg_types: &[SedonaType],
121 _args: &[ColumnarValue],
122 ) -> Result<ColumnarValue> {
123 sedona_internal_err!("Should not be called because invoke_batch_from_args() is implemented")
124 }
125}
126
127#[derive(Debug)]
142pub struct ItemCrsSedonaAccumulator {
143 inner: SedonaAccumulatorRef,
144}
145
146impl ItemCrsSedonaAccumulator {
147 pub fn new_ref(inner: SedonaAccumulatorRef) -> SedonaAccumulatorRef {
153 Arc::new(Self { inner })
154 }
155
156 pub fn wrap_impl(inner: impl IntoSedonaAccumulatorRefs) -> Vec<SedonaAccumulatorRef> {
162 let accumulators = inner.into_sedona_accumulator_refs();
163
164 let mut out = Vec::with_capacity(accumulators.len() * 2);
165
166 for inner_accumulator in &accumulators {
168 out.push(ItemCrsSedonaAccumulator::new_ref(inner_accumulator.clone()));
169 }
170
171 for inner_accumulator in accumulators {
172 out.push(inner_accumulator);
173 }
174
175 out
176 }
177}
178
179impl SedonaAccumulator for ItemCrsSedonaAccumulator {
180 fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
181 if args.len() != 1 {
184 return Ok(None);
185 }
186
187 if !ArgMatcher::is_item_crs().match_type(&args[0]) {
189 return Ok(None);
190 }
191
192 let item_arg_types = args
194 .iter()
195 .map(|arg_type| {
196 parse_item_crs_arg_type_strip_crs(arg_type).map(|(item_type, _)| item_type)
197 })
198 .collect::<Result<Vec<_>>>()?;
199
200 if let Some(item_type) = self.inner.return_type(&item_arg_types)? {
202 let geo_matcher = ArgMatcher::is_geometry_or_geography();
203
204 if geo_matcher.match_type(&item_type) {
207 Ok(Some(SedonaType::new_item_crs(&item_type)?))
208 } else {
209 Ok(Some(item_type))
210 }
211 } else {
212 Ok(None)
213 }
214 }
215
216 fn accumulator(
217 &self,
218 args: &[SedonaType],
219 output_type: &SedonaType,
220 ) -> Result<Box<dyn datafusion_expr::Accumulator>> {
221 let item_arg_types = args
223 .iter()
224 .map(|arg_type| {
225 parse_item_crs_arg_type_strip_crs(arg_type).map(|(item_type, _)| item_type)
226 })
227 .collect::<Result<Vec<_>>>()?;
228
229 let (item_output_type, _) = parse_item_crs_arg_type(output_type)?;
231
232 let inner = self.inner.accumulator(&item_arg_types, &item_output_type)?;
234
235 Ok(Box::new(ItemCrsAccumulator {
236 inner,
237 item_output_type,
238 crs: None,
239 }))
240 }
241
242 fn state_fields(&self, args: &[SedonaType]) -> Result<Vec<FieldRef>> {
243 let mut fields = self.inner.state_fields(args)?;
245 fields.push(Field::new("group_crs", DataType::Utf8View, true).into());
246 Ok(fields)
247 }
248}
249
250#[derive(Debug)]
251struct ItemCrsAccumulator {
252 inner: Box<dyn Accumulator>,
254 item_output_type: SedonaType,
256 crs: Option<String>,
260}
261
262impl Accumulator for ItemCrsAccumulator {
263 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
264 let struct_array = as_struct_array(&values[0])?;
266 let item_array = struct_array.column(0).clone();
267 let crs_array = as_string_view_array(struct_array.column(1))?;
268
269 if let Some(struct_nulls) = struct_array.nulls() {
271 for (is_valid, crs_value) in zip(struct_nulls, crs_array.iter()) {
273 if is_valid {
274 self.merge_crs(crs_value.unwrap_or("0"))?;
275 }
276 }
277 } else {
278 for crs_value in crs_array.iter() {
280 self.merge_crs(crs_value.unwrap_or("0"))?;
281 }
282 }
283
284 self.inner.update_batch(&[item_array])?;
286 Ok(())
287 }
288
289 fn evaluate(&mut self) -> Result<ScalarValue> {
290 let inner_result = self.inner.evaluate()?;
291
292 if !matches!(
294 self.item_output_type,
295 SedonaType::Wkb(_, _) | SedonaType::WkbView(_, _)
296 ) {
297 return Ok(inner_result);
298 }
299
300 let crs_value = match &self.crs {
304 Some(s) if s == "0" => None,
305 Some(s) => Some(s.clone()),
306 None => None,
307 };
308
309 let item_crs_result = make_item_crs(
311 &self.item_output_type,
312 ColumnarValue::Scalar(inner_result),
313 &ColumnarValue::Scalar(ScalarValue::Utf8View(crs_value)),
314 None,
315 )?;
316
317 match item_crs_result {
318 ColumnarValue::Scalar(scalar) => Ok(scalar),
319 ColumnarValue::Array(_) => {
320 sedona_internal_err!("Expected scalar result from make_item_crs")
321 }
322 }
323 }
324
325 fn size(&self) -> usize {
326 self.inner.size() + size_of::<ItemCrsAccumulator>()
327 }
328
329 fn state(&mut self) -> Result<Vec<ScalarValue>> {
330 let mut inner_state = self.inner.state()?;
331 inner_state.push(ScalarValue::Utf8View(self.crs.clone()));
332 Ok(inner_state)
333 }
334
335 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
336 if states.is_empty() {
338 return sedona_internal_err!("Expected at least one state field");
339 }
340 let crs_array = as_string_view_array(states.last().unwrap())?;
341
342 for crs_str in crs_array.iter().flatten() {
344 self.merge_crs(crs_str)?;
345 }
346
347 let inner_states = &states[..states.len() - 1];
349 self.inner.merge_batch(inner_states)
350 }
351}
352
353impl ItemCrsAccumulator {
354 fn merge_crs(&mut self, crs_str: &str) -> Result<()> {
360 match &self.crs {
361 None => {
362 self.crs = Some(crs_str.to_string());
364 Ok(())
365 }
366 Some(existing) if existing == crs_str => {
367 Ok(())
369 }
370 Some(existing) => {
371 let existing_crs = deserialize_crs(existing)?;
373 let new_crs = deserialize_crs(crs_str)?;
374 if existing_crs == new_crs {
375 Ok(())
376 } else {
377 let existing_displ = existing_crs
378 .map(|c| c.to_string())
379 .unwrap_or("None".to_string());
380 let new_displ = new_crs.map(|c| c.to_string()).unwrap_or("None".to_string());
381 exec_err!("CRS values not equal: {existing_displ} vs {new_displ}")
382 }
383 }
384 }
385 }
386}
387
388fn return_type_handle_item_crs(
400 kernel: &dyn SedonaScalarKernel,
401 arg_types: &[SedonaType],
402) -> Result<Option<SedonaType>> {
403 let item_crs_matcher = ArgMatcher::is_item_crs();
404
405 if !arg_types
407 .iter()
408 .any(|arg_type| item_crs_matcher.match_type(arg_type))
409 {
410 return Ok(None);
411 }
412
413 let item_arg_types = arg_types
416 .iter()
417 .map(|arg_type| parse_item_crs_arg_type_strip_crs(arg_type).map(|(item_type, _)| item_type))
418 .collect::<Result<Vec<_>>>()?;
419
420 let scalar_args_none = (0..arg_types.len())
423 .map(|_| None)
424 .collect::<Vec<Option<&ScalarValue>>>();
425
426 if let Some(item_type) =
430 kernel.return_type_from_args_and_scalars(&item_arg_types, &scalar_args_none)?
431 {
432 let geo_matcher = ArgMatcher::is_geometry_or_geography();
433 if geo_matcher.match_type(&item_type) {
434 Ok(Some(SedonaType::new_item_crs(&item_type)?))
435 } else {
436 Ok(Some(item_type))
437 }
438 } else {
439 Ok(None)
440 }
441}
442
443fn invoke_handle_item_crs(
451 kernel: &dyn SedonaScalarKernel,
452 arg_types: &[SedonaType],
453 args: &[ColumnarValue],
454 return_type: &SedonaType,
455 num_rows: usize,
456 config_options: Option<&ConfigOptions>,
457) -> Result<ColumnarValue> {
458 let arg_types_unwrapped = arg_types
462 .iter()
463 .map(parse_item_crs_arg_type)
464 .collect::<Result<Vec<_>>>()?;
465
466 let args_unwrapped = zip(&arg_types_unwrapped, args)
467 .map(|(arg_type, arg)| {
468 let (item_type, crs_type) = arg_type;
469 parse_item_crs_arg(item_type, crs_type, arg)
470 })
471 .collect::<Result<Vec<_>>>()?;
472
473 let crs_args = args_unwrapped
474 .iter()
475 .flat_map(|(_, crs_arg)| crs_arg)
476 .collect::<Vec<_>>();
477
478 let crs_result = ensure_crs_args_equal(&crs_args)?;
479
480 let item_types = arg_types_unwrapped
481 .iter()
482 .map(|(item_type, _)| item_type.clone())
483 .collect::<Vec<_>>();
484 let item_args = args_unwrapped
485 .iter()
486 .map(|(item_arg, _)| item_arg.clone())
487 .collect::<Vec<_>>();
488
489 let item_arg_types_no_crs = arg_types
490 .iter()
491 .map(|arg_type| parse_item_crs_arg_type_strip_crs(arg_type).map(|(item_type, _)| item_type))
492 .collect::<Result<Vec<_>>>()?;
493 let out_item_type = match kernel.return_type(&item_arg_types_no_crs)? {
494 Some(matched_item_type) => matched_item_type,
495 None => return sedona_internal_err!("Expected inner kernel to match types {item_types:?}"),
496 };
497
498 let item_result = kernel.invoke_batch_from_args(
499 &item_types,
500 &item_args,
501 return_type,
502 num_rows,
503 config_options,
504 )?;
505
506 if ArgMatcher::is_geometry_or_geography().match_type(&out_item_type) {
507 make_item_crs(&out_item_type, item_result, crs_result, None)
508 } else {
509 Ok(item_result)
510 }
511}
512
513pub fn make_item_crs(
518 item_type: &SedonaType,
519 item_result: ColumnarValue,
520 crs_result: &ColumnarValue,
521 extra_nulls: Option<&NullBuffer>,
522) -> Result<ColumnarValue> {
523 let out_fields = vec![
524 item_type.to_storage_field("item", true)?,
525 Field::new("crs", DataType::Utf8View, true),
526 ];
527
528 let scalar_result = matches!(
529 (&item_result, crs_result),
530 (ColumnarValue::Scalar(_), ColumnarValue::Scalar(_))
531 );
532
533 let item_crs_arrays = ColumnarValue::values_to_arrays(&[item_result, crs_result.clone()])?;
534 let item_array = &item_crs_arrays[0];
535 let crs_array = &item_crs_arrays[1];
536 let nulls = NullBuffer::union(item_array.nulls(), extra_nulls);
537
538 let item_crs_array = StructArray::new(
539 out_fields.into(),
540 vec![item_array.clone(), crs_array.clone()],
541 nulls,
542 );
543
544 if scalar_result {
545 Ok(ScalarValue::Struct(Arc::new(item_crs_array)).into())
546 } else {
547 Ok(ColumnarValue::Array(Arc::new(item_crs_array)))
548 }
549}
550
551pub fn parse_item_crs_arg_type(
555 sedona_type: &SedonaType,
556) -> Result<(SedonaType, Option<SedonaType>)> {
557 if let SedonaType::Arrow(DataType::Struct(fields)) = sedona_type {
558 let field_names = fields.iter().map(|f| f.name()).collect::<Vec<_>>();
559 if field_names != ["item", "crs"] {
560 return Ok((sedona_type.clone(), None));
561 }
562
563 let item = SedonaType::from_storage_field(&fields[0])?;
564 let crs = SedonaType::from_storage_field(&fields[1])?;
565 Ok((item, Some(crs)))
566 } else {
567 Ok((sedona_type.clone(), None))
568 }
569}
570
571pub fn parse_item_crs_arg_type_strip_crs(
576 sedona_type: &SedonaType,
577) -> Result<(SedonaType, Option<SedonaType>)> {
578 match sedona_type {
579 SedonaType::Wkb(edges, _) => Ok((SedonaType::Wkb(*edges, None), None)),
580 SedonaType::WkbView(edges, _) => Ok((SedonaType::WkbView(*edges, None), None)),
581 SedonaType::Arrow(DataType::Struct(fields))
582 if fields.iter().map(|f| f.name()).collect::<Vec<_>>() == vec!["item", "crs"] =>
583 {
584 let item = SedonaType::from_storage_field(&fields[0])?;
585 let crs = SedonaType::from_storage_field(&fields[1])?;
586 Ok((item, Some(crs)))
587 }
588 other => Ok((other.clone(), None)),
589 }
590}
591
592pub fn parse_item_crs_arg(
595 item_type: &SedonaType,
596 crs_type: &Option<SedonaType>,
597 arg: &ColumnarValue,
598) -> Result<(ColumnarValue, Option<ColumnarValue>)> {
599 if crs_type.is_some() {
600 return match arg {
601 ColumnarValue::Array(array) => {
602 let struct_array = as_struct_array(array)?;
603 Ok((
604 ColumnarValue::Array(struct_array.column(0).clone()),
605 Some(ColumnarValue::Array(struct_array.column(1).clone())),
606 ))
607 }
608 ColumnarValue::Scalar(scalar_value) => {
609 if let ScalarValue::Struct(struct_array) = scalar_value {
610 let item_scalar = ScalarValue::try_from_array(struct_array.column(0), 0)?;
611 let crs_scalar = ScalarValue::try_from_array(struct_array.column(1), 0)?;
612 Ok((
613 ColumnarValue::Scalar(item_scalar),
614 Some(ColumnarValue::Scalar(crs_scalar)),
615 ))
616 } else {
617 sedona_internal_err!(
618 "Expected struct scalar for item_crs but got {}",
619 scalar_value
620 )
621 }
622 }
623 };
624 }
625
626 match item_type {
627 SedonaType::Wkb(_, crs) | SedonaType::WkbView(_, crs) => {
628 let crs_scalar = if let Some(crs) = crs {
629 if let Some(auth_code) = crs.to_authority_code()? {
630 ScalarValue::Utf8View(Some(auth_code))
631 } else {
632 ScalarValue::Utf8View(Some(crs.to_json()))
633 }
634 } else {
635 ScalarValue::Utf8View(None)
636 };
637
638 Ok((arg.clone(), Some(ColumnarValue::Scalar(crs_scalar))))
639 }
640 _ => Ok((arg.clone(), None)),
641 }
642}
643
644fn ensure_crs_args_equal<'a>(crs_args: &[&'a ColumnarValue]) -> Result<&'a ColumnarValue> {
646 match crs_args.len() {
647 0 => sedona_internal_err!("Zero CRS arguments as input to item_crs"),
648 1 => Ok(crs_args[0]),
649 _ => {
650 let crs_args_string = crs_args
651 .iter()
652 .map(|arg| arg.cast_to(&DataType::Utf8View, None))
653 .collect::<Result<Vec<_>>>()?;
654 let crs_arrays = ColumnarValue::values_to_arrays(&crs_args_string)?;
655 for i in 1..crs_arrays.len() {
656 ensure_crs_string_arrays_equal2(&crs_arrays[i - 1], &crs_arrays[i])?
657 }
658
659 Ok(crs_args[0])
660 }
661 }
662}
663
664fn ensure_crs_string_arrays_equal2(lhs: &ArrayRef, rhs: &ArrayRef) -> Result<()> {
667 for (lhs_item, rhs_item) in zip(as_string_view_array(lhs)?, as_string_view_array(rhs)?) {
668 if lhs_item == rhs_item {
669 continue;
671 }
672
673 if let (Some(lhs_item_str), Some(rhs_item_str)) = (lhs_item, rhs_item) {
675 let lhs_crs = deserialize_crs(lhs_item_str)?;
676 let rhs_crs = deserialize_crs(rhs_item_str)?;
677 if lhs_crs == rhs_crs {
678 continue;
679 }
680 }
681
682 if lhs_item != rhs_item {
683 return Err(DataFusionError::Execution(format!(
684 "CRS values not equal: {lhs_item:?} vs {rhs_item:?}",
685 )));
686 }
687 }
688
689 Ok(())
690}
691
692#[cfg(test)]
693mod test {
694 use datafusion_expr::ScalarUDF;
695 use rstest::rstest;
696 use sedona_schema::{
697 crs::lnglat,
698 datatypes::{Edges, SedonaType, WKB_GEOMETRY, WKB_GEOMETRY_ITEM_CRS},
699 };
700 use sedona_testing::{
701 create::create_array_item_crs, create::create_scalar_item_crs, testers::ScalarUdfTester,
702 };
703
704 use crate::scalar_udf::{SedonaScalarUDF, SimpleSedonaScalarKernel};
705
706 use super::*;
707
708 fn test_udf(out_type: SedonaType) -> ScalarUDF {
710 let geom_to_geom_kernel = SimpleSedonaScalarKernel::new_ref(
711 ArgMatcher::new(
712 vec![ArgMatcher::is_any(), ArgMatcher::is_geometry()],
713 out_type,
714 ),
715 Arc::new(|_arg_types, args| Ok(args[0].clone())),
716 );
717
718 let crsified_kernel = ItemCrsKernel::new_ref(geom_to_geom_kernel);
719 SedonaScalarUDF::from_impl("fun", crsified_kernel.clone()).into()
720 }
721
722 #[test]
723 fn item_crs_kernel_no_match() {
724 let tester = ScalarUdfTester::new(test_udf(WKB_GEOMETRY), vec![WKB_GEOMETRY, WKB_GEOMETRY]);
727 let err = tester.return_type().unwrap_err();
728 assert_eq!(
729 err.message(),
730 "fun(geometry, geometry): No kernel matching arguments"
731 );
732 }
733
734 #[rstest]
735 fn item_crs_kernel_basic(
736 #[values(
737 (WKB_GEOMETRY, WKB_GEOMETRY_ITEM_CRS.clone()),
738 (WKB_GEOMETRY_ITEM_CRS.clone(), WKB_GEOMETRY),
739 (WKB_GEOMETRY_ITEM_CRS.clone(), WKB_GEOMETRY_ITEM_CRS.clone())
740 )]
741 arg_types: (SedonaType, SedonaType),
742 ) {
743 let tester = ScalarUdfTester::new(test_udf(WKB_GEOMETRY), vec![arg_types.0, arg_types.1]);
745 tester.assert_return_type(WKB_GEOMETRY_ITEM_CRS.clone());
746 let result = tester
747 .invoke_scalar_scalar("POINT (0 1)", "POINT (1 2)")
748 .unwrap();
749 assert_eq!(
750 result,
751 create_scalar_item_crs(Some("POINT (0 1)"), None, &WKB_GEOMETRY)
752 );
753 }
754
755 #[test]
756 fn item_crs_kernel_crs_values() {
757 let tester = ScalarUdfTester::new(
758 test_udf(WKB_GEOMETRY),
759 vec![WKB_GEOMETRY_ITEM_CRS.clone(), WKB_GEOMETRY_ITEM_CRS.clone()],
760 );
761 tester.assert_return_type(WKB_GEOMETRY_ITEM_CRS.clone());
762
763 let scalar_item_crs_4326 =
764 create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:4326"), &WKB_GEOMETRY);
765 let scalar_item_crs_crs84 =
766 create_scalar_item_crs(Some("POINT (0 1)"), Some("OGC:CRS84"), &WKB_GEOMETRY);
767 let scalar_item_crs_3857 =
768 create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:3857"), &WKB_GEOMETRY);
769
770 let result = tester
773 .invoke_scalar_scalar(scalar_item_crs_4326.clone(), scalar_item_crs_crs84.clone())
774 .unwrap();
775 assert_eq!(result, scalar_item_crs_4326);
776
777 let err = tester
779 .invoke_scalar_scalar(scalar_item_crs_4326.clone(), scalar_item_crs_3857.clone())
780 .unwrap_err();
781 assert_eq!(
782 err.message(),
783 "CRS values not equal: Some(\"EPSG:4326\") vs Some(\"EPSG:3857\")"
784 );
785 }
786
787 #[test]
788 fn item_crs_kernel_crs_types() {
789 let scalar_item_crs_4326 =
790 create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:4326"), &WKB_GEOMETRY);
791 let scalar_item_crs_crs84 =
792 create_scalar_item_crs(Some("POINT (0 1)"), Some("OGC:CRS84"), &WKB_GEOMETRY);
793 let scalar_item_crs_3857 =
794 create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:3857"), &WKB_GEOMETRY);
795
796 let sedona_type_lnglat = SedonaType::Wkb(Edges::Planar, lnglat());
797 let tester = ScalarUdfTester::new(
798 test_udf(WKB_GEOMETRY),
799 vec![WKB_GEOMETRY_ITEM_CRS.clone(), sedona_type_lnglat.clone()],
800 );
801 tester.assert_return_type(WKB_GEOMETRY_ITEM_CRS.clone());
802
803 let result = tester
805 .invoke_scalar_scalar(scalar_item_crs_4326.clone(), "POINT (3 4)")
806 .unwrap();
807 assert_eq!(result, scalar_item_crs_4326);
808
809 let result = tester
810 .invoke_scalar_scalar(scalar_item_crs_crs84.clone(), "POINT (3 4)")
811 .unwrap();
812 assert_eq!(result, scalar_item_crs_crs84);
813
814 let err = tester
816 .invoke_scalar_scalar(scalar_item_crs_3857.clone(), "POINT (3 4)")
817 .unwrap_err();
818 assert_eq!(
819 err.message(),
820 "CRS values not equal: Some(\"EPSG:3857\") vs Some(\"OGC:CRS84\")"
821 );
822 }
823
824 #[test]
825 fn item_crs_kernel_arrays() {
826 let tester = ScalarUdfTester::new(
827 test_udf(WKB_GEOMETRY),
828 vec![WKB_GEOMETRY_ITEM_CRS.clone(), WKB_GEOMETRY_ITEM_CRS.clone()],
829 );
830
831 let array_item_crs_lnglat = create_array_item_crs(
832 &[
833 Some("POINT (0 1)"),
834 Some("POINT (2 3)"),
835 Some("POINT (3 4)"),
836 ],
837 [Some("EPSG:4326"), Some("EPSG:4326"), Some("EPSG:4326")],
838 &WKB_GEOMETRY,
839 );
840 let scalar_item_crs_4326 =
841 create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:4326"), &WKB_GEOMETRY);
842 let scalar_item_crs_3857 =
843 create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:3857"), &WKB_GEOMETRY);
844
845 let result = tester
847 .invoke_array_scalar(array_item_crs_lnglat.clone(), scalar_item_crs_4326.clone())
848 .unwrap();
849 assert_eq!(&result, &array_item_crs_lnglat);
850
851 let err = tester
853 .invoke_array_scalar(array_item_crs_lnglat.clone(), scalar_item_crs_3857.clone())
854 .unwrap_err();
855 assert_eq!(
856 err.message(),
857 "CRS values not equal: Some(\"EPSG:4326\") vs Some(\"EPSG:3857\")"
858 );
859 }
860
861 #[test]
862 fn item_crs_kernel_non_spatial_args_and_result() {
863 let scalar_item_crs =
864 create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:4326"), &WKB_GEOMETRY);
865
866 let tester = ScalarUdfTester::new(
867 test_udf(SedonaType::Arrow(DataType::Int32)),
868 vec![
869 SedonaType::Arrow(DataType::Int32),
870 WKB_GEOMETRY_ITEM_CRS.clone(),
871 ],
872 );
873 tester.assert_return_type(DataType::Int32);
874
875 let result = tester.invoke_scalar_scalar(1234, scalar_item_crs).unwrap();
876 assert_eq!(result, ScalarValue::Int32(Some(1234)))
877 }
878
879 #[test]
880 fn crs_args_equal() {
881 let err = ensure_crs_args_equal(&[]).unwrap_err();
883 assert!(err.message().contains("Zero CRS arguments"));
884
885 let crs_lnglat = ColumnarValue::Scalar(ScalarValue::Utf8(Some("EPSG:4326".to_string())));
886 let crs_also_lnglat =
887 ColumnarValue::Scalar(ScalarValue::Utf8(Some("OGC:CRS84".to_string())));
888 let crs_other = ColumnarValue::Scalar(ScalarValue::Utf8(Some("EPSG:3857".to_string())));
889
890 let result_one_arg = ensure_crs_args_equal(&[&crs_lnglat]).unwrap();
892 assert!(std::ptr::eq(result_one_arg, &crs_lnglat));
893
894 let result_two_args = ensure_crs_args_equal(&[&crs_lnglat, &crs_also_lnglat]).unwrap();
896 assert!(std::ptr::eq(result_two_args, &crs_lnglat));
897
898 let err = ensure_crs_args_equal(&[&crs_lnglat, &crs_other]).unwrap_err();
900 assert_eq!(
901 err.message(),
902 "CRS values not equal: Some(\"EPSG:4326\") vs Some(\"EPSG:3857\")"
903 );
904 }
905}