lance_index/
scalar.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Scalar indices for metadata search & filtering
5
6use std::collections::{HashMap, HashSet};
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
23use inverted::TokenizerConfig;
24use lance_core::utils::mask::RowIdTreeMap;
25use lance_core::{Error, Result};
26use serde::{Deserialize, Serialize};
27use snafu::location;
28
29use crate::metrics::MetricsCollector;
30use crate::{Index, IndexParams, IndexType};
31
32pub mod bitmap;
33pub mod btree;
34pub mod expression;
35pub mod flat;
36pub mod inverted;
37pub mod label_list;
38pub mod lance_format;
39pub mod ngram;
40
41pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
42
43#[derive(Debug, Copy, Clone)]
44pub enum ScalarIndexType {
45    BTree,
46    Bitmap,
47    LabelList,
48    NGram,
49    Inverted,
50}
51
52impl TryFrom<IndexType> for ScalarIndexType {
53    type Error = Error;
54
55    fn try_from(value: IndexType) -> Result<Self> {
56        match value {
57            IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
58            IndexType::Bitmap => Ok(Self::Bitmap),
59            IndexType::LabelList => Ok(Self::LabelList),
60            IndexType::NGram => Ok(Self::NGram),
61            IndexType::Inverted => Ok(Self::Inverted),
62            _ => Err(Error::InvalidInput {
63                source: format!("Index type {:?} is not a scalar index", value).into(),
64                location: location!(),
65            }),
66        }
67    }
68}
69
70#[derive(Default)]
71pub struct ScalarIndexParams {
72    /// If set then always use the given index type and skip auto-detection
73    pub force_index_type: Option<ScalarIndexType>,
74}
75
76impl ScalarIndexParams {
77    pub fn new(index_type: ScalarIndexType) -> Self {
78        Self {
79            force_index_type: Some(index_type),
80        }
81    }
82}
83
84impl IndexParams for ScalarIndexParams {
85    fn as_any(&self) -> &dyn std::any::Any {
86        self
87    }
88
89    fn index_type(&self) -> IndexType {
90        match self.force_index_type {
91            Some(ScalarIndexType::BTree) | None => IndexType::BTree,
92            Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
93            Some(ScalarIndexType::LabelList) => IndexType::LabelList,
94            Some(ScalarIndexType::Inverted) => IndexType::Inverted,
95            Some(ScalarIndexType::NGram) => IndexType::NGram,
96        }
97    }
98
99    fn index_name(&self) -> &str {
100        LANCE_SCALAR_INDEX
101    }
102}
103
104#[derive(Clone, Serialize, Deserialize)]
105pub struct InvertedIndexParams {
106    /// If true, store the position of the term in the document
107    /// This can significantly increase the size of the index
108    /// If false, only store the frequency of the term in the document
109    /// Default is true
110    pub with_position: bool,
111
112    #[serde(flatten)]
113    pub tokenizer_config: TokenizerConfig,
114}
115
116impl Debug for InvertedIndexParams {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        f.debug_struct("InvertedIndexParams")
119            .field("with_position", &self.with_position)
120            .finish()
121    }
122}
123
124impl DeepSizeOf for InvertedIndexParams {
125    fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize {
126        0
127    }
128}
129
130impl Default for InvertedIndexParams {
131    fn default() -> Self {
132        Self {
133            with_position: true,
134            tokenizer_config: TokenizerConfig::default(),
135        }
136    }
137}
138
139impl InvertedIndexParams {
140    pub fn with_position(mut self, with_position: bool) -> Self {
141        self.with_position = with_position;
142        self
143    }
144}
145
146impl IndexParams for InvertedIndexParams {
147    fn as_any(&self) -> &dyn std::any::Any {
148        self
149    }
150
151    fn index_type(&self) -> IndexType {
152        IndexType::Inverted
153    }
154
155    fn index_name(&self) -> &str {
156        "INVERTED"
157    }
158}
159
160/// Trait for storing an index (or parts of an index) into storage
161#[async_trait]
162pub trait IndexWriter: Send {
163    /// Writes a record batch into the file, returning the 0-based index of the batch in the file
164    ///
165    /// E.g. if this is the third time this is called this method will return 2
166    async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
167    /// Finishes writing the file and closes the file
168    async fn finish(&mut self) -> Result<()>;
169    /// Finishes writing the file and closes the file with additional metadata
170    async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
171}
172
173/// Trait for reading an index (or parts of an index) from storage
174#[async_trait]
175pub trait IndexReader: Send + Sync {
176    /// Read the n-th record batch from the file
177    async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
178    /// Read the range of rows from the file.
179    /// If projection is Some, only return the columns in the projection,
180    /// nested columns like Some(&["x.y"]) are not supported.
181    /// If projection is None, return all columns.
182    async fn read_range(
183        &self,
184        range: std::ops::Range<usize>,
185        projection: Option<&[&str]>,
186    ) -> Result<RecordBatch>;
187    /// Return the number of batches in the file
188    async fn num_batches(&self, batch_size: u64) -> u32;
189    /// Return the number of rows in the file
190    fn num_rows(&self) -> usize;
191    /// Return the metadata of the file
192    fn schema(&self) -> &lance_core::datatypes::Schema;
193}
194
195/// Trait abstracting I/O away from index logic
196///
197/// Scalar indices are currently serialized as indexable arrow record batches stored in
198/// named "files".  The index store is responsible for serializing and deserializing
199/// these batches into file data (e.g. as .lance files or .parquet files, etc.)
200#[async_trait]
201pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
202    fn as_any(&self) -> &dyn Any;
203
204    /// Suggested I/O parallelism for the store
205    fn io_parallelism(&self) -> usize;
206
207    /// Create a new file and return a writer to store data in the file
208    async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
209        -> Result<Box<dyn IndexWriter>>;
210
211    /// Open an existing file for retrieval
212    async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
213
214    /// Copy a range of batches from an index file from this store to another
215    ///
216    /// This is often useful when remapping or updating
217    async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
218
219    /// Rename an index file
220    async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
221
222    /// Delete an index file (used in the tmp spill store to keep tmp size down)
223    async fn delete_index_file(&self, name: &str) -> Result<()>;
224}
225
226/// Different scalar indices may support different kinds of queries
227///
228/// For example, a btree index can support a wide range of queries (e.g. x > 7)
229/// while an index based on FTS only supports queries like "x LIKE 'foo'"
230///
231/// This trait is used when we need an object that can represent any kind of query
232///
233/// Note: if you are implementing this trait for a query type then you probably also
234/// need to implement the [crate::scalar::expression::ScalarQueryParser] trait to
235/// create instances of your query at parse time.
236pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
237    /// Cast the query as Any to allow for downcasting
238    fn as_any(&self) -> &dyn Any;
239    /// Format the query as a string
240    fn format(&self, col: &str) -> String;
241    /// Convert the query to a datafusion expression
242    fn to_expr(&self, col: String) -> Expr;
243    /// Compare this query to another query
244    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
245    /// If true, the query results are inexact and will need rechecked
246    fn needs_recheck(&self) -> bool {
247        false
248    }
249}
250
251impl PartialEq for dyn AnyQuery {
252    fn eq(&self, other: &Self) -> bool {
253        self.dyn_eq(other)
254    }
255}
256/// A full text search query
257#[derive(Debug, Clone, PartialEq)]
258pub struct FullTextSearchQuery {
259    pub query: FtsQuery,
260
261    /// The maximum number of results to return
262    pub limit: Option<i64>,
263
264    /// The wand factor to use for ranking
265    /// if None, use the default value of 1.0
266    /// Increasing this value will reduce the recall and improve the performance
267    /// 1.0 is the value that would give the best performance without recall loss
268    pub wand_factor: Option<f32>,
269}
270
271impl FullTextSearchQuery {
272    /// Create a new terms query
273    pub fn new(query: String) -> Self {
274        let query = MatchQuery::new(query).into();
275        Self {
276            query,
277            limit: None,
278            wand_factor: None,
279        }
280    }
281
282    /// Create a new fuzzy query
283    pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
284        let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
285        Self {
286            query,
287            limit: None,
288            wand_factor: None,
289        }
290    }
291
292    /// Create a new compound query
293    pub fn new_query(query: FtsQuery) -> Self {
294        Self {
295            query,
296            limit: None,
297            wand_factor: None,
298        }
299    }
300
301    /// Set the column to search over
302    /// This is available for only MatchQuery and PhraseQuery
303    pub fn with_column(mut self, column: String) -> Result<Self> {
304        self.query = fill_fts_query_column(&self.query, &[column], true)?;
305        Ok(self)
306    }
307
308    /// Set the column to search over
309    /// This is available for only MatchQuery
310    pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
311        self.query = fill_fts_query_column(&self.query, columns, true)?;
312        Ok(self)
313    }
314
315    /// limit the number of results to return
316    /// if None, return all results
317    pub fn limit(mut self, limit: Option<i64>) -> Self {
318        self.limit = limit;
319        self
320    }
321
322    pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
323        self.wand_factor = wand_factor;
324        self
325    }
326
327    pub fn columns(&self) -> HashSet<String> {
328        self.query.columns()
329    }
330
331    pub fn params(&self) -> FtsSearchParams {
332        FtsSearchParams {
333            limit: self.limit.map(|limit| limit as usize),
334            wand_factor: self.wand_factor.unwrap_or(1.0),
335        }
336    }
337}
338
339/// A query that a basic scalar index (e.g. btree / bitmap) can satisfy
340///
341/// This is a subset of expression operators that is often referred to as the
342/// "sargable" operators
343///
344/// Note that negation is not included.  Negation should be applied later.  For
345/// example, to invert an equality query (e.g. all rows where the value is not 7)
346/// you can grab all rows where the value = 7 and then do an inverted take (or use
347/// a block list instead of an allow list for prefiltering)
348#[derive(Debug, Clone, PartialEq)]
349pub enum SargableQuery {
350    /// Retrieve all row ids where the value is in the given [min, max) range
351    Range(Bound<ScalarValue>, Bound<ScalarValue>),
352    /// Retrieve all row ids where the value is in the given set of values
353    IsIn(Vec<ScalarValue>),
354    /// Retrieve all row ids where the value is exactly the given value
355    Equals(ScalarValue),
356    /// Retrieve all row ids where the value matches the given full text search query
357    FullTextSearch(FullTextSearchQuery),
358    /// Retrieve all row ids where the value is null
359    IsNull(),
360}
361
362impl AnyQuery for SargableQuery {
363    fn as_any(&self) -> &dyn Any {
364        self
365    }
366
367    fn format(&self, col: &str) -> String {
368        match self {
369            Self::Range(lower, upper) => match (lower, upper) {
370                (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
371                (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
372                (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
373                (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
374                (Bound::Included(lhs), Bound::Included(rhs)) => {
375                    format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
376                }
377                (Bound::Included(lhs), Bound::Excluded(rhs)) => {
378                    format!("{} >= {} && {} < {}", col, lhs, col, rhs)
379                }
380                (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
381                (Bound::Excluded(lhs), Bound::Included(rhs)) => {
382                    format!("{} > {} && {} <= {}", col, lhs, col, rhs)
383                }
384                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
385                    format!("{} > {} && {} < {}", col, lhs, col, rhs)
386                }
387            },
388            Self::IsIn(values) => {
389                format!(
390                    "{} IN [{}]",
391                    col,
392                    values
393                        .iter()
394                        .map(|val| val.to_string())
395                        .collect::<Vec<_>>()
396                        .join(",")
397                )
398            }
399            Self::FullTextSearch(query) => {
400                format!("fts({})", query.query)
401            }
402            Self::IsNull() => {
403                format!("{} IS NULL", col)
404            }
405            Self::Equals(val) => {
406                format!("{} = {}", col, val)
407            }
408        }
409    }
410
411    fn to_expr(&self, col: String) -> Expr {
412        let col_expr = Expr::Column(Column::new_unqualified(col));
413        match self {
414            Self::Range(lower, upper) => match (lower, upper) {
415                (Bound::Unbounded, Bound::Unbounded) => {
416                    Expr::Literal(ScalarValue::Boolean(Some(true)))
417                }
418                (Bound::Unbounded, Bound::Included(rhs)) => {
419                    col_expr.lt_eq(Expr::Literal(rhs.clone()))
420                }
421                (Bound::Unbounded, Bound::Excluded(rhs)) => col_expr.lt(Expr::Literal(rhs.clone())),
422                (Bound::Included(lhs), Bound::Unbounded) => {
423                    col_expr.gt_eq(Expr::Literal(lhs.clone()))
424                }
425                (Bound::Included(lhs), Bound::Included(rhs)) => {
426                    col_expr.between(Expr::Literal(lhs.clone()), Expr::Literal(rhs.clone()))
427                }
428                (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
429                    .clone()
430                    .gt_eq(Expr::Literal(lhs.clone()))
431                    .and(col_expr.lt(Expr::Literal(rhs.clone()))),
432                (Bound::Excluded(lhs), Bound::Unbounded) => col_expr.gt(Expr::Literal(lhs.clone())),
433                (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
434                    .clone()
435                    .gt(Expr::Literal(lhs.clone()))
436                    .and(col_expr.lt_eq(Expr::Literal(rhs.clone()))),
437                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
438                    .clone()
439                    .gt(Expr::Literal(lhs.clone()))
440                    .and(col_expr.lt(Expr::Literal(rhs.clone()))),
441            },
442            Self::IsIn(values) => col_expr.in_list(
443                values
444                    .iter()
445                    .map(|val| Expr::Literal(val.clone()))
446                    .collect::<Vec<_>>(),
447                false,
448            ),
449            Self::FullTextSearch(query) => col_expr.like(Expr::Literal(ScalarValue::Utf8(Some(
450                query.query.to_string(),
451            )))),
452            Self::IsNull() => col_expr.is_null(),
453            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone())),
454        }
455    }
456
457    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
458        match other.as_any().downcast_ref::<Self>() {
459            Some(o) => self == o,
460            None => false,
461        }
462    }
463}
464
465/// A query that a LabelListIndex can satisfy
466#[derive(Debug, Clone, PartialEq)]
467pub enum LabelListQuery {
468    /// Retrieve all row ids where every label is in the list of values for the row
469    HasAllLabels(Vec<ScalarValue>),
470    /// Retrieve all row ids where at least one of the given labels is in the list of values for the row
471    HasAnyLabel(Vec<ScalarValue>),
472}
473
474impl AnyQuery for LabelListQuery {
475    fn as_any(&self) -> &dyn Any {
476        self
477    }
478
479    fn format(&self, col: &str) -> String {
480        format!("{}", self.to_expr(col.to_string()))
481    }
482
483    fn to_expr(&self, col: String) -> Expr {
484        match self {
485            Self::HasAllLabels(labels) => {
486                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
487                let offsets_buffer =
488                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
489                let labels_list = ListArray::try_new(
490                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
491                    offsets_buffer,
492                    labels_arr,
493                    None,
494                )
495                .unwrap();
496                let labels_arr = Arc::new(labels_list);
497                Expr::ScalarFunction(ScalarFunction {
498                    func: Arc::new(array_has::ArrayHasAll::new().into()),
499                    args: vec![
500                        Expr::Column(Column::new_unqualified(col)),
501                        Expr::Literal(ScalarValue::List(labels_arr)),
502                    ],
503                })
504            }
505            Self::HasAnyLabel(labels) => {
506                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
507                let offsets_buffer =
508                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
509                let labels_list = ListArray::try_new(
510                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
511                    offsets_buffer,
512                    labels_arr,
513                    None,
514                )
515                .unwrap();
516                let labels_arr = Arc::new(labels_list);
517                Expr::ScalarFunction(ScalarFunction {
518                    func: Arc::new(array_has::ArrayHasAny::new().into()),
519                    args: vec![
520                        Expr::Column(Column::new_unqualified(col)),
521                        Expr::Literal(ScalarValue::List(labels_arr)),
522                    ],
523                })
524            }
525        }
526    }
527
528    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
529        match other.as_any().downcast_ref::<Self>() {
530            Some(o) => self == o,
531            None => false,
532        }
533    }
534}
535
536/// A query that a NGramIndex can satisfy
537#[derive(Debug, Clone, PartialEq)]
538pub enum TextQuery {
539    /// Retrieve all row ids where the text contains the given string
540    StringContains(String),
541    // TODO: In the future we should be able to do string-insensitive contains
542    // as well as partial matches (e.g. LIKE 'foo%') and potentially even
543    // some regular expressions
544}
545
546impl AnyQuery for TextQuery {
547    fn as_any(&self) -> &dyn Any {
548        self
549    }
550
551    fn format(&self, col: &str) -> String {
552        format!("{}", self.to_expr(col.to_string()))
553    }
554
555    fn to_expr(&self, col: String) -> Expr {
556        match self {
557            Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
558                func: Arc::new(ContainsFunc::new().into()),
559                args: vec![
560                    Expr::Column(Column::new_unqualified(col)),
561                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone()))),
562                ],
563            }),
564        }
565    }
566
567    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
568        match other.as_any().downcast_ref::<Self>() {
569            Some(o) => self == o,
570            None => false,
571        }
572    }
573
574    fn needs_recheck(&self) -> bool {
575        true
576    }
577}
578
579/// The result of a search operation against a scalar index
580#[derive(Debug, PartialEq)]
581pub enum SearchResult {
582    /// The exact row ids that satisfy the query
583    Exact(RowIdTreeMap),
584    /// Any row id satisfying the query will be in this set but not every
585    /// row id in this set will satisfy the query, a further recheck step
586    /// is needed
587    AtMost(RowIdTreeMap),
588    /// All of the given row ids satisfy the query but there may be more
589    ///
590    /// No scalar index actually returns this today but it can arise from
591    /// boolean operations (e.g. NOT(AtMost(x)) == AtLeast(NOT(x)))
592    AtLeast(RowIdTreeMap),
593}
594
595impl SearchResult {
596    pub fn row_ids(&self) -> &RowIdTreeMap {
597        match self {
598            Self::Exact(row_ids) => row_ids,
599            Self::AtMost(row_ids) => row_ids,
600            Self::AtLeast(row_ids) => row_ids,
601        }
602    }
603
604    pub fn is_exact(&self) -> bool {
605        matches!(self, Self::Exact(_))
606    }
607}
608
609/// A trait for a scalar index, a structure that can determine row ids that satisfy scalar queries
610#[async_trait]
611pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
612    /// Search the scalar index
613    ///
614    /// Returns all row ids that satisfy the query, these row ids are not necessarily ordered
615    async fn search(
616        &self,
617        query: &dyn AnyQuery,
618        metrics: &dyn MetricsCollector,
619    ) -> Result<SearchResult>;
620
621    /// Returns true if the query can be answered exactly
622    ///
623    /// If false is returned then the query still may be answered exactly but if true is returned
624    /// then the query must be answered exactly
625    fn can_answer_exact(&self, query: &dyn AnyQuery) -> bool;
626
627    /// Load the scalar index from storage
628    async fn load(store: Arc<dyn IndexStore>) -> Result<Arc<Self>>
629    where
630        Self: Sized;
631
632    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
633    async fn remap(
634        &self,
635        mapping: &HashMap<u64, Option<u64>>,
636        dest_store: &dyn IndexStore,
637    ) -> Result<()>;
638
639    /// Add the new data into the index, creating an updated version of the index in `dest_store`
640    async fn update(
641        &self,
642        new_data: SendableRecordBatchStream,
643        dest_store: &dyn IndexStore,
644    ) -> Result<()>;
645}