lance_index/
scalar.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Scalar indices for metadata search & filtering
5
6use std::collections::{HashMap, HashSet};
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
23use lance_core::cache::LanceCache;
24use lance_core::utils::mask::RowIdTreeMap;
25use lance_core::{Error, Result};
26use snafu::location;
27
28use crate::metrics::MetricsCollector;
29use crate::{Index, IndexParams, IndexType};
30
31pub mod bitmap;
32pub mod btree;
33pub mod expression;
34pub mod flat;
35pub mod inverted;
36pub mod label_list;
37pub mod lance_format;
38pub mod ngram;
39
40use crate::frag_reuse::FragReuseIndex;
41pub use inverted::tokenizer::InvertedIndexParams;
42
43pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
44
45#[derive(Debug, Copy, Clone, PartialEq, Eq, DeepSizeOf)]
46pub enum ScalarIndexType {
47    BTree,
48    Bitmap,
49    LabelList,
50    NGram,
51    Inverted,
52}
53
54impl TryFrom<IndexType> for ScalarIndexType {
55    type Error = Error;
56
57    fn try_from(value: IndexType) -> Result<Self> {
58        match value {
59            IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
60            IndexType::Bitmap => Ok(Self::Bitmap),
61            IndexType::LabelList => Ok(Self::LabelList),
62            IndexType::NGram => Ok(Self::NGram),
63            IndexType::Inverted => Ok(Self::Inverted),
64            _ => Err(Error::InvalidInput {
65                source: format!("Index type {:?} is not a scalar index", value).into(),
66                location: location!(),
67            }),
68        }
69    }
70}
71
72impl From<ScalarIndexType> for IndexType {
73    fn from(val: ScalarIndexType) -> Self {
74        match val {
75            ScalarIndexType::BTree => Self::BTree,
76            ScalarIndexType::Bitmap => Self::Bitmap,
77            ScalarIndexType::LabelList => Self::LabelList,
78            ScalarIndexType::NGram => Self::NGram,
79            ScalarIndexType::Inverted => Self::Inverted,
80        }
81    }
82}
83
84#[derive(Default)]
85pub struct ScalarIndexParams {
86    /// If set then always use the given index type and skip auto-detection
87    pub force_index_type: Option<ScalarIndexType>,
88}
89
90impl ScalarIndexParams {
91    pub fn new(index_type: ScalarIndexType) -> Self {
92        Self {
93            force_index_type: Some(index_type),
94        }
95    }
96}
97
98impl IndexParams for ScalarIndexParams {
99    fn as_any(&self) -> &dyn std::any::Any {
100        self
101    }
102
103    fn index_type(&self) -> IndexType {
104        match self.force_index_type {
105            Some(ScalarIndexType::BTree) | None => IndexType::BTree,
106            Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
107            Some(ScalarIndexType::LabelList) => IndexType::LabelList,
108            Some(ScalarIndexType::Inverted) => IndexType::Inverted,
109            Some(ScalarIndexType::NGram) => IndexType::NGram,
110        }
111    }
112
113    fn index_name(&self) -> &str {
114        LANCE_SCALAR_INDEX
115    }
116}
117
118impl IndexParams for InvertedIndexParams {
119    fn as_any(&self) -> &dyn std::any::Any {
120        self
121    }
122
123    fn index_type(&self) -> IndexType {
124        IndexType::Inverted
125    }
126
127    fn index_name(&self) -> &str {
128        "INVERTED"
129    }
130}
131
132/// Trait for storing an index (or parts of an index) into storage
133#[async_trait]
134pub trait IndexWriter: Send {
135    /// Writes a record batch into the file, returning the 0-based index of the batch in the file
136    ///
137    /// E.g. if this is the third time this is called this method will return 2
138    async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
139    /// Finishes writing the file and closes the file
140    async fn finish(&mut self) -> Result<()>;
141    /// Finishes writing the file and closes the file with additional metadata
142    async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
143}
144
145/// Trait for reading an index (or parts of an index) from storage
146#[async_trait]
147pub trait IndexReader: Send + Sync {
148    /// Read the n-th record batch from the file
149    async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
150    /// Read the range of rows from the file.
151    /// If projection is Some, only return the columns in the projection,
152    /// nested columns like Some(&["x.y"]) are not supported.
153    /// If projection is None, return all columns.
154    async fn read_range(
155        &self,
156        range: std::ops::Range<usize>,
157        projection: Option<&[&str]>,
158    ) -> Result<RecordBatch>;
159    /// Return the number of batches in the file
160    async fn num_batches(&self, batch_size: u64) -> u32;
161    /// Return the number of rows in the file
162    fn num_rows(&self) -> usize;
163    /// Return the metadata of the file
164    fn schema(&self) -> &lance_core::datatypes::Schema;
165}
166
167/// Trait abstracting I/O away from index logic
168///
169/// Scalar indices are currently serialized as indexable arrow record batches stored in
170/// named "files".  The index store is responsible for serializing and deserializing
171/// these batches into file data (e.g. as .lance files or .parquet files, etc.)
172#[async_trait]
173pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
174    fn as_any(&self) -> &dyn Any;
175
176    /// Suggested I/O parallelism for the store
177    fn io_parallelism(&self) -> usize;
178
179    /// Create a new file and return a writer to store data in the file
180    async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
181        -> Result<Box<dyn IndexWriter>>;
182
183    /// Open an existing file for retrieval
184    async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
185
186    /// Copy a range of batches from an index file from this store to another
187    ///
188    /// This is often useful when remapping or updating
189    async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
190
191    /// Rename an index file
192    async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
193
194    /// Delete an index file (used in the tmp spill store to keep tmp size down)
195    async fn delete_index_file(&self, name: &str) -> Result<()>;
196}
197
198/// Different scalar indices may support different kinds of queries
199///
200/// For example, a btree index can support a wide range of queries (e.g. x > 7)
201/// while an index based on FTS only supports queries like "x LIKE 'foo'"
202///
203/// This trait is used when we need an object that can represent any kind of query
204///
205/// Note: if you are implementing this trait for a query type then you probably also
206/// need to implement the [crate::scalar::expression::ScalarQueryParser] trait to
207/// create instances of your query at parse time.
208pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
209    /// Cast the query as Any to allow for downcasting
210    fn as_any(&self) -> &dyn Any;
211    /// Format the query as a string
212    fn format(&self, col: &str) -> String;
213    /// Convert the query to a datafusion expression
214    fn to_expr(&self, col: String) -> Expr;
215    /// Compare this query to another query
216    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
217    /// If true, the query results are inexact and will need rechecked
218    fn needs_recheck(&self) -> bool {
219        false
220    }
221}
222
223impl PartialEq for dyn AnyQuery {
224    fn eq(&self, other: &Self) -> bool {
225        self.dyn_eq(other)
226    }
227}
228/// A full text search query
229#[derive(Debug, Clone, PartialEq)]
230pub struct FullTextSearchQuery {
231    pub query: FtsQuery,
232
233    /// The maximum number of results to return
234    pub limit: Option<i64>,
235
236    /// The wand factor to use for ranking
237    /// if None, use the default value of 1.0
238    /// Increasing this value will reduce the recall and improve the performance
239    /// 1.0 is the value that would give the best performance without recall loss
240    pub wand_factor: Option<f32>,
241}
242
243impl FullTextSearchQuery {
244    /// Create a new terms query
245    pub fn new(query: String) -> Self {
246        let query = MatchQuery::new(query).into();
247        Self {
248            query,
249            limit: None,
250            wand_factor: None,
251        }
252    }
253
254    /// Create a new fuzzy query
255    pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
256        let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
257        Self {
258            query,
259            limit: None,
260            wand_factor: None,
261        }
262    }
263
264    /// Create a new compound query
265    pub fn new_query(query: FtsQuery) -> Self {
266        Self {
267            query,
268            limit: None,
269            wand_factor: None,
270        }
271    }
272
273    /// Set the column to search over
274    /// This is available for only MatchQuery and PhraseQuery
275    pub fn with_column(mut self, column: String) -> Result<Self> {
276        self.query = fill_fts_query_column(&self.query, &[column], true)?;
277        Ok(self)
278    }
279
280    /// Set the column to search over
281    /// This is available for only MatchQuery
282    pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
283        self.query = fill_fts_query_column(&self.query, columns, true)?;
284        Ok(self)
285    }
286
287    /// limit the number of results to return
288    /// if None, return all results
289    pub fn limit(mut self, limit: Option<i64>) -> Self {
290        self.limit = limit;
291        self
292    }
293
294    pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
295        self.wand_factor = wand_factor;
296        self
297    }
298
299    pub fn columns(&self) -> HashSet<String> {
300        self.query.columns()
301    }
302
303    pub fn params(&self) -> FtsSearchParams {
304        let params = FtsSearchParams::new()
305            .with_limit(self.limit.map(|limit| limit as usize))
306            .with_wand_factor(self.wand_factor.unwrap_or(1.0));
307        match self.query {
308            FtsQuery::Phrase(ref query) => params.with_phrase_slop(Some(query.slop)),
309            _ => params,
310        }
311    }
312}
313
314/// A query that a basic scalar index (e.g. btree / bitmap) can satisfy
315///
316/// This is a subset of expression operators that is often referred to as the
317/// "sargable" operators
318///
319/// Note that negation is not included.  Negation should be applied later.  For
320/// example, to invert an equality query (e.g. all rows where the value is not 7)
321/// you can grab all rows where the value = 7 and then do an inverted take (or use
322/// a block list instead of an allow list for prefiltering)
323#[derive(Debug, Clone, PartialEq)]
324pub enum SargableQuery {
325    /// Retrieve all row ids where the value is in the given [min, max) range
326    Range(Bound<ScalarValue>, Bound<ScalarValue>),
327    /// Retrieve all row ids where the value is in the given set of values
328    IsIn(Vec<ScalarValue>),
329    /// Retrieve all row ids where the value is exactly the given value
330    Equals(ScalarValue),
331    /// Retrieve all row ids where the value matches the given full text search query
332    FullTextSearch(FullTextSearchQuery),
333    /// Retrieve all row ids where the value is null
334    IsNull(),
335}
336
337impl AnyQuery for SargableQuery {
338    fn as_any(&self) -> &dyn Any {
339        self
340    }
341
342    fn format(&self, col: &str) -> String {
343        match self {
344            Self::Range(lower, upper) => match (lower, upper) {
345                (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
346                (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
347                (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
348                (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
349                (Bound::Included(lhs), Bound::Included(rhs)) => {
350                    format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
351                }
352                (Bound::Included(lhs), Bound::Excluded(rhs)) => {
353                    format!("{} >= {} && {} < {}", col, lhs, col, rhs)
354                }
355                (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
356                (Bound::Excluded(lhs), Bound::Included(rhs)) => {
357                    format!("{} > {} && {} <= {}", col, lhs, col, rhs)
358                }
359                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
360                    format!("{} > {} && {} < {}", col, lhs, col, rhs)
361                }
362            },
363            Self::IsIn(values) => {
364                format!(
365                    "{} IN [{}]",
366                    col,
367                    values
368                        .iter()
369                        .map(|val| val.to_string())
370                        .collect::<Vec<_>>()
371                        .join(",")
372                )
373            }
374            Self::FullTextSearch(query) => {
375                format!("fts({})", query.query)
376            }
377            Self::IsNull() => {
378                format!("{} IS NULL", col)
379            }
380            Self::Equals(val) => {
381                format!("{} = {}", col, val)
382            }
383        }
384    }
385
386    fn to_expr(&self, col: String) -> Expr {
387        let col_expr = Expr::Column(Column::new_unqualified(col));
388        match self {
389            Self::Range(lower, upper) => match (lower, upper) {
390                (Bound::Unbounded, Bound::Unbounded) => {
391                    Expr::Literal(ScalarValue::Boolean(Some(true)), None)
392                }
393                (Bound::Unbounded, Bound::Included(rhs)) => {
394                    col_expr.lt_eq(Expr::Literal(rhs.clone(), None))
395                }
396                (Bound::Unbounded, Bound::Excluded(rhs)) => {
397                    col_expr.lt(Expr::Literal(rhs.clone(), None))
398                }
399                (Bound::Included(lhs), Bound::Unbounded) => {
400                    col_expr.gt_eq(Expr::Literal(lhs.clone(), None))
401                }
402                (Bound::Included(lhs), Bound::Included(rhs)) => col_expr.between(
403                    Expr::Literal(lhs.clone(), None),
404                    Expr::Literal(rhs.clone(), None),
405                ),
406                (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
407                    .clone()
408                    .gt_eq(Expr::Literal(lhs.clone(), None))
409                    .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
410                (Bound::Excluded(lhs), Bound::Unbounded) => {
411                    col_expr.gt(Expr::Literal(lhs.clone(), None))
412                }
413                (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
414                    .clone()
415                    .gt(Expr::Literal(lhs.clone(), None))
416                    .and(col_expr.lt_eq(Expr::Literal(rhs.clone(), None))),
417                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
418                    .clone()
419                    .gt(Expr::Literal(lhs.clone(), None))
420                    .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
421            },
422            Self::IsIn(values) => col_expr.in_list(
423                values
424                    .iter()
425                    .map(|val| Expr::Literal(val.clone(), None))
426                    .collect::<Vec<_>>(),
427                false,
428            ),
429            Self::FullTextSearch(query) => col_expr.like(Expr::Literal(
430                ScalarValue::Utf8(Some(query.query.to_string())),
431                None,
432            )),
433            Self::IsNull() => col_expr.is_null(),
434            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
435        }
436    }
437
438    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
439        match other.as_any().downcast_ref::<Self>() {
440            Some(o) => self == o,
441            None => false,
442        }
443    }
444}
445
446/// A query that a LabelListIndex can satisfy
447#[derive(Debug, Clone, PartialEq)]
448pub enum LabelListQuery {
449    /// Retrieve all row ids where every label is in the list of values for the row
450    HasAllLabels(Vec<ScalarValue>),
451    /// Retrieve all row ids where at least one of the given labels is in the list of values for the row
452    HasAnyLabel(Vec<ScalarValue>),
453}
454
455impl AnyQuery for LabelListQuery {
456    fn as_any(&self) -> &dyn Any {
457        self
458    }
459
460    fn format(&self, col: &str) -> String {
461        format!("{}", self.to_expr(col.to_string()))
462    }
463
464    fn to_expr(&self, col: String) -> Expr {
465        match self {
466            Self::HasAllLabels(labels) => {
467                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
468                let offsets_buffer =
469                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
470                let labels_list = ListArray::try_new(
471                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
472                    offsets_buffer,
473                    labels_arr,
474                    None,
475                )
476                .unwrap();
477                let labels_arr = Arc::new(labels_list);
478                Expr::ScalarFunction(ScalarFunction {
479                    func: Arc::new(array_has::ArrayHasAll::new().into()),
480                    args: vec![
481                        Expr::Column(Column::new_unqualified(col)),
482                        Expr::Literal(ScalarValue::List(labels_arr), None),
483                    ],
484                })
485            }
486            Self::HasAnyLabel(labels) => {
487                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
488                let offsets_buffer =
489                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
490                let labels_list = ListArray::try_new(
491                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
492                    offsets_buffer,
493                    labels_arr,
494                    None,
495                )
496                .unwrap();
497                let labels_arr = Arc::new(labels_list);
498                Expr::ScalarFunction(ScalarFunction {
499                    func: Arc::new(array_has::ArrayHasAny::new().into()),
500                    args: vec![
501                        Expr::Column(Column::new_unqualified(col)),
502                        Expr::Literal(ScalarValue::List(labels_arr), None),
503                    ],
504                })
505            }
506        }
507    }
508
509    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
510        match other.as_any().downcast_ref::<Self>() {
511            Some(o) => self == o,
512            None => false,
513        }
514    }
515}
516
517/// A query that a NGramIndex can satisfy
518#[derive(Debug, Clone, PartialEq)]
519pub enum TextQuery {
520    /// Retrieve all row ids where the text contains the given string
521    StringContains(String),
522    // TODO: In the future we should be able to do string-insensitive contains
523    // as well as partial matches (e.g. LIKE 'foo%') and potentially even
524    // some regular expressions
525}
526
527impl AnyQuery for TextQuery {
528    fn as_any(&self) -> &dyn Any {
529        self
530    }
531
532    fn format(&self, col: &str) -> String {
533        format!("{}", self.to_expr(col.to_string()))
534    }
535
536    fn to_expr(&self, col: String) -> Expr {
537        match self {
538            Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
539                func: Arc::new(ContainsFunc::new().into()),
540                args: vec![
541                    Expr::Column(Column::new_unqualified(col)),
542                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
543                ],
544            }),
545        }
546    }
547
548    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
549        match other.as_any().downcast_ref::<Self>() {
550            Some(o) => self == o,
551            None => false,
552        }
553    }
554
555    fn needs_recheck(&self) -> bool {
556        true
557    }
558}
559
560/// The result of a search operation against a scalar index
561#[derive(Debug, PartialEq)]
562pub enum SearchResult {
563    /// The exact row ids that satisfy the query
564    Exact(RowIdTreeMap),
565    /// Any row id satisfying the query will be in this set but not every
566    /// row id in this set will satisfy the query, a further recheck step
567    /// is needed
568    AtMost(RowIdTreeMap),
569    /// All of the given row ids satisfy the query but there may be more
570    ///
571    /// No scalar index actually returns this today but it can arise from
572    /// boolean operations (e.g. NOT(AtMost(x)) == AtLeast(NOT(x)))
573    AtLeast(RowIdTreeMap),
574}
575
576impl SearchResult {
577    pub fn row_ids(&self) -> &RowIdTreeMap {
578        match self {
579            Self::Exact(row_ids) => row_ids,
580            Self::AtMost(row_ids) => row_ids,
581            Self::AtLeast(row_ids) => row_ids,
582        }
583    }
584
585    pub fn is_exact(&self) -> bool {
586        matches!(self, Self::Exact(_))
587    }
588}
589
590/// A trait for a scalar index, a structure that can determine row ids that satisfy scalar queries
591#[async_trait]
592pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
593    /// Search the scalar index
594    ///
595    /// Returns all row ids that satisfy the query, these row ids are not necessarily ordered
596    async fn search(
597        &self,
598        query: &dyn AnyQuery,
599        metrics: &dyn MetricsCollector,
600    ) -> Result<SearchResult>;
601
602    /// Returns true if the query can be answered exactly
603    ///
604    /// If false is returned then the query still may be answered exactly but if true is returned
605    /// then the query must be answered exactly
606    fn can_answer_exact(&self, query: &dyn AnyQuery) -> bool;
607
608    /// Load the scalar index from storage
609    async fn load(
610        store: Arc<dyn IndexStore>,
611        frag_reuse_index: Option<Arc<FragReuseIndex>>,
612        index_cache: LanceCache,
613    ) -> Result<Arc<Self>>
614    where
615        Self: Sized;
616
617    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
618    async fn remap(
619        &self,
620        mapping: &HashMap<u64, Option<u64>>,
621        dest_store: &dyn IndexStore,
622    ) -> Result<()>;
623
624    /// Add the new data into the index, creating an updated version of the index in `dest_store`
625    async fn update(
626        &self,
627        new_data: SendableRecordBatchStream,
628        dest_store: &dyn IndexStore,
629    ) -> Result<()>;
630}