Skip to main content

lance_index/
scalar.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Scalar indices for metadata search & filtering
5
6use arrow::buffer::{OffsetBuffer, ScalarBuffer};
7use arrow_array::{ListArray, RecordBatch};
8use arrow_schema::{Field, Schema};
9use async_trait::async_trait;
10use datafusion::functions::string::contains::ContainsFunc;
11use datafusion::functions_nested::array_has;
12use datafusion::physical_plan::SendableRecordBatchStream;
13use datafusion_common::{scalar::ScalarValue, Column};
14use std::collections::{HashMap, HashSet};
15use std::fmt::Debug;
16use std::{any::Any, ops::Bound, sync::Arc};
17
18use datafusion_expr::expr::ScalarFunction;
19use datafusion_expr::Expr;
20use deepsize::DeepSizeOf;
21use futures::{future::BoxFuture, FutureExt, Stream};
22use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
23use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap};
24use lance_core::{Error, Result};
25use serde::Serialize;
26use snafu::location;
27
28use crate::metrics::MetricsCollector;
29use crate::scalar::registry::TrainingCriteria;
30use crate::{Index, IndexParams, IndexType};
31
32pub mod bitmap;
33pub mod bloomfilter;
34pub mod btree;
35pub mod expression;
36pub mod inverted;
37pub mod json;
38pub mod label_list;
39pub mod lance_format;
40pub mod ngram;
41pub mod registry;
42pub mod rtree;
43pub mod zoned;
44pub mod zonemap;
45
46use crate::frag_reuse::FragReuseIndex;
47pub use inverted::tokenizer::InvertedIndexParams;
48use lance_datafusion::udf::CONTAINS_TOKENS_UDF;
49
50pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
51
52/// Builtin index types supported by the Lance library
53///
54/// This is primarily for convenience to avoid a bunch of string
55/// constants and provide some auto-complete.  This type should not
56/// be used in the manifest as plugins cannot add new entries.
57#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
58pub enum BuiltinIndexType {
59    BTree,
60    Bitmap,
61    LabelList,
62    NGram,
63    ZoneMap,
64    BloomFilter,
65    RTree,
66    Inverted,
67}
68
69impl BuiltinIndexType {
70    pub fn as_str(&self) -> &str {
71        match self {
72            Self::BTree => "btree",
73            Self::Bitmap => "bitmap",
74            Self::LabelList => "labellist",
75            Self::NGram => "ngram",
76            Self::ZoneMap => "zonemap",
77            Self::Inverted => "inverted",
78            Self::BloomFilter => "bloomfilter",
79            Self::RTree => "rtree",
80        }
81    }
82}
83
84impl TryFrom<IndexType> for BuiltinIndexType {
85    type Error = Error;
86
87    fn try_from(value: IndexType) -> Result<Self> {
88        match value {
89            IndexType::BTree => Ok(Self::BTree),
90            IndexType::Bitmap => Ok(Self::Bitmap),
91            IndexType::LabelList => Ok(Self::LabelList),
92            IndexType::NGram => Ok(Self::NGram),
93            IndexType::ZoneMap => Ok(Self::ZoneMap),
94            IndexType::Inverted => Ok(Self::Inverted),
95            IndexType::BloomFilter => Ok(Self::BloomFilter),
96            IndexType::RTree => Ok(Self::RTree),
97            _ => Err(Error::Index {
98                message: "Invalid index type".to_string(),
99                location: location!(),
100            }),
101        }
102    }
103}
104
105#[derive(Debug, Clone, PartialEq)]
106pub struct ScalarIndexParams {
107    /// The type of index to create
108    ///
109    /// Plugins may add additional index types.  Index type lookup is case-insensitive.
110    pub index_type: String,
111    /// The parameters to train the index
112    ///
113    /// This should be a JSON string.  The contents of the JSON string will be specific to the
114    /// index type.  If not set, then default parameters will be used for the index type.
115    pub params: Option<String>,
116}
117
118impl Default for ScalarIndexParams {
119    fn default() -> Self {
120        Self {
121            index_type: BuiltinIndexType::BTree.as_str().to_string(),
122            params: None,
123        }
124    }
125}
126
127impl ScalarIndexParams {
128    /// Creates a new ScalarIndexParams from one of the builtin index types
129    pub fn for_builtin(index_type: BuiltinIndexType) -> Self {
130        Self {
131            index_type: index_type.as_str().to_string(),
132            params: None,
133        }
134    }
135
136    /// Create a new ScalarIndexParams with the given index type
137    pub fn new(index_type: String) -> Self {
138        Self {
139            index_type,
140            params: None,
141        }
142    }
143
144    /// Set the parameters for the index
145    pub fn with_params<ParamsType: Serialize>(mut self, params: &ParamsType) -> Self {
146        self.params = Some(serde_json::to_string(params).unwrap());
147        self
148    }
149}
150
151impl IndexParams for ScalarIndexParams {
152    fn as_any(&self) -> &dyn std::any::Any {
153        self
154    }
155
156    fn index_name(&self) -> &str {
157        LANCE_SCALAR_INDEX
158    }
159}
160
161impl IndexParams for InvertedIndexParams {
162    fn as_any(&self) -> &dyn std::any::Any {
163        self
164    }
165
166    fn index_name(&self) -> &str {
167        "INVERTED"
168    }
169}
170
171/// Trait for storing an index (or parts of an index) into storage
172#[async_trait]
173pub trait IndexWriter: Send {
174    /// Writes a record batch into the file, returning the 0-based index of the batch in the file
175    ///
176    /// E.g. if this is the third time this is called this method will return 2
177    async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
178    /// Finishes writing the file and closes the file
179    async fn finish(&mut self) -> Result<()>;
180    /// Finishes writing the file and closes the file with additional metadata
181    async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
182}
183
184/// Trait for reading an index (or parts of an index) from storage
185#[async_trait]
186pub trait IndexReader: Send + Sync {
187    /// Read the n-th record batch from the file
188    async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
189    /// Read the range of rows from the file.
190    /// If projection is Some, only return the columns in the projection,
191    /// nested columns like Some(&["x.y"]) are not supported.
192    /// If projection is None, return all columns.
193    async fn read_range(
194        &self,
195        range: std::ops::Range<usize>,
196        projection: Option<&[&str]>,
197    ) -> Result<RecordBatch>;
198    /// Return the number of batches in the file
199    async fn num_batches(&self, batch_size: u64) -> u32;
200    /// Return the number of rows in the file
201    fn num_rows(&self) -> usize;
202    /// Return the metadata of the file
203    fn schema(&self) -> &lance_core::datatypes::Schema;
204}
205
206/// A stream that reads the original training data back out of the index
207struct IndexReaderStream {
208    reader: Arc<dyn IndexReader>,
209    batch_size: u64,
210    offset: u64,
211    limit: u64,
212}
213
214impl IndexReaderStream {
215    async fn new(reader: Arc<dyn IndexReader>, batch_size: u64) -> Self {
216        let limit = reader.num_rows() as u64;
217        Self::new_with_limit(reader, batch_size, limit).await
218    }
219
220    async fn new_with_limit(reader: Arc<dyn IndexReader>, batch_size: u64, limit: u64) -> Self {
221        Self {
222            reader,
223            batch_size,
224            offset: 0,
225            limit,
226        }
227    }
228}
229
230impl Stream for IndexReaderStream {
231    type Item = BoxFuture<'static, Result<RecordBatch>>;
232
233    fn poll_next(
234        self: std::pin::Pin<&mut Self>,
235        _cx: &mut std::task::Context<'_>,
236    ) -> std::task::Poll<Option<Self::Item>> {
237        let this = self.get_mut();
238        if this.offset >= this.limit {
239            return std::task::Poll::Ready(None);
240        }
241        let read_start = this.offset;
242        let read_end = this.limit.min(this.offset + this.batch_size);
243        this.offset = read_end;
244        let reader_copy = this.reader.clone();
245
246        let read_task = async move {
247            reader_copy
248                .read_range(read_start as usize..read_end as usize, None)
249                .await
250        }
251        .boxed();
252        std::task::Poll::Ready(Some(read_task))
253    }
254}
255
256/// Trait abstracting I/O away from index logic
257///
258/// Scalar indices are currently serialized as indexable arrow record batches stored in
259/// named "files".  The index store is responsible for serializing and deserializing
260/// these batches into file data (e.g. as .lance files or .parquet files, etc.)
261#[async_trait]
262pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
263    fn as_any(&self) -> &dyn Any;
264
265    /// Suggested I/O parallelism for the store
266    fn io_parallelism(&self) -> usize;
267
268    /// Create a new file and return a writer to store data in the file
269    async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
270        -> Result<Box<dyn IndexWriter>>;
271
272    /// Open an existing file for retrieval
273    async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
274
275    /// Copy a range of batches from an index file from this store to another
276    ///
277    /// This is often useful when remapping or updating
278    async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
279
280    /// Rename an index file
281    async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
282
283    /// Delete an index file (used in the tmp spill store to keep tmp size down)
284    async fn delete_index_file(&self, name: &str) -> Result<()>;
285}
286
287/// Different scalar indices may support different kinds of queries
288///
289/// For example, a btree index can support a wide range of queries (e.g. x > 7)
290/// while an index based on FTS only supports queries like "x LIKE 'foo'"
291///
292/// This trait is used when we need an object that can represent any kind of query
293///
294/// Note: if you are implementing this trait for a query type then you probably also
295/// need to implement the [crate::scalar::expression::ScalarQueryParser] trait to
296/// create instances of your query at parse time.
297pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
298    /// Cast the query as Any to allow for downcasting
299    fn as_any(&self) -> &dyn Any;
300    /// Format the query as a string for display purposes
301    fn format(&self, col: &str) -> String;
302    /// Convert the query to a datafusion expression
303    fn to_expr(&self, col: String) -> Expr;
304    /// Compare this query to another query
305    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
306}
307
308impl PartialEq for dyn AnyQuery {
309    fn eq(&self, other: &Self) -> bool {
310        self.dyn_eq(other)
311    }
312}
313/// A full text search query
314#[derive(Debug, Clone, PartialEq)]
315pub struct FullTextSearchQuery {
316    pub query: FtsQuery,
317
318    /// The maximum number of results to return
319    pub limit: Option<i64>,
320
321    /// The wand factor to use for ranking
322    /// if None, use the default value of 1.0
323    /// Increasing this value will reduce the recall and improve the performance
324    /// 1.0 is the value that would give the best performance without recall loss
325    pub wand_factor: Option<f32>,
326}
327
328impl FullTextSearchQuery {
329    /// Create a new terms query
330    pub fn new(query: String) -> Self {
331        let query = MatchQuery::new(query).into();
332        Self {
333            query,
334            limit: None,
335            wand_factor: None,
336        }
337    }
338
339    /// Create a new fuzzy query
340    pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
341        let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
342        Self {
343            query,
344            limit: None,
345            wand_factor: None,
346        }
347    }
348
349    /// Create a new compound query
350    pub fn new_query(query: FtsQuery) -> Self {
351        Self {
352            query,
353            limit: None,
354            wand_factor: None,
355        }
356    }
357
358    /// Set the column to search over
359    /// This is available for only MatchQuery and PhraseQuery
360    pub fn with_column(mut self, column: String) -> Result<Self> {
361        self.query = fill_fts_query_column(&self.query, &[column], true)?;
362        Ok(self)
363    }
364
365    /// Set the column to search over
366    /// This is available for only MatchQuery
367    pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
368        self.query = fill_fts_query_column(&self.query, columns, true)?;
369        Ok(self)
370    }
371
372    /// limit the number of results to return
373    /// if None, return all results
374    pub fn limit(mut self, limit: Option<i64>) -> Self {
375        self.limit = limit;
376        self
377    }
378
379    pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
380        self.wand_factor = wand_factor;
381        self
382    }
383
384    pub fn columns(&self) -> HashSet<String> {
385        self.query.columns()
386    }
387
388    pub fn params(&self) -> FtsSearchParams {
389        FtsSearchParams::new()
390            .with_limit(self.limit.map(|limit| limit as usize))
391            .with_wand_factor(self.wand_factor.unwrap_or(1.0))
392    }
393}
394
395/// A query that a basic scalar index (e.g. btree / bitmap) can satisfy
396///
397/// This is a subset of expression operators that is often referred to as the
398/// "sargable" operators
399///
400/// Note that negation is not included.  Negation should be applied later.  For
401/// example, to invert an equality query (e.g. all rows where the value is not 7)
402/// you can grab all rows where the value = 7 and then do an inverted take (or use
403/// a block list instead of an allow list for prefiltering)
404#[derive(Debug, Clone, PartialEq)]
405pub enum SargableQuery {
406    /// Retrieve all row ids where the value is in the given [min, max) range
407    Range(Bound<ScalarValue>, Bound<ScalarValue>),
408    /// Retrieve all row ids where the value is in the given set of values
409    IsIn(Vec<ScalarValue>),
410    /// Retrieve all row ids where the value is exactly the given value
411    Equals(ScalarValue),
412    /// Retrieve all row ids where the value matches the given full text search query
413    FullTextSearch(FullTextSearchQuery),
414    /// Retrieve all row ids where the value is null
415    IsNull(),
416}
417
418impl AnyQuery for SargableQuery {
419    fn as_any(&self) -> &dyn Any {
420        self
421    }
422
423    fn format(&self, col: &str) -> String {
424        match self {
425            Self::Range(lower, upper) => match (lower, upper) {
426                (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
427                (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
428                (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
429                (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
430                (Bound::Included(lhs), Bound::Included(rhs)) => {
431                    format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
432                }
433                (Bound::Included(lhs), Bound::Excluded(rhs)) => {
434                    format!("{} >= {} && {} < {}", col, lhs, col, rhs)
435                }
436                (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
437                (Bound::Excluded(lhs), Bound::Included(rhs)) => {
438                    format!("{} > {} && {} <= {}", col, lhs, col, rhs)
439                }
440                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
441                    format!("{} > {} && {} < {}", col, lhs, col, rhs)
442                }
443            },
444            Self::IsIn(values) => {
445                format!(
446                    "{} IN [{}]",
447                    col,
448                    values
449                        .iter()
450                        .map(|val| val.to_string())
451                        .collect::<Vec<_>>()
452                        .join(",")
453                )
454            }
455            Self::FullTextSearch(query) => {
456                format!("fts({})", query.query)
457            }
458            Self::IsNull() => {
459                format!("{} IS NULL", col)
460            }
461            Self::Equals(val) => {
462                format!("{} = {}", col, val)
463            }
464        }
465    }
466
467    fn to_expr(&self, col: String) -> Expr {
468        let col_expr = Expr::Column(Column::new_unqualified(col));
469        match self {
470            Self::Range(lower, upper) => match (lower, upper) {
471                (Bound::Unbounded, Bound::Unbounded) => {
472                    Expr::Literal(ScalarValue::Boolean(Some(true)), None)
473                }
474                (Bound::Unbounded, Bound::Included(rhs)) => {
475                    col_expr.lt_eq(Expr::Literal(rhs.clone(), None))
476                }
477                (Bound::Unbounded, Bound::Excluded(rhs)) => {
478                    col_expr.lt(Expr::Literal(rhs.clone(), None))
479                }
480                (Bound::Included(lhs), Bound::Unbounded) => {
481                    col_expr.gt_eq(Expr::Literal(lhs.clone(), None))
482                }
483                (Bound::Included(lhs), Bound::Included(rhs)) => col_expr.between(
484                    Expr::Literal(lhs.clone(), None),
485                    Expr::Literal(rhs.clone(), None),
486                ),
487                (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
488                    .clone()
489                    .gt_eq(Expr::Literal(lhs.clone(), None))
490                    .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
491                (Bound::Excluded(lhs), Bound::Unbounded) => {
492                    col_expr.gt(Expr::Literal(lhs.clone(), None))
493                }
494                (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
495                    .clone()
496                    .gt(Expr::Literal(lhs.clone(), None))
497                    .and(col_expr.lt_eq(Expr::Literal(rhs.clone(), None))),
498                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
499                    .clone()
500                    .gt(Expr::Literal(lhs.clone(), None))
501                    .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
502            },
503            Self::IsIn(values) => col_expr.in_list(
504                values
505                    .iter()
506                    .map(|val| Expr::Literal(val.clone(), None))
507                    .collect::<Vec<_>>(),
508                false,
509            ),
510            Self::FullTextSearch(query) => col_expr.like(Expr::Literal(
511                ScalarValue::Utf8(Some(query.query.to_string())),
512                None,
513            )),
514            Self::IsNull() => col_expr.is_null(),
515            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
516        }
517    }
518
519    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
520        match other.as_any().downcast_ref::<Self>() {
521            Some(o) => self == o,
522            None => false,
523        }
524    }
525}
526
527/// A query that a LabelListIndex can satisfy
528#[derive(Debug, Clone, PartialEq)]
529pub enum LabelListQuery {
530    /// Retrieve all row ids where every label is in the list of values for the row
531    HasAllLabels(Vec<ScalarValue>),
532    /// Retrieve all row ids where at least one of the given labels is in the list of values for the row
533    HasAnyLabel(Vec<ScalarValue>),
534}
535
536impl AnyQuery for LabelListQuery {
537    fn as_any(&self) -> &dyn Any {
538        self
539    }
540
541    fn format(&self, col: &str) -> String {
542        format!("{}", self.to_expr(col.to_string()))
543    }
544
545    fn to_expr(&self, col: String) -> Expr {
546        match self {
547            Self::HasAllLabels(labels) => {
548                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
549                let offsets_buffer =
550                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
551                let labels_list = ListArray::try_new(
552                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
553                    offsets_buffer,
554                    labels_arr,
555                    None,
556                )
557                .unwrap();
558                let labels_arr = Arc::new(labels_list);
559                Expr::ScalarFunction(ScalarFunction {
560                    func: Arc::new(array_has::ArrayHasAll::new().into()),
561                    args: vec![
562                        Expr::Column(Column::new_unqualified(col)),
563                        Expr::Literal(ScalarValue::List(labels_arr), None),
564                    ],
565                })
566            }
567            Self::HasAnyLabel(labels) => {
568                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
569                let offsets_buffer =
570                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
571                let labels_list = ListArray::try_new(
572                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
573                    offsets_buffer,
574                    labels_arr,
575                    None,
576                )
577                .unwrap();
578                let labels_arr = Arc::new(labels_list);
579                Expr::ScalarFunction(ScalarFunction {
580                    func: Arc::new(array_has::ArrayHasAny::new().into()),
581                    args: vec![
582                        Expr::Column(Column::new_unqualified(col)),
583                        Expr::Literal(ScalarValue::List(labels_arr), None),
584                    ],
585                })
586            }
587        }
588    }
589
590    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
591        match other.as_any().downcast_ref::<Self>() {
592            Some(o) => self == o,
593            None => false,
594        }
595    }
596}
597
598/// A query that a NGramIndex can satisfy
599#[derive(Debug, Clone, PartialEq)]
600pub enum TextQuery {
601    /// Retrieve all row ids where the text contains the given string
602    StringContains(String),
603    // TODO: In the future we should be able to do string-insensitive contains
604    // as well as partial matches (e.g. LIKE 'foo%') and potentially even
605    // some regular expressions
606}
607
608impl AnyQuery for TextQuery {
609    fn as_any(&self) -> &dyn Any {
610        self
611    }
612
613    fn format(&self, col: &str) -> String {
614        format!("{}", self.to_expr(col.to_string()))
615    }
616
617    fn to_expr(&self, col: String) -> Expr {
618        match self {
619            Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
620                func: Arc::new(ContainsFunc::new().into()),
621                args: vec![
622                    Expr::Column(Column::new_unqualified(col)),
623                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
624                ],
625            }),
626        }
627    }
628
629    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
630        match other.as_any().downcast_ref::<Self>() {
631            Some(o) => self == o,
632            None => false,
633        }
634    }
635}
636
637/// A query that a InvertedIndex can satisfy
638#[derive(Debug, Clone, PartialEq)]
639pub enum TokenQuery {
640    /// Retrieve all row ids where the text contains all tokens parsed from given string. The tokens
641    /// are separated by punctuations and white spaces.
642    TokensContains(String),
643}
644
645/// A query that a BloomFilter index can satisfy
646///
647/// This is a subset of SargableQuery that only includes operations that bloom filters
648/// can efficiently handle: equals, is_null, and is_in queries.
649#[derive(Debug, Clone, PartialEq)]
650pub enum BloomFilterQuery {
651    /// Retrieve all row ids where the value is exactly the given value
652    Equals(ScalarValue),
653    /// Retrieve all row ids where the value is null
654    IsNull(),
655    /// Retrieve all row ids where the value is in the given set of values
656    IsIn(Vec<ScalarValue>),
657}
658
659impl AnyQuery for BloomFilterQuery {
660    fn as_any(&self) -> &dyn Any {
661        self
662    }
663
664    fn format(&self, col: &str) -> String {
665        match self {
666            Self::Equals(val) => {
667                format!("{} = {}", col, val)
668            }
669            Self::IsNull() => {
670                format!("{} IS NULL", col)
671            }
672            Self::IsIn(values) => {
673                format!(
674                    "{} IN [{}]",
675                    col,
676                    values
677                        .iter()
678                        .map(|val| val.to_string())
679                        .collect::<Vec<_>>()
680                        .join(",")
681                )
682            }
683        }
684    }
685
686    fn to_expr(&self, col: String) -> Expr {
687        let col_expr = Expr::Column(Column::new_unqualified(col));
688        match self {
689            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
690            Self::IsNull() => col_expr.is_null(),
691            Self::IsIn(values) => col_expr.in_list(
692                values
693                    .iter()
694                    .map(|val| Expr::Literal(val.clone(), None))
695                    .collect::<Vec<_>>(),
696                false,
697            ),
698        }
699    }
700
701    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
702        match other.as_any().downcast_ref::<Self>() {
703            Some(o) => self == o,
704            None => false,
705        }
706    }
707}
708
709impl AnyQuery for TokenQuery {
710    fn as_any(&self) -> &dyn Any {
711        self
712    }
713
714    fn format(&self, col: &str) -> String {
715        format!("{}", self.to_expr(col.to_string()))
716    }
717
718    fn to_expr(&self, col: String) -> Expr {
719        match self {
720            Self::TokensContains(substr) => Expr::ScalarFunction(ScalarFunction {
721                func: Arc::new(CONTAINS_TOKENS_UDF.clone()),
722                args: vec![
723                    Expr::Column(Column::new_unqualified(col)),
724                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
725                ],
726            }),
727        }
728    }
729
730    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
731        match other.as_any().downcast_ref::<Self>() {
732            Some(o) => self == o,
733            None => false,
734        }
735    }
736}
737
738#[derive(Debug, Clone, PartialEq)]
739pub struct RelationQuery {
740    pub value: ScalarValue,
741    pub field: Field,
742}
743
744/// A query that a Geo index can satisfy
745#[derive(Debug, Clone, PartialEq)]
746pub enum GeoQuery {
747    IntersectQuery(RelationQuery),
748    IsNull,
749}
750
751impl AnyQuery for GeoQuery {
752    fn as_any(&self) -> &dyn Any {
753        self
754    }
755
756    fn format(&self, col: &str) -> String {
757        match self {
758            Self::IntersectQuery(query) => {
759                format!("Intersect({} {})", col, query.value)
760            }
761            Self::IsNull => {
762                format!("{} IS NULL", col)
763            }
764        }
765    }
766
767    fn to_expr(&self, _col: String) -> Expr {
768        todo!()
769    }
770
771    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
772        match other.as_any().downcast_ref::<Self>() {
773            Some(o) => self == o,
774            None => false,
775        }
776    }
777}
778
779/// The result of a search operation against a scalar index
780#[derive(Debug, PartialEq)]
781pub enum SearchResult {
782    /// The exact row ids that satisfy the query
783    Exact(NullableRowAddrSet),
784    /// Any row id satisfying the query will be in this set but not every
785    /// row id in this set will satisfy the query, a further recheck step
786    /// is needed
787    AtMost(NullableRowAddrSet),
788    /// All of the given row ids satisfy the query but there may be more
789    ///
790    /// No scalar index actually returns this today but it can arise from
791    /// boolean operations (e.g. NOT(AtMost(x)) == AtLeast(NOT(x)))
792    AtLeast(NullableRowAddrSet),
793}
794
795impl SearchResult {
796    pub fn exact(row_ids: impl Into<RowAddrTreeMap>) -> Self {
797        Self::Exact(NullableRowAddrSet::new(row_ids.into(), Default::default()))
798    }
799
800    pub fn at_most(row_ids: impl Into<RowAddrTreeMap>) -> Self {
801        Self::AtMost(NullableRowAddrSet::new(row_ids.into(), Default::default()))
802    }
803
804    pub fn at_least(row_ids: impl Into<RowAddrTreeMap>) -> Self {
805        Self::AtLeast(NullableRowAddrSet::new(row_ids.into(), Default::default()))
806    }
807
808    pub fn with_nulls(self, nulls: impl Into<RowAddrTreeMap>) -> Self {
809        match self {
810            Self::Exact(row_ids) => Self::Exact(row_ids.with_nulls(nulls.into())),
811            Self::AtMost(row_ids) => Self::AtMost(row_ids.with_nulls(nulls.into())),
812            Self::AtLeast(row_ids) => Self::AtLeast(row_ids.with_nulls(nulls.into())),
813        }
814    }
815
816    pub fn row_addrs(&self) -> &NullableRowAddrSet {
817        match self {
818            Self::Exact(row_addrs) => row_addrs,
819            Self::AtMost(row_addrs) => row_addrs,
820            Self::AtLeast(row_addrs) => row_addrs,
821        }
822    }
823
824    pub fn is_exact(&self) -> bool {
825        matches!(self, Self::Exact(_))
826    }
827}
828
829/// Brief information about an index that was created
830pub struct CreatedIndex {
831    /// The details of the index that was created
832    ///
833    /// These should be stored somewhere as they will be needed to
834    /// load the index later.
835    pub index_details: prost_types::Any,
836    /// The version of the index that was created
837    ///
838    /// This can be used to determine if a reader is able to load the index.
839    pub index_version: u32,
840}
841
842/// The criteria that specifies how to update an index
843pub struct UpdateCriteria {
844    /// If true, then we need to read the old data to update the index
845    ///
846    /// This should be avoided if possible but is left in for some legacy paths
847    pub requires_old_data: bool,
848    /// The criteria required for data (both old and new)
849    pub data_criteria: TrainingCriteria,
850}
851
852impl UpdateCriteria {
853    pub fn requires_old_data(data_criteria: TrainingCriteria) -> Self {
854        Self {
855            requires_old_data: true,
856            data_criteria,
857        }
858    }
859
860    pub fn only_new_data(data_criteria: TrainingCriteria) -> Self {
861        Self {
862            requires_old_data: false,
863            data_criteria,
864        }
865    }
866}
867
868/// A trait for a scalar index, a structure that can determine row ids that satisfy scalar queries
869#[async_trait]
870pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
871    /// Search the scalar index
872    ///
873    /// Returns all row ids that satisfy the query, these row ids are not necessarily ordered
874    async fn search(
875        &self,
876        query: &dyn AnyQuery,
877        metrics: &dyn MetricsCollector,
878    ) -> Result<SearchResult>;
879
880    /// Returns true if the remap operation is supported
881    fn can_remap(&self) -> bool;
882
883    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
884    async fn remap(
885        &self,
886        mapping: &HashMap<u64, Option<u64>>,
887        dest_store: &dyn IndexStore,
888    ) -> Result<CreatedIndex>;
889
890    /// Add the new data into the index, creating an updated version of the index in `dest_store`
891    async fn update(
892        &self,
893        new_data: SendableRecordBatchStream,
894        dest_store: &dyn IndexStore,
895    ) -> Result<CreatedIndex>;
896
897    /// Returns the criteria that will be used to update the index
898    fn update_criteria(&self) -> UpdateCriteria;
899
900    /// Derive the index parameters from the current index
901    ///
902    /// This returns a ScalarIndexParams that can be used to recreate an index
903    /// with the same configuration on another dataset.
904    fn derive_index_params(&self) -> Result<ScalarIndexParams>;
905}