Skip to main content

sedona_expr/
item_crs.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{fmt::Debug, iter::zip, sync::Arc};
19
20use arrow_array::{Array, ArrayRef, StructArray};
21use arrow_buffer::NullBuffer;
22use arrow_schema::{DataType, Field, FieldRef};
23use datafusion_common::config::ConfigOptions;
24use datafusion_common::{
25    cast::{as_string_view_array, as_struct_array},
26    exec_err, DataFusionError, Result, ScalarValue,
27};
28use datafusion_expr::{Accumulator, ColumnarValue};
29use sedona_common::sedona_internal_err;
30use sedona_schema::{crs::deserialize_crs, datatypes::SedonaType, matchers::ArgMatcher};
31
32use crate::aggregate_udf::{IntoSedonaAccumulatorRefs, SedonaAccumulator, SedonaAccumulatorRef};
33use crate::scalar_udf::{IntoScalarKernelRefs, ScalarKernelRef, SedonaScalarKernel};
34
35/// Wrap a [SedonaScalarKernel] to provide Item CRS type support
36///
37/// Most kernels that operate on geometry or geography in some way
38/// can also support Item CRS inputs:
39///
40/// - Functions that return a non-spatial type whose value does not
41///   depend on the input CRS only need to operate on the `item` portion
42///   of any item_crs input (e.g., ST_Area()).
43/// - Functions that accept two or more spatial arguments must have
44///   compatible CRSes.
45/// - Functions that return a geometry or geography must also return
46///   an item_crs type where the output CRSes are propagated from the
47///   input.
48///
49/// This kernel provides an automatic wrapper enforcing these rules.
50/// It is appropriate for most functions except:
51///
52/// - Functions whose return value depends on the CRS
53/// - Functions whose return value depends on the value of a scalar
54///   argument
55/// - Functions whose return CRS is not strictly propagated from the
56///   CRSes of the arguments.
57#[derive(Debug)]
58pub struct ItemCrsKernel {
59    inner: ScalarKernelRef,
60}
61
62impl ItemCrsKernel {
63    /// Create a new [ScalarKernelRef] wrapping the input
64    ///
65    /// The resulting kernel matches arguments of the input with ItemCrs inputs
66    /// but not those of the original kernel (i.e., a function needs both kernels
67    /// to support both type-level and item-level CRSes).
68    pub fn new_ref(inner: ScalarKernelRef) -> ScalarKernelRef {
69        Arc::new(Self { inner })
70    }
71
72    /// Wrap a vector of kernels by appending all ItemCrs versions followed by
73    /// the contents of inner
74    ///
75    /// This is the recommended way to add kernels when all of them should support
76    /// ItemCrs inputs.
77    pub fn wrap_impl(inner: impl IntoScalarKernelRefs) -> Vec<ScalarKernelRef> {
78        let kernels = inner.into_scalar_kernel_refs();
79
80        let mut out = Vec::with_capacity(kernels.len() * 2);
81
82        // Add ItemCrsKernels first (so they will be resolved last)
83        for inner_kernel in &kernels {
84            out.push(ItemCrsKernel::new_ref(inner_kernel.clone()));
85        }
86
87        for inner_kernel in kernels {
88            out.push(inner_kernel);
89        }
90
91        out
92    }
93}
94
95impl SedonaScalarKernel for ItemCrsKernel {
96    fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
97        return_type_handle_item_crs(self.inner.as_ref(), args)
98    }
99
100    fn invoke_batch_from_args(
101        &self,
102        arg_types: &[SedonaType],
103        args: &[ColumnarValue],
104        return_type: &SedonaType,
105        num_rows: usize,
106        config_options: Option<&ConfigOptions>,
107    ) -> Result<ColumnarValue> {
108        invoke_handle_item_crs(
109            self.inner.as_ref(),
110            arg_types,
111            args,
112            return_type,
113            num_rows,
114            config_options,
115        )
116    }
117
118    fn invoke_batch(
119        &self,
120        _arg_types: &[SedonaType],
121        _args: &[ColumnarValue],
122    ) -> Result<ColumnarValue> {
123        sedona_internal_err!("Should not be called because invoke_batch_from_args() is implemented")
124    }
125}
126
127/// Wrap a [SedonaAccumulator] to provide Item CRS type support
128///
129/// Most accumulators that operate on geometry or geography in some way
130/// can also support Item CRS inputs:
131///
132/// - Accumulators that return a non-spatial type whose value does not
133///   depend on the input CRS only need to operate on the `item` portion
134///   of any item_crs input (e.g., ST_Analyze_Agg()).
135/// - Accumulators that return a geometry or geography must also return
136///   an item_crs type where the output CRSes are propagated from the
137///   input.
138/// - CRSes within a single group must be compatible
139///
140/// This accumulator provides an automatic wrapper enforcing these rules.
141#[derive(Debug)]
142pub struct ItemCrsSedonaAccumulator {
143    inner: SedonaAccumulatorRef,
144}
145
146impl ItemCrsSedonaAccumulator {
147    /// Create a new [SedonaAccumulatorRef] wrapping the input
148    ///
149    /// The resulting accumulator matches arguments of the input with ItemCrs inputs
150    /// but not those of the original accumulator (i.e., an aggregate function needs both
151    /// accumulators to support both type-level and item-level CRSes).
152    pub fn new_ref(inner: SedonaAccumulatorRef) -> SedonaAccumulatorRef {
153        Arc::new(Self { inner })
154    }
155
156    /// Wrap a vector of accumulators by appending all ItemCrs versions followed by
157    /// the contents of inner
158    ///
159    /// This is the recommended way to add accumulators when all of them should support
160    /// ItemCrs inputs.
161    pub fn wrap_impl(inner: impl IntoSedonaAccumulatorRefs) -> Vec<SedonaAccumulatorRef> {
162        let accumulators = inner.into_sedona_accumulator_refs();
163
164        let mut out = Vec::with_capacity(accumulators.len() * 2);
165
166        // Add ItemCrsAccumulators first (so they will be resolved last)
167        for inner_accumulator in &accumulators {
168            out.push(ItemCrsSedonaAccumulator::new_ref(inner_accumulator.clone()));
169        }
170
171        for inner_accumulator in accumulators {
172            out.push(inner_accumulator);
173        }
174
175        out
176    }
177}
178
179impl SedonaAccumulator for ItemCrsSedonaAccumulator {
180    fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
181        // We don't have any functions we can test this with yet, so for the moment only support
182        // single-argument aggregations (slightly simpler).
183        if args.len() != 1 {
184            return Ok(None);
185        }
186
187        // This implementation doesn't apply to non-item crs types
188        if !ArgMatcher::is_item_crs().match_type(&args[0]) {
189            return Ok(None);
190        }
191
192        // Strip any CRS that might be present from the input type
193        let item_arg_types = args
194            .iter()
195            .map(|arg_type| {
196                parse_item_crs_arg_type_strip_crs(arg_type).map(|(item_type, _)| item_type)
197            })
198            .collect::<Result<Vec<_>>>()?;
199
200        // Resolve the inner accumulator's return type.
201        if let Some(item_type) = self.inner.return_type(&item_arg_types)? {
202            let geo_matcher = ArgMatcher::is_geometry_or_geography();
203
204            // If the inner output is item_crs, the output must also be item_crs. Otherwise
205            // the output is left as is.
206            if geo_matcher.match_type(&item_type) {
207                Ok(Some(SedonaType::new_item_crs(&item_type)?))
208            } else {
209                Ok(Some(item_type))
210            }
211        } else {
212            Ok(None)
213        }
214    }
215
216    fn accumulator(
217        &self,
218        args: &[SedonaType],
219        output_type: &SedonaType,
220    ) -> Result<Box<dyn datafusion_expr::Accumulator>> {
221        // Strip any CRS that might be present from the input type
222        let item_arg_types = args
223            .iter()
224            .map(|arg_type| {
225                parse_item_crs_arg_type_strip_crs(arg_type).map(|(item_type, _)| item_type)
226            })
227            .collect::<Result<Vec<_>>>()?;
228
229        // Extract the item output type from the item_crs output type
230        let (item_output_type, _) = parse_item_crs_arg_type(output_type)?;
231
232        // Create the inner accumulator
233        let inner = self.inner.accumulator(&item_arg_types, &item_output_type)?;
234
235        Ok(Box::new(ItemCrsAccumulator {
236            inner,
237            item_output_type,
238            crs: None,
239        }))
240    }
241
242    fn state_fields(&self, args: &[SedonaType]) -> Result<Vec<FieldRef>> {
243        // We need an extra state field to track the CRS of each group
244        let mut fields = self.inner.state_fields(args)?;
245        fields.push(Field::new("group_crs", DataType::Utf8View, true).into());
246        Ok(fields)
247    }
248}
249
250#[derive(Debug)]
251struct ItemCrsAccumulator {
252    /// The wrapped inner accumulator
253    inner: Box<dyn Accumulator>,
254    /// The item output type (without the item_crs wrapper)
255    item_output_type: SedonaType,
256    /// If any rows have been encountered, the CRS (the literal string "0" is used
257    /// as a sentinel for "no CRS" because we have to serialize it (and None is
258    /// reserved for "we haven't seen any rows yet"))
259    crs: Option<String>,
260}
261
262impl Accumulator for ItemCrsAccumulator {
263    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
264        // The input is an item_crs struct array; extract the item and crs columns
265        let struct_array = as_struct_array(&values[0])?;
266        let item_array = struct_array.column(0).clone();
267        let crs_array = as_string_view_array(struct_array.column(1))?;
268
269        // Check and track CRS values
270        if let Some(struct_nulls) = struct_array.nulls() {
271            // Skip CRS values for null items
272            for (is_valid, crs_value) in zip(struct_nulls, crs_array.iter()) {
273                if is_valid {
274                    self.merge_crs(crs_value.unwrap_or("0"))?;
275                }
276            }
277        } else {
278            // No nulls
279            for crs_value in crs_array.iter() {
280                self.merge_crs(crs_value.unwrap_or("0"))?;
281            }
282        }
283
284        // Update the inner accumulator with just the item portion
285        self.inner.update_batch(&[item_array])?;
286        Ok(())
287    }
288
289    fn evaluate(&mut self) -> Result<ScalarValue> {
290        let inner_result = self.inner.evaluate()?;
291
292        // If the output type is not geometry or geography we can just return it
293        if !matches!(
294            self.item_output_type,
295            SedonaType::Wkb(_, _) | SedonaType::WkbView(_, _)
296        ) {
297            return Ok(inner_result);
298        }
299
300        // Otherwise, prepare the item_crs result
301
302        // Convert the sentinel back to None
303        let crs_value = match &self.crs {
304            Some(s) if s == "0" => None,
305            Some(s) => Some(s.clone()),
306            None => None,
307        };
308
309        // Create the item_crs struct scalar
310        let item_crs_result = make_item_crs(
311            &self.item_output_type,
312            ColumnarValue::Scalar(inner_result),
313            &ColumnarValue::Scalar(ScalarValue::Utf8View(crs_value)),
314            None,
315        )?;
316
317        match item_crs_result {
318            ColumnarValue::Scalar(scalar) => Ok(scalar),
319            ColumnarValue::Array(_) => {
320                sedona_internal_err!("Expected scalar result from make_item_crs")
321            }
322        }
323    }
324
325    fn size(&self) -> usize {
326        self.inner.size() + size_of::<ItemCrsAccumulator>()
327    }
328
329    fn state(&mut self) -> Result<Vec<ScalarValue>> {
330        let mut inner_state = self.inner.state()?;
331        inner_state.push(ScalarValue::Utf8View(self.crs.clone()));
332        Ok(inner_state)
333    }
334
335    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
336        // The CRS field is the last element of states
337        if states.is_empty() {
338            return sedona_internal_err!("Expected at least one state field");
339        }
340        let crs_array = as_string_view_array(states.last().unwrap())?;
341
342        // Check and merge CRS values from the state
343        for crs_str in crs_array.iter().flatten() {
344            self.merge_crs(crs_str)?;
345        }
346
347        // Merge the inner state (excluding the CRS field)
348        let inner_states = &states[..states.len() - 1];
349        self.inner.merge_batch(inner_states)
350    }
351}
352
353impl ItemCrsAccumulator {
354    /// Merge a CRS value into the accumulator's tracked CRS
355    ///
356    /// Ensures all CRS values are compatible. Here "0" means an explicit
357    /// null crs. This is because we have to serialize it somehow and None
358    /// is reserved for the "we haven't seen a CRS yet".
359    fn merge_crs(&mut self, crs_str: &str) -> Result<()> {
360        match &self.crs {
361            None => {
362                // First CRS value encountered
363                self.crs = Some(crs_str.to_string());
364                Ok(())
365            }
366            Some(existing) if existing == crs_str => {
367                // CRS is byte-for-byte equal, nothing to do
368                Ok(())
369            }
370            Some(existing) => {
371                // Check if CRSes are semantically equal
372                let existing_crs = deserialize_crs(existing)?;
373                let new_crs = deserialize_crs(crs_str)?;
374                if existing_crs == new_crs {
375                    Ok(())
376                } else {
377                    let existing_displ = existing_crs
378                        .map(|c| c.to_string())
379                        .unwrap_or("None".to_string());
380                    let new_displ = new_crs.map(|c| c.to_string()).unwrap_or("None".to_string());
381                    exec_err!("CRS values not equal: {existing_displ} vs {new_displ}")
382                }
383            }
384        }
385    }
386}
387
388/// Calculate a return type based on the underlying kernel
389///
390/// This function extracts the item portion of any item_crs input and
391/// passes the result to the underlying kernel's return type implementation.
392/// If the underlying kernel is going to return a geometry or geography type,
393/// we wrap it in an item_crs type.
394///
395/// This function does not pass on input scalars, because those types of
396/// functions as used in SedonaDB typically assign a type-level CRS.
397/// Functions that use scalar inputs to calculate an output type need
398/// to implement an item_crs implementation themselves.
399fn return_type_handle_item_crs(
400    kernel: &dyn SedonaScalarKernel,
401    arg_types: &[SedonaType],
402) -> Result<Option<SedonaType>> {
403    let item_crs_matcher = ArgMatcher::is_item_crs();
404
405    // If there are no item_crs arguments, this kernel never applies.
406    if !arg_types
407        .iter()
408        .any(|arg_type| item_crs_matcher.match_type(arg_type))
409    {
410        return Ok(None);
411    }
412
413    // Extract the item types. This also strips the type-level CRS for any non item-crs
414    // type, because any resulting geometry type should be CRS free.
415    let item_arg_types = arg_types
416        .iter()
417        .map(|arg_type| parse_item_crs_arg_type_strip_crs(arg_type).map(|(item_type, _)| item_type))
418        .collect::<Result<Vec<_>>>()?;
419
420    // Any kernel that uses scalars to determine the output type is spurious here, so we
421    // pretend that there aren't any for the purposes of computing the type.
422    let scalar_args_none = (0..arg_types.len())
423        .map(|_| None)
424        .collect::<Vec<Option<&ScalarValue>>>();
425
426    // If the wrapped kernel matches and returns a geometry type, that geometry type will be an
427    // item/crs type. The new_item_crs() function handles stripping any CRS that might be present
428    // in the output type.
429    if let Some(item_type) =
430        kernel.return_type_from_args_and_scalars(&item_arg_types, &scalar_args_none)?
431    {
432        let geo_matcher = ArgMatcher::is_geometry_or_geography();
433        if geo_matcher.match_type(&item_type) {
434            Ok(Some(SedonaType::new_item_crs(&item_type)?))
435        } else {
436            Ok(Some(item_type))
437        }
438    } else {
439        Ok(None)
440    }
441}
442
443/// Execute an underlying kernel
444///
445/// This function handles invoking the underlying kernel, which operates
446/// only on the `item` portion of the `item_crs` type. Before executing,
447/// this function handles ensuring that all CRSes are compatible, and,
448/// if necessary, wrap a geometry or geography output in an item_crs
449/// type.
450fn invoke_handle_item_crs(
451    kernel: &dyn SedonaScalarKernel,
452    arg_types: &[SedonaType],
453    args: &[ColumnarValue],
454    return_type: &SedonaType,
455    num_rows: usize,
456    config_options: Option<&ConfigOptions>,
457) -> Result<ColumnarValue> {
458    // Separate the argument types into item and Option<crs>
459    // Don't strip the CRSes because we need them to compare with
460    // the item-level CRSes to ensure they are equal.
461    let arg_types_unwrapped = arg_types
462        .iter()
463        .map(parse_item_crs_arg_type)
464        .collect::<Result<Vec<_>>>()?;
465
466    let args_unwrapped = zip(&arg_types_unwrapped, args)
467        .map(|(arg_type, arg)| {
468            let (item_type, crs_type) = arg_type;
469            parse_item_crs_arg(item_type, crs_type, arg)
470        })
471        .collect::<Result<Vec<_>>>()?;
472
473    let crs_args = args_unwrapped
474        .iter()
475        .flat_map(|(_, crs_arg)| crs_arg)
476        .collect::<Vec<_>>();
477
478    let crs_result = ensure_crs_args_equal(&crs_args)?;
479
480    let item_types = arg_types_unwrapped
481        .iter()
482        .map(|(item_type, _)| item_type.clone())
483        .collect::<Vec<_>>();
484    let item_args = args_unwrapped
485        .iter()
486        .map(|(item_arg, _)| item_arg.clone())
487        .collect::<Vec<_>>();
488
489    let item_arg_types_no_crs = arg_types
490        .iter()
491        .map(|arg_type| parse_item_crs_arg_type_strip_crs(arg_type).map(|(item_type, _)| item_type))
492        .collect::<Result<Vec<_>>>()?;
493    let out_item_type = match kernel.return_type(&item_arg_types_no_crs)? {
494        Some(matched_item_type) => matched_item_type,
495        None => return sedona_internal_err!("Expected inner kernel to match types {item_types:?}"),
496    };
497
498    let item_result = kernel.invoke_batch_from_args(
499        &item_types,
500        &item_args,
501        return_type,
502        num_rows,
503        config_options,
504    )?;
505
506    if ArgMatcher::is_geometry_or_geography().match_type(&out_item_type) {
507        make_item_crs(&out_item_type, item_result, crs_result, None)
508    } else {
509        Ok(item_result)
510    }
511}
512
513/// Create a new item_crs struct [ColumnarValue]
514///
515/// Optionally provide extra nulls (e.g., to satisfy a scalar function contract
516/// where null inputs -> null outputs).
517pub fn make_item_crs(
518    item_type: &SedonaType,
519    item_result: ColumnarValue,
520    crs_result: &ColumnarValue,
521    extra_nulls: Option<&NullBuffer>,
522) -> Result<ColumnarValue> {
523    let out_fields = vec![
524        item_type.to_storage_field("item", true)?,
525        Field::new("crs", DataType::Utf8View, true),
526    ];
527
528    let scalar_result = matches!(
529        (&item_result, crs_result),
530        (ColumnarValue::Scalar(_), ColumnarValue::Scalar(_))
531    );
532
533    let item_crs_arrays = ColumnarValue::values_to_arrays(&[item_result, crs_result.clone()])?;
534    let item_array = &item_crs_arrays[0];
535    let crs_array = &item_crs_arrays[1];
536    let nulls = NullBuffer::union(item_array.nulls(), extra_nulls);
537
538    let item_crs_array = StructArray::new(
539        out_fields.into(),
540        vec![item_array.clone(), crs_array.clone()],
541        nulls,
542    );
543
544    if scalar_result {
545        Ok(ScalarValue::Struct(Arc::new(item_crs_array)).into())
546    } else {
547        Ok(ColumnarValue::Array(Arc::new(item_crs_array)))
548    }
549}
550
551/// Given an input type, separate it into an item and crs type (if the input
552/// is an item_crs type). Otherwise, just return the item type as is and return a
553/// CRS type of None.
554pub fn parse_item_crs_arg_type(
555    sedona_type: &SedonaType,
556) -> Result<(SedonaType, Option<SedonaType>)> {
557    if let SedonaType::Arrow(DataType::Struct(fields)) = sedona_type {
558        let field_names = fields.iter().map(|f| f.name()).collect::<Vec<_>>();
559        if field_names != ["item", "crs"] {
560            return Ok((sedona_type.clone(), None));
561        }
562
563        let item = SedonaType::from_storage_field(&fields[0])?;
564        let crs = SedonaType::from_storage_field(&fields[1])?;
565        Ok((item, Some(crs)))
566    } else {
567        Ok((sedona_type.clone(), None))
568    }
569}
570
571/// Given an input type, separate it into an item and crs type (if the input
572/// is an item_crs type). Otherwise, just return the item type as is. This
573/// version strips the CRS, which we need to do here before passing it to the
574/// underlying kernel (which expects all input CRSes to match).
575pub fn parse_item_crs_arg_type_strip_crs(
576    sedona_type: &SedonaType,
577) -> Result<(SedonaType, Option<SedonaType>)> {
578    match sedona_type {
579        SedonaType::Wkb(edges, _) => Ok((SedonaType::Wkb(*edges, None), None)),
580        SedonaType::WkbView(edges, _) => Ok((SedonaType::WkbView(*edges, None), None)),
581        SedonaType::Arrow(DataType::Struct(fields))
582            if fields.iter().map(|f| f.name()).collect::<Vec<_>>() == vec!["item", "crs"] =>
583        {
584            let item = SedonaType::from_storage_field(&fields[0])?;
585            let crs = SedonaType::from_storage_field(&fields[1])?;
586            Ok((item, Some(crs)))
587        }
588        other => Ok((other.clone(), None)),
589    }
590}
591
592/// Separate an argument into the item and its crs (if applicable). This
593/// operates on the result of parse_item_crs_arg_type().
594pub fn parse_item_crs_arg(
595    item_type: &SedonaType,
596    crs_type: &Option<SedonaType>,
597    arg: &ColumnarValue,
598) -> Result<(ColumnarValue, Option<ColumnarValue>)> {
599    if crs_type.is_some() {
600        return match arg {
601            ColumnarValue::Array(array) => {
602                let struct_array = as_struct_array(array)?;
603                Ok((
604                    ColumnarValue::Array(struct_array.column(0).clone()),
605                    Some(ColumnarValue::Array(struct_array.column(1).clone())),
606                ))
607            }
608            ColumnarValue::Scalar(scalar_value) => {
609                if let ScalarValue::Struct(struct_array) = scalar_value {
610                    let item_scalar = ScalarValue::try_from_array(struct_array.column(0), 0)?;
611                    let crs_scalar = ScalarValue::try_from_array(struct_array.column(1), 0)?;
612                    Ok((
613                        ColumnarValue::Scalar(item_scalar),
614                        Some(ColumnarValue::Scalar(crs_scalar)),
615                    ))
616                } else {
617                    sedona_internal_err!(
618                        "Expected struct scalar for item_crs but got {}",
619                        scalar_value
620                    )
621                }
622            }
623        };
624    }
625
626    match item_type {
627        SedonaType::Wkb(_, crs) | SedonaType::WkbView(_, crs) => {
628            let crs_scalar = if let Some(crs) = crs {
629                if let Some(auth_code) = crs.to_authority_code()? {
630                    ScalarValue::Utf8View(Some(auth_code))
631                } else {
632                    ScalarValue::Utf8View(Some(crs.to_json()))
633                }
634            } else {
635                ScalarValue::Utf8View(None)
636            };
637
638            Ok((arg.clone(), Some(ColumnarValue::Scalar(crs_scalar))))
639        }
640        _ => Ok((arg.clone(), None)),
641    }
642}
643
644/// Ensures values representing CRSes all represent equivalent CRS values
645fn ensure_crs_args_equal<'a>(crs_args: &[&'a ColumnarValue]) -> Result<&'a ColumnarValue> {
646    match crs_args.len() {
647        0 => sedona_internal_err!("Zero CRS arguments as input to item_crs"),
648        1 => Ok(crs_args[0]),
649        _ => {
650            let crs_args_string = crs_args
651                .iter()
652                .map(|arg| arg.cast_to(&DataType::Utf8View, None))
653                .collect::<Result<Vec<_>>>()?;
654            let crs_arrays = ColumnarValue::values_to_arrays(&crs_args_string)?;
655            for i in 1..crs_arrays.len() {
656                ensure_crs_string_arrays_equal2(&crs_arrays[i - 1], &crs_arrays[i])?
657            }
658
659            Ok(crs_args[0])
660        }
661    }
662}
663
664// Checks two string view arrays for equality when each represents a string representation
665// of a CRS
666fn ensure_crs_string_arrays_equal2(lhs: &ArrayRef, rhs: &ArrayRef) -> Result<()> {
667    for (lhs_item, rhs_item) in zip(as_string_view_array(lhs)?, as_string_view_array(rhs)?) {
668        if lhs_item == rhs_item {
669            // First check for byte-for-byte equality (faster and most likely)
670            continue;
671        }
672
673        // Check the deserialized CRS values for equality
674        if let (Some(lhs_item_str), Some(rhs_item_str)) = (lhs_item, rhs_item) {
675            let lhs_crs = deserialize_crs(lhs_item_str)?;
676            let rhs_crs = deserialize_crs(rhs_item_str)?;
677            if lhs_crs == rhs_crs {
678                continue;
679            }
680        }
681
682        if lhs_item != rhs_item {
683            return Err(DataFusionError::Execution(format!(
684                "CRS values not equal: {lhs_item:?} vs {rhs_item:?}",
685            )));
686        }
687    }
688
689    Ok(())
690}
691
692#[cfg(test)]
693mod test {
694    use datafusion_expr::ScalarUDF;
695    use rstest::rstest;
696    use sedona_schema::{
697        crs::lnglat,
698        datatypes::{Edges, SedonaType, WKB_GEOMETRY, WKB_GEOMETRY_ITEM_CRS},
699    };
700    use sedona_testing::{
701        create::create_array_item_crs, create::create_scalar_item_crs, testers::ScalarUdfTester,
702    };
703
704    use crate::scalar_udf::{SedonaScalarUDF, SimpleSedonaScalarKernel};
705
706    use super::*;
707
708    // A test function of something + geometry -> out_type
709    fn test_udf(out_type: SedonaType) -> ScalarUDF {
710        let geom_to_geom_kernel = SimpleSedonaScalarKernel::new_ref(
711            ArgMatcher::new(
712                vec![ArgMatcher::is_any(), ArgMatcher::is_geometry()],
713                out_type,
714            ),
715            Arc::new(|_arg_types, args| Ok(args[0].clone())),
716        );
717
718        let crsified_kernel = ItemCrsKernel::new_ref(geom_to_geom_kernel);
719        SedonaScalarUDF::from_impl("fun", crsified_kernel.clone()).into()
720    }
721
722    #[test]
723    fn item_crs_kernel_no_match() {
724        // A call with geometry + geometry should fail (this case would be handled by the
725        // original kernel, not the item_crs kernel)
726        let tester = ScalarUdfTester::new(test_udf(WKB_GEOMETRY), vec![WKB_GEOMETRY, WKB_GEOMETRY]);
727        let err = tester.return_type().unwrap_err();
728        assert_eq!(
729            err.message(),
730            "fun(geometry, geometry): No kernel matching arguments"
731        );
732    }
733
734    #[rstest]
735    fn item_crs_kernel_basic(
736        #[values(
737            (WKB_GEOMETRY, WKB_GEOMETRY_ITEM_CRS.clone()),
738            (WKB_GEOMETRY_ITEM_CRS.clone(), WKB_GEOMETRY),
739            (WKB_GEOMETRY_ITEM_CRS.clone(), WKB_GEOMETRY_ITEM_CRS.clone())
740        )]
741        arg_types: (SedonaType, SedonaType),
742    ) {
743        // A call with geometry + item_crs or both item_crs should return item_crs
744        let tester = ScalarUdfTester::new(test_udf(WKB_GEOMETRY), vec![arg_types.0, arg_types.1]);
745        tester.assert_return_type(WKB_GEOMETRY_ITEM_CRS.clone());
746        let result = tester
747            .invoke_scalar_scalar("POINT (0 1)", "POINT (1 2)")
748            .unwrap();
749        assert_eq!(
750            result,
751            create_scalar_item_crs(Some("POINT (0 1)"), None, &WKB_GEOMETRY)
752        );
753    }
754
755    #[test]
756    fn item_crs_kernel_crs_values() {
757        let tester = ScalarUdfTester::new(
758            test_udf(WKB_GEOMETRY),
759            vec![WKB_GEOMETRY_ITEM_CRS.clone(), WKB_GEOMETRY_ITEM_CRS.clone()],
760        );
761        tester.assert_return_type(WKB_GEOMETRY_ITEM_CRS.clone());
762
763        let scalar_item_crs_4326 =
764            create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:4326"), &WKB_GEOMETRY);
765        let scalar_item_crs_crs84 =
766            create_scalar_item_crs(Some("POINT (0 1)"), Some("OGC:CRS84"), &WKB_GEOMETRY);
767        let scalar_item_crs_3857 =
768            create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:3857"), &WKB_GEOMETRY);
769
770        // Should be able to execute when both arguments have an equal
771        // (but not necessarily identical) CRS
772        let result = tester
773            .invoke_scalar_scalar(scalar_item_crs_4326.clone(), scalar_item_crs_crs84.clone())
774            .unwrap();
775        assert_eq!(result, scalar_item_crs_4326);
776
777        // We should get an error when the CRSes are not compatible
778        let err = tester
779            .invoke_scalar_scalar(scalar_item_crs_4326.clone(), scalar_item_crs_3857.clone())
780            .unwrap_err();
781        assert_eq!(
782            err.message(),
783            "CRS values not equal: Some(\"EPSG:4326\") vs Some(\"EPSG:3857\")"
784        );
785    }
786
787    #[test]
788    fn item_crs_kernel_crs_types() {
789        let scalar_item_crs_4326 =
790            create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:4326"), &WKB_GEOMETRY);
791        let scalar_item_crs_crs84 =
792            create_scalar_item_crs(Some("POINT (0 1)"), Some("OGC:CRS84"), &WKB_GEOMETRY);
793        let scalar_item_crs_3857 =
794            create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:3857"), &WKB_GEOMETRY);
795
796        let sedona_type_lnglat = SedonaType::Wkb(Edges::Planar, lnglat());
797        let tester = ScalarUdfTester::new(
798            test_udf(WKB_GEOMETRY),
799            vec![WKB_GEOMETRY_ITEM_CRS.clone(), sedona_type_lnglat.clone()],
800        );
801        tester.assert_return_type(WKB_GEOMETRY_ITEM_CRS.clone());
802
803        // We should be able to execute item_crs + geometry when the crs compares equal
804        let result = tester
805            .invoke_scalar_scalar(scalar_item_crs_4326.clone(), "POINT (3 4)")
806            .unwrap();
807        assert_eq!(result, scalar_item_crs_4326);
808
809        let result = tester
810            .invoke_scalar_scalar(scalar_item_crs_crs84.clone(), "POINT (3 4)")
811            .unwrap();
812        assert_eq!(result, scalar_item_crs_crs84);
813
814        // We should get an error when the CRSes are not compatible
815        let err = tester
816            .invoke_scalar_scalar(scalar_item_crs_3857.clone(), "POINT (3 4)")
817            .unwrap_err();
818        assert_eq!(
819            err.message(),
820            "CRS values not equal: Some(\"EPSG:3857\") vs Some(\"OGC:CRS84\")"
821        );
822    }
823
824    #[test]
825    fn item_crs_kernel_arrays() {
826        let tester = ScalarUdfTester::new(
827            test_udf(WKB_GEOMETRY),
828            vec![WKB_GEOMETRY_ITEM_CRS.clone(), WKB_GEOMETRY_ITEM_CRS.clone()],
829        );
830
831        let array_item_crs_lnglat = create_array_item_crs(
832            &[
833                Some("POINT (0 1)"),
834                Some("POINT (2 3)"),
835                Some("POINT (3 4)"),
836            ],
837            [Some("EPSG:4326"), Some("EPSG:4326"), Some("EPSG:4326")],
838            &WKB_GEOMETRY,
839        );
840        let scalar_item_crs_4326 =
841            create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:4326"), &WKB_GEOMETRY);
842        let scalar_item_crs_3857 =
843            create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:3857"), &WKB_GEOMETRY);
844
845        // This should succeed when all CRS combinations are compatible
846        let result = tester
847            .invoke_array_scalar(array_item_crs_lnglat.clone(), scalar_item_crs_4326.clone())
848            .unwrap();
849        assert_eq!(&result, &array_item_crs_lnglat);
850
851        // This should fail otherwise
852        let err = tester
853            .invoke_array_scalar(array_item_crs_lnglat.clone(), scalar_item_crs_3857.clone())
854            .unwrap_err();
855        assert_eq!(
856            err.message(),
857            "CRS values not equal: Some(\"EPSG:4326\") vs Some(\"EPSG:3857\")"
858        );
859    }
860
861    #[test]
862    fn item_crs_kernel_non_spatial_args_and_result() {
863        let scalar_item_crs =
864            create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:4326"), &WKB_GEOMETRY);
865
866        let tester = ScalarUdfTester::new(
867            test_udf(SedonaType::Arrow(DataType::Int32)),
868            vec![
869                SedonaType::Arrow(DataType::Int32),
870                WKB_GEOMETRY_ITEM_CRS.clone(),
871            ],
872        );
873        tester.assert_return_type(DataType::Int32);
874
875        let result = tester.invoke_scalar_scalar(1234, scalar_item_crs).unwrap();
876        assert_eq!(result, ScalarValue::Int32(Some(1234)))
877    }
878
879    #[test]
880    fn crs_args_equal() {
881        // Zero args
882        let err = ensure_crs_args_equal(&[]).unwrap_err();
883        assert!(err.message().contains("Zero CRS arguments"));
884
885        let crs_lnglat = ColumnarValue::Scalar(ScalarValue::Utf8(Some("EPSG:4326".to_string())));
886        let crs_also_lnglat =
887            ColumnarValue::Scalar(ScalarValue::Utf8(Some("OGC:CRS84".to_string())));
888        let crs_other = ColumnarValue::Scalar(ScalarValue::Utf8(Some("EPSG:3857".to_string())));
889
890        // One arg
891        let result_one_arg = ensure_crs_args_equal(&[&crs_lnglat]).unwrap();
892        assert!(std::ptr::eq(result_one_arg, &crs_lnglat));
893
894        // Two args (equal)
895        let result_two_args = ensure_crs_args_equal(&[&crs_lnglat, &crs_also_lnglat]).unwrap();
896        assert!(std::ptr::eq(result_two_args, &crs_lnglat));
897
898        // Two args (not equal)
899        let err = ensure_crs_args_equal(&[&crs_lnglat, &crs_other]).unwrap_err();
900        assert_eq!(
901            err.message(),
902            "CRS values not equal: Some(\"EPSG:4326\") vs Some(\"EPSG:3857\")"
903        );
904    }
905}