lance_index/
scalar.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Scalar indices for metadata search & filtering
5
6use std::collections::HashMap;
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions_array::array_has;
15use datafusion::physical_plan::SendableRecordBatchStream;
16use datafusion_common::{scalar::ScalarValue, Column};
17
18use datafusion_expr::expr::ScalarFunction;
19use datafusion_expr::Expr;
20use deepsize::DeepSizeOf;
21use inverted::TokenizerConfig;
22use lance_core::utils::mask::RowIdTreeMap;
23use lance_core::{Error, Result};
24use snafu::location;
25
26use crate::{Index, IndexParams, IndexType};
27
28pub mod bitmap;
29pub mod btree;
30pub mod expression;
31pub mod flat;
32pub mod inverted;
33pub mod label_list;
34pub mod lance_format;
35
36pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
37
38#[derive(Debug, Copy, Clone)]
39pub enum ScalarIndexType {
40    BTree,
41    Bitmap,
42    LabelList,
43    Inverted,
44}
45
46impl TryFrom<IndexType> for ScalarIndexType {
47    type Error = Error;
48
49    fn try_from(value: IndexType) -> Result<Self> {
50        match value {
51            IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
52            IndexType::Bitmap => Ok(Self::Bitmap),
53            IndexType::LabelList => Ok(Self::LabelList),
54            IndexType::Inverted => Ok(Self::Inverted),
55            _ => Err(Error::InvalidInput {
56                source: format!("Index type {:?} is not a scalar index", value).into(),
57                location: location!(),
58            }),
59        }
60    }
61}
62
63#[derive(Default)]
64pub struct ScalarIndexParams {
65    /// If set then always use the given index type and skip auto-detection
66    pub force_index_type: Option<ScalarIndexType>,
67}
68
69impl ScalarIndexParams {
70    pub fn new(index_type: ScalarIndexType) -> Self {
71        Self {
72            force_index_type: Some(index_type),
73        }
74    }
75}
76
77impl IndexParams for ScalarIndexParams {
78    fn as_any(&self) -> &dyn std::any::Any {
79        self
80    }
81
82    fn index_type(&self) -> IndexType {
83        match self.force_index_type {
84            Some(ScalarIndexType::BTree) | None => IndexType::BTree,
85            Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
86            Some(ScalarIndexType::LabelList) => IndexType::LabelList,
87            Some(ScalarIndexType::Inverted) => IndexType::Inverted,
88        }
89    }
90
91    fn index_name(&self) -> &str {
92        LANCE_SCALAR_INDEX
93    }
94}
95
96#[derive(Clone)]
97pub struct InvertedIndexParams {
98    /// If true, store the position of the term in the document
99    /// This can significantly increase the size of the index
100    /// If false, only store the frequency of the term in the document
101    /// Default is true
102    pub with_position: bool,
103
104    pub tokenizer_config: TokenizerConfig,
105}
106
107impl Debug for InvertedIndexParams {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        f.debug_struct("InvertedIndexParams")
110            .field("with_position", &self.with_position)
111            .finish()
112    }
113}
114
115impl DeepSizeOf for InvertedIndexParams {
116    fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize {
117        0
118    }
119}
120
121impl Default for InvertedIndexParams {
122    fn default() -> Self {
123        Self {
124            with_position: true,
125            tokenizer_config: TokenizerConfig::default(),
126        }
127    }
128}
129
130impl InvertedIndexParams {
131    pub fn with_position(mut self, with_position: bool) -> Self {
132        self.with_position = with_position;
133        self
134    }
135}
136
137impl IndexParams for InvertedIndexParams {
138    fn as_any(&self) -> &dyn std::any::Any {
139        self
140    }
141
142    fn index_type(&self) -> IndexType {
143        IndexType::Inverted
144    }
145
146    fn index_name(&self) -> &str {
147        "INVERTED"
148    }
149}
150
151/// Trait for storing an index (or parts of an index) into storage
152#[async_trait]
153pub trait IndexWriter: Send {
154    /// Writes a record batch into the file, returning the 0-based index of the batch in the file
155    ///
156    /// E.g. if this is the third time this is called this method will return 2
157    async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
158    /// Finishes writing the file and closes the file
159    async fn finish(&mut self) -> Result<()>;
160    /// Finishes writing the file and closes the file with additional metadata
161    async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
162}
163
164/// Trait for reading an index (or parts of an index) from storage
165#[async_trait]
166pub trait IndexReader: Send + Sync {
167    /// Read the n-th record batch from the file
168    async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
169    /// Read the range of rows from the file.
170    /// If projection is Some, only return the columns in the projection,
171    /// nested columns like Some(&["x.y"]) are not supported.
172    /// If projection is None, return all columns.
173    async fn read_range(
174        &self,
175        range: std::ops::Range<usize>,
176        projection: Option<&[&str]>,
177    ) -> Result<RecordBatch>;
178    /// Return the number of batches in the file
179    async fn num_batches(&self) -> u32;
180    /// Return the number of rows in the file
181    fn num_rows(&self) -> usize;
182    /// Return the metadata of the file
183    fn schema(&self) -> &lance_core::datatypes::Schema;
184}
185
186/// Trait abstracting I/O away from index logic
187///
188/// Scalar indices are currently serialized as indexable arrow record batches stored in
189/// named "files".  The index store is responsible for serializing and deserializing
190/// these batches into file data (e.g. as .lance files or .parquet files, etc.)
191#[async_trait]
192pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
193    fn as_any(&self) -> &dyn Any;
194
195    /// Suggested I/O parallelism for the store
196    fn io_parallelism(&self) -> usize;
197
198    /// Create a new file and return a writer to store data in the file
199    async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
200        -> Result<Box<dyn IndexWriter>>;
201
202    /// Open an existing file for retrieval
203    async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
204
205    /// Copy a range of batches from an index file from this store to another
206    ///
207    /// This is often useful when remapping or updating
208    async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
209}
210
211/// Different scalar indices may support different kinds of queries
212///
213/// For example, a btree index can support a wide range of queries (e.g. x > 7)
214/// while an index based on FTS only supports queries like "x LIKE 'foo'"
215///
216/// This trait is used when we need an object that can represent any kind of query
217///
218/// Note: if you are implementing this trait for a query type then you probably also
219/// need to implement the [crate::scalar::expression::ScalarQueryParser] trait to
220/// create instances of your query at parse time.
221pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
222    /// Cast the query as Any to allow for downcasting
223    fn as_any(&self) -> &dyn Any;
224    /// Format the query as a string
225    fn format(&self, col: &str) -> String;
226    /// Convert the query to a datafusion expression
227    fn to_expr(&self, col: String) -> Expr;
228    /// Compare this query to another query
229    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
230}
231
232impl PartialEq for dyn AnyQuery {
233    fn eq(&self, other: &Self) -> bool {
234        self.dyn_eq(other)
235    }
236}
237
238/// A full text search query
239#[derive(Debug, Clone, PartialEq)]
240pub struct FullTextSearchQuery {
241    /// The columns to search,
242    /// if empty, search all indexed columns
243    pub columns: Vec<String>,
244    /// The full text search query
245    pub query: String,
246    /// The maximum number of results to return
247    pub limit: Option<i64>,
248    /// The wand factor to use for ranking
249    /// if None, use the default value of 1.0
250    /// Increasing this value will reduce the recall and improve the performance
251    /// 1.0 is the value that would give the best performance without recall loss
252    pub wand_factor: Option<f32>,
253}
254
255impl FullTextSearchQuery {
256    pub fn new(query: String) -> Self {
257        Self {
258            query,
259            limit: None,
260            columns: vec![],
261            wand_factor: None,
262        }
263    }
264
265    pub fn columns(mut self, columns: Option<Vec<String>>) -> Self {
266        if let Some(columns) = columns {
267            self.columns = columns;
268        }
269        self
270    }
271
272    pub fn limit(mut self, limit: Option<i64>) -> Self {
273        self.limit = limit;
274        self
275    }
276
277    pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
278        self.wand_factor = wand_factor;
279        self
280    }
281}
282
283/// A query that a basic scalar index (e.g. btree / bitmap) can satisfy
284///
285/// This is a subset of expression operators that is often referred to as the
286/// "sargable" operators
287///
288/// Note that negation is not included.  Negation should be applied later.  For
289/// example, to invert an equality query (e.g. all rows where the value is not 7)
290/// you can grab all rows where the value = 7 and then do an inverted take (or use
291/// a block list instead of an allow list for prefiltering)
292#[derive(Debug, Clone, PartialEq)]
293pub enum SargableQuery {
294    /// Retrieve all row ids where the value is in the given [min, max) range
295    Range(Bound<ScalarValue>, Bound<ScalarValue>),
296    /// Retrieve all row ids where the value is in the given set of values
297    IsIn(Vec<ScalarValue>),
298    /// Retrieve all row ids where the value is exactly the given value
299    Equals(ScalarValue),
300    /// Retrieve all row ids where the value matches the given full text search query
301    FullTextSearch(FullTextSearchQuery),
302    /// Retrieve all row ids where the value is null
303    IsNull(),
304}
305
306impl AnyQuery for SargableQuery {
307    fn as_any(&self) -> &dyn Any {
308        self
309    }
310
311    fn format(&self, col: &str) -> String {
312        match self {
313            Self::Range(lower, upper) => match (lower, upper) {
314                (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
315                (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
316                (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
317                (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
318                (Bound::Included(lhs), Bound::Included(rhs)) => {
319                    format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
320                }
321                (Bound::Included(lhs), Bound::Excluded(rhs)) => {
322                    format!("{} >= {} && {} < {}", col, lhs, col, rhs)
323                }
324                (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
325                (Bound::Excluded(lhs), Bound::Included(rhs)) => {
326                    format!("{} > {} && {} <= {}", col, lhs, col, rhs)
327                }
328                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
329                    format!("{} > {} && {} < {}", col, lhs, col, rhs)
330                }
331            },
332            Self::IsIn(values) => {
333                format!(
334                    "{} IN [{}]",
335                    col,
336                    values
337                        .iter()
338                        .map(|val| val.to_string())
339                        .collect::<Vec<_>>()
340                        .join(",")
341                )
342            }
343            Self::FullTextSearch(query) => {
344                format!("fts({})", query.query)
345            }
346            Self::IsNull() => {
347                format!("{} IS NULL", col)
348            }
349            Self::Equals(val) => {
350                format!("{} = {}", col, val)
351            }
352        }
353    }
354
355    fn to_expr(&self, col: String) -> Expr {
356        let col_expr = Expr::Column(Column::new_unqualified(col));
357        match self {
358            Self::Range(lower, upper) => match (lower, upper) {
359                (Bound::Unbounded, Bound::Unbounded) => {
360                    Expr::Literal(ScalarValue::Boolean(Some(true)))
361                }
362                (Bound::Unbounded, Bound::Included(rhs)) => {
363                    col_expr.lt_eq(Expr::Literal(rhs.clone()))
364                }
365                (Bound::Unbounded, Bound::Excluded(rhs)) => col_expr.lt(Expr::Literal(rhs.clone())),
366                (Bound::Included(lhs), Bound::Unbounded) => {
367                    col_expr.gt_eq(Expr::Literal(lhs.clone()))
368                }
369                (Bound::Included(lhs), Bound::Included(rhs)) => {
370                    col_expr.between(Expr::Literal(lhs.clone()), Expr::Literal(rhs.clone()))
371                }
372                (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
373                    .clone()
374                    .gt_eq(Expr::Literal(lhs.clone()))
375                    .and(col_expr.lt(Expr::Literal(rhs.clone()))),
376                (Bound::Excluded(lhs), Bound::Unbounded) => col_expr.gt(Expr::Literal(lhs.clone())),
377                (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
378                    .clone()
379                    .gt(Expr::Literal(lhs.clone()))
380                    .and(col_expr.lt_eq(Expr::Literal(rhs.clone()))),
381                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
382                    .clone()
383                    .gt(Expr::Literal(lhs.clone()))
384                    .and(col_expr.lt(Expr::Literal(rhs.clone()))),
385            },
386            Self::IsIn(values) => col_expr.in_list(
387                values
388                    .iter()
389                    .map(|val| Expr::Literal(val.clone()))
390                    .collect::<Vec<_>>(),
391                false,
392            ),
393            Self::FullTextSearch(query) => {
394                col_expr.like(Expr::Literal(ScalarValue::Utf8(Some(query.query.clone()))))
395            }
396            Self::IsNull() => col_expr.is_null(),
397            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone())),
398        }
399    }
400
401    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
402        match other.as_any().downcast_ref::<Self>() {
403            Some(o) => self == o,
404            None => false,
405        }
406    }
407}
408
409/// A query that a LabelListIndex can satisfy
410#[derive(Debug, Clone, PartialEq)]
411pub enum LabelListQuery {
412    /// Retrieve all row ids where every label is in the list of values for the row
413    HasAllLabels(Vec<ScalarValue>),
414    /// Retrieve all row ids where at least one of the given labels is in the list of values for the row
415    HasAnyLabel(Vec<ScalarValue>),
416}
417
418impl AnyQuery for LabelListQuery {
419    fn as_any(&self) -> &dyn Any {
420        self
421    }
422
423    fn format(&self, col: &str) -> String {
424        format!("{}", self.to_expr(col.to_string()))
425    }
426
427    fn to_expr(&self, col: String) -> Expr {
428        match self {
429            Self::HasAllLabels(labels) => {
430                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
431                let offsets_buffer =
432                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
433                let labels_list = ListArray::try_new(
434                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
435                    offsets_buffer,
436                    labels_arr,
437                    None,
438                )
439                .unwrap();
440                let labels_arr = Arc::new(labels_list);
441                Expr::ScalarFunction(ScalarFunction {
442                    func: Arc::new(array_has::ArrayHasAll::new().into()),
443                    args: vec![
444                        Expr::Column(Column::new_unqualified(col)),
445                        Expr::Literal(ScalarValue::List(labels_arr)),
446                    ],
447                })
448            }
449            Self::HasAnyLabel(labels) => {
450                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
451                let offsets_buffer =
452                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
453                let labels_list = ListArray::try_new(
454                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
455                    offsets_buffer,
456                    labels_arr,
457                    None,
458                )
459                .unwrap();
460                let labels_arr = Arc::new(labels_list);
461                Expr::ScalarFunction(ScalarFunction {
462                    func: Arc::new(array_has::ArrayHasAny::new().into()),
463                    args: vec![
464                        Expr::Column(Column::new_unqualified(col)),
465                        Expr::Literal(ScalarValue::List(labels_arr)),
466                    ],
467                })
468            }
469        }
470    }
471
472    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
473        match other.as_any().downcast_ref::<Self>() {
474            Some(o) => self == o,
475            None => false,
476        }
477    }
478}
479
480/// A trait for a scalar index, a structure that can determine row ids that satisfy scalar queries
481#[async_trait]
482pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
483    /// Search the scalar index
484    ///
485    /// Returns all row ids that satisfy the query, these row ids are not necessarily ordered
486    async fn search(&self, query: &dyn AnyQuery) -> Result<RowIdTreeMap>;
487
488    /// Load the scalar index from storage
489    async fn load(store: Arc<dyn IndexStore>) -> Result<Arc<Self>>
490    where
491        Self: Sized;
492
493    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
494    async fn remap(
495        &self,
496        mapping: &HashMap<u64, Option<u64>>,
497        dest_store: &dyn IndexStore,
498    ) -> Result<()>;
499
500    /// Add the new data into the index, creating an updated version of the index in `dest_store`
501    async fn update(
502        &self,
503        new_data: SendableRecordBatchStream,
504        dest_store: &dyn IndexStore,
505    ) -> Result<()>;
506}