lance_index/
scalar.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Scalar indices for metadata search & filtering
5
6use std::collections::{HashMap, HashSet};
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
23use lance_core::utils::mask::RowIdTreeMap;
24use lance_core::{Error, Result};
25use snafu::location;
26
27use crate::metrics::MetricsCollector;
28use crate::{Index, IndexParams, IndexType};
29
30pub mod bitmap;
31pub mod btree;
32pub mod expression;
33pub mod flat;
34pub mod inverted;
35pub mod label_list;
36pub mod lance_format;
37pub mod ngram;
38
39use crate::frag_reuse::FragReuseIndex;
40pub use inverted::tokenizer::InvertedIndexParams;
41
42pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
43
44#[derive(Debug, Copy, Clone, PartialEq, Eq)]
45pub enum ScalarIndexType {
46    BTree,
47    Bitmap,
48    LabelList,
49    NGram,
50    Inverted,
51}
52
53impl TryFrom<IndexType> for ScalarIndexType {
54    type Error = Error;
55
56    fn try_from(value: IndexType) -> Result<Self> {
57        match value {
58            IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
59            IndexType::Bitmap => Ok(Self::Bitmap),
60            IndexType::LabelList => Ok(Self::LabelList),
61            IndexType::NGram => Ok(Self::NGram),
62            IndexType::Inverted => Ok(Self::Inverted),
63            _ => Err(Error::InvalidInput {
64                source: format!("Index type {:?} is not a scalar index", value).into(),
65                location: location!(),
66            }),
67        }
68    }
69}
70
71impl From<ScalarIndexType> for IndexType {
72    fn from(val: ScalarIndexType) -> Self {
73        match val {
74            ScalarIndexType::BTree => Self::BTree,
75            ScalarIndexType::Bitmap => Self::Bitmap,
76            ScalarIndexType::LabelList => Self::LabelList,
77            ScalarIndexType::NGram => Self::NGram,
78            ScalarIndexType::Inverted => Self::Inverted,
79        }
80    }
81}
82
83#[derive(Default)]
84pub struct ScalarIndexParams {
85    /// If set then always use the given index type and skip auto-detection
86    pub force_index_type: Option<ScalarIndexType>,
87}
88
89impl ScalarIndexParams {
90    pub fn new(index_type: ScalarIndexType) -> Self {
91        Self {
92            force_index_type: Some(index_type),
93        }
94    }
95}
96
97impl IndexParams for ScalarIndexParams {
98    fn as_any(&self) -> &dyn std::any::Any {
99        self
100    }
101
102    fn index_type(&self) -> IndexType {
103        match self.force_index_type {
104            Some(ScalarIndexType::BTree) | None => IndexType::BTree,
105            Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
106            Some(ScalarIndexType::LabelList) => IndexType::LabelList,
107            Some(ScalarIndexType::Inverted) => IndexType::Inverted,
108            Some(ScalarIndexType::NGram) => IndexType::NGram,
109        }
110    }
111
112    fn index_name(&self) -> &str {
113        LANCE_SCALAR_INDEX
114    }
115}
116
117impl IndexParams for InvertedIndexParams {
118    fn as_any(&self) -> &dyn std::any::Any {
119        self
120    }
121
122    fn index_type(&self) -> IndexType {
123        IndexType::Inverted
124    }
125
126    fn index_name(&self) -> &str {
127        "INVERTED"
128    }
129}
130
131/// Trait for storing an index (or parts of an index) into storage
132#[async_trait]
133pub trait IndexWriter: Send {
134    /// Writes a record batch into the file, returning the 0-based index of the batch in the file
135    ///
136    /// E.g. if this is the third time this is called this method will return 2
137    async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
138    /// Finishes writing the file and closes the file
139    async fn finish(&mut self) -> Result<()>;
140    /// Finishes writing the file and closes the file with additional metadata
141    async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
142}
143
144/// Trait for reading an index (or parts of an index) from storage
145#[async_trait]
146pub trait IndexReader: Send + Sync {
147    /// Read the n-th record batch from the file
148    async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
149    /// Read the range of rows from the file.
150    /// If projection is Some, only return the columns in the projection,
151    /// nested columns like Some(&["x.y"]) are not supported.
152    /// If projection is None, return all columns.
153    async fn read_range(
154        &self,
155        range: std::ops::Range<usize>,
156        projection: Option<&[&str]>,
157    ) -> Result<RecordBatch>;
158    /// Return the number of batches in the file
159    async fn num_batches(&self, batch_size: u64) -> u32;
160    /// Return the number of rows in the file
161    fn num_rows(&self) -> usize;
162    /// Return the metadata of the file
163    fn schema(&self) -> &lance_core::datatypes::Schema;
164}
165
166/// Trait abstracting I/O away from index logic
167///
168/// Scalar indices are currently serialized as indexable arrow record batches stored in
169/// named "files".  The index store is responsible for serializing and deserializing
170/// these batches into file data (e.g. as .lance files or .parquet files, etc.)
171#[async_trait]
172pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
173    fn as_any(&self) -> &dyn Any;
174
175    /// Suggested I/O parallelism for the store
176    fn io_parallelism(&self) -> usize;
177
178    /// Create a new file and return a writer to store data in the file
179    async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
180        -> Result<Box<dyn IndexWriter>>;
181
182    /// Open an existing file for retrieval
183    async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
184
185    /// Copy a range of batches from an index file from this store to another
186    ///
187    /// This is often useful when remapping or updating
188    async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
189
190    /// Rename an index file
191    async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
192
193    /// Delete an index file (used in the tmp spill store to keep tmp size down)
194    async fn delete_index_file(&self, name: &str) -> Result<()>;
195}
196
197/// Different scalar indices may support different kinds of queries
198///
199/// For example, a btree index can support a wide range of queries (e.g. x > 7)
200/// while an index based on FTS only supports queries like "x LIKE 'foo'"
201///
202/// This trait is used when we need an object that can represent any kind of query
203///
204/// Note: if you are implementing this trait for a query type then you probably also
205/// need to implement the [crate::scalar::expression::ScalarQueryParser] trait to
206/// create instances of your query at parse time.
207pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
208    /// Cast the query as Any to allow for downcasting
209    fn as_any(&self) -> &dyn Any;
210    /// Format the query as a string
211    fn format(&self, col: &str) -> String;
212    /// Convert the query to a datafusion expression
213    fn to_expr(&self, col: String) -> Expr;
214    /// Compare this query to another query
215    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
216    /// If true, the query results are inexact and will need rechecked
217    fn needs_recheck(&self) -> bool {
218        false
219    }
220}
221
222impl PartialEq for dyn AnyQuery {
223    fn eq(&self, other: &Self) -> bool {
224        self.dyn_eq(other)
225    }
226}
227/// A full text search query
228#[derive(Debug, Clone, PartialEq)]
229pub struct FullTextSearchQuery {
230    pub query: FtsQuery,
231
232    /// The maximum number of results to return
233    pub limit: Option<i64>,
234
235    /// The wand factor to use for ranking
236    /// if None, use the default value of 1.0
237    /// Increasing this value will reduce the recall and improve the performance
238    /// 1.0 is the value that would give the best performance without recall loss
239    pub wand_factor: Option<f32>,
240}
241
242impl FullTextSearchQuery {
243    /// Create a new terms query
244    pub fn new(query: String) -> Self {
245        let query = MatchQuery::new(query).into();
246        Self {
247            query,
248            limit: None,
249            wand_factor: None,
250        }
251    }
252
253    /// Create a new fuzzy query
254    pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
255        let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
256        Self {
257            query,
258            limit: None,
259            wand_factor: None,
260        }
261    }
262
263    /// Create a new compound query
264    pub fn new_query(query: FtsQuery) -> Self {
265        Self {
266            query,
267            limit: None,
268            wand_factor: None,
269        }
270    }
271
272    /// Set the column to search over
273    /// This is available for only MatchQuery and PhraseQuery
274    pub fn with_column(mut self, column: String) -> Result<Self> {
275        self.query = fill_fts_query_column(&self.query, &[column], true)?;
276        Ok(self)
277    }
278
279    /// Set the column to search over
280    /// This is available for only MatchQuery
281    pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
282        self.query = fill_fts_query_column(&self.query, columns, true)?;
283        Ok(self)
284    }
285
286    /// limit the number of results to return
287    /// if None, return all results
288    pub fn limit(mut self, limit: Option<i64>) -> Self {
289        self.limit = limit;
290        self
291    }
292
293    pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
294        self.wand_factor = wand_factor;
295        self
296    }
297
298    pub fn columns(&self) -> HashSet<String> {
299        self.query.columns()
300    }
301
302    pub fn params(&self) -> FtsSearchParams {
303        let params = FtsSearchParams::new()
304            .with_limit(self.limit.map(|limit| limit as usize))
305            .with_wand_factor(self.wand_factor.unwrap_or(1.0));
306        match self.query {
307            FtsQuery::Phrase(ref query) => params.with_phrase_slop(Some(query.slop)),
308            _ => params,
309        }
310    }
311}
312
313/// A query that a basic scalar index (e.g. btree / bitmap) can satisfy
314///
315/// This is a subset of expression operators that is often referred to as the
316/// "sargable" operators
317///
318/// Note that negation is not included.  Negation should be applied later.  For
319/// example, to invert an equality query (e.g. all rows where the value is not 7)
320/// you can grab all rows where the value = 7 and then do an inverted take (or use
321/// a block list instead of an allow list for prefiltering)
322#[derive(Debug, Clone, PartialEq)]
323pub enum SargableQuery {
324    /// Retrieve all row ids where the value is in the given [min, max) range
325    Range(Bound<ScalarValue>, Bound<ScalarValue>),
326    /// Retrieve all row ids where the value is in the given set of values
327    IsIn(Vec<ScalarValue>),
328    /// Retrieve all row ids where the value is exactly the given value
329    Equals(ScalarValue),
330    /// Retrieve all row ids where the value matches the given full text search query
331    FullTextSearch(FullTextSearchQuery),
332    /// Retrieve all row ids where the value is null
333    IsNull(),
334}
335
336impl AnyQuery for SargableQuery {
337    fn as_any(&self) -> &dyn Any {
338        self
339    }
340
341    fn format(&self, col: &str) -> String {
342        match self {
343            Self::Range(lower, upper) => match (lower, upper) {
344                (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
345                (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
346                (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
347                (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
348                (Bound::Included(lhs), Bound::Included(rhs)) => {
349                    format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
350                }
351                (Bound::Included(lhs), Bound::Excluded(rhs)) => {
352                    format!("{} >= {} && {} < {}", col, lhs, col, rhs)
353                }
354                (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
355                (Bound::Excluded(lhs), Bound::Included(rhs)) => {
356                    format!("{} > {} && {} <= {}", col, lhs, col, rhs)
357                }
358                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
359                    format!("{} > {} && {} < {}", col, lhs, col, rhs)
360                }
361            },
362            Self::IsIn(values) => {
363                format!(
364                    "{} IN [{}]",
365                    col,
366                    values
367                        .iter()
368                        .map(|val| val.to_string())
369                        .collect::<Vec<_>>()
370                        .join(",")
371                )
372            }
373            Self::FullTextSearch(query) => {
374                format!("fts({})", query.query)
375            }
376            Self::IsNull() => {
377                format!("{} IS NULL", col)
378            }
379            Self::Equals(val) => {
380                format!("{} = {}", col, val)
381            }
382        }
383    }
384
385    fn to_expr(&self, col: String) -> Expr {
386        let col_expr = Expr::Column(Column::new_unqualified(col));
387        match self {
388            Self::Range(lower, upper) => match (lower, upper) {
389                (Bound::Unbounded, Bound::Unbounded) => {
390                    Expr::Literal(ScalarValue::Boolean(Some(true)))
391                }
392                (Bound::Unbounded, Bound::Included(rhs)) => {
393                    col_expr.lt_eq(Expr::Literal(rhs.clone()))
394                }
395                (Bound::Unbounded, Bound::Excluded(rhs)) => col_expr.lt(Expr::Literal(rhs.clone())),
396                (Bound::Included(lhs), Bound::Unbounded) => {
397                    col_expr.gt_eq(Expr::Literal(lhs.clone()))
398                }
399                (Bound::Included(lhs), Bound::Included(rhs)) => {
400                    col_expr.between(Expr::Literal(lhs.clone()), Expr::Literal(rhs.clone()))
401                }
402                (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
403                    .clone()
404                    .gt_eq(Expr::Literal(lhs.clone()))
405                    .and(col_expr.lt(Expr::Literal(rhs.clone()))),
406                (Bound::Excluded(lhs), Bound::Unbounded) => col_expr.gt(Expr::Literal(lhs.clone())),
407                (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
408                    .clone()
409                    .gt(Expr::Literal(lhs.clone()))
410                    .and(col_expr.lt_eq(Expr::Literal(rhs.clone()))),
411                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
412                    .clone()
413                    .gt(Expr::Literal(lhs.clone()))
414                    .and(col_expr.lt(Expr::Literal(rhs.clone()))),
415            },
416            Self::IsIn(values) => col_expr.in_list(
417                values
418                    .iter()
419                    .map(|val| Expr::Literal(val.clone()))
420                    .collect::<Vec<_>>(),
421                false,
422            ),
423            Self::FullTextSearch(query) => col_expr.like(Expr::Literal(ScalarValue::Utf8(Some(
424                query.query.to_string(),
425            )))),
426            Self::IsNull() => col_expr.is_null(),
427            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone())),
428        }
429    }
430
431    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
432        match other.as_any().downcast_ref::<Self>() {
433            Some(o) => self == o,
434            None => false,
435        }
436    }
437}
438
439/// A query that a LabelListIndex can satisfy
440#[derive(Debug, Clone, PartialEq)]
441pub enum LabelListQuery {
442    /// Retrieve all row ids where every label is in the list of values for the row
443    HasAllLabels(Vec<ScalarValue>),
444    /// Retrieve all row ids where at least one of the given labels is in the list of values for the row
445    HasAnyLabel(Vec<ScalarValue>),
446}
447
448impl AnyQuery for LabelListQuery {
449    fn as_any(&self) -> &dyn Any {
450        self
451    }
452
453    fn format(&self, col: &str) -> String {
454        format!("{}", self.to_expr(col.to_string()))
455    }
456
457    fn to_expr(&self, col: String) -> Expr {
458        match self {
459            Self::HasAllLabels(labels) => {
460                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
461                let offsets_buffer =
462                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
463                let labels_list = ListArray::try_new(
464                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
465                    offsets_buffer,
466                    labels_arr,
467                    None,
468                )
469                .unwrap();
470                let labels_arr = Arc::new(labels_list);
471                Expr::ScalarFunction(ScalarFunction {
472                    func: Arc::new(array_has::ArrayHasAll::new().into()),
473                    args: vec![
474                        Expr::Column(Column::new_unqualified(col)),
475                        Expr::Literal(ScalarValue::List(labels_arr)),
476                    ],
477                })
478            }
479            Self::HasAnyLabel(labels) => {
480                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
481                let offsets_buffer =
482                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
483                let labels_list = ListArray::try_new(
484                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
485                    offsets_buffer,
486                    labels_arr,
487                    None,
488                )
489                .unwrap();
490                let labels_arr = Arc::new(labels_list);
491                Expr::ScalarFunction(ScalarFunction {
492                    func: Arc::new(array_has::ArrayHasAny::new().into()),
493                    args: vec![
494                        Expr::Column(Column::new_unqualified(col)),
495                        Expr::Literal(ScalarValue::List(labels_arr)),
496                    ],
497                })
498            }
499        }
500    }
501
502    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
503        match other.as_any().downcast_ref::<Self>() {
504            Some(o) => self == o,
505            None => false,
506        }
507    }
508}
509
510/// A query that a NGramIndex can satisfy
511#[derive(Debug, Clone, PartialEq)]
512pub enum TextQuery {
513    /// Retrieve all row ids where the text contains the given string
514    StringContains(String),
515    // TODO: In the future we should be able to do string-insensitive contains
516    // as well as partial matches (e.g. LIKE 'foo%') and potentially even
517    // some regular expressions
518}
519
520impl AnyQuery for TextQuery {
521    fn as_any(&self) -> &dyn Any {
522        self
523    }
524
525    fn format(&self, col: &str) -> String {
526        format!("{}", self.to_expr(col.to_string()))
527    }
528
529    fn to_expr(&self, col: String) -> Expr {
530        match self {
531            Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
532                func: Arc::new(ContainsFunc::new().into()),
533                args: vec![
534                    Expr::Column(Column::new_unqualified(col)),
535                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone()))),
536                ],
537            }),
538        }
539    }
540
541    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
542        match other.as_any().downcast_ref::<Self>() {
543            Some(o) => self == o,
544            None => false,
545        }
546    }
547
548    fn needs_recheck(&self) -> bool {
549        true
550    }
551}
552
553/// The result of a search operation against a scalar index
554#[derive(Debug, PartialEq)]
555pub enum SearchResult {
556    /// The exact row ids that satisfy the query
557    Exact(RowIdTreeMap),
558    /// Any row id satisfying the query will be in this set but not every
559    /// row id in this set will satisfy the query, a further recheck step
560    /// is needed
561    AtMost(RowIdTreeMap),
562    /// All of the given row ids satisfy the query but there may be more
563    ///
564    /// No scalar index actually returns this today but it can arise from
565    /// boolean operations (e.g. NOT(AtMost(x)) == AtLeast(NOT(x)))
566    AtLeast(RowIdTreeMap),
567}
568
569impl SearchResult {
570    pub fn row_ids(&self) -> &RowIdTreeMap {
571        match self {
572            Self::Exact(row_ids) => row_ids,
573            Self::AtMost(row_ids) => row_ids,
574            Self::AtLeast(row_ids) => row_ids,
575        }
576    }
577
578    pub fn is_exact(&self) -> bool {
579        matches!(self, Self::Exact(_))
580    }
581}
582
583/// A trait for a scalar index, a structure that can determine row ids that satisfy scalar queries
584#[async_trait]
585pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
586    /// Search the scalar index
587    ///
588    /// Returns all row ids that satisfy the query, these row ids are not necessarily ordered
589    async fn search(
590        &self,
591        query: &dyn AnyQuery,
592        metrics: &dyn MetricsCollector,
593    ) -> Result<SearchResult>;
594
595    /// Returns true if the query can be answered exactly
596    ///
597    /// If false is returned then the query still may be answered exactly but if true is returned
598    /// then the query must be answered exactly
599    fn can_answer_exact(&self, query: &dyn AnyQuery) -> bool;
600
601    /// Load the scalar index from storage
602    async fn load(
603        store: Arc<dyn IndexStore>,
604        fri: Option<Arc<FragReuseIndex>>,
605    ) -> Result<Arc<Self>>
606    where
607        Self: Sized;
608
609    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
610    async fn remap(
611        &self,
612        mapping: &HashMap<u64, Option<u64>>,
613        dest_store: &dyn IndexStore,
614    ) -> Result<()>;
615
616    /// Add the new data into the index, creating an updated version of the index in `dest_store`
617    async fn update(
618        &self,
619        new_data: SendableRecordBatchStream,
620        dest_store: &dyn IndexStore,
621    ) -> Result<()>;
622}