Skip to main content

lance_index/
scalar.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Scalar indices for metadata search & filtering
5
6use arrow::buffer::{OffsetBuffer, ScalarBuffer};
7use arrow_array::{BooleanArray, ListArray, RecordBatch, UInt64Array};
8use arrow_schema::{Field, Schema};
9use async_trait::async_trait;
10use bytes::Bytes;
11use datafusion::functions::string::contains::ContainsFunc;
12use datafusion::functions_nested::array_has;
13use datafusion::physical_plan::SendableRecordBatchStream;
14use datafusion_common::{Column, scalar::ScalarValue};
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::{any::Any, ops::Bound, sync::Arc};
18
19use datafusion_expr::Expr;
20use datafusion_expr::expr::ScalarFunction;
21use deepsize::DeepSizeOf;
22use inverted::query::{FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery, fill_fts_query_column};
23use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap, RowSetOps};
24use lance_core::{Error, Result};
25use roaring::RoaringBitmap;
26use serde::Serialize;
27
28use crate::metrics::MetricsCollector;
29use crate::scalar::registry::TrainingCriteria;
30use crate::{Index, IndexParams, IndexType};
31pub use lance_table::format::IndexFile;
32
33pub mod bitmap;
34pub mod bloomfilter;
35pub mod btree;
36pub mod expression;
37pub mod inverted;
38pub mod json;
39pub mod label_list;
40pub mod lance_format;
41pub mod ngram;
42pub mod registry;
43#[cfg(feature = "geo")]
44pub mod rtree;
45pub mod zoned;
46pub mod zonemap;
47
48use crate::frag_reuse::FragReuseIndex;
49pub use inverted::tokenizer::InvertedIndexParams;
50use lance_datafusion::udf::CONTAINS_TOKENS_UDF;
51
52pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
53
54/// Builtin index types supported by the Lance library
55///
56/// This is primarily for convenience to avoid a bunch of string
57/// constants and provide some auto-complete.  This type should not
58/// be used in the manifest as plugins cannot add new entries.
59#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
60pub enum BuiltinIndexType {
61    BTree,
62    Bitmap,
63    LabelList,
64    NGram,
65    ZoneMap,
66    BloomFilter,
67    RTree,
68    Inverted,
69}
70
71impl BuiltinIndexType {
72    pub fn as_str(&self) -> &str {
73        match self {
74            Self::BTree => "btree",
75            Self::Bitmap => "bitmap",
76            Self::LabelList => "labellist",
77            Self::NGram => "ngram",
78            Self::ZoneMap => "zonemap",
79            Self::Inverted => "inverted",
80            Self::BloomFilter => "bloomfilter",
81            Self::RTree => "rtree",
82        }
83    }
84}
85
86impl TryFrom<IndexType> for BuiltinIndexType {
87    type Error = Error;
88
89    fn try_from(value: IndexType) -> Result<Self> {
90        match value {
91            IndexType::BTree => Ok(Self::BTree),
92            IndexType::Bitmap => Ok(Self::Bitmap),
93            IndexType::LabelList => Ok(Self::LabelList),
94            IndexType::NGram => Ok(Self::NGram),
95            IndexType::ZoneMap => Ok(Self::ZoneMap),
96            IndexType::Inverted => Ok(Self::Inverted),
97            IndexType::BloomFilter => Ok(Self::BloomFilter),
98            IndexType::RTree => Ok(Self::RTree),
99            _ => Err(Error::index("Invalid index type".to_string())),
100        }
101    }
102}
103
104#[derive(Debug, Clone, PartialEq)]
105pub struct ScalarIndexParams {
106    /// The type of index to create
107    ///
108    /// Plugins may add additional index types.  Index type lookup is case-insensitive.
109    pub index_type: String,
110    /// The parameters to train the index
111    ///
112    /// This should be a JSON string.  The contents of the JSON string will be specific to the
113    /// index type.  If not set, then default parameters will be used for the index type.
114    pub params: Option<String>,
115}
116
117impl Default for ScalarIndexParams {
118    fn default() -> Self {
119        Self {
120            index_type: BuiltinIndexType::BTree.as_str().to_string(),
121            params: None,
122        }
123    }
124}
125
126impl ScalarIndexParams {
127    /// Creates a new ScalarIndexParams from one of the builtin index types
128    pub fn for_builtin(index_type: BuiltinIndexType) -> Self {
129        Self {
130            index_type: index_type.as_str().to_string(),
131            params: None,
132        }
133    }
134
135    /// Create a new ScalarIndexParams with the given index type
136    pub fn new(index_type: String) -> Self {
137        Self {
138            index_type,
139            params: None,
140        }
141    }
142
143    /// Set the parameters for the index
144    pub fn with_params<ParamsType: Serialize>(mut self, params: &ParamsType) -> Self {
145        self.params = Some(serde_json::to_string(params).unwrap());
146        self
147    }
148}
149
150impl IndexParams for ScalarIndexParams {
151    fn as_any(&self) -> &dyn std::any::Any {
152        self
153    }
154
155    fn index_name(&self) -> &str {
156        LANCE_SCALAR_INDEX
157    }
158}
159
160impl IndexParams for InvertedIndexParams {
161    fn as_any(&self) -> &dyn std::any::Any {
162        self
163    }
164
165    fn index_name(&self) -> &str {
166        "INVERTED"
167    }
168}
169
170/// Trait for storing an index (or parts of an index) into storage
171#[async_trait]
172pub trait IndexWriter: Send {
173    /// Writes a record batch into the file, returning the 0-based index of the batch in the file
174    ///
175    /// E.g. if this is the third time this is called this method will return 2
176    async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
177    /// Adds a global buffer and returns its index.
178    async fn add_global_buffer(&mut self, _data: Bytes) -> Result<u32> {
179        Err(Error::not_supported(
180            "global buffers are not supported by this index writer",
181        ))
182    }
183    /// Finishes writing the file and closes the file
184    async fn finish(&mut self) -> Result<()>;
185    /// Finishes writing the file and closes the file with additional metadata
186    async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
187}
188
189/// Trait for reading an index (or parts of an index) from storage
190#[async_trait]
191pub trait IndexReader: Send + Sync {
192    /// Read the n-th record batch from the file
193    async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
194    /// Reads a global buffer by index.
195    async fn read_global_buffer(&self, _index: u32) -> Result<Bytes> {
196        Err(Error::not_supported(
197            "global buffers are not supported by this index reader",
198        ))
199    }
200    /// Read the range of rows from the file.
201    /// If projection is Some, only return the columns in the projection,
202    /// nested columns like Some(&["x.y"]) are not supported.
203    /// If projection is None, return all columns.
204    async fn read_range(
205        &self,
206        range: std::ops::Range<usize>,
207        projection: Option<&[&str]>,
208    ) -> Result<RecordBatch>;
209    /// Return the number of batches in the file
210    async fn num_batches(&self, batch_size: u64) -> u32;
211    /// Return the number of rows in the file
212    fn num_rows(&self) -> usize;
213    /// Return the metadata of the file
214    fn schema(&self) -> &lance_core::datatypes::Schema;
215}
216
217/// Trait abstracting I/O away from index logic
218///
219/// Scalar indices are currently serialized as indexable arrow record batches stored in
220/// named "files".  The index store is responsible for serializing and deserializing
221/// these batches into file data (e.g. as .lance files or .parquet files, etc.)
222#[async_trait]
223pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
224    fn as_any(&self) -> &dyn Any;
225    fn clone_arc(&self) -> Arc<dyn IndexStore>;
226
227    /// Suggested I/O parallelism for the store
228    fn io_parallelism(&self) -> usize;
229
230    /// Create a new file and return a writer to store data in the file
231    async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
232    -> Result<Box<dyn IndexWriter>>;
233
234    /// Open an existing file for retrieval
235    async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
236
237    /// Copy a range of batches from an index file from this store to another
238    ///
239    /// This is often useful when remapping or updating
240    async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
241
242    /// Rename an index file
243    async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
244
245    /// Delete an index file (used in the tmp spill store to keep tmp size down)
246    async fn delete_index_file(&self, name: &str) -> Result<()>;
247
248    /// List all files in the index directory with their sizes.
249    ///
250    /// Returns a list of (relative_path, size_bytes) tuples.
251    /// Used to capture file metadata after index creation/modification.
252    async fn list_files_with_sizes(&self) -> Result<Vec<IndexFile>>;
253}
254
255/// Different scalar indices may support different kinds of queries
256///
257/// For example, a btree index can support a wide range of queries (e.g. x > 7)
258/// while an index based on FTS only supports queries like "x LIKE 'foo'"
259///
260/// This trait is used when we need an object that can represent any kind of query
261///
262/// Note: if you are implementing this trait for a query type then you probably also
263/// need to implement the [crate::scalar::expression::ScalarQueryParser] trait to
264/// create instances of your query at parse time.
265pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
266    /// Cast the query as Any to allow for downcasting
267    fn as_any(&self) -> &dyn Any;
268    /// Format the query as a string for display purposes
269    fn format(&self, col: &str) -> String;
270    /// Convert the query to a datafusion expression
271    fn to_expr(&self, col: String) -> Expr;
272    /// Compare this query to another query
273    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
274}
275
276impl PartialEq for dyn AnyQuery {
277    fn eq(&self, other: &Self) -> bool {
278        self.dyn_eq(other)
279    }
280}
281/// A full text search query
282#[derive(Debug, Clone, PartialEq)]
283pub struct FullTextSearchQuery {
284    pub query: FtsQuery,
285
286    /// The maximum number of results to return
287    pub limit: Option<i64>,
288
289    /// The wand factor to use for ranking
290    /// if None, use the default value of 1.0
291    /// Increasing this value will reduce the recall and improve the performance
292    /// 1.0 is the value that would give the best performance without recall loss
293    pub wand_factor: Option<f32>,
294}
295
296impl FullTextSearchQuery {
297    /// Create a new terms query
298    pub fn new(query: String) -> Self {
299        let query = MatchQuery::new(query).into();
300        Self {
301            query,
302            limit: None,
303            wand_factor: None,
304        }
305    }
306
307    /// Create a new fuzzy query
308    pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
309        let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
310        Self {
311            query,
312            limit: None,
313            wand_factor: None,
314        }
315    }
316
317    /// Create a new compound query
318    pub fn new_query(query: FtsQuery) -> Self {
319        Self {
320            query,
321            limit: None,
322            wand_factor: None,
323        }
324    }
325
326    /// Set the column to search over
327    /// This is available for only MatchQuery and PhraseQuery
328    pub fn with_column(mut self, column: String) -> Result<Self> {
329        self.query = fill_fts_query_column(&self.query, &[column], true)?;
330        Ok(self)
331    }
332
333    /// Set the column to search over
334    /// This is available for only MatchQuery
335    pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
336        self.query = fill_fts_query_column(&self.query, columns, true)?;
337        Ok(self)
338    }
339
340    /// limit the number of results to return
341    /// if None, return all results
342    pub fn limit(mut self, limit: Option<i64>) -> Self {
343        self.limit = limit;
344        self
345    }
346
347    pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
348        self.wand_factor = wand_factor;
349        self
350    }
351
352    pub fn columns(&self) -> HashSet<String> {
353        self.query.columns()
354    }
355
356    pub fn params(&self) -> FtsSearchParams {
357        FtsSearchParams::new()
358            .with_limit(self.limit.map(|limit| limit as usize))
359            .with_wand_factor(self.wand_factor.unwrap_or(1.0))
360    }
361}
362
363/// A query that a basic scalar index (e.g. btree / bitmap) can satisfy
364///
365/// This is a subset of expression operators that is often referred to as the
366/// "sargable" operators
367///
368/// Note that negation is not included.  Negation should be applied later.  For
369/// example, to invert an equality query (e.g. all rows where the value is not 7)
370/// you can grab all rows where the value = 7 and then do an inverted take (or use
371/// a block list instead of an allow list for prefiltering)
372#[derive(Debug, Clone, PartialEq)]
373pub enum SargableQuery {
374    /// Retrieve all row ids where the value is in the given [min, max) range
375    Range(Bound<ScalarValue>, Bound<ScalarValue>),
376    /// Retrieve all row ids where the value is in the given set of values
377    IsIn(Vec<ScalarValue>),
378    /// Retrieve all row ids where the value is exactly the given value
379    Equals(ScalarValue),
380    /// Retrieve all row ids where the value matches the given full text search query
381    FullTextSearch(FullTextSearchQuery),
382    /// Retrieve all row ids where the value is null
383    IsNull(),
384    /// Retrieve all row ids where the value matches LIKE 'prefix%' pattern
385    /// This is used for both explicit LIKE expressions and starts_with() function calls
386    LikePrefix(ScalarValue),
387}
388
389impl AnyQuery for SargableQuery {
390    fn as_any(&self) -> &dyn Any {
391        self
392    }
393
394    fn format(&self, col: &str) -> String {
395        match self {
396            Self::Range(lower, upper) => match (lower, upper) {
397                (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
398                (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
399                (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
400                (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
401                (Bound::Included(lhs), Bound::Included(rhs)) => {
402                    format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
403                }
404                (Bound::Included(lhs), Bound::Excluded(rhs)) => {
405                    format!("{} >= {} && {} < {}", col, lhs, col, rhs)
406                }
407                (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
408                (Bound::Excluded(lhs), Bound::Included(rhs)) => {
409                    format!("{} > {} && {} <= {}", col, lhs, col, rhs)
410                }
411                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
412                    format!("{} > {} && {} < {}", col, lhs, col, rhs)
413                }
414            },
415            Self::IsIn(values) => {
416                format!(
417                    "{} IN [{}]",
418                    col,
419                    values
420                        .iter()
421                        .map(|val| val.to_string())
422                        .collect::<Vec<_>>()
423                        .join(",")
424                )
425            }
426            Self::FullTextSearch(query) => {
427                format!("fts({})", query.query)
428            }
429            Self::IsNull() => {
430                format!("{} IS NULL", col)
431            }
432            Self::Equals(val) => {
433                format!("{} = {}", col, val)
434            }
435            Self::LikePrefix(prefix) => {
436                format!("{} LIKE '{}%'", col, prefix)
437            }
438        }
439    }
440
441    fn to_expr(&self, col: String) -> Expr {
442        let col_expr = Expr::Column(Column::new_unqualified(col));
443        match self {
444            Self::Range(lower, upper) => match (lower, upper) {
445                (Bound::Unbounded, Bound::Unbounded) => {
446                    Expr::Literal(ScalarValue::Boolean(Some(true)), None)
447                }
448                (Bound::Unbounded, Bound::Included(rhs)) => {
449                    col_expr.lt_eq(Expr::Literal(rhs.clone(), None))
450                }
451                (Bound::Unbounded, Bound::Excluded(rhs)) => {
452                    col_expr.lt(Expr::Literal(rhs.clone(), None))
453                }
454                (Bound::Included(lhs), Bound::Unbounded) => {
455                    col_expr.gt_eq(Expr::Literal(lhs.clone(), None))
456                }
457                (Bound::Included(lhs), Bound::Included(rhs)) => col_expr.between(
458                    Expr::Literal(lhs.clone(), None),
459                    Expr::Literal(rhs.clone(), None),
460                ),
461                (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
462                    .clone()
463                    .gt_eq(Expr::Literal(lhs.clone(), None))
464                    .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
465                (Bound::Excluded(lhs), Bound::Unbounded) => {
466                    col_expr.gt(Expr::Literal(lhs.clone(), None))
467                }
468                (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
469                    .clone()
470                    .gt(Expr::Literal(lhs.clone(), None))
471                    .and(col_expr.lt_eq(Expr::Literal(rhs.clone(), None))),
472                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
473                    .clone()
474                    .gt(Expr::Literal(lhs.clone(), None))
475                    .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
476            },
477            Self::IsIn(values) => col_expr.in_list(
478                values
479                    .iter()
480                    .map(|val| Expr::Literal(val.clone(), None))
481                    .collect::<Vec<_>>(),
482                false,
483            ),
484            Self::FullTextSearch(query) => col_expr.like(Expr::Literal(
485                ScalarValue::Utf8(Some(query.query.to_string())),
486                None,
487            )),
488            Self::IsNull() => col_expr.is_null(),
489            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
490            Self::LikePrefix(prefix) => {
491                let pattern = match prefix {
492                    ScalarValue::Utf8(Some(s)) => ScalarValue::Utf8(Some(format!("{}%", s))),
493                    ScalarValue::LargeUtf8(Some(s)) => {
494                        ScalarValue::LargeUtf8(Some(format!("{}%", s)))
495                    }
496                    other => other.clone(),
497                };
498                col_expr.like(Expr::Literal(pattern, None))
499            }
500        }
501    }
502
503    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
504        match other.as_any().downcast_ref::<Self>() {
505            Some(o) => self == o,
506            None => false,
507        }
508    }
509}
510
511/// A query that a LabelListIndex can satisfy
512#[derive(Debug, Clone, PartialEq)]
513pub enum LabelListQuery {
514    /// Retrieve all row ids where every label is in the list of values for the row
515    HasAllLabels(Vec<ScalarValue>),
516    /// Retrieve all row ids where at least one of the given labels is in the list of values for the row
517    HasAnyLabel(Vec<ScalarValue>),
518}
519
520impl AnyQuery for LabelListQuery {
521    fn as_any(&self) -> &dyn Any {
522        self
523    }
524
525    fn format(&self, col: &str) -> String {
526        format!("{}", self.to_expr(col.to_string()))
527    }
528
529    fn to_expr(&self, col: String) -> Expr {
530        match self {
531            Self::HasAllLabels(labels) => {
532                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
533                let offsets_buffer =
534                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
535                let labels_list = ListArray::try_new(
536                    Arc::new(Field::new("item", labels_arr.data_type().clone(), true)),
537                    offsets_buffer,
538                    labels_arr,
539                    None,
540                )
541                .unwrap();
542                let labels_arr = Arc::new(labels_list);
543                Expr::ScalarFunction(ScalarFunction {
544                    func: Arc::new(array_has::ArrayHasAll::new().into()),
545                    args: vec![
546                        Expr::Column(Column::new_unqualified(col)),
547                        Expr::Literal(ScalarValue::List(labels_arr), None),
548                    ],
549                })
550            }
551            Self::HasAnyLabel(labels) => {
552                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
553                let offsets_buffer =
554                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
555                let labels_list = ListArray::try_new(
556                    Arc::new(Field::new("item", labels_arr.data_type().clone(), true)),
557                    offsets_buffer,
558                    labels_arr,
559                    None,
560                )
561                .unwrap();
562                let labels_arr = Arc::new(labels_list);
563                Expr::ScalarFunction(ScalarFunction {
564                    func: Arc::new(array_has::ArrayHasAny::new().into()),
565                    args: vec![
566                        Expr::Column(Column::new_unqualified(col)),
567                        Expr::Literal(ScalarValue::List(labels_arr), None),
568                    ],
569                })
570            }
571        }
572    }
573
574    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
575        match other.as_any().downcast_ref::<Self>() {
576            Some(o) => self == o,
577            None => false,
578        }
579    }
580}
581
582/// A query that a NGramIndex can satisfy
583#[derive(Debug, Clone, PartialEq)]
584pub enum TextQuery {
585    /// Retrieve all row ids where the text contains the given string
586    StringContains(String),
587    // TODO: In the future we should be able to do string-insensitive contains
588    // as well as partial matches (e.g. LIKE 'foo%') and potentially even
589    // some regular expressions
590}
591
592impl AnyQuery for TextQuery {
593    fn as_any(&self) -> &dyn Any {
594        self
595    }
596
597    fn format(&self, col: &str) -> String {
598        format!("{}", self.to_expr(col.to_string()))
599    }
600
601    fn to_expr(&self, col: String) -> Expr {
602        match self {
603            Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
604                func: Arc::new(ContainsFunc::new().into()),
605                args: vec![
606                    Expr::Column(Column::new_unqualified(col)),
607                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
608                ],
609            }),
610        }
611    }
612
613    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
614        match other.as_any().downcast_ref::<Self>() {
615            Some(o) => self == o,
616            None => false,
617        }
618    }
619}
620
621/// A query that a InvertedIndex can satisfy
622#[derive(Debug, Clone, PartialEq)]
623pub enum TokenQuery {
624    /// Retrieve all row ids where the text contains all tokens parsed from given string. The tokens
625    /// are separated by punctuations and white spaces.
626    TokensContains(String),
627}
628
629/// A query that a BloomFilter index can satisfy
630///
631/// This is a subset of SargableQuery that only includes operations that bloom filters
632/// can efficiently handle: equals, is_null, and is_in queries.
633#[derive(Debug, Clone, PartialEq)]
634pub enum BloomFilterQuery {
635    /// Retrieve all row ids where the value is exactly the given value
636    Equals(ScalarValue),
637    /// Retrieve all row ids where the value is null
638    IsNull(),
639    /// Retrieve all row ids where the value is in the given set of values
640    IsIn(Vec<ScalarValue>),
641}
642
643impl AnyQuery for BloomFilterQuery {
644    fn as_any(&self) -> &dyn Any {
645        self
646    }
647
648    fn format(&self, col: &str) -> String {
649        match self {
650            Self::Equals(val) => {
651                format!("{} = {}", col, val)
652            }
653            Self::IsNull() => {
654                format!("{} IS NULL", col)
655            }
656            Self::IsIn(values) => {
657                format!(
658                    "{} IN [{}]",
659                    col,
660                    values
661                        .iter()
662                        .map(|val| val.to_string())
663                        .collect::<Vec<_>>()
664                        .join(",")
665                )
666            }
667        }
668    }
669
670    fn to_expr(&self, col: String) -> Expr {
671        let col_expr = Expr::Column(Column::new_unqualified(col));
672        match self {
673            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
674            Self::IsNull() => col_expr.is_null(),
675            Self::IsIn(values) => col_expr.in_list(
676                values
677                    .iter()
678                    .map(|val| Expr::Literal(val.clone(), None))
679                    .collect::<Vec<_>>(),
680                false,
681            ),
682        }
683    }
684
685    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
686        match other.as_any().downcast_ref::<Self>() {
687            Some(o) => self == o,
688            None => false,
689        }
690    }
691}
692
693impl AnyQuery for TokenQuery {
694    fn as_any(&self) -> &dyn Any {
695        self
696    }
697
698    fn format(&self, col: &str) -> String {
699        format!("{}", self.to_expr(col.to_string()))
700    }
701
702    fn to_expr(&self, col: String) -> Expr {
703        match self {
704            Self::TokensContains(substr) => Expr::ScalarFunction(ScalarFunction {
705                func: Arc::new(CONTAINS_TOKENS_UDF.clone()),
706                args: vec![
707                    Expr::Column(Column::new_unqualified(col)),
708                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
709                ],
710            }),
711        }
712    }
713
714    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
715        match other.as_any().downcast_ref::<Self>() {
716            Some(o) => self == o,
717            None => false,
718        }
719    }
720}
721
722#[cfg(feature = "geo")]
723#[derive(Debug, Clone, PartialEq)]
724pub struct RelationQuery {
725    pub value: ScalarValue,
726    pub field: Field,
727}
728
729/// A query that a Geo index can satisfy
730#[cfg(feature = "geo")]
731#[derive(Debug, Clone, PartialEq)]
732pub enum GeoQuery {
733    IntersectQuery(RelationQuery),
734    IsNull,
735}
736
737#[cfg(feature = "geo")]
738impl AnyQuery for GeoQuery {
739    fn as_any(&self) -> &dyn Any {
740        self
741    }
742
743    fn format(&self, col: &str) -> String {
744        match self {
745            Self::IntersectQuery(query) => {
746                format!("Intersect({} {})", col, query.value)
747            }
748            Self::IsNull => {
749                format!("{} IS NULL", col)
750            }
751        }
752    }
753
754    fn to_expr(&self, _col: String) -> Expr {
755        todo!()
756    }
757
758    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
759        match other.as_any().downcast_ref::<Self>() {
760            Some(o) => self == o,
761            None => false,
762        }
763    }
764}
765
766/// The result of a search operation against a scalar index
767#[derive(Debug, PartialEq)]
768pub enum SearchResult {
769    /// The exact row ids that satisfy the query
770    Exact(NullableRowAddrSet),
771    /// Any row id satisfying the query will be in this set but not every
772    /// row id in this set will satisfy the query, a further recheck step
773    /// is needed
774    AtMost(NullableRowAddrSet),
775    /// All of the given row ids satisfy the query but there may be more
776    ///
777    /// No scalar index actually returns this today but it can arise from
778    /// boolean operations (e.g. NOT(AtMost(x)) == AtLeast(NOT(x)))
779    AtLeast(NullableRowAddrSet),
780}
781
782impl SearchResult {
783    pub fn exact(row_ids: impl Into<RowAddrTreeMap>) -> Self {
784        Self::Exact(NullableRowAddrSet::new(row_ids.into(), Default::default()))
785    }
786
787    pub fn at_most(row_ids: impl Into<RowAddrTreeMap>) -> Self {
788        Self::AtMost(NullableRowAddrSet::new(row_ids.into(), Default::default()))
789    }
790
791    pub fn at_least(row_ids: impl Into<RowAddrTreeMap>) -> Self {
792        Self::AtLeast(NullableRowAddrSet::new(row_ids.into(), Default::default()))
793    }
794
795    pub fn with_nulls(self, nulls: impl Into<RowAddrTreeMap>) -> Self {
796        match self {
797            Self::Exact(row_ids) => Self::Exact(row_ids.with_nulls(nulls.into())),
798            Self::AtMost(row_ids) => Self::AtMost(row_ids.with_nulls(nulls.into())),
799            Self::AtLeast(row_ids) => Self::AtLeast(row_ids.with_nulls(nulls.into())),
800        }
801    }
802
803    pub fn row_addrs(&self) -> &NullableRowAddrSet {
804        match self {
805            Self::Exact(row_addrs) => row_addrs,
806            Self::AtMost(row_addrs) => row_addrs,
807            Self::AtLeast(row_addrs) => row_addrs,
808        }
809    }
810
811    pub fn is_exact(&self) -> bool {
812        matches!(self, Self::Exact(_))
813    }
814}
815
816/// Brief information about an index that was created
817pub struct CreatedIndex {
818    /// The details of the index that was created
819    ///
820    /// These should be stored somewhere as they will be needed to
821    /// load the index later.
822    pub index_details: prost_types::Any,
823    /// The version of the index that was created
824    ///
825    /// This can be used to determine if a reader is able to load the index.
826    pub index_version: u32,
827    /// List of files and their sizes for this index
828    ///
829    /// This enables skipping HEAD calls when opening indices and provides
830    /// visibility into index storage size via describe_indices().
831    pub files: Option<Vec<IndexFile>>,
832}
833
834/// The criteria that specifies how to update an index
835pub struct UpdateCriteria {
836    /// If true, then we need to read the old data to update the index
837    ///
838    /// This should be avoided if possible but is left in for some legacy paths
839    pub requires_old_data: bool,
840    /// The criteria required for data (both old and new)
841    pub data_criteria: TrainingCriteria,
842}
843
844/// Filter used when merging existing scalar-index rows during update.
845///
846/// The caller must pick a filter mode that matches the row-id semantics of the
847/// dataset:
848/// - address-style row IDs: fragment filtering is valid
849/// - stable row IDs: use exact row-id membership instead
850#[derive(Debug, Clone)]
851pub enum OldIndexDataFilter {
852    /// Keeps track of which fragments are still valid and which are no longer valid.
853    ///
854    /// This is valid for address-style row IDs.
855    Fragments {
856        to_keep: RoaringBitmap,
857        to_remove: RoaringBitmap,
858    },
859    /// Keep old rows whose row IDs are in this exact allow-list.
860    ///
861    /// This is required for stable row IDs, where row IDs are opaque and
862    /// should not be interpreted as encoded row addresses.
863    RowIds(RowAddrTreeMap),
864}
865
866impl OldIndexDataFilter {
867    /// Build a boolean mask that keeps only row IDs selected by this filter.
868    pub fn filter_row_ids(&self, row_ids: &UInt64Array) -> BooleanArray {
869        match self {
870            Self::Fragments { to_keep, .. } => row_ids
871                .iter()
872                .map(|id| id.map(|id| to_keep.contains((id >> 32) as u32)))
873                .collect(),
874            Self::RowIds(valid_row_ids) => row_ids
875                .iter()
876                .map(|id| id.map(|id| valid_row_ids.contains(id)))
877                .collect(),
878        }
879    }
880}
881
882impl UpdateCriteria {
883    pub fn requires_old_data(data_criteria: TrainingCriteria) -> Self {
884        Self {
885            requires_old_data: true,
886            data_criteria,
887        }
888    }
889
890    pub fn only_new_data(data_criteria: TrainingCriteria) -> Self {
891        Self {
892            requires_old_data: false,
893            data_criteria,
894        }
895    }
896}
897
898/// Compute the lexicographically next prefix by incrementing the last character's code point.
899/// Returns None if no valid upper bound exists.
900///
901/// This is used for LIKE prefix queries to convert `LIKE 'foo%'` to range `[foo, fop)`.
902///
903/// # UTF-8 and Unicode Handling
904///
905/// This function operates on Unicode code points (characters), not bytes. Since UTF-8
906/// byte ordering is identical to Unicode code point ordering, incrementing a character's
907/// code point produces the correct lexicographic successor for byte-wise string comparison.
908///
909/// If incrementing the last character would overflow or land in the surrogate range
910/// (U+D800-U+DFFF), we try incrementing the previous character, and so on.
911///
912/// Examples:
913/// - `"foo"` → `Some("fop")`
914/// - `"café"` → `Some("cafê")`  (é U+00E9 → ê U+00EA)
915/// - `"abc中"` → `Some("abc丮")` (中 U+4E2D → 丮 U+4E2E)
916/// - `"cafÿ"` → `Some("cafĀ")` (ÿ U+00FF → Ā U+0100)
917pub fn compute_next_prefix(prefix: &str) -> Option<String> {
918    if prefix.is_empty() {
919        return None;
920    }
921
922    let chars: Vec<char> = prefix.chars().collect();
923
924    // Try incrementing characters from right to left
925    for i in (0..chars.len()).rev() {
926        if let Some(next_char) = next_unicode_char(chars[i]) {
927            let mut result: String = chars[..i].iter().collect();
928            result.push(next_char);
929            return Some(result);
930        }
931        // This character cannot be incremented (e.g., U+10FFFF), try previous
932    }
933
934    // All characters were at maximum value
935    None
936}
937
938/// Get the next valid Unicode scalar value after the given character.
939/// Skips the surrogate range (U+D800-U+DFFF) which is not valid in UTF-8.
940fn next_unicode_char(c: char) -> Option<char> {
941    let cp = c as u32;
942    let next_cp = cp.checked_add(1)?;
943
944    // Skip surrogate range (U+D800-U+DFFF)
945    let next_cp = if (0xD800..=0xDFFF).contains(&next_cp) {
946        0xE000
947    } else {
948        next_cp
949    };
950
951    char::from_u32(next_cp)
952}
953
954/// A trait for a scalar index, a structure that can determine row ids that satisfy scalar queries
955#[async_trait]
956pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
957    /// Search the scalar index
958    ///
959    /// Returns all row ids that satisfy the query, these row ids are not necessarily ordered
960    async fn search(
961        &self,
962        query: &dyn AnyQuery,
963        metrics: &dyn MetricsCollector,
964    ) -> Result<SearchResult>;
965
966    /// Returns true if the remap operation is supported
967    fn can_remap(&self) -> bool;
968
969    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
970    async fn remap(
971        &self,
972        mapping: &HashMap<u64, Option<u64>>,
973        dest_store: &dyn IndexStore,
974    ) -> Result<CreatedIndex>;
975
976    /// Add the new data into the index, creating an updated version of the index in `dest_store`
977    ///
978    /// If `old_data_filter` is provided, old index data will be filtered before
979    /// merge according to the chosen filter mode.
980    async fn update(
981        &self,
982        new_data: SendableRecordBatchStream,
983        dest_store: &dyn IndexStore,
984        old_data_filter: Option<OldIndexDataFilter>,
985    ) -> Result<CreatedIndex>;
986
987    /// Returns the criteria that will be used to update the index
988    fn update_criteria(&self) -> UpdateCriteria;
989
990    /// Derive the index parameters from the current index
991    ///
992    /// This returns a ScalarIndexParams that can be used to recreate an index
993    /// with the same configuration on another dataset.
994    fn derive_index_params(&self) -> Result<ScalarIndexParams>;
995}