lance_index/
scalar.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Scalar indices for metadata search & filtering
5
6use std::collections::{HashMap, HashSet};
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
23use lance_core::utils::mask::RowIdTreeMap;
24use lance_core::{Error, Result};
25use snafu::location;
26
27use crate::metrics::MetricsCollector;
28use crate::{Index, IndexParams, IndexType};
29
30pub mod bitmap;
31pub mod btree;
32pub mod expression;
33pub mod flat;
34pub mod inverted;
35pub mod label_list;
36pub mod lance_format;
37pub mod ngram;
38
39use crate::frag_reuse::FragReuseIndex;
40pub use inverted::tokenizer::InvertedIndexParams;
41
42pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
43
44#[derive(Debug, Copy, Clone, PartialEq, Eq)]
45pub enum ScalarIndexType {
46    BTree,
47    Bitmap,
48    LabelList,
49    NGram,
50    Inverted,
51}
52
53impl TryFrom<IndexType> for ScalarIndexType {
54    type Error = Error;
55
56    fn try_from(value: IndexType) -> Result<Self> {
57        match value {
58            IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
59            IndexType::Bitmap => Ok(Self::Bitmap),
60            IndexType::LabelList => Ok(Self::LabelList),
61            IndexType::NGram => Ok(Self::NGram),
62            IndexType::Inverted => Ok(Self::Inverted),
63            _ => Err(Error::InvalidInput {
64                source: format!("Index type {:?} is not a scalar index", value).into(),
65                location: location!(),
66            }),
67        }
68    }
69}
70
71impl From<ScalarIndexType> for IndexType {
72    fn from(val: ScalarIndexType) -> Self {
73        match val {
74            ScalarIndexType::BTree => Self::BTree,
75            ScalarIndexType::Bitmap => Self::Bitmap,
76            ScalarIndexType::LabelList => Self::LabelList,
77            ScalarIndexType::NGram => Self::NGram,
78            ScalarIndexType::Inverted => Self::Inverted,
79        }
80    }
81}
82
83#[derive(Default)]
84pub struct ScalarIndexParams {
85    /// If set then always use the given index type and skip auto-detection
86    pub force_index_type: Option<ScalarIndexType>,
87}
88
89impl ScalarIndexParams {
90    pub fn new(index_type: ScalarIndexType) -> Self {
91        Self {
92            force_index_type: Some(index_type),
93        }
94    }
95}
96
97impl IndexParams for ScalarIndexParams {
98    fn as_any(&self) -> &dyn std::any::Any {
99        self
100    }
101
102    fn index_type(&self) -> IndexType {
103        match self.force_index_type {
104            Some(ScalarIndexType::BTree) | None => IndexType::BTree,
105            Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
106            Some(ScalarIndexType::LabelList) => IndexType::LabelList,
107            Some(ScalarIndexType::Inverted) => IndexType::Inverted,
108            Some(ScalarIndexType::NGram) => IndexType::NGram,
109        }
110    }
111
112    fn index_name(&self) -> &str {
113        LANCE_SCALAR_INDEX
114    }
115}
116
117impl IndexParams for InvertedIndexParams {
118    fn as_any(&self) -> &dyn std::any::Any {
119        self
120    }
121
122    fn index_type(&self) -> IndexType {
123        IndexType::Inverted
124    }
125
126    fn index_name(&self) -> &str {
127        "INVERTED"
128    }
129}
130
131/// Trait for storing an index (or parts of an index) into storage
132#[async_trait]
133pub trait IndexWriter: Send {
134    /// Writes a record batch into the file, returning the 0-based index of the batch in the file
135    ///
136    /// E.g. if this is the third time this is called this method will return 2
137    async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
138    /// Finishes writing the file and closes the file
139    async fn finish(&mut self) -> Result<()>;
140    /// Finishes writing the file and closes the file with additional metadata
141    async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
142}
143
144/// Trait for reading an index (or parts of an index) from storage
145#[async_trait]
146pub trait IndexReader: Send + Sync {
147    /// Read the n-th record batch from the file
148    async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
149    /// Read the range of rows from the file.
150    /// If projection is Some, only return the columns in the projection,
151    /// nested columns like Some(&["x.y"]) are not supported.
152    /// If projection is None, return all columns.
153    async fn read_range(
154        &self,
155        range: std::ops::Range<usize>,
156        projection: Option<&[&str]>,
157    ) -> Result<RecordBatch>;
158    /// Return the number of batches in the file
159    async fn num_batches(&self, batch_size: u64) -> u32;
160    /// Return the number of rows in the file
161    fn num_rows(&self) -> usize;
162    /// Return the metadata of the file
163    fn schema(&self) -> &lance_core::datatypes::Schema;
164}
165
166/// Trait abstracting I/O away from index logic
167///
168/// Scalar indices are currently serialized as indexable arrow record batches stored in
169/// named "files".  The index store is responsible for serializing and deserializing
170/// these batches into file data (e.g. as .lance files or .parquet files, etc.)
171#[async_trait]
172pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
173    fn as_any(&self) -> &dyn Any;
174
175    /// Suggested I/O parallelism for the store
176    fn io_parallelism(&self) -> usize;
177
178    /// Create a new file and return a writer to store data in the file
179    async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
180        -> Result<Box<dyn IndexWriter>>;
181
182    /// Open an existing file for retrieval
183    async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
184
185    /// Copy a range of batches from an index file from this store to another
186    ///
187    /// This is often useful when remapping or updating
188    async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
189
190    /// Rename an index file
191    async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
192
193    /// Delete an index file (used in the tmp spill store to keep tmp size down)
194    async fn delete_index_file(&self, name: &str) -> Result<()>;
195}
196
197/// Different scalar indices may support different kinds of queries
198///
199/// For example, a btree index can support a wide range of queries (e.g. x > 7)
200/// while an index based on FTS only supports queries like "x LIKE 'foo'"
201///
202/// This trait is used when we need an object that can represent any kind of query
203///
204/// Note: if you are implementing this trait for a query type then you probably also
205/// need to implement the [crate::scalar::expression::ScalarQueryParser] trait to
206/// create instances of your query at parse time.
207pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
208    /// Cast the query as Any to allow for downcasting
209    fn as_any(&self) -> &dyn Any;
210    /// Format the query as a string
211    fn format(&self, col: &str) -> String;
212    /// Convert the query to a datafusion expression
213    fn to_expr(&self, col: String) -> Expr;
214    /// Compare this query to another query
215    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
216    /// If true, the query results are inexact and will need rechecked
217    fn needs_recheck(&self) -> bool {
218        false
219    }
220}
221
222impl PartialEq for dyn AnyQuery {
223    fn eq(&self, other: &Self) -> bool {
224        self.dyn_eq(other)
225    }
226}
227/// A full text search query
228#[derive(Debug, Clone, PartialEq)]
229pub struct FullTextSearchQuery {
230    pub query: FtsQuery,
231
232    /// The maximum number of results to return
233    pub limit: Option<i64>,
234
235    /// The wand factor to use for ranking
236    /// if None, use the default value of 1.0
237    /// Increasing this value will reduce the recall and improve the performance
238    /// 1.0 is the value that would give the best performance without recall loss
239    pub wand_factor: Option<f32>,
240}
241
242impl FullTextSearchQuery {
243    /// Create a new terms query
244    pub fn new(query: String) -> Self {
245        let query = MatchQuery::new(query).into();
246        Self {
247            query,
248            limit: None,
249            wand_factor: None,
250        }
251    }
252
253    /// Create a new fuzzy query
254    pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
255        let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
256        Self {
257            query,
258            limit: None,
259            wand_factor: None,
260        }
261    }
262
263    /// Create a new compound query
264    pub fn new_query(query: FtsQuery) -> Self {
265        Self {
266            query,
267            limit: None,
268            wand_factor: None,
269        }
270    }
271
272    /// Set the column to search over
273    /// This is available for only MatchQuery and PhraseQuery
274    pub fn with_column(mut self, column: String) -> Result<Self> {
275        self.query = fill_fts_query_column(&self.query, &[column], true)?;
276        Ok(self)
277    }
278
279    /// Set the column to search over
280    /// This is available for only MatchQuery
281    pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
282        self.query = fill_fts_query_column(&self.query, columns, true)?;
283        Ok(self)
284    }
285
286    /// limit the number of results to return
287    /// if None, return all results
288    pub fn limit(mut self, limit: Option<i64>) -> Self {
289        self.limit = limit;
290        self
291    }
292
293    pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
294        self.wand_factor = wand_factor;
295        self
296    }
297
298    pub fn columns(&self) -> HashSet<String> {
299        self.query.columns()
300    }
301
302    pub fn params(&self) -> FtsSearchParams {
303        let params = FtsSearchParams::new()
304            .with_limit(self.limit.map(|limit| limit as usize))
305            .with_wand_factor(self.wand_factor.unwrap_or(1.0));
306        match self.query {
307            FtsQuery::Phrase(ref query) => params.with_phrase_slop(Some(query.slop)),
308            _ => params,
309        }
310    }
311}
312
313/// A query that a basic scalar index (e.g. btree / bitmap) can satisfy
314///
315/// This is a subset of expression operators that is often referred to as the
316/// "sargable" operators
317///
318/// Note that negation is not included.  Negation should be applied later.  For
319/// example, to invert an equality query (e.g. all rows where the value is not 7)
320/// you can grab all rows where the value = 7 and then do an inverted take (or use
321/// a block list instead of an allow list for prefiltering)
322#[derive(Debug, Clone, PartialEq)]
323pub enum SargableQuery {
324    /// Retrieve all row ids where the value is in the given [min, max) range
325    Range(Bound<ScalarValue>, Bound<ScalarValue>),
326    /// Retrieve all row ids where the value is in the given set of values
327    IsIn(Vec<ScalarValue>),
328    /// Retrieve all row ids where the value is exactly the given value
329    Equals(ScalarValue),
330    /// Retrieve all row ids where the value matches the given full text search query
331    FullTextSearch(FullTextSearchQuery),
332    /// Retrieve all row ids where the value is null
333    IsNull(),
334}
335
336impl AnyQuery for SargableQuery {
337    fn as_any(&self) -> &dyn Any {
338        self
339    }
340
341    fn format(&self, col: &str) -> String {
342        match self {
343            Self::Range(lower, upper) => match (lower, upper) {
344                (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
345                (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
346                (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
347                (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
348                (Bound::Included(lhs), Bound::Included(rhs)) => {
349                    format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
350                }
351                (Bound::Included(lhs), Bound::Excluded(rhs)) => {
352                    format!("{} >= {} && {} < {}", col, lhs, col, rhs)
353                }
354                (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
355                (Bound::Excluded(lhs), Bound::Included(rhs)) => {
356                    format!("{} > {} && {} <= {}", col, lhs, col, rhs)
357                }
358                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
359                    format!("{} > {} && {} < {}", col, lhs, col, rhs)
360                }
361            },
362            Self::IsIn(values) => {
363                format!(
364                    "{} IN [{}]",
365                    col,
366                    values
367                        .iter()
368                        .map(|val| val.to_string())
369                        .collect::<Vec<_>>()
370                        .join(",")
371                )
372            }
373            Self::FullTextSearch(query) => {
374                format!("fts({})", query.query)
375            }
376            Self::IsNull() => {
377                format!("{} IS NULL", col)
378            }
379            Self::Equals(val) => {
380                format!("{} = {}", col, val)
381            }
382        }
383    }
384
385    fn to_expr(&self, col: String) -> Expr {
386        let col_expr = Expr::Column(Column::new_unqualified(col));
387        match self {
388            Self::Range(lower, upper) => match (lower, upper) {
389                (Bound::Unbounded, Bound::Unbounded) => {
390                    Expr::Literal(ScalarValue::Boolean(Some(true)), None)
391                }
392                (Bound::Unbounded, Bound::Included(rhs)) => {
393                    col_expr.lt_eq(Expr::Literal(rhs.clone(), None))
394                }
395                (Bound::Unbounded, Bound::Excluded(rhs)) => {
396                    col_expr.lt(Expr::Literal(rhs.clone(), None))
397                }
398                (Bound::Included(lhs), Bound::Unbounded) => {
399                    col_expr.gt_eq(Expr::Literal(lhs.clone(), None))
400                }
401                (Bound::Included(lhs), Bound::Included(rhs)) => col_expr.between(
402                    Expr::Literal(lhs.clone(), None),
403                    Expr::Literal(rhs.clone(), None),
404                ),
405                (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
406                    .clone()
407                    .gt_eq(Expr::Literal(lhs.clone(), None))
408                    .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
409                (Bound::Excluded(lhs), Bound::Unbounded) => {
410                    col_expr.gt(Expr::Literal(lhs.clone(), None))
411                }
412                (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
413                    .clone()
414                    .gt(Expr::Literal(lhs.clone(), None))
415                    .and(col_expr.lt_eq(Expr::Literal(rhs.clone(), None))),
416                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
417                    .clone()
418                    .gt(Expr::Literal(lhs.clone(), None))
419                    .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
420            },
421            Self::IsIn(values) => col_expr.in_list(
422                values
423                    .iter()
424                    .map(|val| Expr::Literal(val.clone(), None))
425                    .collect::<Vec<_>>(),
426                false,
427            ),
428            Self::FullTextSearch(query) => col_expr.like(Expr::Literal(
429                ScalarValue::Utf8(Some(query.query.to_string())),
430                None,
431            )),
432            Self::IsNull() => col_expr.is_null(),
433            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
434        }
435    }
436
437    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
438        match other.as_any().downcast_ref::<Self>() {
439            Some(o) => self == o,
440            None => false,
441        }
442    }
443}
444
445/// A query that a LabelListIndex can satisfy
446#[derive(Debug, Clone, PartialEq)]
447pub enum LabelListQuery {
448    /// Retrieve all row ids where every label is in the list of values for the row
449    HasAllLabels(Vec<ScalarValue>),
450    /// Retrieve all row ids where at least one of the given labels is in the list of values for the row
451    HasAnyLabel(Vec<ScalarValue>),
452}
453
454impl AnyQuery for LabelListQuery {
455    fn as_any(&self) -> &dyn Any {
456        self
457    }
458
459    fn format(&self, col: &str) -> String {
460        format!("{}", self.to_expr(col.to_string()))
461    }
462
463    fn to_expr(&self, col: String) -> Expr {
464        match self {
465            Self::HasAllLabels(labels) => {
466                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
467                let offsets_buffer =
468                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
469                let labels_list = ListArray::try_new(
470                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
471                    offsets_buffer,
472                    labels_arr,
473                    None,
474                )
475                .unwrap();
476                let labels_arr = Arc::new(labels_list);
477                Expr::ScalarFunction(ScalarFunction {
478                    func: Arc::new(array_has::ArrayHasAll::new().into()),
479                    args: vec![
480                        Expr::Column(Column::new_unqualified(col)),
481                        Expr::Literal(ScalarValue::List(labels_arr), None),
482                    ],
483                })
484            }
485            Self::HasAnyLabel(labels) => {
486                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
487                let offsets_buffer =
488                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
489                let labels_list = ListArray::try_new(
490                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
491                    offsets_buffer,
492                    labels_arr,
493                    None,
494                )
495                .unwrap();
496                let labels_arr = Arc::new(labels_list);
497                Expr::ScalarFunction(ScalarFunction {
498                    func: Arc::new(array_has::ArrayHasAny::new().into()),
499                    args: vec![
500                        Expr::Column(Column::new_unqualified(col)),
501                        Expr::Literal(ScalarValue::List(labels_arr), None),
502                    ],
503                })
504            }
505        }
506    }
507
508    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
509        match other.as_any().downcast_ref::<Self>() {
510            Some(o) => self == o,
511            None => false,
512        }
513    }
514}
515
516/// A query that a NGramIndex can satisfy
517#[derive(Debug, Clone, PartialEq)]
518pub enum TextQuery {
519    /// Retrieve all row ids where the text contains the given string
520    StringContains(String),
521    // TODO: In the future we should be able to do string-insensitive contains
522    // as well as partial matches (e.g. LIKE 'foo%') and potentially even
523    // some regular expressions
524}
525
526impl AnyQuery for TextQuery {
527    fn as_any(&self) -> &dyn Any {
528        self
529    }
530
531    fn format(&self, col: &str) -> String {
532        format!("{}", self.to_expr(col.to_string()))
533    }
534
535    fn to_expr(&self, col: String) -> Expr {
536        match self {
537            Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
538                func: Arc::new(ContainsFunc::new().into()),
539                args: vec![
540                    Expr::Column(Column::new_unqualified(col)),
541                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
542                ],
543            }),
544        }
545    }
546
547    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
548        match other.as_any().downcast_ref::<Self>() {
549            Some(o) => self == o,
550            None => false,
551        }
552    }
553
554    fn needs_recheck(&self) -> bool {
555        true
556    }
557}
558
559/// The result of a search operation against a scalar index
560#[derive(Debug, PartialEq)]
561pub enum SearchResult {
562    /// The exact row ids that satisfy the query
563    Exact(RowIdTreeMap),
564    /// Any row id satisfying the query will be in this set but not every
565    /// row id in this set will satisfy the query, a further recheck step
566    /// is needed
567    AtMost(RowIdTreeMap),
568    /// All of the given row ids satisfy the query but there may be more
569    ///
570    /// No scalar index actually returns this today but it can arise from
571    /// boolean operations (e.g. NOT(AtMost(x)) == AtLeast(NOT(x)))
572    AtLeast(RowIdTreeMap),
573}
574
575impl SearchResult {
576    pub fn row_ids(&self) -> &RowIdTreeMap {
577        match self {
578            Self::Exact(row_ids) => row_ids,
579            Self::AtMost(row_ids) => row_ids,
580            Self::AtLeast(row_ids) => row_ids,
581        }
582    }
583
584    pub fn is_exact(&self) -> bool {
585        matches!(self, Self::Exact(_))
586    }
587}
588
589/// A trait for a scalar index, a structure that can determine row ids that satisfy scalar queries
590#[async_trait]
591pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
592    /// Search the scalar index
593    ///
594    /// Returns all row ids that satisfy the query, these row ids are not necessarily ordered
595    async fn search(
596        &self,
597        query: &dyn AnyQuery,
598        metrics: &dyn MetricsCollector,
599    ) -> Result<SearchResult>;
600
601    /// Returns true if the query can be answered exactly
602    ///
603    /// If false is returned then the query still may be answered exactly but if true is returned
604    /// then the query must be answered exactly
605    fn can_answer_exact(&self, query: &dyn AnyQuery) -> bool;
606
607    /// Load the scalar index from storage
608    async fn load(
609        store: Arc<dyn IndexStore>,
610        fri: Option<Arc<FragReuseIndex>>,
611    ) -> Result<Arc<Self>>
612    where
613        Self: Sized;
614
615    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
616    async fn remap(
617        &self,
618        mapping: &HashMap<u64, Option<u64>>,
619        dest_store: &dyn IndexStore,
620    ) -> Result<()>;
621
622    /// Add the new data into the index, creating an updated version of the index in `dest_store`
623    async fn update(
624        &self,
625        new_data: SendableRecordBatchStream,
626        dest_store: &dyn IndexStore,
627    ) -> Result<()>;
628}