lance_index/
scalar.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Scalar indices for metadata search & filtering
5
6use std::collections::HashMap;
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::TokenizerConfig;
23use lance_core::utils::mask::RowIdTreeMap;
24use lance_core::{Error, Result};
25use snafu::location;
26
27use crate::{Index, IndexParams, IndexType};
28
29pub mod bitmap;
30pub mod btree;
31pub mod expression;
32pub mod flat;
33pub mod inverted;
34pub mod label_list;
35pub mod lance_format;
36pub mod ngram;
37
38pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
39
40#[derive(Debug, Copy, Clone)]
41pub enum ScalarIndexType {
42    BTree,
43    Bitmap,
44    LabelList,
45    NGram,
46    Inverted,
47}
48
49impl TryFrom<IndexType> for ScalarIndexType {
50    type Error = Error;
51
52    fn try_from(value: IndexType) -> Result<Self> {
53        match value {
54            IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
55            IndexType::Bitmap => Ok(Self::Bitmap),
56            IndexType::LabelList => Ok(Self::LabelList),
57            IndexType::NGram => Ok(Self::NGram),
58            IndexType::Inverted => Ok(Self::Inverted),
59            _ => Err(Error::InvalidInput {
60                source: format!("Index type {:?} is not a scalar index", value).into(),
61                location: location!(),
62            }),
63        }
64    }
65}
66
67#[derive(Default)]
68pub struct ScalarIndexParams {
69    /// If set then always use the given index type and skip auto-detection
70    pub force_index_type: Option<ScalarIndexType>,
71}
72
73impl ScalarIndexParams {
74    pub fn new(index_type: ScalarIndexType) -> Self {
75        Self {
76            force_index_type: Some(index_type),
77        }
78    }
79}
80
81impl IndexParams for ScalarIndexParams {
82    fn as_any(&self) -> &dyn std::any::Any {
83        self
84    }
85
86    fn index_type(&self) -> IndexType {
87        match self.force_index_type {
88            Some(ScalarIndexType::BTree) | None => IndexType::BTree,
89            Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
90            Some(ScalarIndexType::LabelList) => IndexType::LabelList,
91            Some(ScalarIndexType::Inverted) => IndexType::Inverted,
92            Some(ScalarIndexType::NGram) => IndexType::NGram,
93        }
94    }
95
96    fn index_name(&self) -> &str {
97        LANCE_SCALAR_INDEX
98    }
99}
100
101#[derive(Clone)]
102pub struct InvertedIndexParams {
103    /// If true, store the position of the term in the document
104    /// This can significantly increase the size of the index
105    /// If false, only store the frequency of the term in the document
106    /// Default is true
107    pub with_position: bool,
108
109    pub tokenizer_config: TokenizerConfig,
110}
111
112impl Debug for InvertedIndexParams {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        f.debug_struct("InvertedIndexParams")
115            .field("with_position", &self.with_position)
116            .finish()
117    }
118}
119
120impl DeepSizeOf for InvertedIndexParams {
121    fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize {
122        0
123    }
124}
125
126impl Default for InvertedIndexParams {
127    fn default() -> Self {
128        Self {
129            with_position: true,
130            tokenizer_config: TokenizerConfig::default(),
131        }
132    }
133}
134
135impl InvertedIndexParams {
136    pub fn with_position(mut self, with_position: bool) -> Self {
137        self.with_position = with_position;
138        self
139    }
140}
141
142impl IndexParams for InvertedIndexParams {
143    fn as_any(&self) -> &dyn std::any::Any {
144        self
145    }
146
147    fn index_type(&self) -> IndexType {
148        IndexType::Inverted
149    }
150
151    fn index_name(&self) -> &str {
152        "INVERTED"
153    }
154}
155
156/// Trait for storing an index (or parts of an index) into storage
157#[async_trait]
158pub trait IndexWriter: Send {
159    /// Writes a record batch into the file, returning the 0-based index of the batch in the file
160    ///
161    /// E.g. if this is the third time this is called this method will return 2
162    async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
163    /// Finishes writing the file and closes the file
164    async fn finish(&mut self) -> Result<()>;
165    /// Finishes writing the file and closes the file with additional metadata
166    async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
167}
168
169/// Trait for reading an index (or parts of an index) from storage
170#[async_trait]
171pub trait IndexReader: Send + Sync {
172    /// Read the n-th record batch from the file
173    async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
174    /// Read the range of rows from the file.
175    /// If projection is Some, only return the columns in the projection,
176    /// nested columns like Some(&["x.y"]) are not supported.
177    /// If projection is None, return all columns.
178    async fn read_range(
179        &self,
180        range: std::ops::Range<usize>,
181        projection: Option<&[&str]>,
182    ) -> Result<RecordBatch>;
183    /// Return the number of batches in the file
184    async fn num_batches(&self) -> u32;
185    /// Return the number of rows in the file
186    fn num_rows(&self) -> usize;
187    /// Return the metadata of the file
188    fn schema(&self) -> &lance_core::datatypes::Schema;
189}
190
191/// Trait abstracting I/O away from index logic
192///
193/// Scalar indices are currently serialized as indexable arrow record batches stored in
194/// named "files".  The index store is responsible for serializing and deserializing
195/// these batches into file data (e.g. as .lance files or .parquet files, etc.)
196#[async_trait]
197pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
198    fn as_any(&self) -> &dyn Any;
199
200    /// Suggested I/O parallelism for the store
201    fn io_parallelism(&self) -> usize;
202
203    /// Create a new file and return a writer to store data in the file
204    async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
205        -> Result<Box<dyn IndexWriter>>;
206
207    /// Open an existing file for retrieval
208    async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
209
210    /// Copy a range of batches from an index file from this store to another
211    ///
212    /// This is often useful when remapping or updating
213    async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
214}
215
216/// Different scalar indices may support different kinds of queries
217///
218/// For example, a btree index can support a wide range of queries (e.g. x > 7)
219/// while an index based on FTS only supports queries like "x LIKE 'foo'"
220///
221/// This trait is used when we need an object that can represent any kind of query
222///
223/// Note: if you are implementing this trait for a query type then you probably also
224/// need to implement the [crate::scalar::expression::ScalarQueryParser] trait to
225/// create instances of your query at parse time.
226pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
227    /// Cast the query as Any to allow for downcasting
228    fn as_any(&self) -> &dyn Any;
229    /// Format the query as a string
230    fn format(&self, col: &str) -> String;
231    /// Convert the query to a datafusion expression
232    fn to_expr(&self, col: String) -> Expr;
233    /// Compare this query to another query
234    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
235    /// If true, the query results are inexact and will need rechecked
236    fn needs_recheck(&self) -> bool {
237        false
238    }
239}
240
241impl PartialEq for dyn AnyQuery {
242    fn eq(&self, other: &Self) -> bool {
243        self.dyn_eq(other)
244    }
245}
246
247/// A full text search query
248#[derive(Debug, Clone, PartialEq)]
249pub struct FullTextSearchQuery {
250    /// The columns to search,
251    /// if empty, search all indexed columns
252    pub columns: Vec<String>,
253    /// The full text search query
254    pub query: String,
255    /// The maximum number of results to return
256    pub limit: Option<i64>,
257    /// The wand factor to use for ranking
258    /// if None, use the default value of 1.0
259    /// Increasing this value will reduce the recall and improve the performance
260    /// 1.0 is the value that would give the best performance without recall loss
261    pub wand_factor: Option<f32>,
262}
263
264impl FullTextSearchQuery {
265    pub fn new(query: String) -> Self {
266        Self {
267            query,
268            limit: None,
269            columns: vec![],
270            wand_factor: None,
271        }
272    }
273
274    pub fn columns(mut self, columns: Option<Vec<String>>) -> Self {
275        if let Some(columns) = columns {
276            self.columns = columns;
277        }
278        self
279    }
280
281    pub fn limit(mut self, limit: Option<i64>) -> Self {
282        self.limit = limit;
283        self
284    }
285
286    pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
287        self.wand_factor = wand_factor;
288        self
289    }
290}
291
292/// A query that a basic scalar index (e.g. btree / bitmap) can satisfy
293///
294/// This is a subset of expression operators that is often referred to as the
295/// "sargable" operators
296///
297/// Note that negation is not included.  Negation should be applied later.  For
298/// example, to invert an equality query (e.g. all rows where the value is not 7)
299/// you can grab all rows where the value = 7 and then do an inverted take (or use
300/// a block list instead of an allow list for prefiltering)
301#[derive(Debug, Clone, PartialEq)]
302pub enum SargableQuery {
303    /// Retrieve all row ids where the value is in the given [min, max) range
304    Range(Bound<ScalarValue>, Bound<ScalarValue>),
305    /// Retrieve all row ids where the value is in the given set of values
306    IsIn(Vec<ScalarValue>),
307    /// Retrieve all row ids where the value is exactly the given value
308    Equals(ScalarValue),
309    /// Retrieve all row ids where the value matches the given full text search query
310    FullTextSearch(FullTextSearchQuery),
311    /// Retrieve all row ids where the value is null
312    IsNull(),
313}
314
315impl AnyQuery for SargableQuery {
316    fn as_any(&self) -> &dyn Any {
317        self
318    }
319
320    fn format(&self, col: &str) -> String {
321        match self {
322            Self::Range(lower, upper) => match (lower, upper) {
323                (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
324                (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
325                (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
326                (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
327                (Bound::Included(lhs), Bound::Included(rhs)) => {
328                    format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
329                }
330                (Bound::Included(lhs), Bound::Excluded(rhs)) => {
331                    format!("{} >= {} && {} < {}", col, lhs, col, rhs)
332                }
333                (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
334                (Bound::Excluded(lhs), Bound::Included(rhs)) => {
335                    format!("{} > {} && {} <= {}", col, lhs, col, rhs)
336                }
337                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
338                    format!("{} > {} && {} < {}", col, lhs, col, rhs)
339                }
340            },
341            Self::IsIn(values) => {
342                format!(
343                    "{} IN [{}]",
344                    col,
345                    values
346                        .iter()
347                        .map(|val| val.to_string())
348                        .collect::<Vec<_>>()
349                        .join(",")
350                )
351            }
352            Self::FullTextSearch(query) => {
353                format!("fts({})", query.query)
354            }
355            Self::IsNull() => {
356                format!("{} IS NULL", col)
357            }
358            Self::Equals(val) => {
359                format!("{} = {}", col, val)
360            }
361        }
362    }
363
364    fn to_expr(&self, col: String) -> Expr {
365        let col_expr = Expr::Column(Column::new_unqualified(col));
366        match self {
367            Self::Range(lower, upper) => match (lower, upper) {
368                (Bound::Unbounded, Bound::Unbounded) => {
369                    Expr::Literal(ScalarValue::Boolean(Some(true)))
370                }
371                (Bound::Unbounded, Bound::Included(rhs)) => {
372                    col_expr.lt_eq(Expr::Literal(rhs.clone()))
373                }
374                (Bound::Unbounded, Bound::Excluded(rhs)) => col_expr.lt(Expr::Literal(rhs.clone())),
375                (Bound::Included(lhs), Bound::Unbounded) => {
376                    col_expr.gt_eq(Expr::Literal(lhs.clone()))
377                }
378                (Bound::Included(lhs), Bound::Included(rhs)) => {
379                    col_expr.between(Expr::Literal(lhs.clone()), Expr::Literal(rhs.clone()))
380                }
381                (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
382                    .clone()
383                    .gt_eq(Expr::Literal(lhs.clone()))
384                    .and(col_expr.lt(Expr::Literal(rhs.clone()))),
385                (Bound::Excluded(lhs), Bound::Unbounded) => col_expr.gt(Expr::Literal(lhs.clone())),
386                (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
387                    .clone()
388                    .gt(Expr::Literal(lhs.clone()))
389                    .and(col_expr.lt_eq(Expr::Literal(rhs.clone()))),
390                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
391                    .clone()
392                    .gt(Expr::Literal(lhs.clone()))
393                    .and(col_expr.lt(Expr::Literal(rhs.clone()))),
394            },
395            Self::IsIn(values) => col_expr.in_list(
396                values
397                    .iter()
398                    .map(|val| Expr::Literal(val.clone()))
399                    .collect::<Vec<_>>(),
400                false,
401            ),
402            Self::FullTextSearch(query) => {
403                col_expr.like(Expr::Literal(ScalarValue::Utf8(Some(query.query.clone()))))
404            }
405            Self::IsNull() => col_expr.is_null(),
406            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone())),
407        }
408    }
409
410    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
411        match other.as_any().downcast_ref::<Self>() {
412            Some(o) => self == o,
413            None => false,
414        }
415    }
416}
417
418/// A query that a LabelListIndex can satisfy
419#[derive(Debug, Clone, PartialEq)]
420pub enum LabelListQuery {
421    /// Retrieve all row ids where every label is in the list of values for the row
422    HasAllLabels(Vec<ScalarValue>),
423    /// Retrieve all row ids where at least one of the given labels is in the list of values for the row
424    HasAnyLabel(Vec<ScalarValue>),
425}
426
427impl AnyQuery for LabelListQuery {
428    fn as_any(&self) -> &dyn Any {
429        self
430    }
431
432    fn format(&self, col: &str) -> String {
433        format!("{}", self.to_expr(col.to_string()))
434    }
435
436    fn to_expr(&self, col: String) -> Expr {
437        match self {
438            Self::HasAllLabels(labels) => {
439                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
440                let offsets_buffer =
441                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
442                let labels_list = ListArray::try_new(
443                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
444                    offsets_buffer,
445                    labels_arr,
446                    None,
447                )
448                .unwrap();
449                let labels_arr = Arc::new(labels_list);
450                Expr::ScalarFunction(ScalarFunction {
451                    func: Arc::new(array_has::ArrayHasAll::new().into()),
452                    args: vec![
453                        Expr::Column(Column::new_unqualified(col)),
454                        Expr::Literal(ScalarValue::List(labels_arr)),
455                    ],
456                })
457            }
458            Self::HasAnyLabel(labels) => {
459                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
460                let offsets_buffer =
461                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
462                let labels_list = ListArray::try_new(
463                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
464                    offsets_buffer,
465                    labels_arr,
466                    None,
467                )
468                .unwrap();
469                let labels_arr = Arc::new(labels_list);
470                Expr::ScalarFunction(ScalarFunction {
471                    func: Arc::new(array_has::ArrayHasAny::new().into()),
472                    args: vec![
473                        Expr::Column(Column::new_unqualified(col)),
474                        Expr::Literal(ScalarValue::List(labels_arr)),
475                    ],
476                })
477            }
478        }
479    }
480
481    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
482        match other.as_any().downcast_ref::<Self>() {
483            Some(o) => self == o,
484            None => false,
485        }
486    }
487}
488
489/// A query that a NGramIndex can satisfy
490#[derive(Debug, Clone, PartialEq)]
491pub enum TextQuery {
492    /// Retrieve all row ids where the text contains the given string
493    StringContains(String),
494    // TODO: In the future we should be able to do string-insensitive contains
495    // as well as partial matches (e.g. LIKE 'foo%') and potentially even
496    // some regular expressions
497}
498
499impl AnyQuery for TextQuery {
500    fn as_any(&self) -> &dyn Any {
501        self
502    }
503
504    fn format(&self, col: &str) -> String {
505        format!("{}", self.to_expr(col.to_string()))
506    }
507
508    fn to_expr(&self, col: String) -> Expr {
509        match self {
510            Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
511                func: Arc::new(ContainsFunc::new().into()),
512                args: vec![
513                    Expr::Column(Column::new_unqualified(col)),
514                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone()))),
515                ],
516            }),
517        }
518    }
519
520    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
521        match other.as_any().downcast_ref::<Self>() {
522            Some(o) => self == o,
523            None => false,
524        }
525    }
526
527    fn needs_recheck(&self) -> bool {
528        true
529    }
530}
531
532/// The result of a search operation against a scalar index
533#[derive(Debug)]
534pub enum SearchResult {
535    /// The exact row ids that satisfy the query
536    Exact(RowIdTreeMap),
537    /// Any row id satisfying the query will be in this set but not every
538    /// row id in this set will satisfy the query, a further recheck step
539    /// is needed
540    AtMost(RowIdTreeMap),
541    /// All of the given row ids satisfy the query but there may be more
542    ///
543    /// No scalar index actually returns this today but it can arise from
544    /// boolean operations (e.g. NOT(AtMost(x)) == AtLeast(NOT(x)))
545    AtLeast(RowIdTreeMap),
546}
547
548impl SearchResult {
549    pub fn row_ids(&self) -> &RowIdTreeMap {
550        match self {
551            Self::Exact(row_ids) => row_ids,
552            Self::AtMost(row_ids) => row_ids,
553            Self::AtLeast(row_ids) => row_ids,
554        }
555    }
556
557    pub fn is_exact(&self) -> bool {
558        matches!(self, Self::Exact(_))
559    }
560}
561
562/// A trait for a scalar index, a structure that can determine row ids that satisfy scalar queries
563#[async_trait]
564pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
565    /// Search the scalar index
566    ///
567    /// Returns all row ids that satisfy the query, these row ids are not necessarily ordered
568    async fn search(&self, query: &dyn AnyQuery) -> Result<SearchResult>;
569
570    /// Returns true if the query can be answered exactly
571    ///
572    /// If false is returned then the query still may be answered exactly but if true is returned
573    /// then the query must be answered exactly
574    fn can_answer_exact(&self, query: &dyn AnyQuery) -> bool;
575
576    /// Load the scalar index from storage
577    async fn load(store: Arc<dyn IndexStore>) -> Result<Arc<Self>>
578    where
579        Self: Sized;
580
581    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
582    async fn remap(
583        &self,
584        mapping: &HashMap<u64, Option<u64>>,
585        dest_store: &dyn IndexStore,
586    ) -> Result<()>;
587
588    /// Add the new data into the index, creating an updated version of the index in `dest_store`
589    async fn update(
590        &self,
591        new_data: SendableRecordBatchStream,
592        dest_store: &dyn IndexStore,
593    ) -> Result<()>;
594}