Skip to main content

lance_index/
scalar.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Scalar indices for metadata search & filtering
5
6use arrow::buffer::{OffsetBuffer, ScalarBuffer};
7use arrow_array::{BooleanArray, ListArray, RecordBatch, UInt64Array};
8use arrow_schema::{Field, Schema};
9use async_trait::async_trait;
10use bytes::Bytes;
11use datafusion::functions::string::contains::ContainsFunc;
12use datafusion::functions_nested::array_has;
13use datafusion::physical_plan::SendableRecordBatchStream;
14use datafusion_common::{Column, scalar::ScalarValue};
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::pin::Pin;
18use std::{any::Any, ops::Bound, sync::Arc};
19
20use datafusion_expr::Expr;
21use datafusion_expr::expr::ScalarFunction;
22use deepsize::DeepSizeOf;
23use inverted::query::{FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery, fill_fts_query_column};
24use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap, RowSetOps};
25use lance_core::{Error, Result};
26use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
27use roaring::RoaringBitmap;
28use serde::Serialize;
29
30use crate::metrics::MetricsCollector;
31use crate::scalar::registry::TrainingCriteria;
32use crate::{Index, IndexParams, IndexType};
33pub use lance_table::format::IndexFile;
34
35pub mod bitmap;
36pub mod bloomfilter;
37pub mod btree;
38pub mod expression;
39pub mod inverted;
40pub mod json;
41pub mod label_list;
42pub mod lance_format;
43pub mod ngram;
44pub mod registry;
45#[cfg(feature = "geo")]
46pub mod rtree;
47pub mod zoned;
48pub mod zonemap;
49
50use crate::frag_reuse::FragReuseIndex;
51pub use inverted::tokenizer::InvertedIndexParams;
52use lance_datafusion::udf::CONTAINS_TOKENS_UDF;
53
54pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
55
56/// Builtin index types supported by the Lance library
57///
58/// This is primarily for convenience to avoid a bunch of string
59/// constants and provide some auto-complete.  This type should not
60/// be used in the manifest as plugins cannot add new entries.
61#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
62pub enum BuiltinIndexType {
63    BTree,
64    Bitmap,
65    LabelList,
66    NGram,
67    ZoneMap,
68    BloomFilter,
69    RTree,
70    Inverted,
71}
72
73impl BuiltinIndexType {
74    pub fn as_str(&self) -> &str {
75        match self {
76            Self::BTree => "btree",
77            Self::Bitmap => "bitmap",
78            Self::LabelList => "labellist",
79            Self::NGram => "ngram",
80            Self::ZoneMap => "zonemap",
81            Self::Inverted => "inverted",
82            Self::BloomFilter => "bloomfilter",
83            Self::RTree => "rtree",
84        }
85    }
86}
87
88impl TryFrom<IndexType> for BuiltinIndexType {
89    type Error = Error;
90
91    fn try_from(value: IndexType) -> Result<Self> {
92        match value {
93            IndexType::BTree => Ok(Self::BTree),
94            IndexType::Bitmap => Ok(Self::Bitmap),
95            IndexType::LabelList => Ok(Self::LabelList),
96            IndexType::NGram => Ok(Self::NGram),
97            IndexType::ZoneMap => Ok(Self::ZoneMap),
98            IndexType::Inverted => Ok(Self::Inverted),
99            IndexType::BloomFilter => Ok(Self::BloomFilter),
100            IndexType::RTree => Ok(Self::RTree),
101            _ => Err(Error::index("Invalid index type".to_string())),
102        }
103    }
104}
105
106#[derive(Debug, Clone, PartialEq)]
107pub struct ScalarIndexParams {
108    /// The type of index to create
109    ///
110    /// Plugins may add additional index types.  Index type lookup is case-insensitive.
111    pub index_type: String,
112    /// The parameters to train the index
113    ///
114    /// This should be a JSON string.  The contents of the JSON string will be specific to the
115    /// index type.  If not set, then default parameters will be used for the index type.
116    pub params: Option<String>,
117}
118
119impl Default for ScalarIndexParams {
120    fn default() -> Self {
121        Self {
122            index_type: BuiltinIndexType::BTree.as_str().to_string(),
123            params: None,
124        }
125    }
126}
127
128impl ScalarIndexParams {
129    /// Creates a new ScalarIndexParams from one of the builtin index types
130    pub fn for_builtin(index_type: BuiltinIndexType) -> Self {
131        Self {
132            index_type: index_type.as_str().to_string(),
133            params: None,
134        }
135    }
136
137    /// Create a new ScalarIndexParams with the given index type
138    pub fn new(index_type: String) -> Self {
139        Self {
140            index_type,
141            params: None,
142        }
143    }
144
145    /// Set the parameters for the index
146    pub fn with_params<ParamsType: Serialize>(mut self, params: &ParamsType) -> Self {
147        self.params = Some(serde_json::to_string(params).unwrap());
148        self
149    }
150}
151
152impl IndexParams for ScalarIndexParams {
153    fn as_any(&self) -> &dyn std::any::Any {
154        self
155    }
156
157    fn index_name(&self) -> &str {
158        LANCE_SCALAR_INDEX
159    }
160}
161
162impl IndexParams for InvertedIndexParams {
163    fn as_any(&self) -> &dyn std::any::Any {
164        self
165    }
166
167    fn index_name(&self) -> &str {
168        "INVERTED"
169    }
170}
171
172/// Trait for storing an index (or parts of an index) into storage
173#[async_trait]
174pub trait IndexWriter: Send {
175    /// Writes a record batch into the file, returning the 0-based index of the batch in the file
176    ///
177    /// E.g. if this is the third time this is called this method will return 2
178    async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
179    /// Adds a global buffer and returns its index.
180    async fn add_global_buffer(&mut self, _data: Bytes) -> Result<u32> {
181        Err(Error::not_supported(
182            "global buffers are not supported by this index writer",
183        ))
184    }
185    /// Finishes writing the file and closes the file
186    async fn finish(&mut self) -> Result<()>;
187    /// Finishes writing the file and closes the file with additional metadata
188    async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
189}
190
191/// Trait for reading an index (or parts of an index) from storage
192#[async_trait]
193pub trait IndexReader: Send + Sync {
194    /// Read the n-th record batch from the file
195    async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
196    /// Reads a global buffer by index.
197    async fn read_global_buffer(&self, _index: u32) -> Result<Bytes> {
198        Err(Error::not_supported(
199            "global buffers are not supported by this index reader",
200        ))
201    }
202    /// Read the range of rows from the file.
203    /// If projection is Some, only return the columns in the projection,
204    /// nested columns like Some(&["x.y"]) are not supported.
205    /// If projection is None, return all columns.
206    async fn read_range(
207        &self,
208        range: std::ops::Range<usize>,
209        projection: Option<&[&str]>,
210    ) -> Result<RecordBatch>;
211    /// Read a range of rows as a stream of record batches.
212    ///
213    /// This allows the caller to process rows incrementally without loading the
214    /// entire range into memory at once.
215    ///
216    /// The default implementation falls back to [`Self::read_range`] and wraps
217    /// the result in a single-item stream.
218    async fn read_range_stream(
219        &self,
220        range: std::ops::Range<usize>,
221        projection: Option<&[&str]>,
222    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
223        let batch = self.read_range(range, projection).await?;
224        let schema = batch.schema();
225        Ok(Box::pin(RecordBatchStreamAdapter::new(
226            schema,
227            futures::stream::once(async move { Ok(batch) }),
228        )))
229    }
230    /// Return the number of batches in the file
231    async fn num_batches(&self, batch_size: u64) -> u32;
232    /// Return the number of rows in the file
233    fn num_rows(&self) -> usize;
234    /// Return the metadata of the file
235    fn schema(&self) -> &lance_core::datatypes::Schema;
236}
237
238/// Trait abstracting I/O away from index logic
239///
240/// Scalar indices are currently serialized as indexable arrow record batches stored in
241/// named "files".  The index store is responsible for serializing and deserializing
242/// these batches into file data (e.g. as .lance files or .parquet files, etc.)
243#[async_trait]
244pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
245    fn as_any(&self) -> &dyn Any;
246    fn clone_arc(&self) -> Arc<dyn IndexStore>;
247
248    /// Suggested I/O parallelism for the store
249    fn io_parallelism(&self) -> usize;
250
251    /// Create a new file and return a writer to store data in the file
252    async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
253    -> Result<Box<dyn IndexWriter>>;
254
255    /// Open an existing file for retrieval
256    async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
257
258    /// Copy a range of batches from an index file from this store to another
259    ///
260    /// This is often useful when remapping or updating
261    async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
262
263    /// Rename an index file
264    async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
265
266    /// Delete an index file (used in the tmp spill store to keep tmp size down)
267    async fn delete_index_file(&self, name: &str) -> Result<()>;
268
269    /// List all files in the index directory with their sizes.
270    ///
271    /// Returns a list of (relative_path, size_bytes) tuples.
272    /// Used to capture file metadata after index creation/modification.
273    async fn list_files_with_sizes(&self) -> Result<Vec<IndexFile>>;
274}
275
276/// Different scalar indices may support different kinds of queries
277///
278/// For example, a btree index can support a wide range of queries (e.g. x > 7)
279/// while an index based on FTS only supports queries like "x LIKE 'foo'"
280///
281/// This trait is used when we need an object that can represent any kind of query
282///
283/// Note: if you are implementing this trait for a query type then you probably also
284/// need to implement the [crate::scalar::expression::ScalarQueryParser] trait to
285/// create instances of your query at parse time.
286pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
287    /// Cast the query as Any to allow for downcasting
288    fn as_any(&self) -> &dyn Any;
289    /// Format the query as a string for display purposes
290    fn format(&self, col: &str) -> String;
291    /// Convert the query to a datafusion expression
292    fn to_expr(&self, col: String) -> Expr;
293    /// Compare this query to another query
294    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
295}
296
297impl PartialEq for dyn AnyQuery {
298    fn eq(&self, other: &Self) -> bool {
299        self.dyn_eq(other)
300    }
301}
302/// A full text search query
303#[derive(Debug, Clone, PartialEq)]
304pub struct FullTextSearchQuery {
305    pub query: FtsQuery,
306
307    /// The maximum number of results to return
308    pub limit: Option<i64>,
309
310    /// The wand factor to use for ranking
311    /// if None, use the default value of 1.0
312    /// Increasing this value will reduce the recall and improve the performance
313    /// 1.0 is the value that would give the best performance without recall loss
314    pub wand_factor: Option<f32>,
315}
316
317impl FullTextSearchQuery {
318    /// Create a new terms query
319    pub fn new(query: String) -> Self {
320        let query = MatchQuery::new(query).into();
321        Self {
322            query,
323            limit: None,
324            wand_factor: None,
325        }
326    }
327
328    /// Create a new fuzzy query
329    pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
330        let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
331        Self {
332            query,
333            limit: None,
334            wand_factor: None,
335        }
336    }
337
338    /// Create a new compound query
339    pub fn new_query(query: FtsQuery) -> Self {
340        Self {
341            query,
342            limit: None,
343            wand_factor: None,
344        }
345    }
346
347    /// Set the column to search over
348    /// This is available for only MatchQuery and PhraseQuery
349    pub fn with_column(mut self, column: String) -> Result<Self> {
350        self.query = fill_fts_query_column(&self.query, &[column], true)?;
351        Ok(self)
352    }
353
354    /// Set the column to search over
355    /// This is available for only MatchQuery
356    pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
357        self.query = fill_fts_query_column(&self.query, columns, true)?;
358        Ok(self)
359    }
360
361    /// limit the number of results to return
362    /// if None, return all results
363    pub fn limit(mut self, limit: Option<i64>) -> Self {
364        self.limit = limit;
365        self
366    }
367
368    pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
369        self.wand_factor = wand_factor;
370        self
371    }
372
373    pub fn columns(&self) -> HashSet<String> {
374        self.query.columns()
375    }
376
377    pub fn params(&self) -> FtsSearchParams {
378        FtsSearchParams::new()
379            .with_limit(self.limit.map(|limit| limit as usize))
380            .with_wand_factor(self.wand_factor.unwrap_or(1.0))
381    }
382}
383
384/// A query that a basic scalar index (e.g. btree / bitmap) can satisfy
385///
386/// This is a subset of expression operators that is often referred to as the
387/// "sargable" operators
388///
389/// Note that negation is not included.  Negation should be applied later.  For
390/// example, to invert an equality query (e.g. all rows where the value is not 7)
391/// you can grab all rows where the value = 7 and then do an inverted take (or use
392/// a block list instead of an allow list for prefiltering)
393#[derive(Debug, Clone, PartialEq)]
394pub enum SargableQuery {
395    /// Retrieve all row ids where the value is in the given [min, max) range
396    Range(Bound<ScalarValue>, Bound<ScalarValue>),
397    /// Retrieve all row ids where the value is in the given set of values
398    IsIn(Vec<ScalarValue>),
399    /// Retrieve all row ids where the value is exactly the given value
400    Equals(ScalarValue),
401    /// Retrieve all row ids where the value matches the given full text search query
402    FullTextSearch(FullTextSearchQuery),
403    /// Retrieve all row ids where the value is null
404    IsNull(),
405    /// Retrieve all row ids where the value matches LIKE 'prefix%' pattern
406    /// This is used for both explicit LIKE expressions and starts_with() function calls
407    LikePrefix(ScalarValue),
408}
409
410impl AnyQuery for SargableQuery {
411    fn as_any(&self) -> &dyn Any {
412        self
413    }
414
415    fn format(&self, col: &str) -> String {
416        match self {
417            Self::Range(lower, upper) => match (lower, upper) {
418                (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
419                (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
420                (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
421                (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
422                (Bound::Included(lhs), Bound::Included(rhs)) => {
423                    format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
424                }
425                (Bound::Included(lhs), Bound::Excluded(rhs)) => {
426                    format!("{} >= {} && {} < {}", col, lhs, col, rhs)
427                }
428                (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
429                (Bound::Excluded(lhs), Bound::Included(rhs)) => {
430                    format!("{} > {} && {} <= {}", col, lhs, col, rhs)
431                }
432                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
433                    format!("{} > {} && {} < {}", col, lhs, col, rhs)
434                }
435            },
436            Self::IsIn(values) => {
437                format!(
438                    "{} IN [{}]",
439                    col,
440                    values
441                        .iter()
442                        .map(|val| val.to_string())
443                        .collect::<Vec<_>>()
444                        .join(",")
445                )
446            }
447            Self::FullTextSearch(query) => {
448                format!("fts({})", query.query)
449            }
450            Self::IsNull() => {
451                format!("{} IS NULL", col)
452            }
453            Self::Equals(val) => {
454                format!("{} = {}", col, val)
455            }
456            Self::LikePrefix(prefix) => {
457                format!("{} LIKE '{}%'", col, prefix)
458            }
459        }
460    }
461
462    fn to_expr(&self, col: String) -> Expr {
463        let col_expr = Expr::Column(Column::new_unqualified(col));
464        match self {
465            Self::Range(lower, upper) => match (lower, upper) {
466                (Bound::Unbounded, Bound::Unbounded) => {
467                    Expr::Literal(ScalarValue::Boolean(Some(true)), None)
468                }
469                (Bound::Unbounded, Bound::Included(rhs)) => {
470                    col_expr.lt_eq(Expr::Literal(rhs.clone(), None))
471                }
472                (Bound::Unbounded, Bound::Excluded(rhs)) => {
473                    col_expr.lt(Expr::Literal(rhs.clone(), None))
474                }
475                (Bound::Included(lhs), Bound::Unbounded) => {
476                    col_expr.gt_eq(Expr::Literal(lhs.clone(), None))
477                }
478                (Bound::Included(lhs), Bound::Included(rhs)) => col_expr.between(
479                    Expr::Literal(lhs.clone(), None),
480                    Expr::Literal(rhs.clone(), None),
481                ),
482                (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
483                    .clone()
484                    .gt_eq(Expr::Literal(lhs.clone(), None))
485                    .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
486                (Bound::Excluded(lhs), Bound::Unbounded) => {
487                    col_expr.gt(Expr::Literal(lhs.clone(), None))
488                }
489                (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
490                    .clone()
491                    .gt(Expr::Literal(lhs.clone(), None))
492                    .and(col_expr.lt_eq(Expr::Literal(rhs.clone(), None))),
493                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
494                    .clone()
495                    .gt(Expr::Literal(lhs.clone(), None))
496                    .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
497            },
498            Self::IsIn(values) => col_expr.in_list(
499                values
500                    .iter()
501                    .map(|val| Expr::Literal(val.clone(), None))
502                    .collect::<Vec<_>>(),
503                false,
504            ),
505            Self::FullTextSearch(query) => col_expr.like(Expr::Literal(
506                ScalarValue::Utf8(Some(query.query.to_string())),
507                None,
508            )),
509            Self::IsNull() => col_expr.is_null(),
510            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
511            Self::LikePrefix(prefix) => {
512                let pattern = match prefix {
513                    ScalarValue::Utf8(Some(s)) => ScalarValue::Utf8(Some(format!("{}%", s))),
514                    ScalarValue::LargeUtf8(Some(s)) => {
515                        ScalarValue::LargeUtf8(Some(format!("{}%", s)))
516                    }
517                    other => other.clone(),
518                };
519                col_expr.like(Expr::Literal(pattern, None))
520            }
521        }
522    }
523
524    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
525        match other.as_any().downcast_ref::<Self>() {
526            Some(o) => self == o,
527            None => false,
528        }
529    }
530}
531
532/// A query that a LabelListIndex can satisfy
533#[derive(Debug, Clone, PartialEq)]
534pub enum LabelListQuery {
535    /// Retrieve all row ids where every label is in the list of values for the row
536    HasAllLabels(Vec<ScalarValue>),
537    /// Retrieve all row ids where at least one of the given labels is in the list of values for the row
538    HasAnyLabel(Vec<ScalarValue>),
539}
540
541impl AnyQuery for LabelListQuery {
542    fn as_any(&self) -> &dyn Any {
543        self
544    }
545
546    fn format(&self, col: &str) -> String {
547        format!("{}", self.to_expr(col.to_string()))
548    }
549
550    fn to_expr(&self, col: String) -> Expr {
551        match self {
552            Self::HasAllLabels(labels) => {
553                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
554                let offsets_buffer =
555                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
556                let labels_list = ListArray::try_new(
557                    Arc::new(Field::new("item", labels_arr.data_type().clone(), true)),
558                    offsets_buffer,
559                    labels_arr,
560                    None,
561                )
562                .unwrap();
563                let labels_arr = Arc::new(labels_list);
564                Expr::ScalarFunction(ScalarFunction {
565                    func: Arc::new(array_has::ArrayHasAll::new().into()),
566                    args: vec![
567                        Expr::Column(Column::new_unqualified(col)),
568                        Expr::Literal(ScalarValue::List(labels_arr), None),
569                    ],
570                })
571            }
572            Self::HasAnyLabel(labels) => {
573                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
574                let offsets_buffer =
575                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
576                let labels_list = ListArray::try_new(
577                    Arc::new(Field::new("item", labels_arr.data_type().clone(), true)),
578                    offsets_buffer,
579                    labels_arr,
580                    None,
581                )
582                .unwrap();
583                let labels_arr = Arc::new(labels_list);
584                Expr::ScalarFunction(ScalarFunction {
585                    func: Arc::new(array_has::ArrayHasAny::new().into()),
586                    args: vec![
587                        Expr::Column(Column::new_unqualified(col)),
588                        Expr::Literal(ScalarValue::List(labels_arr), None),
589                    ],
590                })
591            }
592        }
593    }
594
595    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
596        match other.as_any().downcast_ref::<Self>() {
597            Some(o) => self == o,
598            None => false,
599        }
600    }
601}
602
603/// A query that a NGramIndex can satisfy
604#[derive(Debug, Clone, PartialEq)]
605pub enum TextQuery {
606    /// Retrieve all row ids where the text contains the given string
607    StringContains(String),
608    // TODO: In the future we should be able to do string-insensitive contains
609    // as well as partial matches (e.g. LIKE 'foo%') and potentially even
610    // some regular expressions
611}
612
613impl AnyQuery for TextQuery {
614    fn as_any(&self) -> &dyn Any {
615        self
616    }
617
618    fn format(&self, col: &str) -> String {
619        format!("{}", self.to_expr(col.to_string()))
620    }
621
622    fn to_expr(&self, col: String) -> Expr {
623        match self {
624            Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
625                func: Arc::new(ContainsFunc::new().into()),
626                args: vec![
627                    Expr::Column(Column::new_unqualified(col)),
628                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
629                ],
630            }),
631        }
632    }
633
634    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
635        match other.as_any().downcast_ref::<Self>() {
636            Some(o) => self == o,
637            None => false,
638        }
639    }
640}
641
642/// A query that a InvertedIndex can satisfy
643#[derive(Debug, Clone, PartialEq)]
644pub enum TokenQuery {
645    /// Retrieve all row ids where the text contains all tokens parsed from given string. The tokens
646    /// are separated by punctuations and white spaces.
647    TokensContains(String),
648}
649
650/// A query that a BloomFilter index can satisfy
651///
652/// This is a subset of SargableQuery that only includes operations that bloom filters
653/// can efficiently handle: equals, is_null, and is_in queries.
654#[derive(Debug, Clone, PartialEq)]
655pub enum BloomFilterQuery {
656    /// Retrieve all row ids where the value is exactly the given value
657    Equals(ScalarValue),
658    /// Retrieve all row ids where the value is null
659    IsNull(),
660    /// Retrieve all row ids where the value is in the given set of values
661    IsIn(Vec<ScalarValue>),
662}
663
664impl AnyQuery for BloomFilterQuery {
665    fn as_any(&self) -> &dyn Any {
666        self
667    }
668
669    fn format(&self, col: &str) -> String {
670        match self {
671            Self::Equals(val) => {
672                format!("{} = {}", col, val)
673            }
674            Self::IsNull() => {
675                format!("{} IS NULL", col)
676            }
677            Self::IsIn(values) => {
678                format!(
679                    "{} IN [{}]",
680                    col,
681                    values
682                        .iter()
683                        .map(|val| val.to_string())
684                        .collect::<Vec<_>>()
685                        .join(",")
686                )
687            }
688        }
689    }
690
691    fn to_expr(&self, col: String) -> Expr {
692        let col_expr = Expr::Column(Column::new_unqualified(col));
693        match self {
694            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
695            Self::IsNull() => col_expr.is_null(),
696            Self::IsIn(values) => col_expr.in_list(
697                values
698                    .iter()
699                    .map(|val| Expr::Literal(val.clone(), None))
700                    .collect::<Vec<_>>(),
701                false,
702            ),
703        }
704    }
705
706    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
707        match other.as_any().downcast_ref::<Self>() {
708            Some(o) => self == o,
709            None => false,
710        }
711    }
712}
713
714impl AnyQuery for TokenQuery {
715    fn as_any(&self) -> &dyn Any {
716        self
717    }
718
719    fn format(&self, col: &str) -> String {
720        format!("{}", self.to_expr(col.to_string()))
721    }
722
723    fn to_expr(&self, col: String) -> Expr {
724        match self {
725            Self::TokensContains(substr) => Expr::ScalarFunction(ScalarFunction {
726                func: Arc::new(CONTAINS_TOKENS_UDF.clone()),
727                args: vec![
728                    Expr::Column(Column::new_unqualified(col)),
729                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
730                ],
731            }),
732        }
733    }
734
735    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
736        match other.as_any().downcast_ref::<Self>() {
737            Some(o) => self == o,
738            None => false,
739        }
740    }
741}
742
743#[cfg(feature = "geo")]
744#[derive(Debug, Clone, PartialEq)]
745pub struct RelationQuery {
746    pub value: ScalarValue,
747    pub field: Field,
748}
749
750/// A query that a Geo index can satisfy
751#[cfg(feature = "geo")]
752#[derive(Debug, Clone, PartialEq)]
753pub enum GeoQuery {
754    IntersectQuery(RelationQuery),
755    IsNull,
756}
757
758#[cfg(feature = "geo")]
759impl AnyQuery for GeoQuery {
760    fn as_any(&self) -> &dyn Any {
761        self
762    }
763
764    fn format(&self, col: &str) -> String {
765        match self {
766            Self::IntersectQuery(query) => {
767                format!("Intersect({} {})", col, query.value)
768            }
769            Self::IsNull => {
770                format!("{} IS NULL", col)
771            }
772        }
773    }
774
775    fn to_expr(&self, _col: String) -> Expr {
776        todo!()
777    }
778
779    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
780        match other.as_any().downcast_ref::<Self>() {
781            Some(o) => self == o,
782            None => false,
783        }
784    }
785}
786
787/// The result of a search operation against a scalar index
788#[derive(Debug, PartialEq)]
789pub enum SearchResult {
790    /// The exact row ids that satisfy the query
791    Exact(NullableRowAddrSet),
792    /// Any row id satisfying the query will be in this set but not every
793    /// row id in this set will satisfy the query, a further recheck step
794    /// is needed
795    AtMost(NullableRowAddrSet),
796    /// All of the given row ids satisfy the query but there may be more
797    ///
798    /// No scalar index actually returns this today but it can arise from
799    /// boolean operations (e.g. NOT(AtMost(x)) == AtLeast(NOT(x)))
800    AtLeast(NullableRowAddrSet),
801}
802
803impl SearchResult {
804    pub fn exact(row_ids: impl Into<RowAddrTreeMap>) -> Self {
805        Self::Exact(NullableRowAddrSet::new(row_ids.into(), Default::default()))
806    }
807
808    pub fn at_most(row_ids: impl Into<RowAddrTreeMap>) -> Self {
809        Self::AtMost(NullableRowAddrSet::new(row_ids.into(), Default::default()))
810    }
811
812    pub fn at_least(row_ids: impl Into<RowAddrTreeMap>) -> Self {
813        Self::AtLeast(NullableRowAddrSet::new(row_ids.into(), Default::default()))
814    }
815
816    pub fn with_nulls(self, nulls: impl Into<RowAddrTreeMap>) -> Self {
817        match self {
818            Self::Exact(row_ids) => Self::Exact(row_ids.with_nulls(nulls.into())),
819            Self::AtMost(row_ids) => Self::AtMost(row_ids.with_nulls(nulls.into())),
820            Self::AtLeast(row_ids) => Self::AtLeast(row_ids.with_nulls(nulls.into())),
821        }
822    }
823
824    pub fn row_addrs(&self) -> &NullableRowAddrSet {
825        match self {
826            Self::Exact(row_addrs) => row_addrs,
827            Self::AtMost(row_addrs) => row_addrs,
828            Self::AtLeast(row_addrs) => row_addrs,
829        }
830    }
831
832    pub fn is_exact(&self) -> bool {
833        matches!(self, Self::Exact(_))
834    }
835}
836
837/// Brief information about an index that was created
838pub struct CreatedIndex {
839    /// The details of the index that was created
840    ///
841    /// These should be stored somewhere as they will be needed to
842    /// load the index later.
843    pub index_details: prost_types::Any,
844    /// The version of the index that was created
845    ///
846    /// This can be used to determine if a reader is able to load the index.
847    pub index_version: u32,
848    /// List of files and their sizes for this index
849    ///
850    /// This enables skipping HEAD calls when opening indices and provides
851    /// visibility into index storage size via describe_indices().
852    pub files: Option<Vec<IndexFile>>,
853}
854
855/// The criteria that specifies how to update an index
856pub struct UpdateCriteria {
857    /// If true, then we need to read the old data to update the index
858    ///
859    /// This should be avoided if possible but is left in for some legacy paths
860    pub requires_old_data: bool,
861    /// The criteria required for data (both old and new)
862    pub data_criteria: TrainingCriteria,
863}
864
865/// Filter used when merging existing scalar-index rows during update.
866///
867/// The caller must pick a filter mode that matches the row-id semantics of the
868/// dataset:
869/// - address-style row IDs: fragment filtering is valid
870/// - stable row IDs: use exact row-id membership instead
871#[derive(Debug, Clone)]
872pub enum OldIndexDataFilter {
873    /// Keeps track of which fragments are still valid and which are no longer valid.
874    ///
875    /// This is valid for address-style row IDs.
876    Fragments {
877        to_keep: RoaringBitmap,
878        to_remove: RoaringBitmap,
879    },
880    /// Keep old rows whose row IDs are in this exact allow-list.
881    ///
882    /// This is required for stable row IDs, where row IDs are opaque and
883    /// should not be interpreted as encoded row addresses.
884    RowIds(RowAddrTreeMap),
885}
886
887impl OldIndexDataFilter {
888    /// Build a boolean mask that keeps only row IDs selected by this filter.
889    pub fn filter_row_ids(&self, row_ids: &UInt64Array) -> BooleanArray {
890        match self {
891            Self::Fragments { to_keep, .. } => row_ids
892                .iter()
893                .map(|id| id.map(|id| to_keep.contains((id >> 32) as u32)))
894                .collect(),
895            Self::RowIds(valid_row_ids) => row_ids
896                .iter()
897                .map(|id| id.map(|id| valid_row_ids.contains(id)))
898                .collect(),
899        }
900    }
901}
902
903impl UpdateCriteria {
904    pub fn requires_old_data(data_criteria: TrainingCriteria) -> Self {
905        Self {
906            requires_old_data: true,
907            data_criteria,
908        }
909    }
910
911    pub fn only_new_data(data_criteria: TrainingCriteria) -> Self {
912        Self {
913            requires_old_data: false,
914            data_criteria,
915        }
916    }
917}
918
919/// Compute the lexicographically next prefix by incrementing the last character's code point.
920/// Returns None if no valid upper bound exists.
921///
922/// This is used for LIKE prefix queries to convert `LIKE 'foo%'` to range `[foo, fop)`.
923///
924/// # UTF-8 and Unicode Handling
925///
926/// This function operates on Unicode code points (characters), not bytes. Since UTF-8
927/// byte ordering is identical to Unicode code point ordering, incrementing a character's
928/// code point produces the correct lexicographic successor for byte-wise string comparison.
929///
930/// If incrementing the last character would overflow or land in the surrogate range
931/// (U+D800-U+DFFF), we try incrementing the previous character, and so on.
932///
933/// Examples:
934/// - `"foo"` → `Some("fop")`
935/// - `"café"` → `Some("cafê")`  (é U+00E9 → ê U+00EA)
936/// - `"abc中"` → `Some("abc丮")` (中 U+4E2D → 丮 U+4E2E)
937/// - `"cafÿ"` → `Some("cafĀ")` (ÿ U+00FF → Ā U+0100)
938pub fn compute_next_prefix(prefix: &str) -> Option<String> {
939    if prefix.is_empty() {
940        return None;
941    }
942
943    let chars: Vec<char> = prefix.chars().collect();
944
945    // Try incrementing characters from right to left
946    for i in (0..chars.len()).rev() {
947        if let Some(next_char) = next_unicode_char(chars[i]) {
948            let mut result: String = chars[..i].iter().collect();
949            result.push(next_char);
950            return Some(result);
951        }
952        // This character cannot be incremented (e.g., U+10FFFF), try previous
953    }
954
955    // All characters were at maximum value
956    None
957}
958
959/// Get the next valid Unicode scalar value after the given character.
960/// Skips the surrogate range (U+D800-U+DFFF) which is not valid in UTF-8.
961fn next_unicode_char(c: char) -> Option<char> {
962    let cp = c as u32;
963    let next_cp = cp.checked_add(1)?;
964
965    // Skip surrogate range (U+D800-U+DFFF)
966    let next_cp = if (0xD800..=0xDFFF).contains(&next_cp) {
967        0xE000
968    } else {
969        next_cp
970    };
971
972    char::from_u32(next_cp)
973}
974
975/// A trait for a scalar index, a structure that can determine row ids that satisfy scalar queries
976#[async_trait]
977pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
978    /// Search the scalar index
979    ///
980    /// Returns all row ids that satisfy the query, these row ids are not necessarily ordered
981    async fn search(
982        &self,
983        query: &dyn AnyQuery,
984        metrics: &dyn MetricsCollector,
985    ) -> Result<SearchResult>;
986
987    /// Returns true if the remap operation is supported
988    fn can_remap(&self) -> bool;
989
990    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
991    async fn remap(
992        &self,
993        mapping: &HashMap<u64, Option<u64>>,
994        dest_store: &dyn IndexStore,
995    ) -> Result<CreatedIndex>;
996
997    /// Add the new data into the index, creating an updated version of the index in `dest_store`
998    ///
999    /// If `old_data_filter` is provided, old index data will be filtered before
1000    /// merge according to the chosen filter mode.
1001    async fn update(
1002        &self,
1003        new_data: SendableRecordBatchStream,
1004        dest_store: &dyn IndexStore,
1005        old_data_filter: Option<OldIndexDataFilter>,
1006    ) -> Result<CreatedIndex>;
1007
1008    /// Returns the criteria that will be used to update the index
1009    fn update_criteria(&self) -> UpdateCriteria;
1010
1011    /// Derive the index parameters from the current index
1012    ///
1013    /// This returns a ScalarIndexParams that can be used to recreate an index
1014    /// with the same configuration on another dataset.
1015    fn derive_index_params(&self) -> Result<ScalarIndexParams>;
1016}