lance_index/
scalar.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Scalar indices for metadata search & filtering
5
6use std::collections::{HashMap, HashSet};
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
23use inverted::TokenizerConfig;
24use lance_core::utils::mask::RowIdTreeMap;
25use lance_core::{Error, Result};
26use snafu::location;
27
28use crate::metrics::MetricsCollector;
29use crate::{Index, IndexParams, IndexType};
30
31pub mod bitmap;
32pub mod btree;
33pub mod expression;
34pub mod flat;
35pub mod inverted;
36pub mod label_list;
37pub mod lance_format;
38pub mod ngram;
39
40pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
41
42#[derive(Debug, Copy, Clone)]
43pub enum ScalarIndexType {
44    BTree,
45    Bitmap,
46    LabelList,
47    NGram,
48    Inverted,
49}
50
51impl TryFrom<IndexType> for ScalarIndexType {
52    type Error = Error;
53
54    fn try_from(value: IndexType) -> Result<Self> {
55        match value {
56            IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
57            IndexType::Bitmap => Ok(Self::Bitmap),
58            IndexType::LabelList => Ok(Self::LabelList),
59            IndexType::NGram => Ok(Self::NGram),
60            IndexType::Inverted => Ok(Self::Inverted),
61            _ => Err(Error::InvalidInput {
62                source: format!("Index type {:?} is not a scalar index", value).into(),
63                location: location!(),
64            }),
65        }
66    }
67}
68
69#[derive(Default)]
70pub struct ScalarIndexParams {
71    /// If set then always use the given index type and skip auto-detection
72    pub force_index_type: Option<ScalarIndexType>,
73}
74
75impl ScalarIndexParams {
76    pub fn new(index_type: ScalarIndexType) -> Self {
77        Self {
78            force_index_type: Some(index_type),
79        }
80    }
81}
82
83impl IndexParams for ScalarIndexParams {
84    fn as_any(&self) -> &dyn std::any::Any {
85        self
86    }
87
88    fn index_type(&self) -> IndexType {
89        match self.force_index_type {
90            Some(ScalarIndexType::BTree) | None => IndexType::BTree,
91            Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
92            Some(ScalarIndexType::LabelList) => IndexType::LabelList,
93            Some(ScalarIndexType::Inverted) => IndexType::Inverted,
94            Some(ScalarIndexType::NGram) => IndexType::NGram,
95        }
96    }
97
98    fn index_name(&self) -> &str {
99        LANCE_SCALAR_INDEX
100    }
101}
102
103#[derive(Clone)]
104pub struct InvertedIndexParams {
105    /// If true, store the position of the term in the document
106    /// This can significantly increase the size of the index
107    /// If false, only store the frequency of the term in the document
108    /// Default is true
109    pub with_position: bool,
110
111    pub tokenizer_config: TokenizerConfig,
112}
113
114impl Debug for InvertedIndexParams {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        f.debug_struct("InvertedIndexParams")
117            .field("with_position", &self.with_position)
118            .finish()
119    }
120}
121
122impl DeepSizeOf for InvertedIndexParams {
123    fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize {
124        0
125    }
126}
127
128impl Default for InvertedIndexParams {
129    fn default() -> Self {
130        Self {
131            with_position: true,
132            tokenizer_config: TokenizerConfig::default(),
133        }
134    }
135}
136
137impl InvertedIndexParams {
138    pub fn with_position(mut self, with_position: bool) -> Self {
139        self.with_position = with_position;
140        self
141    }
142}
143
144impl IndexParams for InvertedIndexParams {
145    fn as_any(&self) -> &dyn std::any::Any {
146        self
147    }
148
149    fn index_type(&self) -> IndexType {
150        IndexType::Inverted
151    }
152
153    fn index_name(&self) -> &str {
154        "INVERTED"
155    }
156}
157
158/// Trait for storing an index (or parts of an index) into storage
159#[async_trait]
160pub trait IndexWriter: Send {
161    /// Writes a record batch into the file, returning the 0-based index of the batch in the file
162    ///
163    /// E.g. if this is the third time this is called this method will return 2
164    async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
165    /// Finishes writing the file and closes the file
166    async fn finish(&mut self) -> Result<()>;
167    /// Finishes writing the file and closes the file with additional metadata
168    async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
169}
170
171/// Trait for reading an index (or parts of an index) from storage
172#[async_trait]
173pub trait IndexReader: Send + Sync {
174    /// Read the n-th record batch from the file
175    async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
176    /// Read the range of rows from the file.
177    /// If projection is Some, only return the columns in the projection,
178    /// nested columns like Some(&["x.y"]) are not supported.
179    /// If projection is None, return all columns.
180    async fn read_range(
181        &self,
182        range: std::ops::Range<usize>,
183        projection: Option<&[&str]>,
184    ) -> Result<RecordBatch>;
185    /// Return the number of batches in the file
186    async fn num_batches(&self, batch_size: u64) -> u32;
187    /// Return the number of rows in the file
188    fn num_rows(&self) -> usize;
189    /// Return the metadata of the file
190    fn schema(&self) -> &lance_core::datatypes::Schema;
191}
192
193/// Trait abstracting I/O away from index logic
194///
195/// Scalar indices are currently serialized as indexable arrow record batches stored in
196/// named "files".  The index store is responsible for serializing and deserializing
197/// these batches into file data (e.g. as .lance files or .parquet files, etc.)
198#[async_trait]
199pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
200    fn as_any(&self) -> &dyn Any;
201
202    /// Suggested I/O parallelism for the store
203    fn io_parallelism(&self) -> usize;
204
205    /// Create a new file and return a writer to store data in the file
206    async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
207        -> Result<Box<dyn IndexWriter>>;
208
209    /// Open an existing file for retrieval
210    async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
211
212    /// Copy a range of batches from an index file from this store to another
213    ///
214    /// This is often useful when remapping or updating
215    async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
216
217    /// Rename an index file
218    async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
219
220    /// Delete an index file (used in the tmp spill store to keep tmp size down)
221    async fn delete_index_file(&self, name: &str) -> Result<()>;
222}
223
224/// Different scalar indices may support different kinds of queries
225///
226/// For example, a btree index can support a wide range of queries (e.g. x > 7)
227/// while an index based on FTS only supports queries like "x LIKE 'foo'"
228///
229/// This trait is used when we need an object that can represent any kind of query
230///
231/// Note: if you are implementing this trait for a query type then you probably also
232/// need to implement the [crate::scalar::expression::ScalarQueryParser] trait to
233/// create instances of your query at parse time.
234pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
235    /// Cast the query as Any to allow for downcasting
236    fn as_any(&self) -> &dyn Any;
237    /// Format the query as a string
238    fn format(&self, col: &str) -> String;
239    /// Convert the query to a datafusion expression
240    fn to_expr(&self, col: String) -> Expr;
241    /// Compare this query to another query
242    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
243    /// If true, the query results are inexact and will need rechecked
244    fn needs_recheck(&self) -> bool {
245        false
246    }
247}
248
249impl PartialEq for dyn AnyQuery {
250    fn eq(&self, other: &Self) -> bool {
251        self.dyn_eq(other)
252    }
253}
254/// A full text search query
255#[derive(Debug, Clone, PartialEq)]
256pub struct FullTextSearchQuery {
257    pub query: FtsQuery,
258
259    /// The maximum number of results to return
260    pub limit: Option<i64>,
261
262    /// The wand factor to use for ranking
263    /// if None, use the default value of 1.0
264    /// Increasing this value will reduce the recall and improve the performance
265    /// 1.0 is the value that would give the best performance without recall loss
266    pub wand_factor: Option<f32>,
267}
268
269impl FullTextSearchQuery {
270    /// Create a new terms query
271    pub fn new(query: String) -> Self {
272        let query = MatchQuery::new(query).into();
273        Self {
274            query,
275            limit: None,
276            wand_factor: None,
277        }
278    }
279
280    /// Create a new fuzzy query
281    pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
282        let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
283        Self {
284            query,
285            limit: None,
286            wand_factor: None,
287        }
288    }
289
290    /// Create a new compound query
291    pub fn new_query(query: FtsQuery) -> Self {
292        Self {
293            query,
294            limit: None,
295            wand_factor: None,
296        }
297    }
298
299    /// Set the column to search over
300    /// This is available for only MatchQuery and PhraseQuery
301    pub fn with_column(mut self, column: String) -> Result<Self> {
302        self.query = fill_fts_query_column(&self.query, &[column], true)?;
303        Ok(self)
304    }
305
306    /// Set the column to search over
307    /// This is available for only MatchQuery
308    pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
309        self.query = fill_fts_query_column(&self.query, columns, true)?;
310        Ok(self)
311    }
312
313    /// limit the number of results to return
314    /// if None, return all results
315    pub fn limit(mut self, limit: Option<i64>) -> Self {
316        self.limit = limit;
317        self
318    }
319
320    pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
321        self.wand_factor = wand_factor;
322        self
323    }
324
325    pub fn columns(&self) -> HashSet<String> {
326        self.query.columns()
327    }
328
329    pub fn params(&self) -> FtsSearchParams {
330        FtsSearchParams {
331            limit: self.limit.map(|limit| limit as usize),
332            wand_factor: self.wand_factor.unwrap_or(1.0),
333        }
334    }
335}
336
337/// A query that a basic scalar index (e.g. btree / bitmap) can satisfy
338///
339/// This is a subset of expression operators that is often referred to as the
340/// "sargable" operators
341///
342/// Note that negation is not included.  Negation should be applied later.  For
343/// example, to invert an equality query (e.g. all rows where the value is not 7)
344/// you can grab all rows where the value = 7 and then do an inverted take (or use
345/// a block list instead of an allow list for prefiltering)
346#[derive(Debug, Clone, PartialEq)]
347pub enum SargableQuery {
348    /// Retrieve all row ids where the value is in the given [min, max) range
349    Range(Bound<ScalarValue>, Bound<ScalarValue>),
350    /// Retrieve all row ids where the value is in the given set of values
351    IsIn(Vec<ScalarValue>),
352    /// Retrieve all row ids where the value is exactly the given value
353    Equals(ScalarValue),
354    /// Retrieve all row ids where the value matches the given full text search query
355    FullTextSearch(FullTextSearchQuery),
356    /// Retrieve all row ids where the value is null
357    IsNull(),
358}
359
360impl AnyQuery for SargableQuery {
361    fn as_any(&self) -> &dyn Any {
362        self
363    }
364
365    fn format(&self, col: &str) -> String {
366        match self {
367            Self::Range(lower, upper) => match (lower, upper) {
368                (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
369                (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
370                (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
371                (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
372                (Bound::Included(lhs), Bound::Included(rhs)) => {
373                    format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
374                }
375                (Bound::Included(lhs), Bound::Excluded(rhs)) => {
376                    format!("{} >= {} && {} < {}", col, lhs, col, rhs)
377                }
378                (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
379                (Bound::Excluded(lhs), Bound::Included(rhs)) => {
380                    format!("{} > {} && {} <= {}", col, lhs, col, rhs)
381                }
382                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
383                    format!("{} > {} && {} < {}", col, lhs, col, rhs)
384                }
385            },
386            Self::IsIn(values) => {
387                format!(
388                    "{} IN [{}]",
389                    col,
390                    values
391                        .iter()
392                        .map(|val| val.to_string())
393                        .collect::<Vec<_>>()
394                        .join(",")
395                )
396            }
397            Self::FullTextSearch(query) => {
398                format!("fts({})", query.query)
399            }
400            Self::IsNull() => {
401                format!("{} IS NULL", col)
402            }
403            Self::Equals(val) => {
404                format!("{} = {}", col, val)
405            }
406        }
407    }
408
409    fn to_expr(&self, col: String) -> Expr {
410        let col_expr = Expr::Column(Column::new_unqualified(col));
411        match self {
412            Self::Range(lower, upper) => match (lower, upper) {
413                (Bound::Unbounded, Bound::Unbounded) => {
414                    Expr::Literal(ScalarValue::Boolean(Some(true)))
415                }
416                (Bound::Unbounded, Bound::Included(rhs)) => {
417                    col_expr.lt_eq(Expr::Literal(rhs.clone()))
418                }
419                (Bound::Unbounded, Bound::Excluded(rhs)) => col_expr.lt(Expr::Literal(rhs.clone())),
420                (Bound::Included(lhs), Bound::Unbounded) => {
421                    col_expr.gt_eq(Expr::Literal(lhs.clone()))
422                }
423                (Bound::Included(lhs), Bound::Included(rhs)) => {
424                    col_expr.between(Expr::Literal(lhs.clone()), Expr::Literal(rhs.clone()))
425                }
426                (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
427                    .clone()
428                    .gt_eq(Expr::Literal(lhs.clone()))
429                    .and(col_expr.lt(Expr::Literal(rhs.clone()))),
430                (Bound::Excluded(lhs), Bound::Unbounded) => col_expr.gt(Expr::Literal(lhs.clone())),
431                (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
432                    .clone()
433                    .gt(Expr::Literal(lhs.clone()))
434                    .and(col_expr.lt_eq(Expr::Literal(rhs.clone()))),
435                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
436                    .clone()
437                    .gt(Expr::Literal(lhs.clone()))
438                    .and(col_expr.lt(Expr::Literal(rhs.clone()))),
439            },
440            Self::IsIn(values) => col_expr.in_list(
441                values
442                    .iter()
443                    .map(|val| Expr::Literal(val.clone()))
444                    .collect::<Vec<_>>(),
445                false,
446            ),
447            Self::FullTextSearch(query) => col_expr.like(Expr::Literal(ScalarValue::Utf8(Some(
448                query.query.to_string(),
449            )))),
450            Self::IsNull() => col_expr.is_null(),
451            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone())),
452        }
453    }
454
455    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
456        match other.as_any().downcast_ref::<Self>() {
457            Some(o) => self == o,
458            None => false,
459        }
460    }
461}
462
463/// A query that a LabelListIndex can satisfy
464#[derive(Debug, Clone, PartialEq)]
465pub enum LabelListQuery {
466    /// Retrieve all row ids where every label is in the list of values for the row
467    HasAllLabels(Vec<ScalarValue>),
468    /// Retrieve all row ids where at least one of the given labels is in the list of values for the row
469    HasAnyLabel(Vec<ScalarValue>),
470}
471
472impl AnyQuery for LabelListQuery {
473    fn as_any(&self) -> &dyn Any {
474        self
475    }
476
477    fn format(&self, col: &str) -> String {
478        format!("{}", self.to_expr(col.to_string()))
479    }
480
481    fn to_expr(&self, col: String) -> Expr {
482        match self {
483            Self::HasAllLabels(labels) => {
484                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
485                let offsets_buffer =
486                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
487                let labels_list = ListArray::try_new(
488                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
489                    offsets_buffer,
490                    labels_arr,
491                    None,
492                )
493                .unwrap();
494                let labels_arr = Arc::new(labels_list);
495                Expr::ScalarFunction(ScalarFunction {
496                    func: Arc::new(array_has::ArrayHasAll::new().into()),
497                    args: vec![
498                        Expr::Column(Column::new_unqualified(col)),
499                        Expr::Literal(ScalarValue::List(labels_arr)),
500                    ],
501                })
502            }
503            Self::HasAnyLabel(labels) => {
504                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
505                let offsets_buffer =
506                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
507                let labels_list = ListArray::try_new(
508                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
509                    offsets_buffer,
510                    labels_arr,
511                    None,
512                )
513                .unwrap();
514                let labels_arr = Arc::new(labels_list);
515                Expr::ScalarFunction(ScalarFunction {
516                    func: Arc::new(array_has::ArrayHasAny::new().into()),
517                    args: vec![
518                        Expr::Column(Column::new_unqualified(col)),
519                        Expr::Literal(ScalarValue::List(labels_arr)),
520                    ],
521                })
522            }
523        }
524    }
525
526    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
527        match other.as_any().downcast_ref::<Self>() {
528            Some(o) => self == o,
529            None => false,
530        }
531    }
532}
533
534/// A query that a NGramIndex can satisfy
535#[derive(Debug, Clone, PartialEq)]
536pub enum TextQuery {
537    /// Retrieve all row ids where the text contains the given string
538    StringContains(String),
539    // TODO: In the future we should be able to do string-insensitive contains
540    // as well as partial matches (e.g. LIKE 'foo%') and potentially even
541    // some regular expressions
542}
543
544impl AnyQuery for TextQuery {
545    fn as_any(&self) -> &dyn Any {
546        self
547    }
548
549    fn format(&self, col: &str) -> String {
550        format!("{}", self.to_expr(col.to_string()))
551    }
552
553    fn to_expr(&self, col: String) -> Expr {
554        match self {
555            Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
556                func: Arc::new(ContainsFunc::new().into()),
557                args: vec![
558                    Expr::Column(Column::new_unqualified(col)),
559                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone()))),
560                ],
561            }),
562        }
563    }
564
565    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
566        match other.as_any().downcast_ref::<Self>() {
567            Some(o) => self == o,
568            None => false,
569        }
570    }
571
572    fn needs_recheck(&self) -> bool {
573        true
574    }
575}
576
577/// The result of a search operation against a scalar index
578#[derive(Debug, PartialEq)]
579pub enum SearchResult {
580    /// The exact row ids that satisfy the query
581    Exact(RowIdTreeMap),
582    /// Any row id satisfying the query will be in this set but not every
583    /// row id in this set will satisfy the query, a further recheck step
584    /// is needed
585    AtMost(RowIdTreeMap),
586    /// All of the given row ids satisfy the query but there may be more
587    ///
588    /// No scalar index actually returns this today but it can arise from
589    /// boolean operations (e.g. NOT(AtMost(x)) == AtLeast(NOT(x)))
590    AtLeast(RowIdTreeMap),
591}
592
593impl SearchResult {
594    pub fn row_ids(&self) -> &RowIdTreeMap {
595        match self {
596            Self::Exact(row_ids) => row_ids,
597            Self::AtMost(row_ids) => row_ids,
598            Self::AtLeast(row_ids) => row_ids,
599        }
600    }
601
602    pub fn is_exact(&self) -> bool {
603        matches!(self, Self::Exact(_))
604    }
605}
606
607/// A trait for a scalar index, a structure that can determine row ids that satisfy scalar queries
608#[async_trait]
609pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
610    /// Search the scalar index
611    ///
612    /// Returns all row ids that satisfy the query, these row ids are not necessarily ordered
613    async fn search(
614        &self,
615        query: &dyn AnyQuery,
616        metrics: &dyn MetricsCollector,
617    ) -> Result<SearchResult>;
618
619    /// Returns true if the query can be answered exactly
620    ///
621    /// If false is returned then the query still may be answered exactly but if true is returned
622    /// then the query must be answered exactly
623    fn can_answer_exact(&self, query: &dyn AnyQuery) -> bool;
624
625    /// Load the scalar index from storage
626    async fn load(store: Arc<dyn IndexStore>) -> Result<Arc<Self>>
627    where
628        Self: Sized;
629
630    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
631    async fn remap(
632        &self,
633        mapping: &HashMap<u64, Option<u64>>,
634        dest_store: &dyn IndexStore,
635    ) -> Result<()>;
636
637    /// Add the new data into the index, creating an updated version of the index in `dest_store`
638    async fn update(
639        &self,
640        new_data: SendableRecordBatchStream,
641        dest_store: &dyn IndexStore,
642    ) -> Result<()>;
643}