lance_index/
scalar.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Scalar indices for metadata search & filtering
5
6use std::collections::HashMap;
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::TokenizerConfig;
23use lance_core::utils::mask::RowIdTreeMap;
24use lance_core::{Error, Result};
25use snafu::location;
26
27use crate::metrics::MetricsCollector;
28use crate::{Index, IndexParams, IndexType};
29
30pub mod bitmap;
31pub mod btree;
32pub mod expression;
33pub mod flat;
34pub mod inverted;
35pub mod label_list;
36pub mod lance_format;
37pub mod ngram;
38
39pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
40
41#[derive(Debug, Copy, Clone)]
42pub enum ScalarIndexType {
43    BTree,
44    Bitmap,
45    LabelList,
46    NGram,
47    Inverted,
48}
49
50impl TryFrom<IndexType> for ScalarIndexType {
51    type Error = Error;
52
53    fn try_from(value: IndexType) -> Result<Self> {
54        match value {
55            IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
56            IndexType::Bitmap => Ok(Self::Bitmap),
57            IndexType::LabelList => Ok(Self::LabelList),
58            IndexType::NGram => Ok(Self::NGram),
59            IndexType::Inverted => Ok(Self::Inverted),
60            _ => Err(Error::InvalidInput {
61                source: format!("Index type {:?} is not a scalar index", value).into(),
62                location: location!(),
63            }),
64        }
65    }
66}
67
68#[derive(Default)]
69pub struct ScalarIndexParams {
70    /// If set then always use the given index type and skip auto-detection
71    pub force_index_type: Option<ScalarIndexType>,
72}
73
74impl ScalarIndexParams {
75    pub fn new(index_type: ScalarIndexType) -> Self {
76        Self {
77            force_index_type: Some(index_type),
78        }
79    }
80}
81
82impl IndexParams for ScalarIndexParams {
83    fn as_any(&self) -> &dyn std::any::Any {
84        self
85    }
86
87    fn index_type(&self) -> IndexType {
88        match self.force_index_type {
89            Some(ScalarIndexType::BTree) | None => IndexType::BTree,
90            Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
91            Some(ScalarIndexType::LabelList) => IndexType::LabelList,
92            Some(ScalarIndexType::Inverted) => IndexType::Inverted,
93            Some(ScalarIndexType::NGram) => IndexType::NGram,
94        }
95    }
96
97    fn index_name(&self) -> &str {
98        LANCE_SCALAR_INDEX
99    }
100}
101
102#[derive(Clone)]
103pub struct InvertedIndexParams {
104    /// If true, store the position of the term in the document
105    /// This can significantly increase the size of the index
106    /// If false, only store the frequency of the term in the document
107    /// Default is true
108    pub with_position: bool,
109
110    pub tokenizer_config: TokenizerConfig,
111}
112
113impl Debug for InvertedIndexParams {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        f.debug_struct("InvertedIndexParams")
116            .field("with_position", &self.with_position)
117            .finish()
118    }
119}
120
121impl DeepSizeOf for InvertedIndexParams {
122    fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize {
123        0
124    }
125}
126
127impl Default for InvertedIndexParams {
128    fn default() -> Self {
129        Self {
130            with_position: true,
131            tokenizer_config: TokenizerConfig::default(),
132        }
133    }
134}
135
136impl InvertedIndexParams {
137    pub fn with_position(mut self, with_position: bool) -> Self {
138        self.with_position = with_position;
139        self
140    }
141}
142
143impl IndexParams for InvertedIndexParams {
144    fn as_any(&self) -> &dyn std::any::Any {
145        self
146    }
147
148    fn index_type(&self) -> IndexType {
149        IndexType::Inverted
150    }
151
152    fn index_name(&self) -> &str {
153        "INVERTED"
154    }
155}
156
157/// Trait for storing an index (or parts of an index) into storage
158#[async_trait]
159pub trait IndexWriter: Send {
160    /// Writes a record batch into the file, returning the 0-based index of the batch in the file
161    ///
162    /// E.g. if this is the third time this is called this method will return 2
163    async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
164    /// Finishes writing the file and closes the file
165    async fn finish(&mut self) -> Result<()>;
166    /// Finishes writing the file and closes the file with additional metadata
167    async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
168}
169
170/// Trait for reading an index (or parts of an index) from storage
171#[async_trait]
172pub trait IndexReader: Send + Sync {
173    /// Read the n-th record batch from the file
174    async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
175    /// Read the range of rows from the file.
176    /// If projection is Some, only return the columns in the projection,
177    /// nested columns like Some(&["x.y"]) are not supported.
178    /// If projection is None, return all columns.
179    async fn read_range(
180        &self,
181        range: std::ops::Range<usize>,
182        projection: Option<&[&str]>,
183    ) -> Result<RecordBatch>;
184    /// Return the number of batches in the file
185    async fn num_batches(&self) -> u32;
186    /// Return the number of rows in the file
187    fn num_rows(&self) -> usize;
188    /// Return the metadata of the file
189    fn schema(&self) -> &lance_core::datatypes::Schema;
190}
191
192/// Trait abstracting I/O away from index logic
193///
194/// Scalar indices are currently serialized as indexable arrow record batches stored in
195/// named "files".  The index store is responsible for serializing and deserializing
196/// these batches into file data (e.g. as .lance files or .parquet files, etc.)
197#[async_trait]
198pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
199    fn as_any(&self) -> &dyn Any;
200
201    /// Suggested I/O parallelism for the store
202    fn io_parallelism(&self) -> usize;
203
204    /// Create a new file and return a writer to store data in the file
205    async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
206        -> Result<Box<dyn IndexWriter>>;
207
208    /// Open an existing file for retrieval
209    async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
210
211    /// Copy a range of batches from an index file from this store to another
212    ///
213    /// This is often useful when remapping or updating
214    async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
215
216    /// Rename an index file
217    async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
218
219    /// Delete an index file (used in the tmp spill store to keep tmp size down)
220    async fn delete_index_file(&self, name: &str) -> Result<()>;
221}
222
223/// Different scalar indices may support different kinds of queries
224///
225/// For example, a btree index can support a wide range of queries (e.g. x > 7)
226/// while an index based on FTS only supports queries like "x LIKE 'foo'"
227///
228/// This trait is used when we need an object that can represent any kind of query
229///
230/// Note: if you are implementing this trait for a query type then you probably also
231/// need to implement the [crate::scalar::expression::ScalarQueryParser] trait to
232/// create instances of your query at parse time.
233pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
234    /// Cast the query as Any to allow for downcasting
235    fn as_any(&self) -> &dyn Any;
236    /// Format the query as a string
237    fn format(&self, col: &str) -> String;
238    /// Convert the query to a datafusion expression
239    fn to_expr(&self, col: String) -> Expr;
240    /// Compare this query to another query
241    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
242    /// If true, the query results are inexact and will need rechecked
243    fn needs_recheck(&self) -> bool {
244        false
245    }
246}
247
248impl PartialEq for dyn AnyQuery {
249    fn eq(&self, other: &Self) -> bool {
250        self.dyn_eq(other)
251    }
252}
253
254/// A full text search query
255#[derive(Debug, Clone, PartialEq)]
256pub struct FullTextSearchQuery {
257    /// The columns to search,
258    /// if empty, search all indexed columns
259    pub columns: Vec<String>,
260    /// The full text search query
261    pub query: String,
262    /// The maximum number of results to return
263    pub limit: Option<i64>,
264    /// The wand factor to use for ranking
265    /// if None, use the default value of 1.0
266    /// Increasing this value will reduce the recall and improve the performance
267    /// 1.0 is the value that would give the best performance without recall loss
268    pub wand_factor: Option<f32>,
269}
270
271impl FullTextSearchQuery {
272    pub fn new(query: String) -> Self {
273        Self {
274            query,
275            limit: None,
276            columns: vec![],
277            wand_factor: None,
278        }
279    }
280
281    pub fn columns(mut self, columns: Option<Vec<String>>) -> Self {
282        if let Some(columns) = columns {
283            self.columns = columns;
284        }
285        self
286    }
287
288    pub fn limit(mut self, limit: Option<i64>) -> Self {
289        self.limit = limit;
290        self
291    }
292
293    pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
294        self.wand_factor = wand_factor;
295        self
296    }
297}
298
299/// A query that a basic scalar index (e.g. btree / bitmap) can satisfy
300///
301/// This is a subset of expression operators that is often referred to as the
302/// "sargable" operators
303///
304/// Note that negation is not included.  Negation should be applied later.  For
305/// example, to invert an equality query (e.g. all rows where the value is not 7)
306/// you can grab all rows where the value = 7 and then do an inverted take (or use
307/// a block list instead of an allow list for prefiltering)
308#[derive(Debug, Clone, PartialEq)]
309pub enum SargableQuery {
310    /// Retrieve all row ids where the value is in the given [min, max) range
311    Range(Bound<ScalarValue>, Bound<ScalarValue>),
312    /// Retrieve all row ids where the value is in the given set of values
313    IsIn(Vec<ScalarValue>),
314    /// Retrieve all row ids where the value is exactly the given value
315    Equals(ScalarValue),
316    /// Retrieve all row ids where the value matches the given full text search query
317    FullTextSearch(FullTextSearchQuery),
318    /// Retrieve all row ids where the value is null
319    IsNull(),
320}
321
322impl AnyQuery for SargableQuery {
323    fn as_any(&self) -> &dyn Any {
324        self
325    }
326
327    fn format(&self, col: &str) -> String {
328        match self {
329            Self::Range(lower, upper) => match (lower, upper) {
330                (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
331                (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
332                (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
333                (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
334                (Bound::Included(lhs), Bound::Included(rhs)) => {
335                    format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
336                }
337                (Bound::Included(lhs), Bound::Excluded(rhs)) => {
338                    format!("{} >= {} && {} < {}", col, lhs, col, rhs)
339                }
340                (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
341                (Bound::Excluded(lhs), Bound::Included(rhs)) => {
342                    format!("{} > {} && {} <= {}", col, lhs, col, rhs)
343                }
344                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
345                    format!("{} > {} && {} < {}", col, lhs, col, rhs)
346                }
347            },
348            Self::IsIn(values) => {
349                format!(
350                    "{} IN [{}]",
351                    col,
352                    values
353                        .iter()
354                        .map(|val| val.to_string())
355                        .collect::<Vec<_>>()
356                        .join(",")
357                )
358            }
359            Self::FullTextSearch(query) => {
360                format!("fts({})", query.query)
361            }
362            Self::IsNull() => {
363                format!("{} IS NULL", col)
364            }
365            Self::Equals(val) => {
366                format!("{} = {}", col, val)
367            }
368        }
369    }
370
371    fn to_expr(&self, col: String) -> Expr {
372        let col_expr = Expr::Column(Column::new_unqualified(col));
373        match self {
374            Self::Range(lower, upper) => match (lower, upper) {
375                (Bound::Unbounded, Bound::Unbounded) => {
376                    Expr::Literal(ScalarValue::Boolean(Some(true)))
377                }
378                (Bound::Unbounded, Bound::Included(rhs)) => {
379                    col_expr.lt_eq(Expr::Literal(rhs.clone()))
380                }
381                (Bound::Unbounded, Bound::Excluded(rhs)) => col_expr.lt(Expr::Literal(rhs.clone())),
382                (Bound::Included(lhs), Bound::Unbounded) => {
383                    col_expr.gt_eq(Expr::Literal(lhs.clone()))
384                }
385                (Bound::Included(lhs), Bound::Included(rhs)) => {
386                    col_expr.between(Expr::Literal(lhs.clone()), Expr::Literal(rhs.clone()))
387                }
388                (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
389                    .clone()
390                    .gt_eq(Expr::Literal(lhs.clone()))
391                    .and(col_expr.lt(Expr::Literal(rhs.clone()))),
392                (Bound::Excluded(lhs), Bound::Unbounded) => col_expr.gt(Expr::Literal(lhs.clone())),
393                (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
394                    .clone()
395                    .gt(Expr::Literal(lhs.clone()))
396                    .and(col_expr.lt_eq(Expr::Literal(rhs.clone()))),
397                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
398                    .clone()
399                    .gt(Expr::Literal(lhs.clone()))
400                    .and(col_expr.lt(Expr::Literal(rhs.clone()))),
401            },
402            Self::IsIn(values) => col_expr.in_list(
403                values
404                    .iter()
405                    .map(|val| Expr::Literal(val.clone()))
406                    .collect::<Vec<_>>(),
407                false,
408            ),
409            Self::FullTextSearch(query) => {
410                col_expr.like(Expr::Literal(ScalarValue::Utf8(Some(query.query.clone()))))
411            }
412            Self::IsNull() => col_expr.is_null(),
413            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone())),
414        }
415    }
416
417    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
418        match other.as_any().downcast_ref::<Self>() {
419            Some(o) => self == o,
420            None => false,
421        }
422    }
423}
424
425/// A query that a LabelListIndex can satisfy
426#[derive(Debug, Clone, PartialEq)]
427pub enum LabelListQuery {
428    /// Retrieve all row ids where every label is in the list of values for the row
429    HasAllLabels(Vec<ScalarValue>),
430    /// Retrieve all row ids where at least one of the given labels is in the list of values for the row
431    HasAnyLabel(Vec<ScalarValue>),
432}
433
434impl AnyQuery for LabelListQuery {
435    fn as_any(&self) -> &dyn Any {
436        self
437    }
438
439    fn format(&self, col: &str) -> String {
440        format!("{}", self.to_expr(col.to_string()))
441    }
442
443    fn to_expr(&self, col: String) -> Expr {
444        match self {
445            Self::HasAllLabels(labels) => {
446                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
447                let offsets_buffer =
448                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
449                let labels_list = ListArray::try_new(
450                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
451                    offsets_buffer,
452                    labels_arr,
453                    None,
454                )
455                .unwrap();
456                let labels_arr = Arc::new(labels_list);
457                Expr::ScalarFunction(ScalarFunction {
458                    func: Arc::new(array_has::ArrayHasAll::new().into()),
459                    args: vec![
460                        Expr::Column(Column::new_unqualified(col)),
461                        Expr::Literal(ScalarValue::List(labels_arr)),
462                    ],
463                })
464            }
465            Self::HasAnyLabel(labels) => {
466                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
467                let offsets_buffer =
468                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
469                let labels_list = ListArray::try_new(
470                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
471                    offsets_buffer,
472                    labels_arr,
473                    None,
474                )
475                .unwrap();
476                let labels_arr = Arc::new(labels_list);
477                Expr::ScalarFunction(ScalarFunction {
478                    func: Arc::new(array_has::ArrayHasAny::new().into()),
479                    args: vec![
480                        Expr::Column(Column::new_unqualified(col)),
481                        Expr::Literal(ScalarValue::List(labels_arr)),
482                    ],
483                })
484            }
485        }
486    }
487
488    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
489        match other.as_any().downcast_ref::<Self>() {
490            Some(o) => self == o,
491            None => false,
492        }
493    }
494}
495
496/// A query that a NGramIndex can satisfy
497#[derive(Debug, Clone, PartialEq)]
498pub enum TextQuery {
499    /// Retrieve all row ids where the text contains the given string
500    StringContains(String),
501    // TODO: In the future we should be able to do string-insensitive contains
502    // as well as partial matches (e.g. LIKE 'foo%') and potentially even
503    // some regular expressions
504}
505
506impl AnyQuery for TextQuery {
507    fn as_any(&self) -> &dyn Any {
508        self
509    }
510
511    fn format(&self, col: &str) -> String {
512        format!("{}", self.to_expr(col.to_string()))
513    }
514
515    fn to_expr(&self, col: String) -> Expr {
516        match self {
517            Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
518                func: Arc::new(ContainsFunc::new().into()),
519                args: vec![
520                    Expr::Column(Column::new_unqualified(col)),
521                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone()))),
522                ],
523            }),
524        }
525    }
526
527    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
528        match other.as_any().downcast_ref::<Self>() {
529            Some(o) => self == o,
530            None => false,
531        }
532    }
533
534    fn needs_recheck(&self) -> bool {
535        true
536    }
537}
538
539/// The result of a search operation against a scalar index
540#[derive(Debug, PartialEq)]
541pub enum SearchResult {
542    /// The exact row ids that satisfy the query
543    Exact(RowIdTreeMap),
544    /// Any row id satisfying the query will be in this set but not every
545    /// row id in this set will satisfy the query, a further recheck step
546    /// is needed
547    AtMost(RowIdTreeMap),
548    /// All of the given row ids satisfy the query but there may be more
549    ///
550    /// No scalar index actually returns this today but it can arise from
551    /// boolean operations (e.g. NOT(AtMost(x)) == AtLeast(NOT(x)))
552    AtLeast(RowIdTreeMap),
553}
554
555impl SearchResult {
556    pub fn row_ids(&self) -> &RowIdTreeMap {
557        match self {
558            Self::Exact(row_ids) => row_ids,
559            Self::AtMost(row_ids) => row_ids,
560            Self::AtLeast(row_ids) => row_ids,
561        }
562    }
563
564    pub fn is_exact(&self) -> bool {
565        matches!(self, Self::Exact(_))
566    }
567}
568
569/// A trait for a scalar index, a structure that can determine row ids that satisfy scalar queries
570#[async_trait]
571pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
572    /// Search the scalar index
573    ///
574    /// Returns all row ids that satisfy the query, these row ids are not necessarily ordered
575    async fn search(
576        &self,
577        query: &dyn AnyQuery,
578        metrics: &dyn MetricsCollector,
579    ) -> Result<SearchResult>;
580
581    /// Returns true if the query can be answered exactly
582    ///
583    /// If false is returned then the query still may be answered exactly but if true is returned
584    /// then the query must be answered exactly
585    fn can_answer_exact(&self, query: &dyn AnyQuery) -> bool;
586
587    /// Load the scalar index from storage
588    async fn load(store: Arc<dyn IndexStore>) -> Result<Arc<Self>>
589    where
590        Self: Sized;
591
592    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
593    async fn remap(
594        &self,
595        mapping: &HashMap<u64, Option<u64>>,
596        dest_store: &dyn IndexStore,
597    ) -> Result<()>;
598
599    /// Add the new data into the index, creating an updated version of the index in `dest_store`
600    async fn update(
601        &self,
602        new_data: SendableRecordBatchStream,
603        dest_store: &dyn IndexStore,
604    ) -> Result<()>;
605}