lance_index/
scalar.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Scalar indices for metadata search & filtering
5
6use std::collections::{HashMap, HashSet};
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
23use lance_core::utils::mask::RowIdTreeMap;
24use lance_core::{Error, Result};
25use snafu::location;
26
27use crate::metrics::MetricsCollector;
28use crate::{Index, IndexParams, IndexType};
29
30pub mod bitmap;
31pub mod btree;
32pub mod expression;
33pub mod flat;
34pub mod inverted;
35pub mod label_list;
36pub mod lance_format;
37pub mod ngram;
38
39pub use inverted::tokenizer::InvertedIndexParams;
40
41pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
42
43#[derive(Debug, Copy, Clone, PartialEq, Eq)]
44pub enum ScalarIndexType {
45    BTree,
46    Bitmap,
47    LabelList,
48    NGram,
49    Inverted,
50}
51
52impl TryFrom<IndexType> for ScalarIndexType {
53    type Error = Error;
54
55    fn try_from(value: IndexType) -> Result<Self> {
56        match value {
57            IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
58            IndexType::Bitmap => Ok(Self::Bitmap),
59            IndexType::LabelList => Ok(Self::LabelList),
60            IndexType::NGram => Ok(Self::NGram),
61            IndexType::Inverted => Ok(Self::Inverted),
62            _ => Err(Error::InvalidInput {
63                source: format!("Index type {:?} is not a scalar index", value).into(),
64                location: location!(),
65            }),
66        }
67    }
68}
69
70#[derive(Default)]
71pub struct ScalarIndexParams {
72    /// If set then always use the given index type and skip auto-detection
73    pub force_index_type: Option<ScalarIndexType>,
74}
75
76impl ScalarIndexParams {
77    pub fn new(index_type: ScalarIndexType) -> Self {
78        Self {
79            force_index_type: Some(index_type),
80        }
81    }
82}
83
84impl IndexParams for ScalarIndexParams {
85    fn as_any(&self) -> &dyn std::any::Any {
86        self
87    }
88
89    fn index_type(&self) -> IndexType {
90        match self.force_index_type {
91            Some(ScalarIndexType::BTree) | None => IndexType::BTree,
92            Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
93            Some(ScalarIndexType::LabelList) => IndexType::LabelList,
94            Some(ScalarIndexType::Inverted) => IndexType::Inverted,
95            Some(ScalarIndexType::NGram) => IndexType::NGram,
96        }
97    }
98
99    fn index_name(&self) -> &str {
100        LANCE_SCALAR_INDEX
101    }
102}
103
104impl IndexParams for InvertedIndexParams {
105    fn as_any(&self) -> &dyn std::any::Any {
106        self
107    }
108
109    fn index_type(&self) -> IndexType {
110        IndexType::Inverted
111    }
112
113    fn index_name(&self) -> &str {
114        "INVERTED"
115    }
116}
117
118/// Trait for storing an index (or parts of an index) into storage
119#[async_trait]
120pub trait IndexWriter: Send {
121    /// Writes a record batch into the file, returning the 0-based index of the batch in the file
122    ///
123    /// E.g. if this is the third time this is called this method will return 2
124    async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
125    /// Finishes writing the file and closes the file
126    async fn finish(&mut self) -> Result<()>;
127    /// Finishes writing the file and closes the file with additional metadata
128    async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
129}
130
131/// Trait for reading an index (or parts of an index) from storage
132#[async_trait]
133pub trait IndexReader: Send + Sync {
134    /// Read the n-th record batch from the file
135    async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
136    /// Read the range of rows from the file.
137    /// If projection is Some, only return the columns in the projection,
138    /// nested columns like Some(&["x.y"]) are not supported.
139    /// If projection is None, return all columns.
140    async fn read_range(
141        &self,
142        range: std::ops::Range<usize>,
143        projection: Option<&[&str]>,
144    ) -> Result<RecordBatch>;
145    /// Return the number of batches in the file
146    async fn num_batches(&self, batch_size: u64) -> u32;
147    /// Return the number of rows in the file
148    fn num_rows(&self) -> usize;
149    /// Return the metadata of the file
150    fn schema(&self) -> &lance_core::datatypes::Schema;
151}
152
153/// Trait abstracting I/O away from index logic
154///
155/// Scalar indices are currently serialized as indexable arrow record batches stored in
156/// named "files".  The index store is responsible for serializing and deserializing
157/// these batches into file data (e.g. as .lance files or .parquet files, etc.)
158#[async_trait]
159pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
160    fn as_any(&self) -> &dyn Any;
161
162    /// Suggested I/O parallelism for the store
163    fn io_parallelism(&self) -> usize;
164
165    /// Create a new file and return a writer to store data in the file
166    async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
167        -> Result<Box<dyn IndexWriter>>;
168
169    /// Open an existing file for retrieval
170    async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
171
172    /// Copy a range of batches from an index file from this store to another
173    ///
174    /// This is often useful when remapping or updating
175    async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
176
177    /// Rename an index file
178    async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
179
180    /// Delete an index file (used in the tmp spill store to keep tmp size down)
181    async fn delete_index_file(&self, name: &str) -> Result<()>;
182}
183
184/// Different scalar indices may support different kinds of queries
185///
186/// For example, a btree index can support a wide range of queries (e.g. x > 7)
187/// while an index based on FTS only supports queries like "x LIKE 'foo'"
188///
189/// This trait is used when we need an object that can represent any kind of query
190///
191/// Note: if you are implementing this trait for a query type then you probably also
192/// need to implement the [crate::scalar::expression::ScalarQueryParser] trait to
193/// create instances of your query at parse time.
194pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
195    /// Cast the query as Any to allow for downcasting
196    fn as_any(&self) -> &dyn Any;
197    /// Format the query as a string
198    fn format(&self, col: &str) -> String;
199    /// Convert the query to a datafusion expression
200    fn to_expr(&self, col: String) -> Expr;
201    /// Compare this query to another query
202    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
203    /// If true, the query results are inexact and will need rechecked
204    fn needs_recheck(&self) -> bool {
205        false
206    }
207}
208
209impl PartialEq for dyn AnyQuery {
210    fn eq(&self, other: &Self) -> bool {
211        self.dyn_eq(other)
212    }
213}
214/// A full text search query
215#[derive(Debug, Clone, PartialEq)]
216pub struct FullTextSearchQuery {
217    pub query: FtsQuery,
218
219    /// The maximum number of results to return
220    pub limit: Option<i64>,
221
222    /// The wand factor to use for ranking
223    /// if None, use the default value of 1.0
224    /// Increasing this value will reduce the recall and improve the performance
225    /// 1.0 is the value that would give the best performance without recall loss
226    pub wand_factor: Option<f32>,
227}
228
229impl FullTextSearchQuery {
230    /// Create a new terms query
231    pub fn new(query: String) -> Self {
232        let query = MatchQuery::new(query).into();
233        Self {
234            query,
235            limit: None,
236            wand_factor: None,
237        }
238    }
239
240    /// Create a new fuzzy query
241    pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
242        let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
243        Self {
244            query,
245            limit: None,
246            wand_factor: None,
247        }
248    }
249
250    /// Create a new compound query
251    pub fn new_query(query: FtsQuery) -> Self {
252        Self {
253            query,
254            limit: None,
255            wand_factor: None,
256        }
257    }
258
259    /// Set the column to search over
260    /// This is available for only MatchQuery and PhraseQuery
261    pub fn with_column(mut self, column: String) -> Result<Self> {
262        self.query = fill_fts_query_column(&self.query, &[column], true)?;
263        Ok(self)
264    }
265
266    /// Set the column to search over
267    /// This is available for only MatchQuery
268    pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
269        self.query = fill_fts_query_column(&self.query, columns, true)?;
270        Ok(self)
271    }
272
273    /// limit the number of results to return
274    /// if None, return all results
275    pub fn limit(mut self, limit: Option<i64>) -> Self {
276        self.limit = limit;
277        self
278    }
279
280    pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
281        self.wand_factor = wand_factor;
282        self
283    }
284
285    pub fn columns(&self) -> HashSet<String> {
286        self.query.columns()
287    }
288
289    pub fn params(&self) -> FtsSearchParams {
290        FtsSearchParams::new()
291            .with_limit(self.limit.map(|limit| limit as usize))
292            .with_wand_factor(self.wand_factor.unwrap_or(1.0))
293    }
294}
295
296/// A query that a basic scalar index (e.g. btree / bitmap) can satisfy
297///
298/// This is a subset of expression operators that is often referred to as the
299/// "sargable" operators
300///
301/// Note that negation is not included.  Negation should be applied later.  For
302/// example, to invert an equality query (e.g. all rows where the value is not 7)
303/// you can grab all rows where the value = 7 and then do an inverted take (or use
304/// a block list instead of an allow list for prefiltering)
305#[derive(Debug, Clone, PartialEq)]
306pub enum SargableQuery {
307    /// Retrieve all row ids where the value is in the given [min, max) range
308    Range(Bound<ScalarValue>, Bound<ScalarValue>),
309    /// Retrieve all row ids where the value is in the given set of values
310    IsIn(Vec<ScalarValue>),
311    /// Retrieve all row ids where the value is exactly the given value
312    Equals(ScalarValue),
313    /// Retrieve all row ids where the value matches the given full text search query
314    FullTextSearch(FullTextSearchQuery),
315    /// Retrieve all row ids where the value is null
316    IsNull(),
317}
318
319impl AnyQuery for SargableQuery {
320    fn as_any(&self) -> &dyn Any {
321        self
322    }
323
324    fn format(&self, col: &str) -> String {
325        match self {
326            Self::Range(lower, upper) => match (lower, upper) {
327                (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
328                (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
329                (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
330                (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
331                (Bound::Included(lhs), Bound::Included(rhs)) => {
332                    format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
333                }
334                (Bound::Included(lhs), Bound::Excluded(rhs)) => {
335                    format!("{} >= {} && {} < {}", col, lhs, col, rhs)
336                }
337                (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
338                (Bound::Excluded(lhs), Bound::Included(rhs)) => {
339                    format!("{} > {} && {} <= {}", col, lhs, col, rhs)
340                }
341                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
342                    format!("{} > {} && {} < {}", col, lhs, col, rhs)
343                }
344            },
345            Self::IsIn(values) => {
346                format!(
347                    "{} IN [{}]",
348                    col,
349                    values
350                        .iter()
351                        .map(|val| val.to_string())
352                        .collect::<Vec<_>>()
353                        .join(",")
354                )
355            }
356            Self::FullTextSearch(query) => {
357                format!("fts({})", query.query)
358            }
359            Self::IsNull() => {
360                format!("{} IS NULL", col)
361            }
362            Self::Equals(val) => {
363                format!("{} = {}", col, val)
364            }
365        }
366    }
367
368    fn to_expr(&self, col: String) -> Expr {
369        let col_expr = Expr::Column(Column::new_unqualified(col));
370        match self {
371            Self::Range(lower, upper) => match (lower, upper) {
372                (Bound::Unbounded, Bound::Unbounded) => {
373                    Expr::Literal(ScalarValue::Boolean(Some(true)))
374                }
375                (Bound::Unbounded, Bound::Included(rhs)) => {
376                    col_expr.lt_eq(Expr::Literal(rhs.clone()))
377                }
378                (Bound::Unbounded, Bound::Excluded(rhs)) => col_expr.lt(Expr::Literal(rhs.clone())),
379                (Bound::Included(lhs), Bound::Unbounded) => {
380                    col_expr.gt_eq(Expr::Literal(lhs.clone()))
381                }
382                (Bound::Included(lhs), Bound::Included(rhs)) => {
383                    col_expr.between(Expr::Literal(lhs.clone()), Expr::Literal(rhs.clone()))
384                }
385                (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
386                    .clone()
387                    .gt_eq(Expr::Literal(lhs.clone()))
388                    .and(col_expr.lt(Expr::Literal(rhs.clone()))),
389                (Bound::Excluded(lhs), Bound::Unbounded) => col_expr.gt(Expr::Literal(lhs.clone())),
390                (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
391                    .clone()
392                    .gt(Expr::Literal(lhs.clone()))
393                    .and(col_expr.lt_eq(Expr::Literal(rhs.clone()))),
394                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
395                    .clone()
396                    .gt(Expr::Literal(lhs.clone()))
397                    .and(col_expr.lt(Expr::Literal(rhs.clone()))),
398            },
399            Self::IsIn(values) => col_expr.in_list(
400                values
401                    .iter()
402                    .map(|val| Expr::Literal(val.clone()))
403                    .collect::<Vec<_>>(),
404                false,
405            ),
406            Self::FullTextSearch(query) => col_expr.like(Expr::Literal(ScalarValue::Utf8(Some(
407                query.query.to_string(),
408            )))),
409            Self::IsNull() => col_expr.is_null(),
410            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone())),
411        }
412    }
413
414    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
415        match other.as_any().downcast_ref::<Self>() {
416            Some(o) => self == o,
417            None => false,
418        }
419    }
420}
421
422/// A query that a LabelListIndex can satisfy
423#[derive(Debug, Clone, PartialEq)]
424pub enum LabelListQuery {
425    /// Retrieve all row ids where every label is in the list of values for the row
426    HasAllLabels(Vec<ScalarValue>),
427    /// Retrieve all row ids where at least one of the given labels is in the list of values for the row
428    HasAnyLabel(Vec<ScalarValue>),
429}
430
431impl AnyQuery for LabelListQuery {
432    fn as_any(&self) -> &dyn Any {
433        self
434    }
435
436    fn format(&self, col: &str) -> String {
437        format!("{}", self.to_expr(col.to_string()))
438    }
439
440    fn to_expr(&self, col: String) -> Expr {
441        match self {
442            Self::HasAllLabels(labels) => {
443                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
444                let offsets_buffer =
445                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
446                let labels_list = ListArray::try_new(
447                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
448                    offsets_buffer,
449                    labels_arr,
450                    None,
451                )
452                .unwrap();
453                let labels_arr = Arc::new(labels_list);
454                Expr::ScalarFunction(ScalarFunction {
455                    func: Arc::new(array_has::ArrayHasAll::new().into()),
456                    args: vec![
457                        Expr::Column(Column::new_unqualified(col)),
458                        Expr::Literal(ScalarValue::List(labels_arr)),
459                    ],
460                })
461            }
462            Self::HasAnyLabel(labels) => {
463                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
464                let offsets_buffer =
465                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
466                let labels_list = ListArray::try_new(
467                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
468                    offsets_buffer,
469                    labels_arr,
470                    None,
471                )
472                .unwrap();
473                let labels_arr = Arc::new(labels_list);
474                Expr::ScalarFunction(ScalarFunction {
475                    func: Arc::new(array_has::ArrayHasAny::new().into()),
476                    args: vec![
477                        Expr::Column(Column::new_unqualified(col)),
478                        Expr::Literal(ScalarValue::List(labels_arr)),
479                    ],
480                })
481            }
482        }
483    }
484
485    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
486        match other.as_any().downcast_ref::<Self>() {
487            Some(o) => self == o,
488            None => false,
489        }
490    }
491}
492
493/// A query that a NGramIndex can satisfy
494#[derive(Debug, Clone, PartialEq)]
495pub enum TextQuery {
496    /// Retrieve all row ids where the text contains the given string
497    StringContains(String),
498    // TODO: In the future we should be able to do string-insensitive contains
499    // as well as partial matches (e.g. LIKE 'foo%') and potentially even
500    // some regular expressions
501}
502
503impl AnyQuery for TextQuery {
504    fn as_any(&self) -> &dyn Any {
505        self
506    }
507
508    fn format(&self, col: &str) -> String {
509        format!("{}", self.to_expr(col.to_string()))
510    }
511
512    fn to_expr(&self, col: String) -> Expr {
513        match self {
514            Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
515                func: Arc::new(ContainsFunc::new().into()),
516                args: vec![
517                    Expr::Column(Column::new_unqualified(col)),
518                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone()))),
519                ],
520            }),
521        }
522    }
523
524    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
525        match other.as_any().downcast_ref::<Self>() {
526            Some(o) => self == o,
527            None => false,
528        }
529    }
530
531    fn needs_recheck(&self) -> bool {
532        true
533    }
534}
535
536/// The result of a search operation against a scalar index
537#[derive(Debug, PartialEq)]
538pub enum SearchResult {
539    /// The exact row ids that satisfy the query
540    Exact(RowIdTreeMap),
541    /// Any row id satisfying the query will be in this set but not every
542    /// row id in this set will satisfy the query, a further recheck step
543    /// is needed
544    AtMost(RowIdTreeMap),
545    /// All of the given row ids satisfy the query but there may be more
546    ///
547    /// No scalar index actually returns this today but it can arise from
548    /// boolean operations (e.g. NOT(AtMost(x)) == AtLeast(NOT(x)))
549    AtLeast(RowIdTreeMap),
550}
551
552impl SearchResult {
553    pub fn row_ids(&self) -> &RowIdTreeMap {
554        match self {
555            Self::Exact(row_ids) => row_ids,
556            Self::AtMost(row_ids) => row_ids,
557            Self::AtLeast(row_ids) => row_ids,
558        }
559    }
560
561    pub fn is_exact(&self) -> bool {
562        matches!(self, Self::Exact(_))
563    }
564}
565
566/// A trait for a scalar index, a structure that can determine row ids that satisfy scalar queries
567#[async_trait]
568pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
569    /// Search the scalar index
570    ///
571    /// Returns all row ids that satisfy the query, these row ids are not necessarily ordered
572    async fn search(
573        &self,
574        query: &dyn AnyQuery,
575        metrics: &dyn MetricsCollector,
576    ) -> Result<SearchResult>;
577
578    /// Returns true if the query can be answered exactly
579    ///
580    /// If false is returned then the query still may be answered exactly but if true is returned
581    /// then the query must be answered exactly
582    fn can_answer_exact(&self, query: &dyn AnyQuery) -> bool;
583
584    /// Load the scalar index from storage
585    async fn load(store: Arc<dyn IndexStore>) -> Result<Arc<Self>>
586    where
587        Self: Sized;
588
589    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
590    async fn remap(
591        &self,
592        mapping: &HashMap<u64, Option<u64>>,
593        dest_store: &dyn IndexStore,
594    ) -> Result<()>;
595
596    /// Add the new data into the index, creating an updated version of the index in `dest_store`
597    async fn update(
598        &self,
599        new_data: SendableRecordBatchStream,
600        dest_store: &dyn IndexStore,
601    ) -> Result<()>;
602}