sedona_expr/
spatial_filter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17use std::{collections::HashMap, sync::Arc};
18
19use arrow_schema::{DataType, Schema};
20use datafusion_common::{DataFusionError, Result, ScalarValue};
21use datafusion_expr::Operator;
22use datafusion_physical_expr::{
23    expressions::{BinaryExpr, Column, Literal},
24    PhysicalExpr, ScalarFunctionExpr,
25};
26use geo_traits::Dimensions;
27use sedona_common::sedona_internal_err;
28use sedona_geometry::{
29    bounding_box::BoundingBox,
30    bounds::wkb_bounds_xy,
31    interval::{Interval, IntervalTrait},
32};
33use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher, schema::SedonaSchema};
34
35use crate::{
36    statistics::GeoStatistics,
37    utils::{parse_distance_predicate, ParsedDistancePredicate},
38};
39
40/// Simplified parsed spatial filter
41///
42/// This enumerator represents a parsed version of the [PhysicalExpr] provided as a
43/// filter to an implementation of a table provider or file opener. This is intended
44/// as a means by which to process an arbitrary PhysicalExpr against column statistics
45/// to attempt pruning unnecessary files or parts of files specifically with respect
46/// to a spatial filter (i.e., non-spatial filters we leave to an underlying
47/// implementation).
48#[derive(Debug, Clone)]
49pub enum SpatialFilter {
50    /// ST_Intersects(\<column\>, \<literal\>) or ST_Intersects(\<literal\>, \<column\>)
51    Intersects(Column, BoundingBox),
52    /// ST_Covers(\<column\>, \<literal\>) or ST_Covers(\<literal\>, \<column\>)
53    Covers(Column, BoundingBox),
54    /// ST_HasZ(\<column\>)
55    HasZ(Column),
56    /// Logical AND
57    And(Box<SpatialFilter>, Box<SpatialFilter>),
58    /// Logical OR
59    Or(Box<SpatialFilter>, Box<SpatialFilter>),
60    /// A literal FALSE, which is never true
61    LiteralFalse,
62    /// An expression we don't know about, which we assume could be true
63    Unknown,
64}
65
66impl SpatialFilter {
67    /// Compute the maximum extent of a filter for a specific column index
68    ///
69    /// Some spatial file formats have the ability to push down a bounding box
70    /// into an index. This function allows deriving that bounding box based
71    /// on what DataFusion provides, which is a physical expression.
72    ///
73    /// Note that this always succeeds; however, for a non-spatial expression or
74    /// a non-spatial expression that is unsupported, the full bounding box is
75    /// returned.
76    pub fn filter_bbox(&self, column_name: &str) -> BoundingBox {
77        match self {
78            SpatialFilter::Intersects(column, bounding_box)
79            | SpatialFilter::Covers(column, bounding_box) => {
80                if column.name() == column_name {
81                    return bounding_box.clone();
82                }
83            }
84            SpatialFilter::And(lhs, rhs) => {
85                let lhs_box = lhs.filter_bbox(column_name);
86                let rhs_box = rhs.filter_bbox(column_name);
87                if let Ok(bounds) = lhs_box.intersection(&rhs_box) {
88                    return bounds;
89                }
90            }
91            SpatialFilter::Or(lhs, rhs) => {
92                let mut bounds = lhs.filter_bbox(column_name);
93                bounds.update_box(&rhs.filter_bbox(column_name));
94                return bounds;
95            }
96            SpatialFilter::LiteralFalse => {
97                return BoundingBox::xy(Interval::empty(), Interval::empty())
98            }
99            SpatialFilter::HasZ(_) | SpatialFilter::Unknown => {}
100        }
101
102        BoundingBox::xy(Interval::full(), Interval::full())
103    }
104
105    /// Returns true if there is any chance the expression might be true
106    ///
107    /// In other words, returns false if and only if the expression is guaranteed
108    /// to be false.
109    pub fn evaluate(&self, table_stats: &TableGeoStatistics) -> Result<bool> {
110        self.evaluate_internal(table_stats)
111    }
112
113    fn evaluate_internal(&self, table_stats: &TableGeoStatistics) -> Result<bool> {
114        match self {
115            SpatialFilter::Intersects(column, bounds) => Ok(Self::evaluate_intersects_bbox(
116                table_stats.get(column)?,
117                bounds,
118            )),
119            SpatialFilter::Covers(column, bounds) => {
120                Ok(Self::evaluate_covers_bbox(table_stats.get(column)?, bounds))
121            }
122            SpatialFilter::HasZ(column) => Ok(Self::evaluate_has_z(table_stats.get(column)?)),
123            SpatialFilter::And(lhs, rhs) => Self::evaluate_and(lhs, rhs, table_stats),
124            SpatialFilter::Or(lhs, rhs) => Self::evaluate_or(lhs, rhs, table_stats),
125            SpatialFilter::LiteralFalse => Ok(false),
126            SpatialFilter::Unknown => Ok(true),
127        }
128    }
129
130    fn evaluate_intersects_bbox(column_stats: &GeoStatistics, bounds: &BoundingBox) -> bool {
131        if let Some(bbox) = column_stats.bbox() {
132            bbox.intersects(bounds)
133        } else {
134            true
135        }
136    }
137
138    fn evaluate_covers_bbox(column_stats: &GeoStatistics, bounds: &BoundingBox) -> bool {
139        if let Some(bbox) = column_stats.bbox() {
140            bbox.contains(bounds)
141        } else {
142            true
143        }
144    }
145
146    fn evaluate_has_z(column_stats: &GeoStatistics) -> bool {
147        if let Some(bbox) = column_stats.bbox() {
148            if let Some(z) = bbox.z() {
149                if z.is_empty() {
150                    return false;
151                }
152            }
153        }
154
155        if let Some(geometry_types) = column_stats.geometry_types() {
156            for geometry_type in geometry_types {
157                match geometry_type.dimensions() {
158                    Dimensions::Xyz | Dimensions::Xyzm => return true,
159                    _ => {}
160                }
161            }
162
163            return false;
164        }
165
166        true
167    }
168
169    fn evaluate_and(lhs: &Self, rhs: &Self, table_stats: &TableGeoStatistics) -> Result<bool> {
170        let maybe_lhs = lhs.evaluate_internal(table_stats)?;
171        let maybe_rhs = rhs.evaluate_internal(table_stats)?;
172        Ok(maybe_lhs && maybe_rhs)
173    }
174
175    fn evaluate_or(lhs: &Self, rhs: &Self, table_stats: &TableGeoStatistics) -> Result<bool> {
176        let maybe_lhs = lhs.evaluate_internal(table_stats)?;
177        let maybe_rhs = rhs.evaluate_internal(table_stats)?;
178        Ok(maybe_lhs || maybe_rhs)
179    }
180
181    /// Construct a SpatialPredicate from a [PhysicalExpr]
182    ///
183    /// Parses expr to extract known expressions we can evaluate against statistics.
184    pub fn try_from_expr(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
185        if let Some(spatial_filter) = Self::try_from_range_predicate(expr)? {
186            Ok(spatial_filter)
187        } else if let Some(spatial_filter) = Self::try_from_distance_predicate(expr)? {
188            Ok(spatial_filter)
189        } else if let Some(binary_expr) = expr.as_any().downcast_ref::<BinaryExpr>() {
190            match binary_expr.op() {
191                Operator::And => Ok(Self::And(
192                    Box::new(Self::try_from_expr(binary_expr.left())?),
193                    Box::new(Self::try_from_expr(binary_expr.right())?),
194                )),
195                Operator::Or => Ok(Self::Or(
196                    Box::new(Self::try_from_expr(binary_expr.left())?),
197                    Box::new(Self::try_from_expr(binary_expr.right())?),
198                )),
199                // Not a binary expression we know about
200                _ => Ok(Self::Unknown),
201            }
202        } else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
203            if let ScalarValue::Boolean(Some(value)) = literal.value() {
204                match value {
205                    true => Ok(Self::Unknown),
206                    false => Ok(Self::LiteralFalse),
207                }
208            } else {
209                // Not a literal we know about
210                Ok(Self::Unknown)
211            }
212        } else {
213            // Not an expression we know about
214            Ok(Self::Unknown)
215        }
216    }
217
218    fn try_from_range_predicate(expr: &Arc<dyn PhysicalExpr>) -> Result<Option<Self>> {
219        let Some(scalar_fun) = expr.as_any().downcast_ref::<ScalarFunctionExpr>() else {
220            return Ok(None);
221        };
222
223        let raw_args = scalar_fun.args();
224        let args = parse_args(raw_args);
225        let fun_name = scalar_fun.fun().name();
226        match fun_name {
227            "st_intersects" | "st_touches" | "st_crosses" | "st_overlaps" => {
228                if args.len() != 2 {
229                    return sedona_internal_err!("unexpected argument count in filter evaluation");
230                }
231
232                match (&args[0], &args[1]) {
233                    (ArgRef::Col(column), ArgRef::Lit(literal))
234                    | (ArgRef::Lit(literal), ArgRef::Col(column)) => {
235                        if !is_prunable_geospatial_literal(literal) {
236                            return Ok(Some(Self::Unknown));
237                        }
238                        match literal_bounds(literal) {
239                            Ok(literal_bounds) => {
240                                Ok(Some(Self::Intersects(column.clone(), literal_bounds)))
241                            }
242                            Err(e) => Err(DataFusionError::External(Box::new(e))),
243                        }
244                    }
245                    // Not between a literal and a column
246                    _ => Ok(Some(Self::Unknown)),
247                }
248            }
249            "st_equals" => {
250                if args.len() != 2 {
251                    return sedona_internal_err!("unexpected argument count in filter evaluation");
252                }
253
254                match (&args[0], &args[1]) {
255                    (ArgRef::Col(column), ArgRef::Lit(literal))
256                    | (ArgRef::Lit(literal), ArgRef::Col(column)) => {
257                        if !is_prunable_geospatial_literal(literal) {
258                            return Ok(Some(Self::Unknown));
259                        }
260                        match literal_bounds(literal) {
261                            Ok(literal_bounds) => {
262                                Ok(Some(Self::Covers(column.clone(), literal_bounds)))
263                            }
264                            Err(e) => Err(DataFusionError::External(Box::new(e))),
265                        }
266                    }
267                    // Not between a literal and a column
268                    _ => Ok(Some(Self::Unknown)),
269                }
270            }
271            "st_within" | "st_covered_by" | "st_coveredby" => {
272                if args.len() != 2 {
273                    return sedona_internal_err!("unexpected argument count in filter evaluation");
274                }
275
276                match (&args[0], &args[1]) {
277                    (ArgRef::Col(column), ArgRef::Lit(literal)) => {
278                        // column within/covered_by literal -> Intersects filter
279                        if !is_prunable_geospatial_literal(literal) {
280                            return Ok(Some(Self::Unknown));
281                        }
282                        match literal_bounds(literal) {
283                            Ok(literal_bounds) => {
284                                Ok(Some(Self::Intersects(column.clone(), literal_bounds)))
285                            }
286                            Err(e) => Err(DataFusionError::External(Box::new(e))),
287                        }
288                    }
289                    (ArgRef::Lit(literal), ArgRef::Col(column)) => {
290                        // literal within/covered_by column -> Covers filter
291                        if !is_prunable_geospatial_literal(literal) {
292                            return Ok(Some(Self::Unknown));
293                        }
294                        match literal_bounds(literal) {
295                            Ok(literal_bounds) => {
296                                Ok(Some(Self::Covers(column.clone(), literal_bounds)))
297                            }
298                            Err(e) => Err(DataFusionError::External(Box::new(e))),
299                        }
300                    }
301                    // Not between a literal and a column
302                    _ => Ok(Some(Self::Unknown)),
303                }
304            }
305            "st_contains" | "st_covers" => {
306                if args.len() != 2 {
307                    return sedona_internal_err!("unexpected argument count in filter evaluation");
308                }
309
310                match (&args[0], &args[1]) {
311                    (ArgRef::Col(column), ArgRef::Lit(literal)) => {
312                        // column contains/covers literal -> Covers filter
313                        // (column's bbox must fully cover literal's bbox)
314                        if !is_prunable_geospatial_literal(literal) {
315                            return Ok(Some(Self::Unknown));
316                        }
317                        match literal_bounds(literal) {
318                            Ok(literal_bounds) => {
319                                Ok(Some(Self::Covers(column.clone(), literal_bounds)))
320                            }
321                            Err(e) => Err(DataFusionError::External(Box::new(e))),
322                        }
323                    }
324                    (ArgRef::Lit(literal), ArgRef::Col(column)) => {
325                        // literal contains/covers column -> Intersects filter
326                        // (if literal contains column, they must at least intersect)
327                        if !is_prunable_geospatial_literal(literal) {
328                            return Ok(Some(Self::Unknown));
329                        }
330                        match literal_bounds(literal) {
331                            Ok(literal_bounds) => {
332                                Ok(Some(Self::Intersects(column.clone(), literal_bounds)))
333                            }
334                            Err(e) => Err(DataFusionError::External(Box::new(e))),
335                        }
336                    }
337                    // Not between a literal and a column
338                    _ => Ok(Some(Self::Unknown)),
339                }
340            }
341            "st_hasz" => {
342                if args.len() != 1 {
343                    return sedona_internal_err!("unexpected argument count in filter evaluation");
344                }
345
346                match &args[0] {
347                    ArgRef::Col(column) => Ok(Some(Self::HasZ(column.clone()))),
348                    _ => Ok(Some(Self::Unknown)),
349                }
350            }
351            _ => Ok(None),
352        }
353    }
354
355    fn try_from_distance_predicate(expr: &Arc<dyn PhysicalExpr>) -> Result<Option<Self>> {
356        let Some(ParsedDistancePredicate {
357            arg0,
358            arg1,
359            arg_distance,
360        }) = parse_distance_predicate(expr)
361        else {
362            return Ok(None);
363        };
364
365        let raw_args = [arg0, arg1, arg_distance];
366        let args = parse_args(&raw_args);
367
368        match (&args[0], &args[1], &args[2]) {
369            (ArgRef::Col(column), ArgRef::Lit(literal), ArgRef::Lit(distance))
370            | (ArgRef::Lit(literal), ArgRef::Col(column), ArgRef::Lit(distance)) => {
371                if !is_prunable_geospatial_literal(literal) {
372                    return Ok(Some(Self::Unknown));
373                }
374                match (
375                    literal_bounds(literal),
376                    distance.value().cast_to(&DataType::Float64)?,
377                ) {
378                    (Ok(literal_bounds), distance_scalar_value) => {
379                        let ScalarValue::Float64(Some(dist)) = distance_scalar_value else {
380                            return Ok(None);
381                        };
382                        if dist.is_nan() || dist < 0.0 {
383                            return Ok(None);
384                        }
385                        let expanded_bounds = literal_bounds.expand_by(dist);
386                        Ok(Some(Self::Intersects(column.clone(), expanded_bounds)))
387                    }
388                    (Err(e), _) => Err(DataFusionError::External(Box::new(e))),
389                }
390            }
391            // Not between a literal and a column
392            _ => Ok(Some(Self::Unknown)),
393        }
394    }
395}
396
397/// Table GeoStatistics
398///
399/// Enables providing a collection of GeoStatistics to [SpatialFilter::evaluate]
400/// such that attempts to access out-of-bounds values results in a readable
401/// error.
402pub enum TableGeoStatistics {
403    /// Provide statistics for every Column in the table. These must be
404    /// [GeoStatistics::unspecified] for non-spatial columns.
405    ///
406    /// These are resolved using [Column::index].
407    ByPosition(Vec<GeoStatistics>),
408
409    /// Provide statistics for specific named columns. Columns not included
410    /// are treated as [GeoStatistics::unspecified].
411    ///
412    /// These are resolved using [Column::name]. This may be used for logical
413    /// expressions (where columns are resolved by name) or as a workaround
414    /// for physical expressions where the index is relative to a projected
415    /// schema <https://github.com/apache/sedona-db/issues/389>.
416    ByName(HashMap<String, GeoStatistics>),
417}
418
419impl TableGeoStatistics {
420    /// Construct TableGeoStatistics with no columns
421    pub fn empty() -> Self {
422        TableGeoStatistics::ByPosition(vec![])
423    }
424
425    /// Construct TableGeoStatistics from a slice of all column statistics and a schema
426    pub fn try_from_stats_and_schema(
427        column_stats: &[GeoStatistics],
428        schema: &Schema,
429    ) -> Result<Self> {
430        let mut stats_map = HashMap::new();
431        for i in schema.geometry_column_indices()? {
432            stats_map.insert(schema.field(i).name().to_string(), column_stats[i].clone());
433        }
434        Ok(Self::ByName(stats_map))
435    }
436
437    /// For a given [Column], obtain [GeoStatistics]
438    ///
439    /// This will error if the provided statistics have an index out of bounds.
440    /// Names that cannot be resolved will be treated as unspecified.
441    fn get(&self, column: &Column) -> Result<&GeoStatistics> {
442        match self {
443            Self::ByPosition(items) => {
444                if column.index() >= items.len() {
445                    sedona_internal_err!(
446                        "Can't obtain GeoStatistics for column at index {} from schema with {} columns",
447                        column.index(),
448                        items.len()
449                    )
450                } else {
451                    Ok(&items[column.index()])
452                }
453            }
454            Self::ByName(items) => {
455                if let Some(item) = items.get(column.name()) {
456                    Ok(item)
457                } else {
458                    Ok(&GeoStatistics::UNSPECIFIED)
459                }
460            }
461        }
462    }
463}
464
465// Useful for testing (create from a single GeoStatistics)
466impl From<GeoStatistics> for TableGeoStatistics {
467    fn from(value: GeoStatistics) -> Self {
468        TableGeoStatistics::ByPosition(vec![value])
469    }
470}
471
472/// Internal utility to help match physical expression types
473enum ArgRef<'a> {
474    Col(Column),
475    Lit(&'a Literal),
476    Other,
477}
478
479/// Our current spatial data pruning implementation does not correctly handle geography data.
480/// We therefore only consider geometry data type for pruning.
481fn is_prunable_geospatial_literal(literal: &Literal) -> bool {
482    let Ok(literal_field) = literal.return_field(&Schema::empty()) else {
483        return false;
484    };
485    let Ok(sedona_type) = SedonaType::from_storage_field(&literal_field) else {
486        return false;
487    };
488    let matcher = ArgMatcher::is_geometry();
489    matcher.match_type(&sedona_type)
490}
491
492fn literal_bounds(literal: &Literal) -> Result<BoundingBox> {
493    let literal_field = literal.return_field(&Schema::empty())?;
494    let sedona_type = SedonaType::from_storage_field(&literal_field)?;
495    match &sedona_type {
496        SedonaType::Wkb(_, _) | SedonaType::WkbView(_, _) => match literal.value() {
497            ScalarValue::Binary(maybe_vec) | ScalarValue::BinaryView(maybe_vec) => {
498                if let Some(vec) = maybe_vec {
499                    return wkb_bounds_xy(vec).map_err(|e| DataFusionError::External(Box::new(e)));
500                }
501            }
502            _ => {}
503        },
504        _ => {}
505    }
506
507    sedona_internal_err!("Unexpected scalar type in filter expression ({literal:?})")
508}
509
510fn parse_args(args: &[Arc<dyn PhysicalExpr>]) -> Vec<ArgRef<'_>> {
511    args.iter()
512        .map(|arg| {
513            if let Some(column) = arg.as_any().downcast_ref::<Column>() {
514                ArgRef::Col(column.clone())
515            } else if let Some(literal) = arg.as_any().downcast_ref::<Literal>() {
516                ArgRef::Lit(literal)
517            } else {
518                ArgRef::Other
519            }
520        })
521        .collect::<Vec<_>>()
522}
523
524#[cfg(test)]
525mod test {
526    use arrow_schema::{DataType, Field};
527    use datafusion_common::config::ConfigOptions;
528    use datafusion_expr::{ScalarUDF, Signature, SimpleScalarUDF, Volatility};
529    use rstest::rstest;
530    use sedona_geometry::{bounding_box::BoundingBox, interval::Interval};
531    use sedona_schema::datatypes::{WKB_GEOGRAPHY, WKB_GEOMETRY};
532    use sedona_testing::create::create_scalar;
533
534    use super::*;
535
536    fn dummy_st_hasz() -> ScalarUDF {
537        SimpleScalarUDF::new_with_signature(
538            "st_hasz",
539            Signature::any(2, Volatility::Immutable),
540            DataType::Boolean,
541            Arc::new(|_args| Ok(ScalarValue::Boolean(Some(true)).into())),
542        )
543        .into()
544    }
545
546    fn dummy_unrelated() -> ScalarUDF {
547        SimpleScalarUDF::new_with_signature(
548            "st_not_a_predicate",
549            Signature::any(2, Volatility::Immutable),
550            DataType::Boolean,
551            Arc::new(|_args| Ok(ScalarValue::Boolean(Some(true)).into())),
552        )
553        .into()
554    }
555
556    fn create_dummy_spatial_function(name: &str, arg_count: usize) -> ScalarUDF {
557        SimpleScalarUDF::new_with_signature(
558            name,
559            Signature::any(arg_count, Volatility::Immutable),
560            DataType::Boolean,
561            Arc::new(|_args| Ok(ScalarValue::Boolean(Some(true)).into())),
562        )
563        .into()
564    }
565
566    #[test]
567    fn predicate_intersects() {
568        let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap();
569        let literal = Literal::new_with_metadata(
570            create_scalar(Some("POINT (1 2)"), &WKB_GEOMETRY),
571            Some(storage_field.metadata().into()),
572        );
573        let bounds = literal_bounds(&literal).unwrap();
574
575        let stats_no_info = TableGeoStatistics::from(GeoStatistics::unspecified());
576        let stats_intersecting = TableGeoStatistics::from(
577            GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0.5, 1.5), (1.5, 2.5)))),
578        );
579        let col0 = Column::new("col0", 0);
580
581        assert!(SpatialFilter::Intersects(col0.clone(), bounds.clone())
582            .evaluate(&stats_no_info)
583            .unwrap());
584        assert!(SpatialFilter::Intersects(col0.clone(), bounds.clone())
585            .evaluate(&stats_intersecting)
586            .unwrap());
587
588        let stats_empty_bbox = TableGeoStatistics::from(
589            GeoStatistics::unspecified()
590                .with_bbox(Some(BoundingBox::xy(Interval::empty(), Interval::empty()))),
591        );
592
593        assert!(!SpatialFilter::Intersects(col0.clone(), bounds.clone())
594            .evaluate(&stats_empty_bbox)
595            .unwrap());
596
597        let unrelated_literal = Literal::new(ScalarValue::Null);
598
599        let err = literal_bounds(&unrelated_literal).unwrap_err();
600        assert!(err
601            .message()
602            .contains("Unexpected scalar type in filter expression"));
603    }
604
605    #[test]
606    fn predicate_covers() {
607        let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap();
608        let literal = Literal::new_with_metadata(
609            create_scalar(Some("POLYGON ((0 0, 4 0, 4 4, 0 4, 0 0))"), &WKB_GEOMETRY),
610            Some(storage_field.metadata().into()),
611        );
612        let bounds = literal_bounds(&literal).unwrap();
613
614        let stats_no_info = TableGeoStatistics::from(GeoStatistics::unspecified());
615        let stats_covered = TableGeoStatistics::from(
616            GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0, 4), (0, 4)))),
617        );
618        let stats_not_covered = TableGeoStatistics::from(
619            GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((3.0, 3.0), (5.0, 5.0)))),
620        );
621        let col0 = Column::new("col0", 0);
622
623        // Covers should return true when column bbox is fully contained in literal bounds
624        assert!(SpatialFilter::Covers(col0.clone(), bounds.clone())
625            .evaluate(&stats_no_info)
626            .unwrap());
627        assert!(SpatialFilter::Covers(col0.clone(), bounds.clone())
628            .evaluate(&stats_covered)
629            .unwrap());
630        assert!(!SpatialFilter::Covers(col0.clone(), bounds.clone())
631            .evaluate(&stats_not_covered)
632            .unwrap());
633    }
634
635    #[test]
636    fn predicate_has_z() {
637        let col0 = Column::new("col0", 0);
638        let has_z = SpatialFilter::HasZ(col0.clone());
639
640        let stats_z_geometry_types = TableGeoStatistics::from(
641            GeoStatistics::unspecified()
642                .try_with_str_geometry_types(Some(&["POINT Z"]))
643                .unwrap(),
644        );
645        let stats_z_bbox = TableGeoStatistics::from(GeoStatistics::unspecified().with_bbox(Some(
646            BoundingBox::xyzm((0, 1), (2, 3), Some((4, 5).into()), None),
647        )));
648        let stats_no_info = TableGeoStatistics::from(GeoStatistics::unspecified());
649
650        assert!(has_z.evaluate(&stats_z_geometry_types).unwrap());
651        assert!(has_z.evaluate(&stats_z_bbox).unwrap());
652        assert!(has_z.evaluate(&stats_no_info).unwrap());
653
654        let stats_no_z_geometry_types = TableGeoStatistics::from(
655            GeoStatistics::unspecified()
656                .try_with_str_geometry_types(Some(&["POINT"]))
657                .unwrap(),
658        );
659        let stats_no_z_bbox =
660            TableGeoStatistics::from(GeoStatistics::unspecified().with_bbox(Some(
661                BoundingBox::xyzm((0, 1), (2, 3), Some(Interval::empty()), None),
662            )));
663
664        assert!(!has_z.evaluate(&stats_no_z_geometry_types).unwrap());
665        assert!(!has_z.evaluate(&stats_no_z_bbox).unwrap());
666    }
667
668    #[test]
669    fn predicate_other() {
670        assert!(!SpatialFilter::LiteralFalse
671            .evaluate(&TableGeoStatistics::empty())
672            .unwrap());
673        assert!(SpatialFilter::Unknown
674            .evaluate(&TableGeoStatistics::empty())
675            .unwrap());
676
677        assert!(SpatialFilter::And(
678            Box::new(SpatialFilter::Unknown),
679            Box::new(SpatialFilter::Unknown)
680        )
681        .evaluate(&TableGeoStatistics::empty())
682        .unwrap());
683
684        assert!(!SpatialFilter::And(
685            Box::new(SpatialFilter::Unknown),
686            Box::new(SpatialFilter::LiteralFalse)
687        )
688        .evaluate(&TableGeoStatistics::empty())
689        .unwrap());
690
691        assert!(SpatialFilter::Or(
692            Box::new(SpatialFilter::Unknown),
693            Box::new(SpatialFilter::Unknown)
694        )
695        .evaluate(&TableGeoStatistics::empty())
696        .unwrap());
697
698        assert!(SpatialFilter::Or(
699            Box::new(SpatialFilter::Unknown),
700            Box::new(SpatialFilter::LiteralFalse)
701        )
702        .evaluate(&TableGeoStatistics::empty())
703        .unwrap());
704
705        assert!(!SpatialFilter::Or(
706            Box::new(SpatialFilter::LiteralFalse),
707            Box::new(SpatialFilter::LiteralFalse)
708        )
709        .evaluate(&TableGeoStatistics::empty())
710        .unwrap());
711    }
712
713    #[test]
714    fn predicate_from_expr_errors() {
715        let literal: Arc<dyn PhysicalExpr> = Arc::new(Literal::new(ScalarValue::Null));
716        let unrelated = dummy_unrelated();
717
718        // Not a scalar function
719        assert!(matches!(
720            SpatialFilter::try_from_expr(&literal).unwrap(),
721            SpatialFilter::Unknown
722        ));
723
724        // Not a predicate
725        let expr_no_args: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
726            "intersects",
727            Arc::new(unrelated),
728            vec![],
729            Arc::new(Field::new("", DataType::Boolean, true)),
730            Arc::new(ConfigOptions::default()),
731        ));
732        assert!(matches!(
733            SpatialFilter::try_from_expr(&expr_no_args).unwrap(),
734            SpatialFilter::Unknown
735        ));
736    }
737
738    #[rstest]
739    fn predicate_from_expr_commutative_intersects_functions(
740        #[values("st_intersects", "st_touches", "st_crosses", "st_overlaps")] func_name: &str,
741    ) {
742        let column: Arc<dyn PhysicalExpr> = Arc::new(Column::new("geometry", 0));
743        let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap();
744        let literal: Arc<dyn PhysicalExpr> = Arc::new(Literal::new_with_metadata(
745            create_scalar(Some("POLYGON ((0 0, 2 0, 2 2, 0 2, 0 0))"), &WKB_GEOMETRY),
746            Some(storage_field.metadata().into()),
747        ));
748
749        // Test functions that should result in Intersects filter
750        let func = create_dummy_spatial_function(func_name, 2);
751        let expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
752            func_name,
753            Arc::new(func.clone()),
754            vec![column.clone(), literal.clone()],
755            Arc::new(Field::new("", DataType::Boolean, true)),
756            Arc::new(ConfigOptions::default()),
757        ));
758        let predicate = SpatialFilter::try_from_expr(&expr).unwrap();
759        assert!(
760            matches!(predicate, SpatialFilter::Intersects(_, _)),
761            "Function {func_name} should produce Intersects filter"
762        );
763
764        // Test reversed argument order
765        let expr_reversed: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
766            func_name,
767            Arc::new(func),
768            vec![literal.clone(), column.clone()],
769            Arc::new(Field::new("", DataType::Boolean, true)),
770            Arc::new(ConfigOptions::default()),
771        ));
772        let predicate_reversed = SpatialFilter::try_from_expr(&expr_reversed).unwrap();
773        assert!(
774            matches!(predicate_reversed, SpatialFilter::Intersects(_, _)),
775            "Function {func_name} with reversed args should produce Intersects filter"
776        );
777    }
778
779    #[rstest]
780    fn predicate_from_expr_equals_function(#[values("st_equals")] func_name: &str) {
781        let column: Arc<dyn PhysicalExpr> = Arc::new(Column::new("geometry", 0));
782        let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap();
783        let literal: Arc<dyn PhysicalExpr> = Arc::new(Literal::new_with_metadata(
784            create_scalar(Some("POLYGON ((0 0, 2 0, 2 2, 0 2, 0 0))"), &WKB_GEOMETRY),
785            Some(storage_field.metadata().into()),
786        ));
787
788        // Test functions that should result in Covers filter
789        let func = create_dummy_spatial_function(func_name, 2);
790        let expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
791            func_name,
792            Arc::new(func.clone()),
793            vec![column.clone(), literal.clone()],
794            Arc::new(Field::new("", DataType::Boolean, true)),
795            Arc::new(ConfigOptions::default()),
796        ));
797        let predicate = SpatialFilter::try_from_expr(&expr).unwrap();
798        assert!(
799            matches!(predicate, SpatialFilter::Covers(_, _)),
800            "Function {func_name} should produce Covers filter"
801        );
802
803        // Test reversed argument order
804        let expr_reversed: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
805            func_name,
806            Arc::new(func),
807            vec![literal.clone(), column.clone()],
808            Arc::new(Field::new("", DataType::Boolean, true)),
809            Arc::new(ConfigOptions::default()),
810        ));
811        let predicate_reversed = SpatialFilter::try_from_expr(&expr_reversed).unwrap();
812        assert!(
813            matches!(predicate_reversed, SpatialFilter::Covers(_, _)),
814            "Function {func_name} with reversed args should produce Covers filter"
815        );
816    }
817
818    #[rstest]
819    fn predicate_from_expr_within_covered_by_functions(
820        #[values("st_within", "st_covered_by", "st_coveredby")] func_name: &str,
821    ) {
822        let column: Arc<dyn PhysicalExpr> = Arc::new(Column::new("geometry", 0));
823        let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap();
824        let literal: Arc<dyn PhysicalExpr> = Arc::new(Literal::new_with_metadata(
825            create_scalar(Some("POLYGON ((0 0, 2 0, 2 2, 0 2, 0 0))"), &WKB_GEOMETRY),
826            Some(storage_field.metadata().into()),
827        ));
828
829        // Test functions that should result in CoveredBy filter when column is first arg
830        let func = create_dummy_spatial_function(func_name, 2);
831        let expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
832            func_name,
833            Arc::new(func.clone()),
834            vec![column.clone(), literal.clone()],
835            Arc::new(Field::new("", DataType::Boolean, true)),
836            Arc::new(ConfigOptions::default()),
837        ));
838        let predicate = SpatialFilter::try_from_expr(&expr).unwrap();
839        assert!(
840            matches!(predicate, SpatialFilter::Intersects(_, _)),
841            "Function {func_name} should produce Intersects filter"
842        );
843
844        // Test reversed argument order: should be converted to Intersects filter
845        let expr_reversed: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
846            func_name,
847            Arc::new(func),
848            vec![literal.clone(), column.clone()],
849            Arc::new(Field::new("", DataType::Boolean, true)),
850            Arc::new(ConfigOptions::default()),
851        ));
852        let predicate_reversed = SpatialFilter::try_from_expr(&expr_reversed).unwrap();
853        assert!(
854            matches!(predicate_reversed, SpatialFilter::Covers(_, _)),
855            "Function {func_name} with reversed args should produce Covers filter"
856        );
857    }
858
859    #[rstest]
860    fn predicate_from_expr_contains_covers_functions(
861        #[values("st_contains", "st_covers")] func_name: &str,
862    ) {
863        let column: Arc<dyn PhysicalExpr> = Arc::new(Column::new("geometry", 0));
864        let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap();
865        let literal: Arc<dyn PhysicalExpr> = Arc::new(Literal::new_with_metadata(
866            create_scalar(Some("POLYGON ((0 0, 2 0, 2 2, 0 2, 0 0))"), &WKB_GEOMETRY),
867            Some(storage_field.metadata().into()),
868        ));
869
870        // Test functions that should result in CoveredBy filter when column is first arg
871        // (column contains/covers literal -> column's bbox must fully contain literal's bbox)
872        let func = create_dummy_spatial_function(func_name, 2);
873        let expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
874            func_name,
875            Arc::new(func.clone()),
876            vec![column.clone(), literal.clone()],
877            Arc::new(Field::new("", DataType::Boolean, true)),
878            Arc::new(ConfigOptions::default()),
879        ));
880        let predicate = SpatialFilter::try_from_expr(&expr).unwrap();
881        assert!(
882            matches!(predicate, SpatialFilter::Covers(_, _)),
883            "Function {func_name} should produce CoveredBy filter"
884        );
885
886        // Test reversed argument order: should be converted to Intersects filter
887        // (literal contains/covers column -> they must at least intersect)
888        let expr_reversed: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
889            func_name,
890            Arc::new(func),
891            vec![literal.clone(), column.clone()],
892            Arc::new(Field::new("", DataType::Boolean, true)),
893            Arc::new(ConfigOptions::default()),
894        ));
895        let predicate_reversed = SpatialFilter::try_from_expr(&expr_reversed).unwrap();
896        assert!(
897            matches!(predicate_reversed, SpatialFilter::Intersects(_, _)),
898            "Function {func_name} with reversed args should produce Intersects filter"
899        );
900    }
901
902    #[test]
903    fn predicate_from_expr_distance_functions() {
904        let column: Arc<dyn PhysicalExpr> = Arc::new(Column::new("geometry", 0));
905        let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap();
906        let literal: Arc<dyn PhysicalExpr> = Arc::new(Literal::new_with_metadata(
907            create_scalar(Some("POINT (1 2)"), &WKB_GEOMETRY),
908            Some(storage_field.metadata().into()),
909        ));
910        let distance_literal: Arc<dyn PhysicalExpr> =
911            Arc::new(Literal::new(ScalarValue::Float64(Some(100.0))));
912
913        // Test ST_DWithin function
914        let st_dwithin = create_dummy_spatial_function("st_dwithin", 3);
915        let dwithin_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
916            "st_dwithin",
917            Arc::new(st_dwithin.clone()),
918            vec![column.clone(), literal.clone(), distance_literal.clone()],
919            Arc::new(Field::new("", DataType::Boolean, true)),
920            Arc::new(ConfigOptions::default()),
921        ));
922        let predicate = SpatialFilter::try_from_expr(&dwithin_expr).unwrap();
923        assert!(
924            matches!(predicate, SpatialFilter::Intersects(_, _)),
925            "ST_DWithin should produce Intersects filter with expanded bounds"
926        );
927
928        // Test ST_DWithin with reversed geometry arguments
929        let dwithin_expr_reversed: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
930            "st_dwithin",
931            Arc::new(st_dwithin),
932            vec![literal.clone(), column.clone(), distance_literal.clone()],
933            Arc::new(Field::new("", DataType::Boolean, true)),
934            Arc::new(ConfigOptions::default()),
935        ));
936        let predicate_reversed = SpatialFilter::try_from_expr(&dwithin_expr_reversed).unwrap();
937        assert!(
938            matches!(predicate_reversed, SpatialFilter::Intersects(_, _)),
939            "ST_DWithin with reversed args should produce Intersects filter"
940        );
941
942        // Test ST_Distance <= threshold
943        let st_distance = create_dummy_spatial_function("st_distance", 2);
944        let distance_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
945            "st_distance",
946            Arc::new(st_distance.clone()),
947            vec![column.clone(), literal.clone()],
948            Arc::new(Field::new("", DataType::Boolean, true)),
949            Arc::new(ConfigOptions::default()),
950        ));
951        let comparison_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
952            distance_expr.clone(),
953            Operator::LtEq,
954            distance_literal.clone(),
955        ));
956        let predicate = SpatialFilter::try_from_expr(&comparison_expr).unwrap();
957        assert!(
958            matches!(predicate, SpatialFilter::Intersects(_, _)),
959            "ST_Distance <= threshold should produce Intersects filter"
960        );
961
962        // Test threshold >= ST_Distance
963        let comparison_expr_reversed: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
964            distance_literal.clone(),
965            Operator::GtEq,
966            distance_expr.clone(),
967        ));
968        let predicate_reversed = SpatialFilter::try_from_expr(&comparison_expr_reversed).unwrap();
969        assert!(
970            matches!(predicate_reversed, SpatialFilter::Intersects(_, _)),
971            "threshold >= ST_Distance should produce Intersects filter"
972        );
973
974        // Test with negative distance (should be treated as Unknown)
975        let negative_distance: Arc<dyn PhysicalExpr> =
976            Arc::new(Literal::new(ScalarValue::Float64(Some(-10.0))));
977        let st_dwithin = create_dummy_spatial_function("st_dwithin", 3);
978        let dwithin_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
979            "st_dwithin",
980            Arc::new(st_dwithin.clone()),
981            vec![column.clone(), literal.clone(), negative_distance],
982            Arc::new(Field::new("", DataType::Boolean, true)),
983            Arc::new(ConfigOptions::default()),
984        ));
985        let predicate = SpatialFilter::try_from_expr(&dwithin_expr).unwrap();
986        assert!(
987            matches!(predicate, SpatialFilter::Unknown),
988            "Negative distance should result in Unknown filter"
989        );
990
991        // Test with NaN distance (should be treated as Unknown)
992        let nan_distance: Arc<dyn PhysicalExpr> =
993            Arc::new(Literal::new(ScalarValue::Float64(Some(f64::NAN))));
994        let dwithin_expr_nan: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
995            "st_dwithin",
996            Arc::new(st_dwithin),
997            vec![column.clone(), literal.clone(), nan_distance],
998            Arc::new(Field::new("", DataType::Boolean, true)),
999            Arc::new(ConfigOptions::default()),
1000        ));
1001        let predicate_nan = SpatialFilter::try_from_expr(&dwithin_expr_nan).unwrap();
1002        assert!(
1003            matches!(predicate_nan, SpatialFilter::Unknown),
1004            "NaN distance should result in Unknown filter"
1005        );
1006    }
1007
1008    #[rstest]
1009    fn predicate_from_spatial_relation_function_errors(
1010        #[values(
1011            "st_intersects",
1012            "st_equals",
1013            "st_touches",
1014            "st_contains",
1015            "st_covers",
1016            "st_within",
1017            "st_covered_by",
1018            "st_coveredby",
1019            "st_crosses",
1020            "st_overlaps"
1021        )]
1022        func_name: &str,
1023    ) {
1024        let literal: Arc<dyn PhysicalExpr> = Arc::new(Literal::new(ScalarValue::Null));
1025        let st_intersects = create_dummy_spatial_function(func_name, 2);
1026
1027        // Wrong number of args
1028        let expr_no_args: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
1029            "intersects",
1030            Arc::new(st_intersects.clone()),
1031            vec![],
1032            Arc::new(Field::new("", DataType::Boolean, true)),
1033            Arc::new(ConfigOptions::default()),
1034        ));
1035        assert!(SpatialFilter::try_from_expr(&expr_no_args)
1036            .unwrap_err()
1037            .message()
1038            .contains("unexpected argument count"));
1039
1040        // Unsupported arg types
1041        let expr_wrong_types: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
1042            "intersects",
1043            Arc::new(st_intersects.clone()),
1044            vec![literal.clone(), literal.clone()],
1045            Arc::new(Field::new("", DataType::Boolean, true)),
1046            Arc::new(ConfigOptions::default()),
1047        ));
1048        assert!(matches!(
1049            SpatialFilter::try_from_expr(&expr_wrong_types).unwrap(),
1050            SpatialFilter::Unknown
1051        ));
1052    }
1053
1054    #[rstest]
1055    fn range_predicate_involving_geography_should_be_transformed_to_unknown(
1056        #[values(
1057            "st_intersects",
1058            "st_equals",
1059            "st_touches",
1060            "st_contains",
1061            "st_covers",
1062            "st_within",
1063            "st_covered_by",
1064            "st_coveredby",
1065            "st_crosses",
1066            "st_overlaps"
1067        )]
1068        func_name: &str,
1069    ) {
1070        let column: Arc<dyn PhysicalExpr> = Arc::new(Column::new("geometry", 0));
1071        let storage_field = WKB_GEOGRAPHY.to_storage_field("", true).unwrap();
1072        let literal: Arc<dyn PhysicalExpr> = Arc::new(Literal::new_with_metadata(
1073            create_scalar(Some("POLYGON ((0 0, 2 0, 2 2, 0 2, 0 0))"), &WKB_GEOGRAPHY),
1074            Some(storage_field.metadata().into()),
1075        ));
1076
1077        let func = create_dummy_spatial_function(func_name, 2);
1078        let expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
1079            func_name,
1080            Arc::new(func.clone()),
1081            vec![column.clone(), literal.clone()],
1082            Arc::new(Field::new("", DataType::Boolean, true)),
1083            Arc::new(ConfigOptions::default()),
1084        ));
1085        let predicate = SpatialFilter::try_from_expr(&expr).unwrap();
1086        assert!(
1087            matches!(predicate, SpatialFilter::Unknown),
1088            "Function {func_name} involving geography should produce Unknown filter"
1089        );
1090    }
1091
1092    #[test]
1093    fn distance_predicate_involving_geography_should_be_transformed_to_unknown() {
1094        let column: Arc<dyn PhysicalExpr> = Arc::new(Column::new("geometry", 0));
1095        let storage_field = WKB_GEOGRAPHY.to_storage_field("", true).unwrap();
1096        let literal: Arc<dyn PhysicalExpr> = Arc::new(Literal::new_with_metadata(
1097            create_scalar(Some("POINT (1 2)"), &WKB_GEOGRAPHY),
1098            Some(storage_field.metadata().into()),
1099        ));
1100        let distance_literal: Arc<dyn PhysicalExpr> =
1101            Arc::new(Literal::new(ScalarValue::Float64(Some(100.0))));
1102
1103        // Test ST_DWithin function
1104        let st_dwithin = create_dummy_spatial_function("st_dwithin", 3);
1105        let dwithin_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
1106            "st_dwithin",
1107            Arc::new(st_dwithin.clone()),
1108            vec![column.clone(), literal.clone(), distance_literal.clone()],
1109            Arc::new(Field::new("", DataType::Boolean, true)),
1110            Arc::new(ConfigOptions::default()),
1111        ));
1112        let predicate = SpatialFilter::try_from_expr(&dwithin_expr).unwrap();
1113        assert!(
1114            matches!(predicate, SpatialFilter::Unknown),
1115            "ST_DWithin involving geography should produce Unknown filter"
1116        );
1117
1118        // Test ST_DWithin with reversed geometry arguments
1119        let dwithin_expr_reversed: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
1120            "st_dwithin",
1121            Arc::new(st_dwithin),
1122            vec![literal.clone(), column.clone(), distance_literal.clone()],
1123            Arc::new(Field::new("", DataType::Boolean, true)),
1124            Arc::new(ConfigOptions::default()),
1125        ));
1126        let predicate_reversed = SpatialFilter::try_from_expr(&dwithin_expr_reversed).unwrap();
1127        assert!(
1128            matches!(predicate_reversed, SpatialFilter::Unknown),
1129            "ST_DWithin involving geography should produce Unknown filter"
1130        );
1131
1132        // Test ST_Distance <= threshold
1133        let st_distance = create_dummy_spatial_function("st_distance", 2);
1134        let distance_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
1135            "st_distance",
1136            Arc::new(st_distance.clone()),
1137            vec![column.clone(), literal.clone()],
1138            Arc::new(Field::new("", DataType::Boolean, true)),
1139            Arc::new(ConfigOptions::default()),
1140        ));
1141        let comparison_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1142            distance_expr.clone(),
1143            Operator::LtEq,
1144            distance_literal.clone(),
1145        ));
1146        let predicate = SpatialFilter::try_from_expr(&comparison_expr).unwrap();
1147        assert!(
1148            matches!(predicate, SpatialFilter::Unknown),
1149            "ST_Distance <= threshold involving geography should produce Unknown filter"
1150        );
1151
1152        // Test threshold >= ST_Distance
1153        let comparison_expr_reversed: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1154            distance_literal.clone(),
1155            Operator::GtEq,
1156            distance_expr.clone(),
1157        ));
1158        let predicate_reversed = SpatialFilter::try_from_expr(&comparison_expr_reversed).unwrap();
1159        assert!(
1160            matches!(predicate_reversed, SpatialFilter::Unknown),
1161            "threshold >= ST_Distance involving geography should produce Unknown filter"
1162        );
1163    }
1164
1165    #[test]
1166    fn predicate_from_expr_has_z() {
1167        let column: Arc<dyn PhysicalExpr> = Arc::new(Column::new("geometry", 0));
1168        let has_z = dummy_st_hasz();
1169
1170        let expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
1171            "has_z",
1172            Arc::new(has_z.clone()),
1173            vec![column.clone()],
1174            Arc::new(Field::new("", DataType::Boolean, true)),
1175            Arc::new(ConfigOptions::default()),
1176        ));
1177        let predicate = SpatialFilter::try_from_expr(&expr).unwrap();
1178        assert!(matches!(predicate, SpatialFilter::HasZ(_)));
1179    }
1180
1181    #[test]
1182    fn predicate_from_has_z_errors() {
1183        let literal: Arc<dyn PhysicalExpr> = Arc::new(Literal::new(ScalarValue::Null));
1184        let has_z = dummy_st_hasz();
1185
1186        let expr_no_args: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
1187            "has_z",
1188            Arc::new(has_z.clone()),
1189            vec![],
1190            Arc::new(Field::new("", DataType::Boolean, true)),
1191            Arc::new(ConfigOptions::default()),
1192        ));
1193        assert!(SpatialFilter::try_from_expr(&expr_no_args)
1194            .unwrap_err()
1195            .message()
1196            .contains("unexpected argument count"));
1197
1198        // Wrong arg types
1199        let expr_wrong_types: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
1200            "intersects",
1201            Arc::new(has_z.clone()),
1202            vec![literal.clone()],
1203            Arc::new(Field::new("", DataType::Boolean, true)),
1204            Arc::new(ConfigOptions::default()),
1205        ));
1206        assert!(matches!(
1207            SpatialFilter::try_from_expr(&expr_wrong_types).unwrap(),
1208            SpatialFilter::Unknown
1209        ));
1210    }
1211
1212    #[test]
1213    fn predicate_from_binary() {
1214        let literal_false: Arc<dyn PhysicalExpr> =
1215            Arc::new(Literal::new(ScalarValue::Boolean(Some(false))));
1216        let literal_true: Arc<dyn PhysicalExpr> =
1217            Arc::new(Literal::new(ScalarValue::Boolean(Some(true))));
1218        let binary_and: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1219            literal_false.clone(),
1220            Operator::And,
1221            literal_true.clone(),
1222        ));
1223        let binary_or: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1224            literal_false.clone(),
1225            Operator::Or,
1226            literal_true.clone(),
1227        ));
1228
1229        if let SpatialFilter::And(lhs, rhs) = SpatialFilter::try_from_expr(&binary_and).unwrap() {
1230            assert!(matches!(*lhs, SpatialFilter::LiteralFalse));
1231            assert!(matches!(*rhs, SpatialFilter::Unknown));
1232        } else {
1233            panic!("Parse incorrect!")
1234        }
1235
1236        if let SpatialFilter::Or(lhs, rhs) = SpatialFilter::try_from_expr(&binary_or).unwrap() {
1237            assert!(matches!(*lhs, SpatialFilter::LiteralFalse));
1238            assert!(matches!(*rhs, SpatialFilter::Unknown));
1239        } else {
1240            panic!("Parse incorrect!")
1241        }
1242    }
1243
1244    #[test]
1245    fn table_geo_stats_position() {
1246        let column_stats =
1247            GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0.5, 1.5), (1.5, 2.5))));
1248        let table_stats = TableGeoStatistics::from(column_stats.clone());
1249
1250        assert_eq!(
1251            table_stats.get(&Column::new("col0", 0)).unwrap(),
1252            &column_stats
1253        );
1254        assert!(table_stats.get(&Column::new("col1", 1)).is_err());
1255    }
1256
1257    #[test]
1258    fn table_geo_stats_name() {
1259        let geo_stats =
1260            GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0.5, 1.5), (1.5, 2.5))));
1261        let schema = Schema::new(vec![
1262            Field::new("col0", DataType::Binary, true),
1263            WKB_GEOMETRY.to_storage_field("col1", true).unwrap(),
1264        ]);
1265        let table_stats = TableGeoStatistics::try_from_stats_and_schema(
1266            &[GeoStatistics::UNSPECIFIED, geo_stats.clone()],
1267            &schema,
1268        )
1269        .unwrap();
1270
1271        assert_eq!(
1272            table_stats.get(&Column::new("col0", usize::MAX)).unwrap(),
1273            &GeoStatistics::UNSPECIFIED
1274        );
1275        assert_eq!(
1276            table_stats.get(&Column::new("col1", usize::MAX)).unwrap(),
1277            &geo_stats
1278        );
1279        assert_eq!(
1280            table_stats
1281                .get(&Column::new("col_not_in_schema", usize::MAX))
1282                .unwrap(),
1283            &GeoStatistics::UNSPECIFIED
1284        );
1285    }
1286
1287    #[test]
1288    fn bounding_box() {
1289        let col_zero = Column::new("foofy", 0);
1290        let bbox_02 = BoundingBox::xy((0, 2), (0, 2));
1291        let bbox_13 = BoundingBox::xy((1, 3), (1, 3));
1292
1293        assert_eq!(
1294            SpatialFilter::Intersects(col_zero.clone(), bbox_02.clone()).filter_bbox("foofy"),
1295            bbox_02
1296        );
1297
1298        assert_eq!(
1299            SpatialFilter::Covers(col_zero.clone(), bbox_02.clone()).filter_bbox("foofy"),
1300            bbox_02
1301        );
1302
1303        assert_eq!(
1304            SpatialFilter::LiteralFalse.filter_bbox("foofy"),
1305            BoundingBox::xy(Interval::empty(), Interval::empty())
1306        );
1307        assert_eq!(
1308            SpatialFilter::HasZ(col_zero.clone()).filter_bbox("foofy"),
1309            BoundingBox::xy(Interval::full(), Interval::full())
1310        );
1311        assert_eq!(
1312            SpatialFilter::Unknown.filter_bbox("foofy"),
1313            BoundingBox::xy(Interval::full(), Interval::full())
1314        );
1315
1316        let intersects_02 = SpatialFilter::Intersects(col_zero.clone(), bbox_02.clone());
1317        let intersects_13 = SpatialFilter::Intersects(col_zero.clone(), bbox_13.clone());
1318        assert_eq!(
1319            SpatialFilter::And(
1320                Box::new(intersects_02.clone()),
1321                Box::new(intersects_13.clone())
1322            )
1323            .filter_bbox("foofy"),
1324            BoundingBox::xy((1, 2), (1, 2))
1325        );
1326
1327        assert_eq!(
1328            SpatialFilter::Or(
1329                Box::new(intersects_02.clone()),
1330                Box::new(intersects_13.clone())
1331            )
1332            .filter_bbox("foofy"),
1333            BoundingBox::xy((0, 3), (0, 3))
1334        );
1335    }
1336}