Skip to main content

lance_index/scalar/
expression.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    ops::Bound,
6    sync::{Arc, LazyLock},
7};
8
9use arrow::array::BinaryBuilder;
10use arrow_array::{Array, RecordBatch, UInt32Array};
11use arrow_schema::{DataType, Field, Schema, SchemaRef};
12use async_recursion::async_recursion;
13use async_trait::async_trait;
14use datafusion_common::ScalarValue;
15use datafusion_expr::{
16    expr::{InList, ScalarFunction},
17    Between, BinaryExpr, Expr, Operator, ReturnFieldArgs, ScalarUDF,
18};
19use tokio::try_join;
20
21use super::{
22    AnyQuery, BloomFilterQuery, GeoQuery, LabelListQuery, MetricsCollector, RelationQuery,
23    SargableQuery, ScalarIndex, SearchResult, TextQuery, TokenQuery,
24};
25use lance_core::{
26    utils::mask::{NullableRowAddrMask, RowAddrMask},
27    Error, Result,
28};
29use lance_datafusion::{expr::safe_coerce_scalar, planner::Planner};
30use roaring::RoaringBitmap;
31use snafu::location;
32use tracing::instrument;
33
34const MAX_DEPTH: usize = 500;
35
36/// An indexed expression consists of a scalar index query with a post-scan filter
37///
38/// When a user wants to filter the data returned by a scan we may be able to use
39/// one or more scalar indices to reduce the amount of data we load from the disk.
40///
41/// For example, if a user provides the filter "x = 7", and we have a scalar index
42/// on x, then we can possibly identify the exact row that the user desires with our
43/// index.  A full-table scan can then turn into a take operation fetching the rows
44/// desired.  This would create an IndexedExpression with a scalar_query but no
45/// refine.
46///
47/// If the user asked for "type = 'dog' && z = 3" and we had a scalar index on the
48/// "type" column then we could convert this to an indexed scan for "type='dog'"
49/// followed by an in-memory filter for z=3.  This would create an IndexedExpression
50/// with both a scalar_query AND a refine.
51///
52/// Finally, if the user asked for "z = 3" and we do not have a scalar index on the
53/// "z" column then we must fallback to an IndexedExpression with no scalar_query and
54/// only a refine.
55///
56/// Two IndexedExpressions can be AND'd together.  Each part is AND'd together.
57/// Two IndexedExpressions cannot be OR'd together unless both are scalar_query only
58///   or both are refine only
59/// An IndexedExpression cannot be negated if it has both a refine and a scalar_query
60///
61/// When an operation cannot be performed we fallback to the original expression-only
62/// representation
63#[derive(Debug, PartialEq)]
64pub struct IndexedExpression {
65    /// The portion of the query that can be satisfied by scalar indices
66    pub scalar_query: Option<ScalarIndexExpr>,
67    /// The portion of the query that cannot be satisfied by scalar indices
68    pub refine_expr: Option<Expr>,
69}
70
71pub trait ScalarQueryParser: std::fmt::Debug + Send + Sync {
72    /// Visit a between expression
73    ///
74    /// Returns an IndexedExpression if the index can accelerate between expressions
75    fn visit_between(
76        &self,
77        column: &str,
78        low: &Bound<ScalarValue>,
79        high: &Bound<ScalarValue>,
80    ) -> Option<IndexedExpression>;
81    /// Visit an in list expression
82    ///
83    /// Returns an IndexedExpression if the index can accelerate in list expressions
84    fn visit_in_list(&self, column: &str, in_list: &[ScalarValue]) -> Option<IndexedExpression>;
85    /// Visit an is bool expression
86    ///
87    /// Returns an IndexedExpression if the index can accelerate is bool expressions
88    fn visit_is_bool(&self, column: &str, value: bool) -> Option<IndexedExpression>;
89    /// Visit an is null expression
90    ///
91    /// Returns an IndexedExpression if the index can accelerate is null expressions
92    fn visit_is_null(&self, column: &str) -> Option<IndexedExpression>;
93    /// Visit a comparison expression
94    ///
95    /// Returns an IndexedExpression if the index can accelerate comparison expressions
96    fn visit_comparison(
97        &self,
98        column: &str,
99        value: &ScalarValue,
100        op: &Operator,
101    ) -> Option<IndexedExpression>;
102    /// Visit a scalar function expression
103    ///
104    /// Returns an IndexedExpression if the index can accelerate the given scalar function.
105    /// For example, an ngram index can accelerate the contains function.
106    fn visit_scalar_function(
107        &self,
108        column: &str,
109        data_type: &DataType,
110        func: &ScalarUDF,
111        args: &[Expr],
112    ) -> Option<IndexedExpression>;
113
114    /// Visits a potential reference to a column
115    ///
116    /// This function is a little different from the other visitors.  It is used to test if a potential
117    /// column reference is a reference the index handles.
118    ///
119    /// Most indexes are designed to run on references to the indexed column.  For example, if a query
120    /// is "x = 7" and we have a scalar index on "x" then we apply the index to the "x" column reference.
121    ///
122    /// However, some indexes are designed to run on projections of the indexed column.  For example,
123    /// if a query is "json_extract(json, '$.name') = 'books'" and we have a JSON index on the "json" column
124    /// then we apply the index to the projection of the "json" column.
125    ///
126    /// This function is used to test if a potential column reference is a reference the index handles.
127    /// The default implementation matches column references but this can be overridden by indexes that
128    /// handle projections.
129    ///
130    /// The function is also passed in the data type of the column and should return the data type of the
131    /// reference.  Normally this is the same as the input for a direct column reference and possibly something
132    /// different for a projection.  E.g. a JSON column (LargeBinary) might be projected to a string or float
133    ///
134    /// Note: higher logic in the expression parser already limits references to either Expr::Column or Expr::ScalarFunction
135    /// where the first argument is an Expr::Column.  If your projection doesn't fit that mold then the
136    /// expression parser will need to be modified.
137    fn is_valid_reference(&self, func: &Expr, data_type: &DataType) -> Option<DataType> {
138        match func {
139            Expr::Column(_) => Some(data_type.clone()),
140            _ => None,
141        }
142    }
143}
144
145/// A generic parser that wraps multiple scalar query parsers
146///
147/// It will search each parser in order and return the first non-None result
148#[derive(Debug)]
149pub struct MultiQueryParser {
150    parsers: Vec<Box<dyn ScalarQueryParser>>,
151}
152
153impl MultiQueryParser {
154    /// Create a new MultiQueryParser with a single parser
155    pub fn single(parser: Box<dyn ScalarQueryParser>) -> Self {
156        Self {
157            parsers: vec![parser],
158        }
159    }
160
161    /// Add a new parser to the MultiQueryParser
162    pub fn add(&mut self, other: Box<dyn ScalarQueryParser>) {
163        self.parsers.push(other);
164    }
165}
166
167impl ScalarQueryParser for MultiQueryParser {
168    fn visit_between(
169        &self,
170        column: &str,
171        low: &Bound<ScalarValue>,
172        high: &Bound<ScalarValue>,
173    ) -> Option<IndexedExpression> {
174        self.parsers
175            .iter()
176            .find_map(|parser| parser.visit_between(column, low, high))
177    }
178    fn visit_in_list(&self, column: &str, in_list: &[ScalarValue]) -> Option<IndexedExpression> {
179        self.parsers
180            .iter()
181            .find_map(|parser| parser.visit_in_list(column, in_list))
182    }
183    fn visit_is_bool(&self, column: &str, value: bool) -> Option<IndexedExpression> {
184        self.parsers
185            .iter()
186            .find_map(|parser| parser.visit_is_bool(column, value))
187    }
188    fn visit_is_null(&self, column: &str) -> Option<IndexedExpression> {
189        self.parsers
190            .iter()
191            .find_map(|parser| parser.visit_is_null(column))
192    }
193    fn visit_comparison(
194        &self,
195        column: &str,
196        value: &ScalarValue,
197        op: &Operator,
198    ) -> Option<IndexedExpression> {
199        self.parsers
200            .iter()
201            .find_map(|parser| parser.visit_comparison(column, value, op))
202    }
203    fn visit_scalar_function(
204        &self,
205        column: &str,
206        data_type: &DataType,
207        func: &ScalarUDF,
208        args: &[Expr],
209    ) -> Option<IndexedExpression> {
210        self.parsers
211            .iter()
212            .find_map(|parser| parser.visit_scalar_function(column, data_type, func, args))
213    }
214    /// TODO(low-priority): This is maybe not quite right.  We should filter down the list of parsers based
215    /// on those that consider the reference valid.  Instead what we are doing is checking all parsers if any one
216    /// parser considers the reference valid.
217    ///
218    /// This will be a problem if the user creates two indexes (e.g. btree and json) on the same column and those two
219    /// indexes have different reference schemes.
220    fn is_valid_reference(&self, func: &Expr, data_type: &DataType) -> Option<DataType> {
221        self.parsers
222            .iter()
223            .find_map(|parser| parser.is_valid_reference(func, data_type))
224    }
225}
226
227/// A parser for indices that handle SARGable queries
228#[derive(Debug)]
229pub struct SargableQueryParser {
230    index_name: String,
231    needs_recheck: bool,
232}
233
234impl SargableQueryParser {
235    pub fn new(index_name: String, needs_recheck: bool) -> Self {
236        Self {
237            index_name,
238            needs_recheck,
239        }
240    }
241}
242
243impl ScalarQueryParser for SargableQueryParser {
244    fn is_valid_reference(&self, func: &Expr, data_type: &DataType) -> Option<DataType> {
245        match func {
246            Expr::Column(_) => Some(data_type.clone()),
247            // Also accept get_field expressions for nested field access
248            Expr::ScalarFunction(udf) if udf.name() == "get_field" => Some(data_type.clone()),
249            _ => None,
250        }
251    }
252
253    fn visit_between(
254        &self,
255        column: &str,
256        low: &Bound<ScalarValue>,
257        high: &Bound<ScalarValue>,
258    ) -> Option<IndexedExpression> {
259        if let Bound::Included(val) | Bound::Excluded(val) = low {
260            if val.is_null() {
261                return None;
262            }
263        }
264        if let Bound::Included(val) | Bound::Excluded(val) = high {
265            if val.is_null() {
266                return None;
267            }
268        }
269        let query = SargableQuery::Range(low.clone(), high.clone());
270        Some(IndexedExpression::index_query_with_recheck(
271            column.to_string(),
272            self.index_name.clone(),
273            Arc::new(query),
274            self.needs_recheck,
275        ))
276    }
277
278    fn visit_in_list(&self, column: &str, in_list: &[ScalarValue]) -> Option<IndexedExpression> {
279        if in_list.iter().any(|val| val.is_null()) {
280            return None;
281        }
282        let query = SargableQuery::IsIn(in_list.to_vec());
283        Some(IndexedExpression::index_query_with_recheck(
284            column.to_string(),
285            self.index_name.clone(),
286            Arc::new(query),
287            self.needs_recheck,
288        ))
289    }
290
291    fn visit_is_bool(&self, column: &str, value: bool) -> Option<IndexedExpression> {
292        Some(IndexedExpression::index_query_with_recheck(
293            column.to_string(),
294            self.index_name.clone(),
295            Arc::new(SargableQuery::Equals(ScalarValue::Boolean(Some(value)))),
296            self.needs_recheck,
297        ))
298    }
299
300    fn visit_is_null(&self, column: &str) -> Option<IndexedExpression> {
301        Some(IndexedExpression::index_query_with_recheck(
302            column.to_string(),
303            self.index_name.clone(),
304            Arc::new(SargableQuery::IsNull()),
305            self.needs_recheck,
306        ))
307    }
308
309    fn visit_comparison(
310        &self,
311        column: &str,
312        value: &ScalarValue,
313        op: &Operator,
314    ) -> Option<IndexedExpression> {
315        if value.is_null() {
316            return None;
317        }
318        let query = match op {
319            Operator::Lt => SargableQuery::Range(Bound::Unbounded, Bound::Excluded(value.clone())),
320            Operator::LtEq => {
321                SargableQuery::Range(Bound::Unbounded, Bound::Included(value.clone()))
322            }
323            Operator::Gt => SargableQuery::Range(Bound::Excluded(value.clone()), Bound::Unbounded),
324            Operator::GtEq => {
325                SargableQuery::Range(Bound::Included(value.clone()), Bound::Unbounded)
326            }
327            Operator::Eq => SargableQuery::Equals(value.clone()),
328            // This will be negated by the caller
329            Operator::NotEq => SargableQuery::Equals(value.clone()),
330            _ => unreachable!(),
331        };
332        Some(IndexedExpression::index_query_with_recheck(
333            column.to_string(),
334            self.index_name.clone(),
335            Arc::new(query),
336            self.needs_recheck,
337        ))
338    }
339
340    fn visit_scalar_function(
341        &self,
342        _: &str,
343        _: &DataType,
344        _: &ScalarUDF,
345        _: &[Expr],
346    ) -> Option<IndexedExpression> {
347        None
348    }
349}
350
351/// A parser for bloom filter indices that only support equals, is_null, and is_in operations
352#[derive(Debug)]
353pub struct BloomFilterQueryParser {
354    index_name: String,
355    needs_recheck: bool,
356}
357
358impl BloomFilterQueryParser {
359    pub fn new(index_name: String, needs_recheck: bool) -> Self {
360        Self {
361            index_name,
362            needs_recheck,
363        }
364    }
365}
366
367impl ScalarQueryParser for BloomFilterQueryParser {
368    fn visit_between(
369        &self,
370        _: &str,
371        _: &Bound<ScalarValue>,
372        _: &Bound<ScalarValue>,
373    ) -> Option<IndexedExpression> {
374        // Bloom filters don't support range queries
375        None
376    }
377
378    fn visit_in_list(&self, column: &str, in_list: &[ScalarValue]) -> Option<IndexedExpression> {
379        let query = BloomFilterQuery::IsIn(in_list.to_vec());
380        Some(IndexedExpression::index_query_with_recheck(
381            column.to_string(),
382            self.index_name.clone(),
383            Arc::new(query),
384            self.needs_recheck,
385        ))
386    }
387
388    fn visit_is_bool(&self, column: &str, value: bool) -> Option<IndexedExpression> {
389        Some(IndexedExpression::index_query_with_recheck(
390            column.to_string(),
391            self.index_name.clone(),
392            Arc::new(BloomFilterQuery::Equals(ScalarValue::Boolean(Some(value)))),
393            self.needs_recheck,
394        ))
395    }
396
397    fn visit_is_null(&self, column: &str) -> Option<IndexedExpression> {
398        Some(IndexedExpression::index_query_with_recheck(
399            column.to_string(),
400            self.index_name.clone(),
401            Arc::new(BloomFilterQuery::IsNull()),
402            self.needs_recheck,
403        ))
404    }
405
406    fn visit_comparison(
407        &self,
408        column: &str,
409        value: &ScalarValue,
410        op: &Operator,
411    ) -> Option<IndexedExpression> {
412        let query = match op {
413            // Bloom filters only support equality comparisons
414            Operator::Eq => BloomFilterQuery::Equals(value.clone()),
415            // This will be negated by the caller
416            Operator::NotEq => BloomFilterQuery::Equals(value.clone()),
417            // Bloom filters don't support range operations
418            _ => return None,
419        };
420        Some(IndexedExpression::index_query_with_recheck(
421            column.to_string(),
422            self.index_name.clone(),
423            Arc::new(query),
424            self.needs_recheck,
425        ))
426    }
427
428    fn visit_scalar_function(
429        &self,
430        _: &str,
431        _: &DataType,
432        _: &ScalarUDF,
433        _: &[Expr],
434    ) -> Option<IndexedExpression> {
435        // Bloom filters don't support scalar functions
436        None
437    }
438}
439
440/// A parser for indices that handle label list queries
441#[derive(Debug)]
442pub struct LabelListQueryParser {
443    index_name: String,
444}
445
446impl LabelListQueryParser {
447    pub fn new(index_name: String) -> Self {
448        Self { index_name }
449    }
450}
451
452impl ScalarQueryParser for LabelListQueryParser {
453    fn visit_between(
454        &self,
455        _: &str,
456        _: &Bound<ScalarValue>,
457        _: &Bound<ScalarValue>,
458    ) -> Option<IndexedExpression> {
459        None
460    }
461
462    fn visit_in_list(&self, _: &str, _: &[ScalarValue]) -> Option<IndexedExpression> {
463        None
464    }
465
466    fn visit_is_bool(&self, _: &str, _: bool) -> Option<IndexedExpression> {
467        None
468    }
469
470    fn visit_is_null(&self, _: &str) -> Option<IndexedExpression> {
471        None
472    }
473
474    fn visit_comparison(
475        &self,
476        _: &str,
477        _: &ScalarValue,
478        _: &Operator,
479    ) -> Option<IndexedExpression> {
480        None
481    }
482
483    fn visit_scalar_function(
484        &self,
485        column: &str,
486        data_type: &DataType,
487        func: &ScalarUDF,
488        args: &[Expr],
489    ) -> Option<IndexedExpression> {
490        if args.len() != 2 {
491            return None;
492        }
493        // DataFusion normalizes array_contains to array_has
494        if func.name() == "array_has" {
495            let inner_type = match data_type {
496                DataType::List(field) | DataType::LargeList(field) => field.data_type(),
497                _ => return None,
498            };
499            let scalar = maybe_scalar(&args[1], inner_type)?;
500            // array_has(..., NULL) returns no matches in datafusion, but the index would
501            // match rows containing NULL. Fallback to match datafusion behavior.
502            if scalar.is_null() {
503                return None;
504            }
505            let query = LabelListQuery::HasAnyLabel(vec![scalar]);
506            return Some(IndexedExpression::index_query(
507                column.to_string(),
508                self.index_name.clone(),
509                Arc::new(query),
510            ));
511        }
512
513        let label_list = maybe_scalar(&args[1], data_type)?;
514        if let ScalarValue::List(list_arr) = label_list {
515            let list_values = list_arr.values();
516            let mut scalars = Vec::with_capacity(list_values.len());
517            for idx in 0..list_values.len() {
518                scalars.push(ScalarValue::try_from_array(list_values.as_ref(), idx).ok()?);
519            }
520            if func.name() == "array_has_all" {
521                let query = LabelListQuery::HasAllLabels(scalars);
522                Some(IndexedExpression::index_query(
523                    column.to_string(),
524                    self.index_name.clone(),
525                    Arc::new(query),
526                ))
527            } else if func.name() == "array_has_any" {
528                let query = LabelListQuery::HasAnyLabel(scalars);
529                Some(IndexedExpression::index_query(
530                    column.to_string(),
531                    self.index_name.clone(),
532                    Arc::new(query),
533                ))
534            } else {
535                None
536            }
537        } else {
538            None
539        }
540    }
541}
542
543/// A parser for indices that handle string contains queries
544#[derive(Debug, Clone)]
545pub struct TextQueryParser {
546    index_name: String,
547    needs_recheck: bool,
548}
549
550impl TextQueryParser {
551    pub fn new(index_name: String, needs_recheck: bool) -> Self {
552        Self {
553            index_name,
554            needs_recheck,
555        }
556    }
557}
558
559impl ScalarQueryParser for TextQueryParser {
560    fn visit_between(
561        &self,
562        _: &str,
563        _: &Bound<ScalarValue>,
564        _: &Bound<ScalarValue>,
565    ) -> Option<IndexedExpression> {
566        None
567    }
568
569    fn visit_in_list(&self, _: &str, _: &[ScalarValue]) -> Option<IndexedExpression> {
570        None
571    }
572
573    fn visit_is_bool(&self, _: &str, _: bool) -> Option<IndexedExpression> {
574        None
575    }
576
577    fn visit_is_null(&self, _: &str) -> Option<IndexedExpression> {
578        None
579    }
580
581    fn visit_comparison(
582        &self,
583        _: &str,
584        _: &ScalarValue,
585        _: &Operator,
586    ) -> Option<IndexedExpression> {
587        None
588    }
589
590    fn visit_scalar_function(
591        &self,
592        column: &str,
593        data_type: &DataType,
594        func: &ScalarUDF,
595        args: &[Expr],
596    ) -> Option<IndexedExpression> {
597        if args.len() != 2 {
598            return None;
599        }
600        let scalar = maybe_scalar(&args[1], data_type)?;
601        match scalar {
602            ScalarValue::Utf8(Some(scalar_str)) | ScalarValue::LargeUtf8(Some(scalar_str)) => {
603                if func.name() == "contains" {
604                    let query = TextQuery::StringContains(scalar_str);
605                    Some(IndexedExpression::index_query_with_recheck(
606                        column.to_string(),
607                        self.index_name.clone(),
608                        Arc::new(query),
609                        self.needs_recheck,
610                    ))
611                } else {
612                    None
613                }
614            }
615            _ => {
616                // If the scalar is not a string, we cannot handle it
617                None
618            }
619        }
620    }
621}
622
623/// A parser for indices that handle queries with the contains_tokens function
624#[derive(Debug, Clone)]
625pub struct FtsQueryParser {
626    index_name: String,
627}
628
629impl FtsQueryParser {
630    pub fn new(name: String) -> Self {
631        Self { index_name: name }
632    }
633}
634
635impl ScalarQueryParser for FtsQueryParser {
636    fn visit_between(
637        &self,
638        _: &str,
639        _: &Bound<ScalarValue>,
640        _: &Bound<ScalarValue>,
641    ) -> Option<IndexedExpression> {
642        None
643    }
644
645    fn visit_in_list(&self, _: &str, _: &[ScalarValue]) -> Option<IndexedExpression> {
646        None
647    }
648
649    fn visit_is_bool(&self, _: &str, _: bool) -> Option<IndexedExpression> {
650        None
651    }
652
653    fn visit_is_null(&self, _: &str) -> Option<IndexedExpression> {
654        None
655    }
656
657    fn visit_comparison(
658        &self,
659        _: &str,
660        _: &ScalarValue,
661        _: &Operator,
662    ) -> Option<IndexedExpression> {
663        None
664    }
665
666    fn visit_scalar_function(
667        &self,
668        column: &str,
669        data_type: &DataType,
670        func: &ScalarUDF,
671        args: &[Expr],
672    ) -> Option<IndexedExpression> {
673        if args.len() != 2 {
674            return None;
675        }
676        let scalar = maybe_scalar(&args[1], data_type)?;
677        if let ScalarValue::Utf8(Some(scalar_str)) = scalar {
678            if func.name() == "contains_tokens" {
679                let query = TokenQuery::TokensContains(scalar_str);
680                return Some(IndexedExpression::index_query(
681                    column.to_string(),
682                    self.index_name.clone(),
683                    Arc::new(query),
684                ));
685            }
686        }
687        None
688    }
689}
690
691/// A parser for geo indices that handles spatial queries
692#[derive(Debug, Clone)]
693pub struct GeoQueryParser {
694    index_name: String,
695}
696
697impl GeoQueryParser {
698    pub fn new(index_name: String) -> Self {
699        Self { index_name }
700    }
701}
702
703impl ScalarQueryParser for GeoQueryParser {
704    fn visit_between(
705        &self,
706        _: &str,
707        _: &Bound<ScalarValue>,
708        _: &Bound<ScalarValue>,
709    ) -> Option<IndexedExpression> {
710        None
711    }
712
713    fn visit_in_list(&self, _: &str, _: &[ScalarValue]) -> Option<IndexedExpression> {
714        None
715    }
716
717    fn visit_is_bool(&self, _: &str, _: bool) -> Option<IndexedExpression> {
718        None
719    }
720
721    fn visit_is_null(&self, column: &str) -> Option<IndexedExpression> {
722        Some(IndexedExpression::index_query_with_recheck(
723            column.to_string(),
724            self.index_name.clone(),
725            Arc::new(GeoQuery::IsNull),
726            true,
727        ))
728    }
729
730    fn visit_comparison(
731        &self,
732        _: &str,
733        _: &ScalarValue,
734        _: &Operator,
735    ) -> Option<IndexedExpression> {
736        None
737    }
738
739    fn visit_scalar_function(
740        &self,
741        column: &str,
742        _data_type: &DataType,
743        func: &ScalarUDF,
744        args: &[Expr],
745    ) -> Option<IndexedExpression> {
746        if (func.name() == "st_intersects"
747            || func.name() == "st_contains"
748            || func.name() == "st_within"
749            || func.name() == "st_touches"
750            || func.name() == "st_crosses"
751            || func.name() == "st_overlaps"
752            || func.name() == "st_covers"
753            || func.name() == "st_coveredby")
754            && args.len() == 2
755        {
756            let left_arg = &args[0];
757            let right_arg = &args[1];
758            return match (left_arg, right_arg) {
759                (Expr::Literal(left_value, metadata), Expr::Column(_)) => {
760                    let mut field = Field::new("_geo", left_value.data_type(), false);
761                    if let Some(metadata) = metadata {
762                        field = field.with_metadata(metadata.to_hashmap());
763                    }
764                    let query = GeoQuery::IntersectQuery(RelationQuery {
765                        value: left_value.clone(),
766                        field,
767                    });
768                    Some(IndexedExpression::index_query_with_recheck(
769                        column.to_string(),
770                        self.index_name.clone(),
771                        Arc::new(query),
772                        true,
773                    ))
774                }
775                (Expr::Column(_), Expr::Literal(right_value, metadata)) => {
776                    let mut field = Field::new("_geo", right_value.data_type(), false);
777                    if let Some(metadata) = metadata {
778                        field = field.with_metadata(metadata.to_hashmap());
779                    }
780                    let query = GeoQuery::IntersectQuery(RelationQuery {
781                        value: right_value.clone(),
782                        field,
783                    });
784                    Some(IndexedExpression::index_query_with_recheck(
785                        column.to_string(),
786                        self.index_name.clone(),
787                        Arc::new(query),
788                        true,
789                    ))
790                }
791                _ => None,
792            };
793        }
794        None
795    }
796}
797
798impl IndexedExpression {
799    /// Create an expression that only does refine
800    fn refine_only(refine_expr: Expr) -> Self {
801        Self {
802            scalar_query: None,
803            refine_expr: Some(refine_expr),
804        }
805    }
806
807    /// Create an expression that is only an index query
808    fn index_query(column: String, index_name: String, query: Arc<dyn AnyQuery>) -> Self {
809        Self {
810            scalar_query: Some(ScalarIndexExpr::Query(ScalarIndexSearch {
811                column,
812                index_name,
813                query,
814                needs_recheck: false, // Default to false, will be set by parser
815            })),
816            refine_expr: None,
817        }
818    }
819
820    /// Create an expression that is only an index query with explicit needs_recheck
821    fn index_query_with_recheck(
822        column: String,
823        index_name: String,
824        query: Arc<dyn AnyQuery>,
825        needs_recheck: bool,
826    ) -> Self {
827        Self {
828            scalar_query: Some(ScalarIndexExpr::Query(ScalarIndexSearch {
829                column,
830                index_name,
831                query,
832                needs_recheck,
833            })),
834            refine_expr: None,
835        }
836    }
837
838    /// Try and negate the expression
839    ///
840    /// If the expression contains both an index query and a refine expression then it
841    /// cannot be negated today and None will be returned (we give up trying to use indices)
842    fn maybe_not(self) -> Option<Self> {
843        match (self.scalar_query, self.refine_expr) {
844            (Some(_), Some(_)) => None,
845            (Some(scalar_query), None) => {
846                if scalar_query.needs_recheck() {
847                    return None;
848                }
849                Some(Self {
850                    scalar_query: Some(ScalarIndexExpr::Not(Box::new(scalar_query))),
851                    refine_expr: None,
852                })
853            }
854            (None, Some(refine_expr)) => Some(Self {
855                scalar_query: None,
856                refine_expr: Some(Expr::Not(Box::new(refine_expr))),
857            }),
858            (None, None) => panic!("Empty node should not occur"),
859        }
860    }
861
862    /// Perform a logical AND of two indexed expressions
863    ///
864    /// This is straightforward because we can just AND the individual parts
865    /// because (A && B) && (C && D) == (A && C) && (B && D)
866    fn and(self, other: Self) -> Self {
867        let scalar_query = match (self.scalar_query, other.scalar_query) {
868            (Some(scalar_query), Some(other_scalar_query)) => Some(ScalarIndexExpr::And(
869                Box::new(scalar_query),
870                Box::new(other_scalar_query),
871            )),
872            (Some(scalar_query), None) => Some(scalar_query),
873            (None, Some(scalar_query)) => Some(scalar_query),
874            (None, None) => None,
875        };
876        let refine_expr = match (self.refine_expr, other.refine_expr) {
877            (Some(refine_expr), Some(other_refine_expr)) => {
878                Some(refine_expr.and(other_refine_expr))
879            }
880            (Some(refine_expr), None) => Some(refine_expr),
881            (None, Some(refine_expr)) => Some(refine_expr),
882            (None, None) => None,
883        };
884        Self {
885            scalar_query,
886            refine_expr,
887        }
888    }
889
890    /// Try and perform a logical OR of two indexed expressions
891    ///
892    /// This is a bit tricky because something like:
893    ///   (color == 'blue' AND size < 20) OR (color == 'green' AND size < 50)
894    /// is not equivalent to:
895    ///   (color == 'blue' OR color == 'green') AND (size < 20 OR size < 50)
896    fn maybe_or(self, other: Self) -> Option<Self> {
897        // If either expression is missing a scalar_query then we need to load all rows from
898        // the database and so we short-circuit and return None
899        let scalar_query = self.scalar_query?;
900        let other_scalar_query = other.scalar_query?;
901        let scalar_query = Some(ScalarIndexExpr::Or(
902            Box::new(scalar_query),
903            Box::new(other_scalar_query),
904        ));
905
906        let refine_expr = match (self.refine_expr, other.refine_expr) {
907            // TODO
908            //
909            // To handle these cases we need a way of going back from a scalar expression query to a logical DF expression (perhaps
910            // we can store the expression that led to the creation of the query)
911            //
912            // For example, imagine we have something like "(color == 'blue' AND size < 20) OR (color == 'green' AND size < 50)"
913            //
914            // We can do an indexed load of all rows matching "color == 'blue' OR color == 'green'" but then we need to
915            // refine that load with the full original expression which, at the moment, we no longer have.
916            (Some(_), Some(_)) => {
917                return None;
918            }
919            (Some(_), None) => {
920                return None;
921            }
922            (None, Some(_)) => {
923                return None;
924            }
925            (None, None) => None,
926        };
927        Some(Self {
928            scalar_query,
929            refine_expr,
930        })
931    }
932
933    fn refine(self, expr: Expr) -> Self {
934        match self.refine_expr {
935            Some(refine_expr) => Self {
936                scalar_query: self.scalar_query,
937                refine_expr: Some(refine_expr.and(expr)),
938            },
939            None => Self {
940                scalar_query: self.scalar_query,
941                refine_expr: Some(expr),
942            },
943        }
944    }
945}
946
947/// A trait implemented by anything that can load indices by name
948///
949/// This is used during the evaluation of an index expression
950#[async_trait]
951pub trait ScalarIndexLoader: Send + Sync {
952    /// Load the index with the given name
953    async fn load_index(
954        &self,
955        column: &str,
956        index_name: &str,
957        metrics: &dyn MetricsCollector,
958    ) -> Result<Arc<dyn ScalarIndex>>;
959}
960
961/// This represents a search into a scalar index
962#[derive(Debug, Clone)]
963pub struct ScalarIndexSearch {
964    /// The column to search (redundant, used for debugging messages)
965    pub column: String,
966    /// The name of the index to search
967    pub index_name: String,
968    /// The query to search for
969    pub query: Arc<dyn AnyQuery>,
970    /// If true, the query results are inexact and will need a recheck
971    pub needs_recheck: bool,
972}
973
974impl PartialEq for ScalarIndexSearch {
975    fn eq(&self, other: &Self) -> bool {
976        self.column == other.column
977            && self.index_name == other.index_name
978            && self.query.as_ref().eq(other.query.as_ref())
979    }
980}
981
982/// This represents a lookup into one or more scalar indices
983///
984/// This is a tree of operations because we may need to logically combine or
985/// modify the results of scalar lookups
986#[derive(Debug, Clone)]
987pub enum ScalarIndexExpr {
988    Not(Box<Self>),
989    And(Box<Self>, Box<Self>),
990    Or(Box<Self>, Box<Self>),
991    Query(ScalarIndexSearch),
992}
993
994impl PartialEq for ScalarIndexExpr {
995    fn eq(&self, other: &Self) -> bool {
996        match (self, other) {
997            (Self::Not(l0), Self::Not(r0)) => l0 == r0,
998            (Self::And(l0, l1), Self::And(r0, r1)) => l0 == r0 && l1 == r1,
999            (Self::Or(l0, l1), Self::Or(r0, r1)) => l0 == r0 && l1 == r1,
1000            (Self::Query(l_search), Self::Query(r_search)) => l_search == r_search,
1001            _ => false,
1002        }
1003    }
1004}
1005
1006impl std::fmt::Display for ScalarIndexExpr {
1007    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1008        match self {
1009            Self::Not(inner) => write!(f, "NOT({})", inner),
1010            Self::And(lhs, rhs) => write!(f, "AND({},{})", lhs, rhs),
1011            Self::Or(lhs, rhs) => write!(f, "OR({},{})", lhs, rhs),
1012            Self::Query(search) => write!(
1013                f,
1014                "[{}]@{}",
1015                search.query.format(&search.column),
1016                search.index_name
1017            ),
1018        }
1019    }
1020}
1021
1022/// When we evaluate a scalar index query we return a batch with three columns and two rows
1023///
1024/// The first column has the block list and allow list
1025/// The second column tells if the result is least/exact/more (we repeat the discriminant twice)
1026/// The third column has the fragments covered bitmap in the first row and null in the second row
1027pub static INDEX_EXPR_RESULT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
1028    Arc::new(Schema::new(vec![
1029        Field::new("result".to_string(), DataType::Binary, true),
1030        Field::new("discriminant".to_string(), DataType::UInt32, true),
1031        Field::new("fragments_covered".to_string(), DataType::Binary, true),
1032    ]))
1033});
1034
1035#[derive(Debug)]
1036enum NullableIndexExprResult {
1037    Exact(NullableRowAddrMask),
1038    AtMost(NullableRowAddrMask),
1039    AtLeast(NullableRowAddrMask),
1040}
1041
1042impl From<SearchResult> for NullableIndexExprResult {
1043    fn from(result: SearchResult) -> Self {
1044        match result {
1045            SearchResult::Exact(mask) => Self::Exact(NullableRowAddrMask::AllowList(mask)),
1046            SearchResult::AtMost(mask) => Self::AtMost(NullableRowAddrMask::AllowList(mask)),
1047            SearchResult::AtLeast(mask) => Self::AtLeast(NullableRowAddrMask::AllowList(mask)),
1048        }
1049    }
1050}
1051
1052impl std::ops::BitAnd<Self> for NullableIndexExprResult {
1053    type Output = Self;
1054
1055    fn bitand(self, rhs: Self) -> Self {
1056        match (self, rhs) {
1057            (Self::Exact(lhs), Self::Exact(rhs)) => Self::Exact(lhs & rhs),
1058            (Self::Exact(lhs), Self::AtMost(rhs)) | (Self::AtMost(lhs), Self::Exact(rhs)) => {
1059                Self::AtMost(lhs & rhs)
1060            }
1061            (Self::Exact(exact), Self::AtLeast(_)) | (Self::AtLeast(_), Self::Exact(exact)) => {
1062                // We could do better here, elements in both lhs and rhs are known
1063                // to be true and don't require a recheck.  We only need to recheck
1064                // elements in lhs that are not in rhs
1065                Self::AtMost(exact)
1066            }
1067            (Self::AtMost(lhs), Self::AtMost(rhs)) => Self::AtMost(lhs & rhs),
1068            (Self::AtLeast(lhs), Self::AtLeast(rhs)) => Self::AtLeast(lhs & rhs),
1069            (Self::AtMost(most), Self::AtLeast(_)) | (Self::AtLeast(_), Self::AtMost(most)) => {
1070                Self::AtMost(most)
1071            }
1072        }
1073    }
1074}
1075
1076impl std::ops::BitOr<Self> for NullableIndexExprResult {
1077    type Output = Self;
1078
1079    fn bitor(self, rhs: Self) -> Self {
1080        match (self, rhs) {
1081            (Self::Exact(lhs), Self::Exact(rhs)) => Self::Exact(lhs | rhs),
1082            (Self::Exact(lhs), Self::AtMost(rhs)) | (Self::AtMost(rhs), Self::Exact(lhs)) => {
1083                // We could do better here, elements in lhs are known to be true
1084                // and don't require a recheck.  We only need to recheck elements
1085                // in rhs that are not in lhs
1086                Self::AtMost(lhs | rhs)
1087            }
1088            (Self::Exact(lhs), Self::AtLeast(rhs)) | (Self::AtLeast(rhs), Self::Exact(lhs)) => {
1089                Self::AtLeast(lhs | rhs)
1090            }
1091            (Self::AtMost(lhs), Self::AtMost(rhs)) => Self::AtMost(lhs | rhs),
1092            (Self::AtLeast(lhs), Self::AtLeast(rhs)) => Self::AtLeast(lhs | rhs),
1093            (Self::AtMost(_), Self::AtLeast(least)) | (Self::AtLeast(least), Self::AtMost(_)) => {
1094                Self::AtLeast(least)
1095            }
1096        }
1097    }
1098}
1099
1100impl NullableIndexExprResult {
1101    pub fn drop_nulls(self) -> IndexExprResult {
1102        match self {
1103            Self::Exact(mask) => IndexExprResult::Exact(mask.drop_nulls()),
1104            Self::AtMost(mask) => IndexExprResult::AtMost(mask.drop_nulls()),
1105            Self::AtLeast(mask) => IndexExprResult::AtLeast(mask.drop_nulls()),
1106        }
1107    }
1108}
1109
1110#[derive(Debug)]
1111pub enum IndexExprResult {
1112    // The answer is exactly the rows in the allow list minus the rows in the block list
1113    Exact(RowAddrMask),
1114    // The answer is at most the rows in the allow list minus the rows in the block list
1115    // Some of the rows in the allow list may not be in the result and will need to be filtered
1116    // by a recheck.  Every row in the block list is definitely not in the result.
1117    AtMost(RowAddrMask),
1118    // The answer is at least the rows in the allow list minus the rows in the block list
1119    // Some of the rows in the block list might be in the result.  Every row in the allow list is
1120    // definitely in the result.
1121    AtLeast(RowAddrMask),
1122}
1123
1124impl IndexExprResult {
1125    pub fn row_addr_mask(&self) -> &RowAddrMask {
1126        match self {
1127            Self::Exact(mask) => mask,
1128            Self::AtMost(mask) => mask,
1129            Self::AtLeast(mask) => mask,
1130        }
1131    }
1132
1133    pub fn discriminant(&self) -> u32 {
1134        match self {
1135            Self::Exact(_) => 0,
1136            Self::AtMost(_) => 1,
1137            Self::AtLeast(_) => 2,
1138        }
1139    }
1140
1141    pub fn from_parts(mask: RowAddrMask, discriminant: u32) -> Result<Self> {
1142        match discriminant {
1143            0 => Ok(Self::Exact(mask)),
1144            1 => Ok(Self::AtMost(mask)),
1145            2 => Ok(Self::AtLeast(mask)),
1146            _ => Err(Error::InvalidInput {
1147                source: format!("Invalid IndexExprResult discriminant: {}", discriminant).into(),
1148                location: location!(),
1149            }),
1150        }
1151    }
1152
1153    #[instrument(skip_all)]
1154    pub fn serialize_to_arrow(
1155        &self,
1156        fragments_covered_by_result: &RoaringBitmap,
1157    ) -> Result<RecordBatch> {
1158        let row_addr_mask = self.row_addr_mask();
1159        let row_addr_mask_arr = row_addr_mask.into_arrow()?;
1160        let discriminant = self.discriminant();
1161        let discriminant_arr =
1162            Arc::new(UInt32Array::from(vec![discriminant, discriminant])) as Arc<dyn Array>;
1163        let mut fragments_covered_builder = BinaryBuilder::new();
1164        let fragments_covered_bytes_len = fragments_covered_by_result.serialized_size();
1165        let mut fragments_covered_bytes = Vec::with_capacity(fragments_covered_bytes_len);
1166        fragments_covered_by_result.serialize_into(&mut fragments_covered_bytes)?;
1167        fragments_covered_builder.append_value(fragments_covered_bytes);
1168        fragments_covered_builder.append_null();
1169        let fragments_covered_arr = Arc::new(fragments_covered_builder.finish()) as Arc<dyn Array>;
1170        Ok(RecordBatch::try_new(
1171            INDEX_EXPR_RESULT_SCHEMA.clone(),
1172            vec![
1173                Arc::new(row_addr_mask_arr),
1174                Arc::new(discriminant_arr),
1175                Arc::new(fragments_covered_arr),
1176            ],
1177        )?)
1178    }
1179}
1180
1181impl ScalarIndexExpr {
1182    /// Evaluates the scalar index expression
1183    ///
1184    /// This will result in loading one or more scalar indices and searching them
1185    ///
1186    /// TODO: We could potentially try and be smarter about reusing loaded indices for
1187    /// any situations where the session cache has been disabled.
1188    #[async_recursion]
1189    async fn evaluate_impl(
1190        &self,
1191        index_loader: &dyn ScalarIndexLoader,
1192        metrics: &dyn MetricsCollector,
1193    ) -> Result<NullableIndexExprResult> {
1194        match self {
1195            Self::Not(inner) => {
1196                let result = inner.evaluate_impl(index_loader, metrics).await?;
1197                // Flip certainty: NOT(AtMost) → AtLeast, NOT(AtLeast) → AtMost
1198                Ok(match result {
1199                    NullableIndexExprResult::Exact(mask) => NullableIndexExprResult::Exact(!mask),
1200                    NullableIndexExprResult::AtMost(mask) => {
1201                        NullableIndexExprResult::AtLeast(!mask)
1202                    }
1203                    NullableIndexExprResult::AtLeast(mask) => {
1204                        NullableIndexExprResult::AtMost(!mask)
1205                    }
1206                })
1207            }
1208            Self::And(lhs, rhs) => {
1209                let lhs_result = lhs.evaluate_impl(index_loader, metrics);
1210                let rhs_result = rhs.evaluate_impl(index_loader, metrics);
1211                let (lhs_result, rhs_result) = try_join!(lhs_result, rhs_result)?;
1212                Ok(lhs_result & rhs_result)
1213            }
1214            Self::Or(lhs, rhs) => {
1215                let lhs_result = lhs.evaluate_impl(index_loader, metrics);
1216                let rhs_result = rhs.evaluate_impl(index_loader, metrics);
1217                let (lhs_result, rhs_result) = try_join!(lhs_result, rhs_result)?;
1218                Ok(lhs_result | rhs_result)
1219            }
1220            Self::Query(search) => {
1221                let index = index_loader
1222                    .load_index(&search.column, &search.index_name, metrics)
1223                    .await?;
1224                let search_result = index.search(search.query.as_ref(), metrics).await?;
1225                Ok(search_result.into())
1226            }
1227        }
1228    }
1229
1230    #[instrument(level = "debug", skip_all)]
1231    pub async fn evaluate(
1232        &self,
1233        index_loader: &dyn ScalarIndexLoader,
1234        metrics: &dyn MetricsCollector,
1235    ) -> Result<IndexExprResult> {
1236        Ok(self
1237            .evaluate_impl(index_loader, metrics)
1238            .await?
1239            .drop_nulls())
1240    }
1241
1242    pub fn to_expr(&self) -> Expr {
1243        match self {
1244            Self::Not(inner) => Expr::Not(inner.to_expr().into()),
1245            Self::And(lhs, rhs) => {
1246                let lhs = lhs.to_expr();
1247                let rhs = rhs.to_expr();
1248                lhs.and(rhs)
1249            }
1250            Self::Or(lhs, rhs) => {
1251                let lhs = lhs.to_expr();
1252                let rhs = rhs.to_expr();
1253                lhs.or(rhs)
1254            }
1255            Self::Query(search) => search.query.to_expr(search.column.clone()),
1256        }
1257    }
1258
1259    pub fn needs_recheck(&self) -> bool {
1260        match self {
1261            Self::Not(inner) => inner.needs_recheck(),
1262            Self::And(lhs, rhs) | Self::Or(lhs, rhs) => lhs.needs_recheck() || rhs.needs_recheck(),
1263            Self::Query(search) => search.needs_recheck,
1264        }
1265    }
1266}
1267
1268// Extract a column from the expression, if it is a column, or None
1269fn maybe_column(expr: &Expr) -> Option<&str> {
1270    match expr {
1271        Expr::Column(col) => Some(&col.name),
1272        _ => None,
1273    }
1274}
1275
1276// Extract the full nested column path from a get_field expression chain
1277// For example: get_field(get_field(metadata, "status"), "code") -> "metadata.status.code"
1278fn extract_nested_column_path(expr: &Expr) -> Option<String> {
1279    let mut current_expr = expr;
1280    let mut parts = Vec::new();
1281
1282    // Walk up the get_field chain
1283    loop {
1284        match current_expr {
1285            Expr::ScalarFunction(udf) if udf.name() == "get_field" => {
1286                if udf.args.len() != 2 {
1287                    return None;
1288                }
1289                // Extract the field name from the second argument
1290                // The Literal now has two fields: ScalarValue and Option<FieldMetadata>
1291                if let Expr::Literal(ScalarValue::Utf8(Some(field_name)), _) = &udf.args[1] {
1292                    parts.push(field_name.clone());
1293                } else {
1294                    return None;
1295                }
1296                // Move up to the parent expression
1297                current_expr = &udf.args[0];
1298            }
1299            Expr::Column(col) => {
1300                // We've reached the base column
1301                parts.push(col.name.clone());
1302                break;
1303            }
1304            _ => {
1305                return None;
1306            }
1307        }
1308    }
1309
1310    // Reverse to get the correct order (parent.child.grandchild)
1311    parts.reverse();
1312
1313    // Format the path correctly
1314    let field_refs: Vec<&str> = parts.iter().map(|s| s.as_str()).collect();
1315    Some(lance_core::datatypes::format_field_path(&field_refs))
1316}
1317
1318// Extract a column from the expression, if it is a column, and we have an index for that column, or None
1319//
1320// There's two ways to get a column.  First, the obvious way, is a
1321// simple column reference (e.g. x = 7).  Second, a more complex way,
1322// is some kind of projection into a column (e.g. json_extract(json, '$.name')).
1323// Third way is nested field access (e.g. get_field(metadata, "status.code"))
1324fn maybe_indexed_column<'b>(
1325    expr: &Expr,
1326    index_info: &'b dyn IndexInformationProvider,
1327) -> Option<(String, DataType, &'b dyn ScalarQueryParser)> {
1328    // First try to extract the full nested column path for get_field expressions
1329    if let Some(nested_path) = extract_nested_column_path(expr) {
1330        if let Some((data_type, parser)) = index_info.get_index(&nested_path) {
1331            if let Some(data_type) = parser.is_valid_reference(expr, data_type) {
1332                return Some((nested_path, data_type, parser));
1333            }
1334        }
1335    }
1336
1337    match expr {
1338        Expr::Column(col) => {
1339            let col = col.name.as_str();
1340            let (data_type, parser) = index_info.get_index(col)?;
1341            if let Some(data_type) = parser.is_valid_reference(expr, data_type) {
1342                Some((col.to_string(), data_type, parser))
1343            } else {
1344                None
1345            }
1346        }
1347        Expr::ScalarFunction(udf) => {
1348            if udf.args.is_empty() {
1349                return None;
1350            }
1351            // For non-get_field functions, fall back to old behavior
1352            let col = maybe_column(&udf.args[0])?;
1353            let (data_type, parser) = index_info.get_index(col)?;
1354            if let Some(data_type) = parser.is_valid_reference(expr, data_type) {
1355                Some((col.to_string(), data_type, parser))
1356            } else {
1357                None
1358            }
1359        }
1360        _ => None,
1361    }
1362}
1363
1364// Extract a literal scalar value from an expression, if it is a literal, or None
1365fn maybe_scalar(expr: &Expr, expected_type: &DataType) -> Option<ScalarValue> {
1366    match expr {
1367        Expr::Literal(value, _) => safe_coerce_scalar(value, expected_type),
1368        // Some literals can't be expressed in datafusion's SQL and can only be expressed with
1369        // a cast.  For example, there is no way to express a fixed-size-binary literal (which is
1370        // commonly used for UUID).  As a result the expression could look like...
1371        //
1372        // col = arrow_cast(value, 'fixed_size_binary(16)')
1373        //
1374        // In this case we need to extract the value, apply the cast, and then test the casted value
1375        Expr::Cast(cast) => match cast.expr.as_ref() {
1376            Expr::Literal(value, _) => {
1377                let casted = value.cast_to(&cast.data_type).ok()?;
1378                safe_coerce_scalar(&casted, expected_type)
1379            }
1380            _ => None,
1381        },
1382        Expr::ScalarFunction(scalar_function) => {
1383            if scalar_function.name() == "arrow_cast" {
1384                if scalar_function.args.len() != 2 {
1385                    return None;
1386                }
1387                match (&scalar_function.args[0], &scalar_function.args[1]) {
1388                    (Expr::Literal(value, _), Expr::Literal(cast_type, _)) => {
1389                        let target_type = scalar_function
1390                            .func
1391                            .return_field_from_args(ReturnFieldArgs {
1392                                arg_fields: &[
1393                                    Arc::new(Field::new("expression", value.data_type(), false)),
1394                                    Arc::new(Field::new("datatype", cast_type.data_type(), false)),
1395                                ],
1396                                scalar_arguments: &[Some(value), Some(cast_type)],
1397                            })
1398                            .ok()?;
1399                        let casted = value.cast_to(target_type.data_type()).ok()?;
1400                        safe_coerce_scalar(&casted, expected_type)
1401                    }
1402                    _ => None,
1403                }
1404            } else {
1405                None
1406            }
1407        }
1408        _ => None,
1409    }
1410}
1411
1412// Extract a list of scalar values from an expression, if it is a list of scalar values, or None
1413fn maybe_scalar_list(exprs: &Vec<Expr>, expected_type: &DataType) -> Option<Vec<ScalarValue>> {
1414    let mut scalar_values = Vec::with_capacity(exprs.len());
1415    for expr in exprs {
1416        match maybe_scalar(expr, expected_type) {
1417            Some(scalar_val) => {
1418                scalar_values.push(scalar_val);
1419            }
1420            None => {
1421                return None;
1422            }
1423        }
1424    }
1425    Some(scalar_values)
1426}
1427
1428fn visit_between(
1429    between: &Between,
1430    index_info: &dyn IndexInformationProvider,
1431) -> Option<IndexedExpression> {
1432    let (column, col_type, query_parser) = maybe_indexed_column(&between.expr, index_info)?;
1433    let low = maybe_scalar(&between.low, &col_type)?;
1434    let high = maybe_scalar(&between.high, &col_type)?;
1435
1436    let indexed_expr =
1437        query_parser.visit_between(&column, &Bound::Included(low), &Bound::Included(high))?;
1438
1439    if between.negated {
1440        indexed_expr.maybe_not()
1441    } else {
1442        Some(indexed_expr)
1443    }
1444}
1445
1446fn visit_in_list(
1447    in_list: &InList,
1448    index_info: &dyn IndexInformationProvider,
1449) -> Option<IndexedExpression> {
1450    let (column, col_type, query_parser) = maybe_indexed_column(&in_list.expr, index_info)?;
1451    let values = maybe_scalar_list(&in_list.list, &col_type)?;
1452
1453    let indexed_expr = query_parser.visit_in_list(&column, &values)?;
1454
1455    if in_list.negated {
1456        indexed_expr.maybe_not()
1457    } else {
1458        Some(indexed_expr)
1459    }
1460}
1461
1462fn visit_is_bool(
1463    expr: &Expr,
1464    index_info: &dyn IndexInformationProvider,
1465    value: bool,
1466) -> Option<IndexedExpression> {
1467    let (column, col_type, query_parser) = maybe_indexed_column(expr, index_info)?;
1468    if col_type != DataType::Boolean {
1469        None
1470    } else {
1471        query_parser.visit_is_bool(&column, value)
1472    }
1473}
1474
1475// A column can be a valid indexed expression if the column is boolean (e.g. 'WHERE on_sale')
1476fn visit_column(
1477    col: &Expr,
1478    index_info: &dyn IndexInformationProvider,
1479) -> Option<IndexedExpression> {
1480    let (column, col_type, query_parser) = maybe_indexed_column(col, index_info)?;
1481    if col_type != DataType::Boolean {
1482        None
1483    } else {
1484        query_parser.visit_is_bool(&column, true)
1485    }
1486}
1487
1488fn visit_is_null(
1489    expr: &Expr,
1490    index_info: &dyn IndexInformationProvider,
1491    negated: bool,
1492) -> Option<IndexedExpression> {
1493    let (column, _, query_parser) = maybe_indexed_column(expr, index_info)?;
1494    let indexed_expr = query_parser.visit_is_null(&column)?;
1495    if negated {
1496        indexed_expr.maybe_not()
1497    } else {
1498        Some(indexed_expr)
1499    }
1500}
1501
1502fn visit_not(
1503    expr: &Expr,
1504    index_info: &dyn IndexInformationProvider,
1505    depth: usize,
1506) -> Result<Option<IndexedExpression>> {
1507    let node = visit_node(expr, index_info, depth + 1)?;
1508    Ok(node.and_then(|node| node.maybe_not()))
1509}
1510
1511fn visit_comparison(
1512    expr: &BinaryExpr,
1513    index_info: &dyn IndexInformationProvider,
1514) -> Option<IndexedExpression> {
1515    let left_col = maybe_indexed_column(&expr.left, index_info);
1516    if let Some((column, col_type, query_parser)) = left_col {
1517        let scalar = maybe_scalar(&expr.right, &col_type)?;
1518        query_parser.visit_comparison(&column, &scalar, &expr.op)
1519    } else {
1520        // Datafusion's query simplifier will canonicalize expressions and so we shouldn't reach this case.  If, for some reason, we
1521        // do reach this case we can handle it in the future by inverting expr.op and swapping the left and right sides
1522        None
1523    }
1524}
1525
1526fn maybe_range(
1527    expr: &BinaryExpr,
1528    index_info: &dyn IndexInformationProvider,
1529) -> Option<IndexedExpression> {
1530    let left_expr = match expr.left.as_ref() {
1531        Expr::BinaryExpr(binary_expr) => Some(binary_expr),
1532        _ => None,
1533    }?;
1534    let right_expr = match expr.right.as_ref() {
1535        Expr::BinaryExpr(binary_expr) => Some(binary_expr),
1536        _ => None,
1537    }?;
1538
1539    let (left_col, dt, parser) = maybe_indexed_column(&left_expr.left, index_info)?;
1540    let right_col = maybe_column(&right_expr.left)?;
1541
1542    if left_col != right_col {
1543        return None;
1544    }
1545
1546    let left_value = maybe_scalar(&left_expr.right, &dt)?;
1547    let right_value = maybe_scalar(&right_expr.right, &dt)?;
1548
1549    let (low, high) = match (left_expr.op, right_expr.op) {
1550        // x >= a && x <= b
1551        (Operator::GtEq, Operator::LtEq) => {
1552            (Bound::Included(left_value), Bound::Included(right_value))
1553        }
1554        // x >= a && x < b
1555        (Operator::GtEq, Operator::Lt) => {
1556            (Bound::Included(left_value), Bound::Excluded(right_value))
1557        }
1558        // x > a && x <= b
1559        (Operator::Gt, Operator::LtEq) => {
1560            (Bound::Excluded(left_value), Bound::Included(right_value))
1561        }
1562        // x > a && x < b
1563        (Operator::Gt, Operator::Lt) => (Bound::Excluded(left_value), Bound::Excluded(right_value)),
1564        // x <= a && x >= b
1565        (Operator::LtEq, Operator::GtEq) => {
1566            (Bound::Included(right_value), Bound::Included(left_value))
1567        }
1568        // x <= a && x > b
1569        (Operator::LtEq, Operator::Gt) => {
1570            (Bound::Included(right_value), Bound::Excluded(left_value))
1571        }
1572        // x < a && x >= b
1573        (Operator::Lt, Operator::GtEq) => {
1574            (Bound::Excluded(right_value), Bound::Included(left_value))
1575        }
1576        // x < a && x > b
1577        (Operator::Lt, Operator::Gt) => (Bound::Excluded(right_value), Bound::Excluded(left_value)),
1578        _ => return None,
1579    };
1580
1581    parser.visit_between(&left_col, &low, &high)
1582}
1583
1584fn visit_and(
1585    expr: &BinaryExpr,
1586    index_info: &dyn IndexInformationProvider,
1587    depth: usize,
1588) -> Result<Option<IndexedExpression>> {
1589    // Many scalar indices can efficiently handle a BETWEEN query as a single search and this
1590    // can be much more efficient than two separate range queries.  As an optimization we check
1591    // to see if this is a between query and, if so, we handle it as a single query
1592    //
1593    // Note: We can't rely on users writing the SQL BETWEEN operator because:
1594    //   * Some users won't realize it's an option or a good idea
1595    //   * Datafusion's simplifier will rewrite the BETWEEN operator into two separate range queries
1596    if let Some(range_expr) = maybe_range(expr, index_info) {
1597        return Ok(Some(range_expr));
1598    }
1599
1600    let left = visit_node(&expr.left, index_info, depth + 1)?;
1601    let right = visit_node(&expr.right, index_info, depth + 1)?;
1602    Ok(match (left, right) {
1603        (Some(left), Some(right)) => Some(left.and(right)),
1604        (Some(left), None) => Some(left.refine((*expr.right).clone())),
1605        (None, Some(right)) => Some(right.refine((*expr.left).clone())),
1606        (None, None) => None,
1607    })
1608}
1609
1610fn visit_or(
1611    expr: &BinaryExpr,
1612    index_info: &dyn IndexInformationProvider,
1613    depth: usize,
1614) -> Result<Option<IndexedExpression>> {
1615    let left = visit_node(&expr.left, index_info, depth + 1)?;
1616    let right = visit_node(&expr.right, index_info, depth + 1)?;
1617    Ok(match (left, right) {
1618        (Some(left), Some(right)) => left.maybe_or(right),
1619        // If one side can use an index and the other side cannot then
1620        // we must abandon the entire thing.  For example, consider the
1621        // query "color == 'blue' or size > 10" where color is indexed but
1622        // size is not.  It's entirely possible that size > 10 matches every
1623        // row in our database.  There is nothing we can do except a full scan
1624        (Some(_), None) => None,
1625        (None, Some(_)) => None,
1626        (None, None) => None,
1627    })
1628}
1629
1630fn visit_binary_expr(
1631    expr: &BinaryExpr,
1632    index_info: &dyn IndexInformationProvider,
1633    depth: usize,
1634) -> Result<Option<IndexedExpression>> {
1635    match &expr.op {
1636        Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq | Operator::Eq => {
1637            Ok(visit_comparison(expr, index_info))
1638        }
1639        // visit_comparison will maybe create an Eq query which we negate
1640        Operator::NotEq => Ok(visit_comparison(expr, index_info).and_then(|node| node.maybe_not())),
1641        Operator::And => visit_and(expr, index_info, depth),
1642        Operator::Or => visit_or(expr, index_info, depth),
1643        _ => Ok(None),
1644    }
1645}
1646
1647fn visit_scalar_fn(
1648    scalar_fn: &ScalarFunction,
1649    index_info: &dyn IndexInformationProvider,
1650) -> Option<IndexedExpression> {
1651    if scalar_fn.args.is_empty() {
1652        return None;
1653    }
1654    let (col, data_type, query_parser) = maybe_indexed_column(&scalar_fn.args[0], index_info)?;
1655    query_parser.visit_scalar_function(&col, &data_type, &scalar_fn.func, &scalar_fn.args)
1656}
1657
1658fn visit_node(
1659    expr: &Expr,
1660    index_info: &dyn IndexInformationProvider,
1661    depth: usize,
1662) -> Result<Option<IndexedExpression>> {
1663    if depth >= MAX_DEPTH {
1664        return Err(Error::invalid_input(
1665            format!(
1666                "the filter expression is too long, lance limit the max number of conditions to {}",
1667                MAX_DEPTH
1668            ),
1669            location!(),
1670        ));
1671    }
1672    match expr {
1673        Expr::Between(between) => Ok(visit_between(between, index_info)),
1674        Expr::Alias(alias) => visit_node(alias.expr.as_ref(), index_info, depth),
1675        Expr::Column(_) => Ok(visit_column(expr, index_info)),
1676        Expr::InList(in_list) => Ok(visit_in_list(in_list, index_info)),
1677        Expr::IsFalse(expr) => Ok(visit_is_bool(expr.as_ref(), index_info, false)),
1678        Expr::IsTrue(expr) => Ok(visit_is_bool(expr.as_ref(), index_info, true)),
1679        Expr::IsNull(expr) => Ok(visit_is_null(expr.as_ref(), index_info, false)),
1680        Expr::IsNotNull(expr) => Ok(visit_is_null(expr.as_ref(), index_info, true)),
1681        Expr::Not(expr) => visit_not(expr.as_ref(), index_info, depth),
1682        Expr::BinaryExpr(binary_expr) => visit_binary_expr(binary_expr, index_info, depth),
1683        Expr::ScalarFunction(scalar_fn) => Ok(visit_scalar_fn(scalar_fn, index_info)),
1684        _ => Ok(None),
1685    }
1686}
1687
1688/// A trait to be used in `apply_scalar_indices` to inform the function which columns are indexeds
1689pub trait IndexInformationProvider {
1690    /// Check if an index exists for `col` and, if so, return the data type of col
1691    /// as well as a query parser that can parse queries for that column
1692    fn get_index(&self, col: &str) -> Option<(&DataType, &dyn ScalarQueryParser)>;
1693}
1694
1695/// Attempt to split a filter expression into a search of scalar indexes and an
1696///   optional post-search refinement query
1697pub fn apply_scalar_indices(
1698    expr: Expr,
1699    index_info: &dyn IndexInformationProvider,
1700) -> Result<IndexedExpression> {
1701    Ok(visit_node(&expr, index_info, 0)?.unwrap_or(IndexedExpression::refine_only(expr)))
1702}
1703
1704#[derive(Clone, Default, Debug)]
1705pub struct FilterPlan {
1706    pub index_query: Option<ScalarIndexExpr>,
1707    /// True if the index query is guaranteed to return exact results
1708    pub skip_recheck: bool,
1709    pub refine_expr: Option<Expr>,
1710    pub full_expr: Option<Expr>,
1711}
1712
1713impl FilterPlan {
1714    pub fn empty() -> Self {
1715        Self {
1716            index_query: None,
1717            skip_recheck: true,
1718            refine_expr: None,
1719            full_expr: None,
1720        }
1721    }
1722
1723    pub fn new_refine_only(expr: Expr) -> Self {
1724        Self {
1725            index_query: None,
1726            skip_recheck: true,
1727            refine_expr: Some(expr.clone()),
1728            full_expr: Some(expr),
1729        }
1730    }
1731
1732    pub fn is_empty(&self) -> bool {
1733        self.refine_expr.is_none() && self.index_query.is_none()
1734    }
1735
1736    pub fn all_columns(&self) -> Vec<String> {
1737        self.full_expr
1738            .as_ref()
1739            .map(Planner::column_names_in_expr)
1740            .unwrap_or_default()
1741    }
1742
1743    pub fn refine_columns(&self) -> Vec<String> {
1744        self.refine_expr
1745            .as_ref()
1746            .map(Planner::column_names_in_expr)
1747            .unwrap_or_default()
1748    }
1749
1750    /// Return true if this has a refine step, regardless of the status of prefilter
1751    pub fn has_refine(&self) -> bool {
1752        self.refine_expr.is_some()
1753    }
1754
1755    /// Return true if this has a scalar index query
1756    pub fn has_index_query(&self) -> bool {
1757        self.index_query.is_some()
1758    }
1759
1760    pub fn has_any_filter(&self) -> bool {
1761        self.refine_expr.is_some() || self.index_query.is_some()
1762    }
1763
1764    pub fn make_refine_only(&mut self) {
1765        self.index_query = None;
1766        self.refine_expr = self.full_expr.clone();
1767    }
1768
1769    /// Return true if there is no refine or recheck of any kind and there is an index query
1770    pub fn is_exact_index_search(&self) -> bool {
1771        self.index_query.is_some() && self.refine_expr.is_none() && self.skip_recheck
1772    }
1773}
1774
1775pub trait PlannerIndexExt {
1776    /// Determine how to apply a provided filter
1777    ///
1778    /// We parse the filter into a logical expression.  We then
1779    /// split the logical expression into a portion that can be
1780    /// satisfied by an index search (of one or more indices) and
1781    /// a refine portion that must be applied after the index search
1782    fn create_filter_plan(
1783        &self,
1784        filter: Expr,
1785        index_info: &dyn IndexInformationProvider,
1786        use_scalar_index: bool,
1787    ) -> Result<FilterPlan>;
1788}
1789
1790impl PlannerIndexExt for Planner {
1791    fn create_filter_plan(
1792        &self,
1793        filter: Expr,
1794        index_info: &dyn IndexInformationProvider,
1795        use_scalar_index: bool,
1796    ) -> Result<FilterPlan> {
1797        let logical_expr = self.optimize_expr(filter)?;
1798        if use_scalar_index {
1799            let indexed_expr = apply_scalar_indices(logical_expr.clone(), index_info)?;
1800            let mut skip_recheck = false;
1801            if let Some(scalar_query) = indexed_expr.scalar_query.as_ref() {
1802                skip_recheck = !scalar_query.needs_recheck();
1803            }
1804            Ok(FilterPlan {
1805                index_query: indexed_expr.scalar_query,
1806                refine_expr: indexed_expr.refine_expr,
1807                full_expr: Some(logical_expr),
1808                skip_recheck,
1809            })
1810        } else {
1811            Ok(FilterPlan {
1812                index_query: None,
1813                skip_recheck: true,
1814                refine_expr: Some(logical_expr.clone()),
1815                full_expr: Some(logical_expr),
1816            })
1817        }
1818    }
1819}
1820
1821#[cfg(test)]
1822mod tests {
1823    use std::collections::HashMap;
1824
1825    use arrow_schema::{Field, Schema};
1826    use chrono::Utc;
1827    use datafusion_common::{Column, DFSchema};
1828    use datafusion_expr::execution_props::ExecutionProps;
1829    use datafusion_expr::simplify::SimplifyContext;
1830    use lance_datafusion::exec::{get_session_context, LanceExecutionOptions};
1831
1832    use crate::scalar::json::{JsonQuery, JsonQueryParser};
1833
1834    use super::*;
1835
1836    struct ColInfo {
1837        data_type: DataType,
1838        parser: Box<dyn ScalarQueryParser>,
1839    }
1840
1841    impl ColInfo {
1842        fn new(data_type: DataType, parser: Box<dyn ScalarQueryParser>) -> Self {
1843            Self { data_type, parser }
1844        }
1845    }
1846
1847    struct MockIndexInfoProvider {
1848        indexed_columns: HashMap<String, ColInfo>,
1849    }
1850
1851    impl MockIndexInfoProvider {
1852        fn new(indexed_columns: Vec<(&str, ColInfo)>) -> Self {
1853            Self {
1854                indexed_columns: HashMap::from_iter(
1855                    indexed_columns
1856                        .into_iter()
1857                        .map(|(s, ty)| (s.to_string(), ty)),
1858                ),
1859            }
1860        }
1861    }
1862
1863    impl IndexInformationProvider for MockIndexInfoProvider {
1864        fn get_index(&self, col: &str) -> Option<(&DataType, &dyn ScalarQueryParser)> {
1865            self.indexed_columns
1866                .get(col)
1867                .map(|col_info| (&col_info.data_type, col_info.parser.as_ref()))
1868        }
1869    }
1870
1871    fn check(
1872        index_info: &dyn IndexInformationProvider,
1873        expr: &str,
1874        expected: Option<IndexedExpression>,
1875        optimize: bool,
1876    ) {
1877        let schema = Schema::new(vec![
1878            Field::new("color", DataType::Utf8, false),
1879            Field::new("size", DataType::Float32, false),
1880            Field::new("aisle", DataType::UInt32, false),
1881            Field::new("on_sale", DataType::Boolean, false),
1882            Field::new("price", DataType::Float32, false),
1883            Field::new("json", DataType::LargeBinary, false),
1884        ]);
1885        let df_schema: DFSchema = schema.try_into().unwrap();
1886
1887        let ctx = get_session_context(&LanceExecutionOptions::default());
1888        let state = ctx.state();
1889        let mut expr = state.create_logical_expr(expr, &df_schema).unwrap();
1890        if optimize {
1891            let props = ExecutionProps::new().with_query_execution_start_time(Utc::now());
1892            let simplify_context = SimplifyContext::new(&props).with_schema(Arc::new(df_schema));
1893            let simplifier =
1894                datafusion::optimizer::simplify_expressions::ExprSimplifier::new(simplify_context);
1895            expr = simplifier.simplify(expr).unwrap();
1896        }
1897
1898        let actual = apply_scalar_indices(expr.clone(), index_info).unwrap();
1899        if let Some(expected) = expected {
1900            assert_eq!(actual, expected);
1901        } else {
1902            assert!(actual.scalar_query.is_none());
1903            assert_eq!(actual.refine_expr.unwrap(), expr);
1904        }
1905    }
1906
1907    fn check_no_index(index_info: &dyn IndexInformationProvider, expr: &str) {
1908        check(index_info, expr, None, false)
1909    }
1910
1911    fn check_simple(
1912        index_info: &dyn IndexInformationProvider,
1913        expr: &str,
1914        col: &str,
1915        query: impl AnyQuery,
1916    ) {
1917        check(
1918            index_info,
1919            expr,
1920            Some(IndexedExpression::index_query(
1921                col.to_string(),
1922                format!("{}_idx", col),
1923                Arc::new(query),
1924            )),
1925            false,
1926        )
1927    }
1928
1929    fn check_range(
1930        index_info: &dyn IndexInformationProvider,
1931        expr: &str,
1932        col: &str,
1933        query: SargableQuery,
1934    ) {
1935        check(
1936            index_info,
1937            expr,
1938            Some(IndexedExpression::index_query(
1939                col.to_string(),
1940                format!("{}_idx", col),
1941                Arc::new(query),
1942            )),
1943            true,
1944        )
1945    }
1946
1947    fn check_simple_negated(
1948        index_info: &dyn IndexInformationProvider,
1949        expr: &str,
1950        col: &str,
1951        query: SargableQuery,
1952    ) {
1953        check(
1954            index_info,
1955            expr,
1956            Some(
1957                IndexedExpression::index_query(
1958                    col.to_string(),
1959                    format!("{}_idx", col),
1960                    Arc::new(query),
1961                )
1962                .maybe_not()
1963                .unwrap(),
1964            ),
1965            false,
1966        )
1967    }
1968
1969    #[test]
1970    fn test_expressions() {
1971        let index_info = MockIndexInfoProvider::new(vec![
1972            (
1973                "color",
1974                ColInfo::new(
1975                    DataType::Utf8,
1976                    Box::new(SargableQueryParser::new("color_idx".to_string(), false)),
1977                ),
1978            ),
1979            (
1980                "aisle",
1981                ColInfo::new(
1982                    DataType::UInt32,
1983                    Box::new(SargableQueryParser::new("aisle_idx".to_string(), false)),
1984                ),
1985            ),
1986            (
1987                "on_sale",
1988                ColInfo::new(
1989                    DataType::Boolean,
1990                    Box::new(SargableQueryParser::new("on_sale_idx".to_string(), false)),
1991                ),
1992            ),
1993            (
1994                "price",
1995                ColInfo::new(
1996                    DataType::Float32,
1997                    Box::new(SargableQueryParser::new("price_idx".to_string(), false)),
1998                ),
1999            ),
2000            (
2001                "json",
2002                ColInfo::new(
2003                    DataType::LargeBinary,
2004                    Box::new(JsonQueryParser::new(
2005                        "$.name".to_string(),
2006                        Box::new(SargableQueryParser::new("json_idx".to_string(), false)),
2007                    )),
2008                ),
2009            ),
2010        ]);
2011
2012        check_simple(
2013            &index_info,
2014            "json_extract(json, '$.name') = 'foo'",
2015            "json",
2016            JsonQuery::new(
2017                Arc::new(SargableQuery::Equals(ScalarValue::Utf8(Some(
2018                    "foo".to_string(),
2019                )))),
2020                "$.name".to_string(),
2021            ),
2022        );
2023
2024        check_no_index(&index_info, "size BETWEEN 5 AND 10");
2025        // Cast case.  We will cast 5 (an int64) to Int16 and then coerce to UInt32
2026        check_simple(
2027            &index_info,
2028            "aisle = arrow_cast(5, 'Int16')",
2029            "aisle",
2030            SargableQuery::Equals(ScalarValue::UInt32(Some(5))),
2031        );
2032        // 5 different ways of writing BETWEEN (all should be recognized)
2033        check_range(
2034            &index_info,
2035            "aisle BETWEEN 5 AND 10",
2036            "aisle",
2037            SargableQuery::Range(
2038                Bound::Included(ScalarValue::UInt32(Some(5))),
2039                Bound::Included(ScalarValue::UInt32(Some(10))),
2040            ),
2041        );
2042        check_range(
2043            &index_info,
2044            "aisle >= 5 AND aisle <= 10",
2045            "aisle",
2046            SargableQuery::Range(
2047                Bound::Included(ScalarValue::UInt32(Some(5))),
2048                Bound::Included(ScalarValue::UInt32(Some(10))),
2049            ),
2050        );
2051
2052        check_range(
2053            &index_info,
2054            "aisle <= 10 AND aisle >= 5",
2055            "aisle",
2056            SargableQuery::Range(
2057                Bound::Included(ScalarValue::UInt32(Some(5))),
2058                Bound::Included(ScalarValue::UInt32(Some(10))),
2059            ),
2060        );
2061
2062        check_range(
2063            &index_info,
2064            "5 <= aisle AND 10 >= aisle",
2065            "aisle",
2066            SargableQuery::Range(
2067                Bound::Included(ScalarValue::UInt32(Some(5))),
2068                Bound::Included(ScalarValue::UInt32(Some(10))),
2069            ),
2070        );
2071
2072        check_range(
2073            &index_info,
2074            "10 >= aisle AND 5 <= aisle",
2075            "aisle",
2076            SargableQuery::Range(
2077                Bound::Included(ScalarValue::UInt32(Some(5))),
2078                Bound::Included(ScalarValue::UInt32(Some(10))),
2079            ),
2080        );
2081        check_simple(
2082            &index_info,
2083            "on_sale IS TRUE",
2084            "on_sale",
2085            SargableQuery::Equals(ScalarValue::Boolean(Some(true))),
2086        );
2087        check_simple(
2088            &index_info,
2089            "on_sale",
2090            "on_sale",
2091            SargableQuery::Equals(ScalarValue::Boolean(Some(true))),
2092        );
2093        check_simple_negated(
2094            &index_info,
2095            "NOT on_sale",
2096            "on_sale",
2097            SargableQuery::Equals(ScalarValue::Boolean(Some(true))),
2098        );
2099        check_simple(
2100            &index_info,
2101            "on_sale IS FALSE",
2102            "on_sale",
2103            SargableQuery::Equals(ScalarValue::Boolean(Some(false))),
2104        );
2105        check_simple_negated(
2106            &index_info,
2107            "aisle NOT BETWEEN 5 AND 10",
2108            "aisle",
2109            SargableQuery::Range(
2110                Bound::Included(ScalarValue::UInt32(Some(5))),
2111                Bound::Included(ScalarValue::UInt32(Some(10))),
2112            ),
2113        );
2114        // Small in-list (in-list with 3 or fewer items optimizes into or-chain)
2115        check_simple(
2116            &index_info,
2117            "aisle IN (5, 6, 7)",
2118            "aisle",
2119            SargableQuery::IsIn(vec![
2120                ScalarValue::UInt32(Some(5)),
2121                ScalarValue::UInt32(Some(6)),
2122                ScalarValue::UInt32(Some(7)),
2123            ]),
2124        );
2125        check_simple_negated(
2126            &index_info,
2127            "NOT aisle IN (5, 6, 7)",
2128            "aisle",
2129            SargableQuery::IsIn(vec![
2130                ScalarValue::UInt32(Some(5)),
2131                ScalarValue::UInt32(Some(6)),
2132                ScalarValue::UInt32(Some(7)),
2133            ]),
2134        );
2135        check_simple_negated(
2136            &index_info,
2137            "aisle NOT IN (5, 6, 7)",
2138            "aisle",
2139            SargableQuery::IsIn(vec![
2140                ScalarValue::UInt32(Some(5)),
2141                ScalarValue::UInt32(Some(6)),
2142                ScalarValue::UInt32(Some(7)),
2143            ]),
2144        );
2145        check_simple(
2146            &index_info,
2147            "aisle IN (5, 6, 7, 8, 9)",
2148            "aisle",
2149            SargableQuery::IsIn(vec![
2150                ScalarValue::UInt32(Some(5)),
2151                ScalarValue::UInt32(Some(6)),
2152                ScalarValue::UInt32(Some(7)),
2153                ScalarValue::UInt32(Some(8)),
2154                ScalarValue::UInt32(Some(9)),
2155            ]),
2156        );
2157        check_simple_negated(
2158            &index_info,
2159            "NOT aisle IN (5, 6, 7, 8, 9)",
2160            "aisle",
2161            SargableQuery::IsIn(vec![
2162                ScalarValue::UInt32(Some(5)),
2163                ScalarValue::UInt32(Some(6)),
2164                ScalarValue::UInt32(Some(7)),
2165                ScalarValue::UInt32(Some(8)),
2166                ScalarValue::UInt32(Some(9)),
2167            ]),
2168        );
2169        check_simple_negated(
2170            &index_info,
2171            "aisle NOT IN (5, 6, 7, 8, 9)",
2172            "aisle",
2173            SargableQuery::IsIn(vec![
2174                ScalarValue::UInt32(Some(5)),
2175                ScalarValue::UInt32(Some(6)),
2176                ScalarValue::UInt32(Some(7)),
2177                ScalarValue::UInt32(Some(8)),
2178                ScalarValue::UInt32(Some(9)),
2179            ]),
2180        );
2181        check_simple(
2182            &index_info,
2183            "on_sale is false",
2184            "on_sale",
2185            SargableQuery::Equals(ScalarValue::Boolean(Some(false))),
2186        );
2187        check_simple(
2188            &index_info,
2189            "on_sale is true",
2190            "on_sale",
2191            SargableQuery::Equals(ScalarValue::Boolean(Some(true))),
2192        );
2193        check_simple(
2194            &index_info,
2195            "aisle < 10",
2196            "aisle",
2197            SargableQuery::Range(
2198                Bound::Unbounded,
2199                Bound::Excluded(ScalarValue::UInt32(Some(10))),
2200            ),
2201        );
2202        check_simple(
2203            &index_info,
2204            "aisle <= 10",
2205            "aisle",
2206            SargableQuery::Range(
2207                Bound::Unbounded,
2208                Bound::Included(ScalarValue::UInt32(Some(10))),
2209            ),
2210        );
2211        check_simple(
2212            &index_info,
2213            "aisle > 10",
2214            "aisle",
2215            SargableQuery::Range(
2216                Bound::Excluded(ScalarValue::UInt32(Some(10))),
2217                Bound::Unbounded,
2218            ),
2219        );
2220        // In the future we can handle this case if we need to.  For
2221        // now let's make sure we don't accidentally do the wrong thing
2222        // (we were getting this backwards in the past)
2223        check_no_index(&index_info, "10 > aisle");
2224        check_simple(
2225            &index_info,
2226            "aisle >= 10",
2227            "aisle",
2228            SargableQuery::Range(
2229                Bound::Included(ScalarValue::UInt32(Some(10))),
2230                Bound::Unbounded,
2231            ),
2232        );
2233        check_simple(
2234            &index_info,
2235            "aisle = 10",
2236            "aisle",
2237            SargableQuery::Equals(ScalarValue::UInt32(Some(10))),
2238        );
2239        check_simple_negated(
2240            &index_info,
2241            "aisle <> 10",
2242            "aisle",
2243            SargableQuery::Equals(ScalarValue::UInt32(Some(10))),
2244        );
2245        // // Common compound case, AND'd clauses
2246        let left = Box::new(ScalarIndexExpr::Query(ScalarIndexSearch {
2247            column: "aisle".to_string(),
2248            index_name: "aisle_idx".to_string(),
2249            query: Arc::new(SargableQuery::Equals(ScalarValue::UInt32(Some(10)))),
2250            needs_recheck: false,
2251        }));
2252        let right = Box::new(ScalarIndexExpr::Query(ScalarIndexSearch {
2253            column: "color".to_string(),
2254            index_name: "color_idx".to_string(),
2255            query: Arc::new(SargableQuery::Equals(ScalarValue::Utf8(Some(
2256                "blue".to_string(),
2257            )))),
2258            needs_recheck: false,
2259        }));
2260        check(
2261            &index_info,
2262            "aisle = 10 AND color = 'blue'",
2263            Some(IndexedExpression {
2264                scalar_query: Some(ScalarIndexExpr::And(left.clone(), right.clone())),
2265                refine_expr: None,
2266            }),
2267            false,
2268        );
2269        // Compound AND's and not all of them are indexed columns
2270        let refine = Expr::Column(Column::new_unqualified("size")).gt(datafusion_expr::lit(30_i64));
2271        check(
2272            &index_info,
2273            "aisle = 10 AND color = 'blue' AND size > 30",
2274            Some(IndexedExpression {
2275                scalar_query: Some(ScalarIndexExpr::And(left.clone(), right.clone())),
2276                refine_expr: Some(refine.clone()),
2277            }),
2278            false,
2279        );
2280        // Compounded OR's where ALL columns are indexed
2281        check(
2282            &index_info,
2283            "aisle = 10 OR color = 'blue'",
2284            Some(IndexedExpression {
2285                scalar_query: Some(ScalarIndexExpr::Or(left.clone(), right.clone())),
2286                refine_expr: None,
2287            }),
2288            false,
2289        );
2290        // Compounded OR's with one or more unindexed columns
2291        check_no_index(&index_info, "aisle = 10 OR color = 'blue' OR size > 30");
2292        // AND'd group of OR
2293        check(
2294            &index_info,
2295            "(aisle = 10 OR color = 'blue') AND size > 30",
2296            Some(IndexedExpression {
2297                scalar_query: Some(ScalarIndexExpr::Or(left, right)),
2298                refine_expr: Some(refine),
2299            }),
2300            false,
2301        );
2302        // Examples of things that are not yet supported but should be supportable someday
2303
2304        // OR'd group of refined index searches (see IndexedExpression::or for details)
2305        check_no_index(
2306            &index_info,
2307            "(aisle = 10 AND size > 30) OR (color = 'blue' AND size > 20)",
2308        );
2309
2310        // Non-normalized arithmetic (can use expression simplification)
2311        check_no_index(&index_info, "aisle + 3 < 10");
2312
2313        // Currently we assume that the return of an index search tells us which rows are
2314        // TRUE and all other rows are FALSE.  This will need to change but for now it is
2315        // safer to not support the following cases because the return value of non-matched
2316        // rows is NULL and not FALSE.
2317        check_no_index(&index_info, "aisle IN (5, 6, NULL)");
2318        // OR-list with NULL (in future DF version this will be optimized repr of
2319        // small in-list with NULL so let's get ready for it)
2320        check_no_index(&index_info, "aisle = 5 OR aisle = 6 OR NULL");
2321        check_no_index(&index_info, "aisle IN (5, 6, 7, 8, NULL)");
2322        check_no_index(&index_info, "aisle = NULL");
2323        check_no_index(&index_info, "aisle BETWEEN 5 AND NULL");
2324        check_no_index(&index_info, "aisle BETWEEN NULL AND 10");
2325    }
2326
2327    #[tokio::test]
2328    async fn test_not_flips_certainty() {
2329        use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap};
2330
2331        // Test that NOT flips certainty for inexact index results
2332        // This tests the implementation in evaluate_impl for Self::Not
2333
2334        // Helper function that mimics the NOT logic we just fixed
2335        fn apply_not(result: NullableIndexExprResult) -> NullableIndexExprResult {
2336            match result {
2337                NullableIndexExprResult::Exact(mask) => NullableIndexExprResult::Exact(!mask),
2338                NullableIndexExprResult::AtMost(mask) => NullableIndexExprResult::AtLeast(!mask),
2339                NullableIndexExprResult::AtLeast(mask) => NullableIndexExprResult::AtMost(!mask),
2340            }
2341        }
2342
2343        // AtMost: superset of matches (e.g., bloom filter says "might be in [1,2]")
2344        let at_most = NullableIndexExprResult::AtMost(NullableRowAddrMask::AllowList(
2345            NullableRowAddrSet::new(RowAddrTreeMap::from_iter(&[1, 2]), RowAddrTreeMap::new()),
2346        ));
2347        // NOT(AtMost) should be AtLeast (definitely NOT in [1,2], might be elsewhere)
2348        assert!(matches!(
2349            apply_not(at_most),
2350            NullableIndexExprResult::AtLeast(_)
2351        ));
2352
2353        // AtLeast: subset of matches (e.g., definitely in [1,2], might be more)
2354        let at_least = NullableIndexExprResult::AtLeast(NullableRowAddrMask::AllowList(
2355            NullableRowAddrSet::new(RowAddrTreeMap::from_iter(&[1, 2]), RowAddrTreeMap::new()),
2356        ));
2357        // NOT(AtLeast) should be AtMost (might NOT be in [1,2], definitely elsewhere)
2358        assert!(matches!(
2359            apply_not(at_least),
2360            NullableIndexExprResult::AtMost(_)
2361        ));
2362
2363        // Exact should stay Exact
2364        let exact = NullableIndexExprResult::Exact(NullableRowAddrMask::AllowList(
2365            NullableRowAddrSet::new(RowAddrTreeMap::from_iter(&[1, 2]), RowAddrTreeMap::new()),
2366        ));
2367        assert!(matches!(
2368            apply_not(exact),
2369            NullableIndexExprResult::Exact(_)
2370        ));
2371    }
2372
2373    #[tokio::test]
2374    async fn test_and_or_preserve_certainty() {
2375        use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap};
2376
2377        // Test that AND/OR correctly propagate certainty
2378        let make_at_most = || {
2379            NullableIndexExprResult::AtMost(NullableRowAddrMask::AllowList(
2380                NullableRowAddrSet::new(
2381                    RowAddrTreeMap::from_iter(&[1, 2, 3]),
2382                    RowAddrTreeMap::new(),
2383                ),
2384            ))
2385        };
2386
2387        let make_at_least = || {
2388            NullableIndexExprResult::AtLeast(NullableRowAddrMask::AllowList(
2389                NullableRowAddrSet::new(
2390                    RowAddrTreeMap::from_iter(&[2, 3, 4]),
2391                    RowAddrTreeMap::new(),
2392                ),
2393            ))
2394        };
2395
2396        let make_exact = || {
2397            NullableIndexExprResult::Exact(NullableRowAddrMask::AllowList(NullableRowAddrSet::new(
2398                RowAddrTreeMap::from_iter(&[1, 2]),
2399                RowAddrTreeMap::new(),
2400            )))
2401        };
2402
2403        // AtMost & AtMost → AtMost
2404        assert!(matches!(
2405            make_at_most() & make_at_most(),
2406            NullableIndexExprResult::AtMost(_)
2407        ));
2408
2409        // AtLeast & AtLeast → AtLeast
2410        assert!(matches!(
2411            make_at_least() & make_at_least(),
2412            NullableIndexExprResult::AtLeast(_)
2413        ));
2414
2415        // AtMost & AtLeast → AtMost (superset remains superset)
2416        assert!(matches!(
2417            make_at_most() & make_at_least(),
2418            NullableIndexExprResult::AtMost(_)
2419        ));
2420
2421        // AtMost | AtMost → AtMost
2422        assert!(matches!(
2423            make_at_most() | make_at_most(),
2424            NullableIndexExprResult::AtMost(_)
2425        ));
2426
2427        // AtLeast | AtLeast → AtLeast
2428        assert!(matches!(
2429            make_at_least() | make_at_least(),
2430            NullableIndexExprResult::AtLeast(_)
2431        ));
2432
2433        // AtMost | AtLeast → AtLeast (subset coverage guaranteed)
2434        assert!(matches!(
2435            make_at_most() | make_at_least(),
2436            NullableIndexExprResult::AtLeast(_)
2437        ));
2438
2439        // Exact & AtMost → AtMost
2440        assert!(matches!(
2441            make_exact() & make_at_most(),
2442            NullableIndexExprResult::AtMost(_)
2443        ));
2444
2445        // Exact | AtLeast → AtLeast
2446        assert!(matches!(
2447            make_exact() | make_at_least(),
2448            NullableIndexExprResult::AtLeast(_)
2449        ));
2450    }
2451}