Skip to main content

sedona_functions/
executor.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17use std::iter::zip;
18
19use arrow_array::ArrayRef;
20use arrow_schema::DataType;
21use datafusion_common::cast::{as_binary_array, as_binary_view_array, as_struct_array};
22use datafusion_common::error::Result;
23use datafusion_common::{DataFusionError, ScalarValue};
24use datafusion_expr::ColumnarValue;
25use sedona_common::sedona_internal_err;
26use sedona_schema::datatypes::SedonaType;
27use wkb::reader::Wkb;
28
29/// Helper for writing general kernel implementations with geometry
30///
31/// The [GenericExecutor] wraps a set of arguments and their types and provides helpers
32/// to make writing general compute functions less verbose. Broadly, kernel implementations
33/// must consider multiple input data types (e.g., Wkb/WkbView or Float32/Float64) and
34/// multiple combinations of Array or ScalarValue inputs. This executor is generic on
35/// a [GeometryFactory] to support iterating over geometries from multiple libraries;
36/// however, the most commonly used version is the [WkbExecutor].
37///
38/// The pattern supported by the [GenericExecutor] is:
39///
40/// - Create a [GenericExecutor] with `new()`
41/// - Create an Arrow builder of the appropriate output type using
42///   `with_capacity(executor.num_iterations()`
43/// - Use `execute_wkb()` with a lambda whose contents appends to the builder
44/// - Use `finish()` to build the output [ColumnarValue].
45///
46/// When all arguments are scalars, `execute_wkb()` will perform one iteration
47/// and `finish()` will return a [ColumnarValue::Scalar]. Otherwise, the output will
48/// be a [ColumnarValue::Array] built from `num_iterations()` iterations. This is true
49/// even if a geometry scalar is passed with a non-geometry argument. Non-geometry
50/// arrays are typically cheaper to cast to a concrete type (e.g., cast a numeric
51/// to float64 or an integer to int64) and cheaper to access by element or iterator
52/// compared to most geometry operations that require them.
53///
54/// The [GenericExecutor] is not built to be completely general and UDF implementers are
55/// free to use other mechanisms to implement UDFs with geometry arguments. The balance
56/// between optimizing iteration speed, minimizing dispatch overhead, and maximizing
57/// readability of kernel implementations is difficult to achieve and future utilities
58/// may provide alternatives optimized for a different balance than was chosen here.
59///
60/// This executor accepts two factories (supporting kernels accepting 0, 1, or 2
61/// geometry arguments), which can also support implementations that wish to "prepare"
62/// one side or the other (e.g., for binary predicates).
63///
64/// A critical optimization is iterating over two arguments where one argument is a
65/// scalar. In this case, [GeometryFactory::try_from_wkb] is called exactly once on the
66/// scalar side (e.g., whatever parsing needs to occur for the scalar only occurs once).
67pub struct GenericExecutor<'a, 'b, Factory0, Factory1> {
68    pub arg_types: &'a [SedonaType],
69    pub args: &'b [ColumnarValue],
70    num_iterations: usize,
71    factory0: Factory0,
72    factory1: Factory1,
73}
74
75/// Alias for an executor that iterates over geometries as [Wkb]
76pub type WkbExecutor<'a, 'b> = GenericExecutor<'a, 'b, WkbGeometryFactory, WkbGeometryFactory>;
77
78impl<'a, 'b, Factory0: GeometryFactory, Factory1: GeometryFactory>
79    GenericExecutor<'a, 'b, Factory0, Factory1>
80{
81    /// Create a new [GenericExecutor]
82    pub fn new(arg_types: &'a [SedonaType], args: &'b [ColumnarValue]) -> Self {
83        Self {
84            arg_types,
85            args,
86            num_iterations: Self::calc_num_iterations(args),
87            factory0: Factory0::default(),
88            factory1: Factory1::default(),
89        }
90    }
91
92    /// Return the number of iterations that will be performed `execute_*()` methods
93    ///
94    /// If all arguments are [ColumnarValue::Scalar]s, this will be one iteration.
95    /// Otherwise, it will be the length of the first array.
96    pub fn num_iterations(&self) -> usize {
97        self.num_iterations
98    }
99
100    /// Execute a function by iterating over [Wkb] scalars in the first argument
101    ///
102    /// Provides a mechanism to iterate over a geometry array by converting each to
103    /// a [Wkb] scalar. For [SedonaType::Wkb] and [SedonaType::WkbView] arrays, this
104    /// is the conversion that would normally happen. For other future supported geometry
105    /// array types, this may incur a cast of item-wise conversion overhead.
106    pub fn execute_wkb_void<F: FnMut(Option<Factory0::Geom<'b>>) -> Result<()>>(
107        &self,
108        mut func: F,
109    ) -> Result<()> {
110        let factory = Factory0::default();
111        match &self.args[0] {
112            ColumnarValue::Array(array) => {
113                array.iter_with_factory(&factory, &self.arg_types[0], self.num_iterations, func)
114            }
115            ColumnarValue::Scalar(scalar_value) => {
116                let wkb0 = scalar_value.scalar_from_factory(&self.factory0)?;
117                func(wkb0)
118            }
119        }
120    }
121
122    /// Execute a binary geometry function by iterating over [Wkb] scalars in the
123    /// first two arguments
124    ///
125    /// Provides a mechanism to iterate over two geometry arrays as pairs of [Wkb]
126    /// scalars. [SedonaType::Wkb] and [SedonaType::WkbView] arrays are iterated over
127    /// in place; however, future supported geometry array types may incur conversion
128    /// overhead.
129    pub fn execute_wkb_wkb_void<
130        F: FnMut(Option<&Factory0::Geom<'b>>, Option<&Factory1::Geom<'b>>) -> Result<()>,
131    >(
132        &self,
133        mut func: F,
134    ) -> Result<()> {
135        match (&self.args[0], &self.args[1]) {
136            (ColumnarValue::Array(array0), ColumnarValue::Array(array1)) => iter_wkb_wkb_array(
137                &self.factory0,
138                &self.factory1,
139                (&self.arg_types[0], &self.arg_types[1]),
140                (array0, array1),
141                func,
142            ),
143            (ColumnarValue::Array(array), ColumnarValue::Scalar(scalar_value)) => {
144                let wkb1 = scalar_value.scalar_from_factory(&self.factory1)?;
145                array.iter_with_factory(
146                    &self.factory0,
147                    &self.arg_types[0],
148                    self.num_iterations(),
149                    |wkb0| func(wkb0.as_ref(), wkb1.as_ref()),
150                )
151            }
152            (ColumnarValue::Scalar(scalar_value), ColumnarValue::Array(array)) => {
153                let wkb0 = scalar_value.scalar_from_factory(&self.factory0)?;
154                array.iter_with_factory(
155                    &self.factory1,
156                    &self.arg_types[1],
157                    self.num_iterations(),
158                    |wkb1| func(wkb0.as_ref(), wkb1.as_ref()),
159                )
160            }
161            (ColumnarValue::Scalar(scalar_value0), ColumnarValue::Scalar(scalar_value1)) => {
162                let wkb0 = scalar_value0.scalar_from_factory(&self.factory0)?;
163                let wkb1 = scalar_value1.scalar_from_factory(&self.factory1)?;
164                func(wkb0.as_ref(), wkb1.as_ref())
165            }
166        }
167    }
168
169    /// Finish an [ArrayRef] output as the appropriate [ColumnarValue]
170    ///
171    /// Converts the output of `finish()`ing an Arrow builder into a
172    /// [ColumnarValue::Scalar] if all arguments were scalars, or a
173    /// [ColumnarValue::Array] otherwise.
174    pub fn finish(&self, out: ArrayRef) -> Result<ColumnarValue> {
175        for arg in self.args {
176            match arg {
177                // If any argument was an array, we return an array
178                ColumnarValue::Array(_) => {
179                    return Ok(ColumnarValue::Array(out));
180                }
181                ColumnarValue::Scalar(_) => {}
182            }
183        }
184
185        // For all scalar arguments, we return a scalar
186        Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(&out, 0)?))
187    }
188
189    /// Calculates the number of iterations that should happen based on the
190    /// argument ColumnarValue types
191    fn calc_num_iterations(args: &[ColumnarValue]) -> usize {
192        for arg in args {
193            match arg {
194                // If any argument is an array, we have to iterate array.len() times
195                ColumnarValue::Array(array) => {
196                    return array.len();
197                }
198                ColumnarValue::Scalar(_) => {}
199            }
200        }
201
202        // For all scalar arguments, we iterate once
203        1
204    }
205}
206
207/// Factory object to help the [GenericExecutor] iterate over
208/// various concrete geometry objects
209pub trait GeometryFactory: Default {
210    /// The concrete geometry type (e.g., [Wkb])
211    ///
212    /// Usually this is some type whose conversion from raw WKB bytes incurs some cost.
213    /// The lifetime ensure that types that are a "view" of their input [ArrayRef] or
214    /// [ScalarValue] can be used here; however, this type may also own its own data.
215    type Geom<'a>;
216
217    /// Parse bytes of WKB or EWKB into [GeometryFactory::Geom]
218    fn try_from_wkb<'a>(&self, wkb_bytes: &'a [u8]) -> Result<Self::Geom<'a>>;
219
220    /// Helper that calls [GeometryFactory::try_from_wkb] on an
221    /// `Option<>`.
222    fn try_from_maybe_wkb<'a>(
223        &self,
224        maybe_wkb_bytes: Option<&'a [u8]>,
225    ) -> Result<Option<Self::Geom<'a>>> {
226        match maybe_wkb_bytes {
227            Some(wkb_bytes) => Ok(Some(self.try_from_wkb(wkb_bytes)?)),
228            None => Ok(None),
229        }
230    }
231}
232
233/// A [GeometryFactory] whose geometry type is [Wkb]
234///
235/// Using this geometry factory iterates over items as references to [Wkb]
236/// objects (which are the fastest objects when iterating over WKB input
237/// that implement geo-traits).
238#[derive(Default)]
239pub struct WkbGeometryFactory {}
240
241impl GeometryFactory for WkbGeometryFactory {
242    type Geom<'a> = Wkb<'a>;
243
244    fn try_from_wkb<'a>(&self, wkb_bytes: &'a [u8]) -> Result<Self::Geom<'a>> {
245        wkb::reader::read_wkb(wkb_bytes).map_err(|e| DataFusionError::External(Box::new(e)))
246    }
247}
248
249/// A [GeometryFactory] whose geometry type are raw WKB bytes
250///
251/// Using this geometry factory iterates over items as references to the raw underlying
252/// bytes, which is useful for writing optimized kernels that do not need the full buffer to
253/// be validated and/or parsed.
254#[derive(Default)]
255pub struct WkbBytesFactory {}
256
257impl GeometryFactory for WkbBytesFactory {
258    type Geom<'a> = &'a [u8];
259
260    fn try_from_wkb<'a>(&self, wkb_bytes: &'a [u8]) -> Result<Self::Geom<'a>> {
261        Ok(wkb_bytes)
262    }
263}
264
265/// Alias for an executor that iterates over geometries in their raw [Wkb] bytes.
266///
267/// This [GenericExecutor] implementation provides more optimization opportunities,
268/// but it requires additional manual processing of the raw [Wkb] bytes compared to
269/// the [WkbExecutor].
270pub type WkbBytesExecutor<'a, 'b> = GenericExecutor<'a, 'b, WkbBytesFactory, WkbBytesFactory>;
271
272/// Trait for iterating over a container type as geometry scalars
273///
274/// Currently the only scalar type supported is [Wkb]; however, for future
275/// geometry array types it may make sense to offer other scalar types over
276/// which to iterate.
277pub trait IterGeo {
278    fn iter_as_wkb_bytes<'a, F: FnMut(Option<&'a [u8]>) -> Result<()>>(
279        &'a self,
280        sedona_type: &SedonaType,
281        num_iterations: usize,
282        func: F,
283    ) -> Result<()>;
284
285    fn iter_with_factory<
286        'a,
287        Factory: GeometryFactory,
288        F: FnMut(Option<Factory::Geom<'a>>) -> Result<()>,
289    >(
290        &'a self,
291        factory: &Factory,
292        sedona_type: &SedonaType,
293        num_iterations: usize,
294        mut func: F,
295    ) -> Result<()> {
296        self.iter_as_wkb_bytes(
297            sedona_type,
298            num_iterations,
299            |maybe_bytes| match maybe_bytes {
300                Some(wkb_bytes) => {
301                    let geom = factory.try_from_wkb(wkb_bytes)?;
302                    func(Some(geom))
303                }
304                None => func(None),
305            },
306        )
307    }
308
309    /// Apply a function for each element of self as an optional [Wkb]
310    ///
311    /// The function will always be called num_iteration types to support
312    /// efficient iteration over scalar containers (e.g., so that implementations
313    /// can parse the Wkb once and reuse the object).
314    fn iter_as_wkb<'a, F: FnMut(Option<Wkb<'a>>) -> Result<()>>(
315        &'a self,
316        sedona_type: &SedonaType,
317        num_iterations: usize,
318        func: F,
319    ) -> Result<()> {
320        let factory = WkbGeometryFactory {};
321        self.iter_with_factory(&factory, sedona_type, num_iterations, func)
322    }
323}
324
325/// Trait for obtaining geometry scalars from containers
326///
327/// Currently this is internal and only implemented for the [ScalarValue].
328trait ScalarGeo {
329    fn scalar_as_wkb_bytes(&self) -> Result<Option<&[u8]>>;
330
331    fn scalar_from_factory<T: GeometryFactory>(&self, factory: &T) -> Result<Option<T::Geom<'_>>> {
332        factory.try_from_maybe_wkb(self.scalar_as_wkb_bytes()?)
333    }
334}
335
336impl IterGeo for ArrayRef {
337    fn iter_as_wkb_bytes<'a, F: FnMut(Option<&'a [u8]>) -> Result<()>>(
338        &'a self,
339        sedona_type: &SedonaType,
340        num_iterations: usize,
341        mut func: F,
342    ) -> Result<()> {
343        if num_iterations != self.len() {
344            return sedona_internal_err!(
345                "Expected {num_iterations} items but got Array with {} items",
346                self.len()
347            );
348        }
349
350        match sedona_type {
351            SedonaType::Arrow(DataType::Null) => {
352                for _ in 0..num_iterations {
353                    func(None)?;
354                }
355
356                Ok(())
357            }
358            SedonaType::Wkb(_, _) => iter_wkb_binary(as_binary_array(self)?, func),
359            SedonaType::WkbView(_, _) => iter_wkb_binary(as_binary_view_array(self)?, func),
360            SedonaType::Arrow(DataType::Struct(fields))
361                if fields.len() == 2 && fields[0].name() == "item" && fields[1].name() == "crs" =>
362            {
363                let struct_array = as_struct_array(self)?;
364                let item_type = SedonaType::from_storage_field(&fields[0])?;
365                struct_array
366                    .column(0)
367                    .iter_as_wkb_bytes(&item_type, num_iterations, func)
368            }
369            _ => {
370                // We could cast here as a fallback, iterate and cast per-element, or
371                // implement iter_as_something_else()/supports_iter_xxx() when more geo array types
372                // are supported.
373                sedona_internal_err!("Can't iterate over {:?} as Wkb", sedona_type)
374            }
375        }
376    }
377}
378
379impl ScalarGeo for ScalarValue {
380    fn scalar_as_wkb_bytes(&self) -> Result<Option<&[u8]>> {
381        match self {
382            ScalarValue::Binary(maybe_item)
383            | ScalarValue::BinaryView(maybe_item)
384            | ScalarValue::LargeBinary(maybe_item) => Ok(maybe_item.as_deref()),
385            ScalarValue::Null => Ok(None),
386            ScalarValue::Struct(s)
387                if s.fields().len() == 2
388                    && s.fields()[0].name() == "item"
389                    && s.fields()[1].name() == "crs" =>
390            {
391                let item_type = SedonaType::from_storage_field(&s.fields()[0])?;
392                let mut out = None;
393                s.column(0).iter_as_wkb_bytes(&item_type, 1, |v| {
394                    out = v;
395                    Ok(())
396                })?;
397
398                Ok(out)
399            }
400            _ => sedona_internal_err!("Can't iterate over {:?} ScalarValue as &[u8]", self),
401        }
402    }
403}
404
405/// Helper to dispatch binary iteration over two arrays. The Scalar/Array,
406/// Array/Scalar, and Scalar/Scalar case are handled using the unary iteration
407/// infrastructure.
408fn iter_wkb_wkb_array<
409    'a,
410    Factory0: GeometryFactory,
411    Factory1: GeometryFactory,
412    F: FnMut(Option<&Factory0::Geom<'a>>, Option<&Factory1::Geom<'a>>) -> Result<()>,
413>(
414    factory0: &Factory0,
415    factory1: &Factory1,
416    types: (&SedonaType, &SedonaType),
417    arrays: (&'a ArrayRef, &'a ArrayRef),
418    func: F,
419) -> Result<()> {
420    let (array0, array1) = arrays;
421    match types {
422        (SedonaType::Wkb(_, _), SedonaType::Wkb(_, _)) => iter_wkb_wkb_binary(
423            factory0,
424            factory1,
425            as_binary_array(array0)?,
426            as_binary_array(array1)?,
427            func,
428        ),
429        (SedonaType::Wkb(_, _), SedonaType::WkbView(_, _)) => iter_wkb_wkb_binary(
430            factory0,
431            factory1,
432            as_binary_array(array0)?,
433            as_binary_view_array(array1)?,
434            func,
435        ),
436        (SedonaType::WkbView(_, _), SedonaType::Wkb(_, _)) => iter_wkb_wkb_binary(
437            factory0,
438            factory1,
439            as_binary_view_array(array0)?,
440            as_binary_array(array1)?,
441            func,
442        ),
443        (SedonaType::WkbView(_, _), SedonaType::WkbView(_, _)) => iter_wkb_wkb_binary(
444            factory0,
445            factory1,
446            as_binary_view_array(array0)?,
447            as_binary_view_array(array1)?,
448            func,
449        ),
450        _ => {
451            // We could do casting of one or both sides to support other cases as they
452            // arise to manage the complexity/performance balance
453            sedona_internal_err!(
454                "Can't iterate over {:?} and {:?} arrays as a pair of Wkb scalars",
455                types.0,
456                types.1
457            )
458        }
459    }
460}
461
462/// Generic function to iterate over a pair of optional bytes providers
463/// (e.g., various concrete array types)
464fn iter_wkb_wkb_binary<
465    'a,
466    Factory0: GeometryFactory,
467    Factory1: GeometryFactory,
468    T0: IntoIterator<Item = Option<&'a [u8]>>,
469    T1: IntoIterator<Item = Option<&'a [u8]>>,
470    F: FnMut(Option<&Factory0::Geom<'a>>, Option<&Factory1::Geom<'a>>) -> Result<()>,
471>(
472    factory0: &Factory0,
473    factory1: &Factory1,
474    iterable0: T0,
475    iterable1: T1,
476    mut func: F,
477) -> Result<()> {
478    for (item0, item1) in zip(iterable0, iterable1) {
479        let wkb0 = factory0.try_from_maybe_wkb(item0)?;
480        let wkb1 = factory1.try_from_maybe_wkb(item1)?;
481        func(wkb0.as_ref(), wkb1.as_ref())?;
482    }
483
484    Ok(())
485}
486
487/// Generic function to iterate over a single provider of optional wkb byte slices
488/// (e.g., concrete array types)
489fn iter_wkb_binary<
490    'a,
491    T: IntoIterator<Item = Option<&'a [u8]>>,
492    F: FnMut(Option<&'a [u8]>) -> Result<()>,
493>(
494    iterable: T,
495    mut func: F,
496) -> Result<()> {
497    for item in iterable.into_iter() {
498        func(item)?;
499    }
500
501    Ok(())
502}
503
504#[cfg(test)]
505mod tests {
506    use std::fmt::Write;
507    use std::sync::Arc;
508
509    use super::*;
510    use arrow_array::{builder::BinaryBuilder, create_array};
511    use arrow_schema::DataType;
512    use datafusion_common::{cast::as_binary_view_array, scalar::ScalarValue};
513    use datafusion_expr::ColumnarValue;
514    use rstest::rstest;
515    use sedona_schema::datatypes::{WKB_GEOMETRY, WKB_VIEW_GEOMETRY};
516
517    const POINT: [u8; 21] = [
518        0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xf0, 0x3f, 0x00, 0x00,
519        0x00, 0x00, 0x00, 0x00, 0x00, 0x40,
520    ];
521
522    fn write_to_test_output(
523        wkt_out: &mut String,
524        i: usize,
525        maybe_geom: Option<&Wkb>,
526    ) -> Result<()> {
527        write!(wkt_out, " {i}: ").unwrap();
528        match maybe_geom {
529            Some(geom) => wkt::to_wkt::write_geometry(wkt_out, &geom).unwrap(),
530            None => write!(wkt_out, "None").unwrap(),
531        };
532
533        Ok(())
534    }
535
536    #[test]
537    fn wkb_array() {
538        let mut builder = BinaryBuilder::new();
539        builder.append_value(POINT);
540        builder.append_null();
541        builder.append_value(POINT);
542        let wkb_array = builder.finish();
543        let wkb_array_ref: ArrayRef = Arc::new(wkb_array.clone());
544
545        let mut out = Vec::new();
546        iter_wkb_binary(&wkb_array, |x| {
547            out.push(x.is_some());
548            Ok(())
549        })
550        .unwrap();
551        assert_eq!(out, vec![true, false, true]);
552
553        let mut wkt_out = String::new();
554        let mut i = 0;
555        wkb_array_ref
556            .iter_as_wkb(&WKB_GEOMETRY, 3, |maybe_geom| {
557                write_to_test_output(&mut wkt_out, i, maybe_geom.as_ref()).unwrap();
558                i += 1;
559                Ok(())
560            })
561            .unwrap();
562        assert_eq!(wkt_out, " 0: POINT(1 2) 1: None 2: POINT(1 2)");
563
564        let wkb_view_array_ref = ColumnarValue::Array(wkb_array_ref)
565            .cast_to(&DataType::BinaryView, None)
566            .unwrap()
567            .to_array(3)
568            .unwrap();
569        let wkb_view_array = as_binary_view_array(&wkb_view_array_ref).unwrap();
570
571        let mut out = Vec::new();
572        iter_wkb_binary(wkb_view_array, |x| {
573            out.push(x.is_some());
574            Ok(())
575        })
576        .unwrap();
577        assert_eq!(out, vec![true, false, true]);
578
579        let mut wkt_out = String::new();
580        let mut i = 0;
581        wkb_view_array_ref
582            .iter_as_wkb(&WKB_VIEW_GEOMETRY, 3, |maybe_geom| {
583                write_to_test_output(&mut wkt_out, i, maybe_geom.as_ref()).unwrap();
584                i += 1;
585                Ok(())
586            })
587            .unwrap();
588        assert_eq!(wkt_out, " 0: POINT(1 2) 1: None 2: POINT(1 2)");
589    }
590
591    #[rstest]
592    fn wkb_wkb_types(
593        #[values(
594            (WKB_GEOMETRY, WKB_GEOMETRY),
595            (WKB_GEOMETRY, WKB_VIEW_GEOMETRY),
596            (WKB_VIEW_GEOMETRY, WKB_GEOMETRY),
597            (WKB_VIEW_GEOMETRY, WKB_VIEW_GEOMETRY))
598        ]
599        types: (SedonaType, SedonaType),
600    ) {
601        let (left_type, right_type) = types;
602
603        let mut builder = BinaryBuilder::new();
604        builder.append_value(POINT);
605        builder.append_null();
606        builder.append_value(POINT);
607        builder.append_null();
608        let binary_array0 = builder.finish();
609
610        let mut builder = BinaryBuilder::new();
611        builder.append_value(POINT);
612        builder.append_value(POINT);
613        builder.append_null();
614        builder.append_null();
615        let binary_array1 = builder.finish();
616
617        let value0 = ColumnarValue::Array(Arc::new(binary_array0.clone()))
618            .cast_to(left_type.storage_type(), None)
619            .unwrap();
620
621        let value1 = ColumnarValue::Array(Arc::new(binary_array1.clone()))
622            .cast_to(right_type.storage_type(), None)
623            .unwrap();
624
625        let arg_types = [left_type, right_type];
626        let args = [value0, value1];
627        let executor = WkbExecutor::new(&arg_types, &args);
628
629        let mut wkt_out = String::new();
630        let mut i = 0;
631        executor
632            .execute_wkb_wkb_void(|maybe_geom0, maybe_geom1| {
633                write_to_test_output(&mut wkt_out, i, maybe_geom0)?;
634                write_to_test_output(&mut wkt_out, i, maybe_geom1)?;
635                i += 1;
636                Ok(())
637            })
638            .unwrap();
639        assert_eq!(
640            wkt_out,
641            " 0: POINT(1 2) 0: POINT(1 2) 1: None 1: POINT(1 2) 2: POINT(1 2) 2: None 3: None 3: None"
642        );
643    }
644
645    #[test]
646    fn wkb_wkb_scalar_array() {
647        let mut builder = BinaryBuilder::new();
648        builder.append_value(POINT);
649        builder.append_null();
650        let wkb_array = builder.finish();
651        let wkb_array_value = ColumnarValue::Array(Arc::new(wkb_array));
652        let wkb_scalar_value = ColumnarValue::Scalar(ScalarValue::Binary(Some(POINT.to_vec())));
653        let wkb_scalar_null_value = ColumnarValue::Scalar(ScalarValue::Binary(None));
654
655        let arg_types = [WKB_GEOMETRY, WKB_GEOMETRY];
656        let args = [wkb_array_value.clone(), wkb_scalar_value.clone()];
657        let executor = WkbExecutor::new(&arg_types, &args);
658
659        let mut wkt_out = String::new();
660        let mut i = 0;
661        executor
662            .execute_wkb_wkb_void(|maybe_geom0, maybe_geom1| {
663                write_to_test_output(&mut wkt_out, i, maybe_geom0)?;
664                write_to_test_output(&mut wkt_out, i, maybe_geom1)?;
665                i += 1;
666                Ok(())
667            })
668            .unwrap();
669        assert_eq!(
670            wkt_out,
671            " 0: POINT(1 2) 0: POINT(1 2) 1: None 1: POINT(1 2)"
672        );
673
674        let args = [wkb_array_value.clone(), wkb_scalar_value.clone()];
675        let executor = WkbExecutor::new(&arg_types, &args);
676
677        let mut wkt_out = String::new();
678        let mut i = 0;
679        executor
680            .execute_wkb_wkb_void(|maybe_geom0, maybe_geom1| {
681                write_to_test_output(&mut wkt_out, i, maybe_geom0)?;
682                write_to_test_output(&mut wkt_out, i, maybe_geom1)?;
683                i += 1;
684                Ok(())
685            })
686            .unwrap();
687        assert_eq!(
688            wkt_out,
689            " 0: POINT(1 2) 0: POINT(1 2) 1: None 1: POINT(1 2)"
690        );
691
692        let args = [wkb_array_value.clone(), wkb_array_value.clone()];
693        let executor = WkbExecutor::new(&arg_types, &args);
694
695        let mut wkt_out = String::new();
696        let mut i = 0;
697        executor
698            .execute_wkb_wkb_void(|maybe_geom0, maybe_geom1| {
699                write_to_test_output(&mut wkt_out, i, maybe_geom0)?;
700                write_to_test_output(&mut wkt_out, i, maybe_geom1)?;
701                i += 1;
702                Ok(())
703            })
704            .unwrap();
705        assert_eq!(wkt_out, " 0: POINT(1 2) 0: POINT(1 2) 1: None 1: None");
706
707        let args = [wkb_scalar_null_value.clone(), wkb_scalar_value.clone()];
708        let executor = WkbExecutor::new(&arg_types, &args);
709
710        let mut wkt_out = String::new();
711        let mut i = 0;
712        executor
713            .execute_wkb_wkb_void(|maybe_geom0, maybe_geom1| {
714                write_to_test_output(&mut wkt_out, i, maybe_geom0)?;
715                write_to_test_output(&mut wkt_out, i, maybe_geom1)?;
716                i += 1;
717                Ok(())
718            })
719            .unwrap();
720        assert_eq!(wkt_out, " 0: None 0: POINT(1 2)");
721    }
722
723    #[test]
724    fn wkb_array_errors() {
725        let not_geometry_array: ArrayRef = create_array!(Int32, [0, 1, 2]);
726        let err = not_geometry_array
727            .iter_as_wkb(&SedonaType::Arrow(DataType::Int32), 3, |_| unreachable!())
728            .unwrap_err();
729        assert!(err.message().contains("Can't iterate over"));
730
731        let err = not_geometry_array
732            .iter_as_wkb(
733                &SedonaType::Arrow(DataType::Int32),
734                1000000,
735                |_| unreachable!(),
736            )
737            .unwrap_err();
738        assert!(err
739            .message()
740            .contains("Expected 1000000 items but got Array with 3 items"));
741
742        let factory0 = WkbGeometryFactory::default();
743        let factory1 = WkbGeometryFactory::default();
744        let err = iter_wkb_wkb_array(
745            &factory0,
746            &factory1,
747            (
748                &SedonaType::Arrow(DataType::Int32),
749                &SedonaType::Arrow(DataType::Int32),
750            ),
751            (&not_geometry_array, &not_geometry_array),
752            |_, _| unreachable!(),
753        )
754        .unwrap_err();
755        assert!(err.message().contains(
756            "Can't iterate over Arrow(Int32) and Arrow(Int32) arrays as a pair of Wkb scalars"
757        ));
758    }
759
760    #[test]
761    fn wkb_scalar() {
762        let factory = WkbGeometryFactory::default();
763        assert!(ScalarValue::Binary(None)
764            .scalar_from_factory(&factory)
765            .unwrap()
766            .is_none());
767
768        let mut wkt_out = String::new();
769        let binary_scalar = ScalarValue::Binary(Some(POINT.to_vec()));
770
771        let wkb_item = binary_scalar
772            .scalar_from_factory(&factory)
773            .unwrap()
774            .unwrap();
775        wkt::to_wkt::write_geometry(&mut wkt_out, &wkb_item).unwrap();
776        assert_eq!(wkt_out, "POINT(1 2)");
777        drop(wkb_item);
778
779        let mut wkt_out = String::new();
780        let binary_scalar = ScalarValue::BinaryView(Some(POINT.to_vec()));
781
782        let wkb_item = binary_scalar
783            .scalar_from_factory(&factory)
784            .unwrap()
785            .unwrap();
786        wkt::to_wkt::write_geometry(&mut wkt_out, &wkb_item).unwrap();
787        assert_eq!(wkt_out, "POINT(1 2)");
788        drop(wkb_item);
789
790        let null_item = ScalarValue::Null.scalar_from_factory(&factory).unwrap();
791        assert!(null_item.is_none());
792
793        let err = ScalarValue::Binary(Some(vec![]))
794            .scalar_from_factory(&factory)
795            .unwrap_err();
796        assert_eq!(err.message(), "failed to fill whole buffer");
797
798        let err = ScalarValue::Date32(Some(1))
799            .scalar_from_factory(&factory)
800            .unwrap_err();
801        assert!(err.message().contains("Can't iterate over"));
802    }
803}