1use core::fmt;
18use std::{mem::transmute, sync::Arc};
19
20use arrow_array::{Array, ArrayRef, Float64Array, RecordBatch};
21use arrow_schema::DataType;
22use datafusion_common::{utils::proxy::VecAllocExt, JoinSide, Result, ScalarValue};
23use datafusion_expr::ColumnarValue;
24use datafusion_physical_expr::PhysicalExpr;
25use float_next_after::NextAfter;
26use geo_index::rtree::util::f64_box_to_f32;
27use geo_types::{coord, Rect};
28use sedona_functions::executor::IterGeo;
29use sedona_geo_generic_alg::BoundingRect;
30use sedona_schema::datatypes::SedonaType;
31use wkb::reader::Wkb;
32
33use sedona_common::option::SpatialJoinOptions;
34use sedona_common::sedona_internal_err;
35
36use crate::{
37 spatial_predicate::{DistancePredicate, KNNPredicate, RelationPredicate, SpatialPredicate},
38 utils::arrow_utils::get_array_memory_size,
39};
40
41pub(crate) trait OperandEvaluator: fmt::Debug + Send + Sync {
44 fn evaluate_build(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray> {
46 let geom_expr = self.build_side_expr()?;
47 evaluate_with_rects(batch, &geom_expr)
48 }
49
50 fn evaluate_probe(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray> {
52 let geom_expr = self.probe_side_expr()?;
53 evaluate_with_rects(batch, &geom_expr)
54 }
55
56 fn resolve_distance(
58 &self,
59 _build_distance: &Option<ColumnarValue>,
60 _build_row_idx: usize,
61 _probe_distance: &Option<f64>,
62 ) -> Result<Option<f64>> {
63 Ok(None)
64 }
65
66 fn build_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>>;
68
69 fn probe_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>>;
71}
72
73pub(crate) fn create_operand_evaluator(
75 predicate: &SpatialPredicate,
76 options: SpatialJoinOptions,
77) -> Arc<dyn OperandEvaluator> {
78 match predicate {
79 SpatialPredicate::Distance(predicate) => {
80 Arc::new(DistanceOperandEvaluator::new(predicate.clone(), options))
81 }
82 SpatialPredicate::Relation(predicate) => {
83 Arc::new(RelationOperandEvaluator::new(predicate.clone(), options))
84 }
85 SpatialPredicate::KNearestNeighbors(predicate) => {
86 Arc::new(KNNOperandEvaluator::new(predicate.clone()))
87 }
88 }
89}
90
91pub struct EvaluatedGeometryArray {
93 pub sedona_type: SedonaType,
95 pub geometry_array: ArrayRef,
97 pub rects: Vec<Option<Rect<f32>>>,
100 pub distance: Option<ColumnarValue>,
102 wkbs: Vec<Option<Wkb<'static>>>,
107}
108
109impl EvaluatedGeometryArray {
110 pub fn try_new(geometry_array: ArrayRef, sedona_type: &SedonaType) -> Result<Self> {
111 let num_rows = geometry_array.len();
112 let mut rect_vec = Vec::with_capacity(num_rows);
113 let mut wkbs = Vec::with_capacity(num_rows);
114 geometry_array.iter_as_wkb(sedona_type, num_rows, |wkb_opt| {
115 let rect_opt = if let Some(wkb) = &wkb_opt {
116 if let Some(rect) = wkb.bounding_rect() {
117 let min = rect.min();
118 let max = rect.max();
119 let (min_x, min_y, max_x, max_y) = f64_box_to_f32(min.x, min.y, max.x, max.y);
121 let rect = Rect::new(coord!(x: min_x, y: min_y), coord!(x: max_x, y: max_y));
122 Some(rect)
123 } else {
124 None
125 }
126 } else {
127 None
128 };
129 rect_vec.push(rect_opt);
130 wkbs.push(wkb_opt);
131 Ok(())
132 })?;
133
134 let wkbs = wkbs
139 .into_iter()
140 .map(|wkb| wkb.map(|wkb| unsafe { transmute(wkb) }))
141 .collect();
142 Ok(Self {
143 sedona_type: sedona_type.clone(),
144 geometry_array,
145 rects: rect_vec,
146 distance: None,
147 wkbs,
148 })
149 }
150
151 pub fn wkbs(&self) -> &Vec<Option<Wkb<'_>>> {
153 &self.wkbs
158 }
159
160 pub fn in_mem_size(&self) -> Result<usize> {
161 let geom_array_size = get_array_memory_size(&self.geometry_array)?;
162
163 let distance_in_mem_size = match &self.distance {
164 Some(ColumnarValue::Array(array)) => get_array_memory_size(array)?,
165 _ => 8,
166 };
167
168 let wkb_vec_size = self.wkbs.allocated_size();
171
172 Ok(geom_array_size + self.rects.allocated_size() + distance_in_mem_size + wkb_vec_size)
173 }
174}
175
176#[derive(Debug)]
178struct RelationOperandEvaluator {
179 inner: RelationPredicate,
180 _options: SpatialJoinOptions,
181}
182
183impl RelationOperandEvaluator {
184 pub fn new(inner: RelationPredicate, options: SpatialJoinOptions) -> Self {
185 Self {
186 inner,
187 _options: options,
188 }
189 }
190}
191
192#[derive(Debug)]
194struct DistanceOperandEvaluator {
195 inner: DistancePredicate,
196 _options: SpatialJoinOptions,
197}
198
199impl DistanceOperandEvaluator {
200 pub fn new(inner: DistancePredicate, options: SpatialJoinOptions) -> Self {
201 Self {
202 inner,
203 _options: options,
204 }
205 }
206}
207
208fn evaluate_with_rects(
209 batch: &RecordBatch,
210 geom_expr: &Arc<dyn PhysicalExpr>,
211) -> Result<EvaluatedGeometryArray> {
212 let geometry_columnar_value = geom_expr.evaluate(batch)?;
213 let num_rows = batch.num_rows();
214 let geometry_array = geometry_columnar_value.to_array(num_rows)?;
215 let sedona_type =
216 SedonaType::from_storage_field(geom_expr.return_field(&batch.schema())?.as_ref())?;
217 EvaluatedGeometryArray::try_new(geometry_array, &sedona_type)
218}
219
220impl DistanceOperandEvaluator {
221 fn evaluate_with_rects(
222 &self,
223 batch: &RecordBatch,
224 geom_expr: &Arc<dyn PhysicalExpr>,
225 side: JoinSide,
226 ) -> Result<EvaluatedGeometryArray> {
227 let mut result = evaluate_with_rects(batch, geom_expr)?;
228
229 let should_expand = match side {
230 JoinSide::Left => self.inner.distance_side == JoinSide::Left,
231 JoinSide::Right => self.inner.distance_side != JoinSide::Left,
232 JoinSide::None => unreachable!(),
233 };
234
235 if !should_expand {
236 return Ok(result);
237 }
238
239 let distance_columnar_value = self.inner.distance.evaluate(batch)?;
241 let distance_columnar_value = distance_columnar_value.cast_to(&DataType::Float64, None)?;
243 match &distance_columnar_value {
244 ColumnarValue::Scalar(ScalarValue::Float64(Some(distance))) => {
245 result.rects.iter_mut().for_each(|rect_opt| {
246 if let Some(rect) = rect_opt {
247 expand_rect_in_place(rect, *distance);
248 };
249 });
250 }
251 ColumnarValue::Scalar(ScalarValue::Float64(None)) => {
252 result.rects.clear();
254 }
255 ColumnarValue::Array(array) => {
256 if let Some(array) = array.as_any().downcast_ref::<Float64Array>() {
257 for (geom_idx, rect_opt) in result.rects.iter_mut().enumerate() {
258 if !array.is_null(geom_idx) {
259 let dist = array.value(geom_idx);
260 if let Some(rect) = rect_opt {
261 expand_rect_in_place(rect, dist);
262 };
263 }
264 }
265 } else {
266 return sedona_internal_err!("Distance columnar value is not a Float64Array");
267 }
268 }
269 _ => {
270 return sedona_internal_err!("Distance columnar value is not a Float64");
271 }
272 }
273
274 result.distance = Some(distance_columnar_value);
275 Ok(result)
276 }
277}
278
279pub(crate) fn distance_value_at(
280 distance_columnar_value: &ColumnarValue,
281 i: usize,
282) -> Result<Option<f64>> {
283 match distance_columnar_value {
284 ColumnarValue::Scalar(ScalarValue::Float64(dist_opt)) => Ok(*dist_opt),
285 ColumnarValue::Array(array) => {
286 if let Some(array) = array.as_any().downcast_ref::<Float64Array>() {
287 if array.is_null(i) {
288 Ok(None)
289 } else {
290 Ok(Some(array.value(i)))
291 }
292 } else {
293 sedona_internal_err!("Distance columnar value is not a Float64Array")
294 }
295 }
296 _ => sedona_internal_err!("Distance columnar value is not a Float64"),
297 }
298}
299
300fn expand_rect_in_place(rect: &mut Rect<f32>, distance: f64) {
301 let mut min = rect.min();
302 let mut max = rect.max();
303 let mut distance_f32 = distance as f32;
304 if (distance_f32 as f64) < distance {
308 distance_f32 = distance_f32.next_after(f32::INFINITY);
309 }
310 min.x -= distance_f32;
311 min.y -= distance_f32;
312 max.x += distance_f32;
313 max.y += distance_f32;
314 rect.set_min(min);
315 rect.set_max(max);
316}
317
318impl OperandEvaluator for DistanceOperandEvaluator {
319 fn evaluate_build(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray> {
320 let geom_expr = self.build_side_expr()?;
321 self.evaluate_with_rects(batch, &geom_expr, JoinSide::Left)
322 }
323
324 fn evaluate_probe(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray> {
325 let geom_expr = self.probe_side_expr()?;
326 self.evaluate_with_rects(batch, &geom_expr, JoinSide::Right)
327 }
328
329 fn build_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
330 Ok(Arc::clone(&self.inner.left))
331 }
332
333 fn probe_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
334 Ok(Arc::clone(&self.inner.right))
335 }
336
337 fn resolve_distance(
338 &self,
339 build_distance: &Option<ColumnarValue>,
340 build_row_idx: usize,
341 probe_distance: &Option<f64>,
342 ) -> Result<Option<f64>> {
343 match self.inner.distance_side {
344 JoinSide::Left => {
345 let Some(distance) = build_distance else {
346 return Ok(None);
347 };
348 distance_value_at(distance, build_row_idx)
349 }
350 JoinSide::Right | JoinSide::None => Ok(*probe_distance),
351 }
352 }
353}
354
355impl OperandEvaluator for RelationOperandEvaluator {
356 fn build_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
357 Ok(Arc::clone(&self.inner.left))
358 }
359
360 fn probe_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
361 Ok(Arc::clone(&self.inner.right))
362 }
363}
364
365#[derive(Debug)]
367struct KNNOperandEvaluator {
368 inner: KNNPredicate,
369}
370
371impl KNNOperandEvaluator {
372 fn new(inner: KNNPredicate) -> Self {
373 Self { inner }
374 }
375}
376
377impl OperandEvaluator for KNNOperandEvaluator {
378 fn build_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
379 Ok(Arc::clone(&self.inner.right))
381 }
382
383 fn probe_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
384 Ok(Arc::clone(&self.inner.left))
386 }
387
388 fn resolve_distance(
390 &self,
391 _build_distance: &Option<ColumnarValue>,
392 _build_row_idx: usize,
393 _probe_distance: &Option<f64>,
394 ) -> Result<Option<f64>> {
395 Ok(None)
397 }
398}