Skip to main content

sedona_spatial_join/
operand_evaluator.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17use core::fmt;
18use std::{mem::transmute, sync::Arc};
19
20use arrow_array::{Array, ArrayRef, Float64Array, RecordBatch};
21use arrow_schema::DataType;
22use datafusion_common::{utils::proxy::VecAllocExt, JoinSide, Result, ScalarValue};
23use datafusion_expr::ColumnarValue;
24use datafusion_physical_expr::PhysicalExpr;
25use float_next_after::NextAfter;
26use geo_index::rtree::util::f64_box_to_f32;
27use geo_types::{coord, Rect};
28use sedona_functions::executor::IterGeo;
29use sedona_geo_generic_alg::BoundingRect;
30use sedona_schema::datatypes::SedonaType;
31use wkb::reader::Wkb;
32
33use sedona_common::option::SpatialJoinOptions;
34use sedona_common::sedona_internal_err;
35
36use crate::{
37    spatial_predicate::{DistancePredicate, KNNPredicate, RelationPredicate, SpatialPredicate},
38    utils::arrow_utils::get_array_memory_size,
39};
40
41/// Operand evaluator is for evaluating the operands of a spatial predicate. It can be a distance
42/// operand evaluator or a relation operand evaluator.
43pub(crate) trait OperandEvaluator: fmt::Debug + Send + Sync {
44    /// Evaluate the spatial predicate operand on the build side.
45    fn evaluate_build(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray> {
46        let geom_expr = self.build_side_expr()?;
47        evaluate_with_rects(batch, &geom_expr)
48    }
49
50    /// Evaluate the spatial predicate operand on the probe side.
51    fn evaluate_probe(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray> {
52        let geom_expr = self.probe_side_expr()?;
53        evaluate_with_rects(batch, &geom_expr)
54    }
55
56    /// Resolve the distance operand for a given row.
57    fn resolve_distance(
58        &self,
59        _build_distance: &Option<ColumnarValue>,
60        _build_row_idx: usize,
61        _probe_distance: &Option<f64>,
62    ) -> Result<Option<f64>> {
63        Ok(None)
64    }
65
66    /// Get the expression for the build side.
67    fn build_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>>;
68
69    /// Get the expression for the probe side.
70    fn probe_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>>;
71}
72
73/// Create a spatial predicate evaluator for the spatial predicate.
74pub(crate) fn create_operand_evaluator(
75    predicate: &SpatialPredicate,
76    options: SpatialJoinOptions,
77) -> Arc<dyn OperandEvaluator> {
78    match predicate {
79        SpatialPredicate::Distance(predicate) => {
80            Arc::new(DistanceOperandEvaluator::new(predicate.clone(), options))
81        }
82        SpatialPredicate::Relation(predicate) => {
83            Arc::new(RelationOperandEvaluator::new(predicate.clone(), options))
84        }
85        SpatialPredicate::KNearestNeighbors(predicate) => {
86            Arc::new(KNNOperandEvaluator::new(predicate.clone()))
87        }
88    }
89}
90
91/// Result of evaluating a geometry batch.
92pub struct EvaluatedGeometryArray {
93    /// Type of geometry_array
94    pub sedona_type: SedonaType,
95    /// The array of geometries produced by evaluating the geometry expression.
96    pub geometry_array: ArrayRef,
97    /// The rects of the geometries in the geometry array. The length of this array is equal to the number of geometries.
98    /// The rects will be None for empty or null geometries.
99    pub rects: Vec<Option<Rect<f32>>>,
100    /// The distance value produced by evaluating the distance expression.
101    pub distance: Option<ColumnarValue>,
102    /// WKBs of the geometries in `geometry_array`. The wkb values reference buffers inside the geometry array,
103    /// but we'll only allow accessing Wkb<'a> where 'a is the lifetime of the GeometryBatchResult to make
104    /// the interfaces safe. The buffers in `geometry_array` are allocated on the heap and won't be moved when
105    /// the GeometryBatchResult is moved, so we don't need to worry about pinning.
106    wkbs: Vec<Option<Wkb<'static>>>,
107}
108
109impl EvaluatedGeometryArray {
110    pub fn try_new(geometry_array: ArrayRef, sedona_type: &SedonaType) -> Result<Self> {
111        let num_rows = geometry_array.len();
112        let mut rect_vec = Vec::with_capacity(num_rows);
113        let mut wkbs = Vec::with_capacity(num_rows);
114        geometry_array.iter_as_wkb(sedona_type, num_rows, |wkb_opt| {
115            let rect_opt = if let Some(wkb) = &wkb_opt {
116                if let Some(rect) = wkb.bounding_rect() {
117                    let min = rect.min();
118                    let max = rect.max();
119                    // f64_box_to_f32 will ensure the resulting `f32` box is no smaller than the `f64` box.
120                    let (min_x, min_y, max_x, max_y) = f64_box_to_f32(min.x, min.y, max.x, max.y);
121                    let rect = Rect::new(coord!(x: min_x, y: min_y), coord!(x: max_x, y: max_y));
122                    Some(rect)
123                } else {
124                    None
125                }
126            } else {
127                None
128            };
129            rect_vec.push(rect_opt);
130            wkbs.push(wkb_opt);
131            Ok(())
132        })?;
133
134        // Safety: The wkbs must reference buffers inside the `geometry_array`. Since the `geometry_array` and
135        // `wkbs` are both owned by the `EvaluatedGeometryArray`, so they have the same lifetime. We'll never
136        // have a situation where the `EvaluatedGeometryArray` is dropped while the `wkbs` are still in use
137        // (guaranteed by the scope of the `wkbs` field and lifetime signature of the `wkbs` method).
138        let wkbs = wkbs
139            .into_iter()
140            .map(|wkb| wkb.map(|wkb| unsafe { transmute(wkb) }))
141            .collect();
142        Ok(Self {
143            sedona_type: sedona_type.clone(),
144            geometry_array,
145            rects: rect_vec,
146            distance: None,
147            wkbs,
148        })
149    }
150
151    /// Get the WKBs of the geometries in the geometry array.
152    pub fn wkbs(&self) -> &Vec<Option<Wkb<'_>>> {
153        // The returned WKBs are guaranteed to be valid for the lifetime of the GeometryBatchResult,
154        // because the WKBs reference buffers inside `geometry_array`, which is guaranteed to be valid
155        // for the lifetime of the GeometryBatchResult. We shorten the lifetime of the WKBs from 'static
156        // to '_, so that the caller can use the WKBs without worrying about the lifetime.
157        &self.wkbs
158    }
159
160    pub fn in_mem_size(&self) -> Result<usize> {
161        let geom_array_size = get_array_memory_size(&self.geometry_array)?;
162
163        let distance_in_mem_size = match &self.distance {
164            Some(ColumnarValue::Array(array)) => get_array_memory_size(array)?,
165            _ => 8,
166        };
167
168        // Note: this is not an accurate, because wkbs has inner Vecs. However, the size of inner vecs
169        // should be small, so the inaccuracy does not matter too much.
170        let wkb_vec_size = self.wkbs.allocated_size();
171
172        Ok(geom_array_size + self.rects.allocated_size() + distance_in_mem_size + wkb_vec_size)
173    }
174}
175
176/// Evaluator for a relation predicate.
177#[derive(Debug)]
178struct RelationOperandEvaluator {
179    inner: RelationPredicate,
180    _options: SpatialJoinOptions,
181}
182
183impl RelationOperandEvaluator {
184    pub fn new(inner: RelationPredicate, options: SpatialJoinOptions) -> Self {
185        Self {
186            inner,
187            _options: options,
188        }
189    }
190}
191
192/// Evaluator for a distance predicate.
193#[derive(Debug)]
194struct DistanceOperandEvaluator {
195    inner: DistancePredicate,
196    _options: SpatialJoinOptions,
197}
198
199impl DistanceOperandEvaluator {
200    pub fn new(inner: DistancePredicate, options: SpatialJoinOptions) -> Self {
201        Self {
202            inner,
203            _options: options,
204        }
205    }
206}
207
208fn evaluate_with_rects(
209    batch: &RecordBatch,
210    geom_expr: &Arc<dyn PhysicalExpr>,
211) -> Result<EvaluatedGeometryArray> {
212    let geometry_columnar_value = geom_expr.evaluate(batch)?;
213    let num_rows = batch.num_rows();
214    let geometry_array = geometry_columnar_value.to_array(num_rows)?;
215    let sedona_type =
216        SedonaType::from_storage_field(geom_expr.return_field(&batch.schema())?.as_ref())?;
217    EvaluatedGeometryArray::try_new(geometry_array, &sedona_type)
218}
219
220impl DistanceOperandEvaluator {
221    fn evaluate_with_rects(
222        &self,
223        batch: &RecordBatch,
224        geom_expr: &Arc<dyn PhysicalExpr>,
225        side: JoinSide,
226    ) -> Result<EvaluatedGeometryArray> {
227        let mut result = evaluate_with_rects(batch, geom_expr)?;
228
229        let should_expand = match side {
230            JoinSide::Left => self.inner.distance_side == JoinSide::Left,
231            JoinSide::Right => self.inner.distance_side != JoinSide::Left,
232            JoinSide::None => unreachable!(),
233        };
234
235        if !should_expand {
236            return Ok(result);
237        }
238
239        // Expand the vec by distance
240        let distance_columnar_value = self.inner.distance.evaluate(batch)?;
241        // No timezone conversion needed for distance; pass None as cast_options explicitly.
242        let distance_columnar_value = distance_columnar_value.cast_to(&DataType::Float64, None)?;
243        match &distance_columnar_value {
244            ColumnarValue::Scalar(ScalarValue::Float64(Some(distance))) => {
245                result.rects.iter_mut().for_each(|rect_opt| {
246                    if let Some(rect) = rect_opt {
247                        expand_rect_in_place(rect, *distance);
248                    };
249                });
250            }
251            ColumnarValue::Scalar(ScalarValue::Float64(None)) => {
252                // Distance expression evaluates to NULL, the resulting distance should be NULL as well.
253                result.rects.clear();
254            }
255            ColumnarValue::Array(array) => {
256                if let Some(array) = array.as_any().downcast_ref::<Float64Array>() {
257                    for (geom_idx, rect_opt) in result.rects.iter_mut().enumerate() {
258                        if !array.is_null(geom_idx) {
259                            let dist = array.value(geom_idx);
260                            if let Some(rect) = rect_opt {
261                                expand_rect_in_place(rect, dist);
262                            };
263                        }
264                    }
265                } else {
266                    return sedona_internal_err!("Distance columnar value is not a Float64Array");
267                }
268            }
269            _ => {
270                return sedona_internal_err!("Distance columnar value is not a Float64");
271            }
272        }
273
274        result.distance = Some(distance_columnar_value);
275        Ok(result)
276    }
277}
278
279pub(crate) fn distance_value_at(
280    distance_columnar_value: &ColumnarValue,
281    i: usize,
282) -> Result<Option<f64>> {
283    match distance_columnar_value {
284        ColumnarValue::Scalar(ScalarValue::Float64(dist_opt)) => Ok(*dist_opt),
285        ColumnarValue::Array(array) => {
286            if let Some(array) = array.as_any().downcast_ref::<Float64Array>() {
287                if array.is_null(i) {
288                    Ok(None)
289                } else {
290                    Ok(Some(array.value(i)))
291                }
292            } else {
293                sedona_internal_err!("Distance columnar value is not a Float64Array")
294            }
295        }
296        _ => sedona_internal_err!("Distance columnar value is not a Float64"),
297    }
298}
299
300fn expand_rect_in_place(rect: &mut Rect<f32>, distance: f64) {
301    let mut min = rect.min();
302    let mut max = rect.max();
303    let mut distance_f32 = distance as f32;
304    // distance_f32 may be smaller than the original f64 value due to loss of precision.
305    // We need to expand the rect using next_after to ensure that the rect expansion
306    // is always inclusive, otherwise we may miss some query results.
307    if (distance_f32 as f64) < distance {
308        distance_f32 = distance_f32.next_after(f32::INFINITY);
309    }
310    min.x -= distance_f32;
311    min.y -= distance_f32;
312    max.x += distance_f32;
313    max.y += distance_f32;
314    rect.set_min(min);
315    rect.set_max(max);
316}
317
318impl OperandEvaluator for DistanceOperandEvaluator {
319    fn evaluate_build(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray> {
320        let geom_expr = self.build_side_expr()?;
321        self.evaluate_with_rects(batch, &geom_expr, JoinSide::Left)
322    }
323
324    fn evaluate_probe(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray> {
325        let geom_expr = self.probe_side_expr()?;
326        self.evaluate_with_rects(batch, &geom_expr, JoinSide::Right)
327    }
328
329    fn build_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
330        Ok(Arc::clone(&self.inner.left))
331    }
332
333    fn probe_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
334        Ok(Arc::clone(&self.inner.right))
335    }
336
337    fn resolve_distance(
338        &self,
339        build_distance: &Option<ColumnarValue>,
340        build_row_idx: usize,
341        probe_distance: &Option<f64>,
342    ) -> Result<Option<f64>> {
343        match self.inner.distance_side {
344            JoinSide::Left => {
345                let Some(distance) = build_distance else {
346                    return Ok(None);
347                };
348                distance_value_at(distance, build_row_idx)
349            }
350            JoinSide::Right | JoinSide::None => Ok(*probe_distance),
351        }
352    }
353}
354
355impl OperandEvaluator for RelationOperandEvaluator {
356    fn build_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
357        Ok(Arc::clone(&self.inner.left))
358    }
359
360    fn probe_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
361        Ok(Arc::clone(&self.inner.right))
362    }
363}
364
365/// KNN operand evaluator for evaluating the KNN predicate.
366#[derive(Debug)]
367struct KNNOperandEvaluator {
368    inner: KNNPredicate,
369}
370
371impl KNNOperandEvaluator {
372    fn new(inner: KNNPredicate) -> Self {
373        Self { inner }
374    }
375}
376
377impl OperandEvaluator for KNNOperandEvaluator {
378    fn build_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
379        // For KNN, the right side (objects/candidates) is the build side
380        Ok(Arc::clone(&self.inner.right))
381    }
382
383    fn probe_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
384        // For KNN, the left side (queries) is the probe side
385        Ok(Arc::clone(&self.inner.left))
386    }
387
388    /// Resolve the k value for KNN operation
389    fn resolve_distance(
390        &self,
391        _build_distance: &Option<ColumnarValue>,
392        _build_row_idx: usize,
393        _probe_distance: &Option<f64>,
394    ) -> Result<Option<f64>> {
395        // NOTE: We do not support distance-based refinement for KNN predicates in the refiner phase.
396        Ok(None)
397    }
398}