lance_index/
scalar.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Scalar indices for metadata search & filtering
5
6use std::collections::{HashMap, HashSet};
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
23use lance_core::cache::LanceCache;
24use lance_core::utils::mask::RowIdTreeMap;
25use lance_core::{Error, Result};
26use snafu::location;
27
28use crate::metrics::MetricsCollector;
29use crate::{Index, IndexParams, IndexType};
30
31pub mod bitmap;
32pub mod btree;
33pub mod expression;
34pub mod flat;
35pub mod inverted;
36pub mod label_list;
37pub mod lance_format;
38pub mod ngram;
39pub mod zonemap;
40
41use crate::frag_reuse::FragReuseIndex;
42pub use inverted::tokenizer::InvertedIndexParams;
43
44pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
45
46#[derive(Debug, Copy, Clone, PartialEq, Eq, DeepSizeOf)]
47pub enum ScalarIndexType {
48    BTree,
49    Bitmap,
50    LabelList,
51    NGram,
52    ZoneMap,
53    Inverted,
54}
55
56impl TryFrom<IndexType> for ScalarIndexType {
57    type Error = Error;
58
59    fn try_from(value: IndexType) -> Result<Self> {
60        match value {
61            IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
62            IndexType::Bitmap => Ok(Self::Bitmap),
63            IndexType::LabelList => Ok(Self::LabelList),
64            IndexType::NGram => Ok(Self::NGram),
65            IndexType::ZoneMap => Ok(Self::ZoneMap),
66            IndexType::Inverted => Ok(Self::Inverted),
67            _ => Err(Error::InvalidInput {
68                source: format!("Index type {:?} is not a scalar index", value).into(),
69                location: location!(),
70            }),
71        }
72    }
73}
74
75impl From<ScalarIndexType> for IndexType {
76    fn from(val: ScalarIndexType) -> Self {
77        match val {
78            ScalarIndexType::BTree => Self::BTree,
79            ScalarIndexType::Bitmap => Self::Bitmap,
80            ScalarIndexType::LabelList => Self::LabelList,
81            ScalarIndexType::NGram => Self::NGram,
82            ScalarIndexType::ZoneMap => Self::ZoneMap,
83            ScalarIndexType::Inverted => Self::Inverted,
84        }
85    }
86}
87
88#[derive(Default)]
89pub struct ScalarIndexParams {
90    /// If set then always use the given index type and skip auto-detection
91    pub force_index_type: Option<ScalarIndexType>,
92}
93
94impl ScalarIndexParams {
95    pub fn new(index_type: ScalarIndexType) -> Self {
96        Self {
97            force_index_type: Some(index_type),
98        }
99    }
100}
101
102impl IndexParams for ScalarIndexParams {
103    fn as_any(&self) -> &dyn std::any::Any {
104        self
105    }
106
107    fn index_type(&self) -> IndexType {
108        match self.force_index_type {
109            Some(ScalarIndexType::BTree) | None => IndexType::BTree,
110            Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
111            Some(ScalarIndexType::LabelList) => IndexType::LabelList,
112            Some(ScalarIndexType::Inverted) => IndexType::Inverted,
113            Some(ScalarIndexType::NGram) => IndexType::NGram,
114            Some(ScalarIndexType::ZoneMap) => IndexType::ZoneMap,
115        }
116    }
117
118    fn index_name(&self) -> &str {
119        LANCE_SCALAR_INDEX
120    }
121}
122
123impl IndexParams for InvertedIndexParams {
124    fn as_any(&self) -> &dyn std::any::Any {
125        self
126    }
127
128    fn index_type(&self) -> IndexType {
129        IndexType::Inverted
130    }
131
132    fn index_name(&self) -> &str {
133        "INVERTED"
134    }
135}
136
137/// Trait for storing an index (or parts of an index) into storage
138#[async_trait]
139pub trait IndexWriter: Send {
140    /// Writes a record batch into the file, returning the 0-based index of the batch in the file
141    ///
142    /// E.g. if this is the third time this is called this method will return 2
143    async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
144    /// Finishes writing the file and closes the file
145    async fn finish(&mut self) -> Result<()>;
146    /// Finishes writing the file and closes the file with additional metadata
147    async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
148}
149
150/// Trait for reading an index (or parts of an index) from storage
151#[async_trait]
152pub trait IndexReader: Send + Sync {
153    /// Read the n-th record batch from the file
154    async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
155    /// Read the range of rows from the file.
156    /// If projection is Some, only return the columns in the projection,
157    /// nested columns like Some(&["x.y"]) are not supported.
158    /// If projection is None, return all columns.
159    async fn read_range(
160        &self,
161        range: std::ops::Range<usize>,
162        projection: Option<&[&str]>,
163    ) -> Result<RecordBatch>;
164    /// Return the number of batches in the file
165    async fn num_batches(&self, batch_size: u64) -> u32;
166    /// Return the number of rows in the file
167    fn num_rows(&self) -> usize;
168    /// Return the metadata of the file
169    fn schema(&self) -> &lance_core::datatypes::Schema;
170}
171
172/// Trait abstracting I/O away from index logic
173///
174/// Scalar indices are currently serialized as indexable arrow record batches stored in
175/// named "files".  The index store is responsible for serializing and deserializing
176/// these batches into file data (e.g. as .lance files or .parquet files, etc.)
177#[async_trait]
178pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
179    fn as_any(&self) -> &dyn Any;
180
181    /// Suggested I/O parallelism for the store
182    fn io_parallelism(&self) -> usize;
183
184    /// Create a new file and return a writer to store data in the file
185    async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
186        -> Result<Box<dyn IndexWriter>>;
187
188    /// Open an existing file for retrieval
189    async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
190
191    /// Copy a range of batches from an index file from this store to another
192    ///
193    /// This is often useful when remapping or updating
194    async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
195
196    /// Rename an index file
197    async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
198
199    /// Delete an index file (used in the tmp spill store to keep tmp size down)
200    async fn delete_index_file(&self, name: &str) -> Result<()>;
201}
202
203/// Different scalar indices may support different kinds of queries
204///
205/// For example, a btree index can support a wide range of queries (e.g. x > 7)
206/// while an index based on FTS only supports queries like "x LIKE 'foo'"
207///
208/// This trait is used when we need an object that can represent any kind of query
209///
210/// Note: if you are implementing this trait for a query type then you probably also
211/// need to implement the [crate::scalar::expression::ScalarQueryParser] trait to
212/// create instances of your query at parse time.
213pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
214    /// Cast the query as Any to allow for downcasting
215    fn as_any(&self) -> &dyn Any;
216    /// Format the query as a string
217    fn format(&self, col: &str) -> String;
218    /// Convert the query to a datafusion expression
219    fn to_expr(&self, col: String) -> Expr;
220    /// Compare this query to another query
221    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
222}
223
224impl PartialEq for dyn AnyQuery {
225    fn eq(&self, other: &Self) -> bool {
226        self.dyn_eq(other)
227    }
228}
229/// A full text search query
230#[derive(Debug, Clone, PartialEq)]
231pub struct FullTextSearchQuery {
232    pub query: FtsQuery,
233
234    /// The maximum number of results to return
235    pub limit: Option<i64>,
236
237    /// The wand factor to use for ranking
238    /// if None, use the default value of 1.0
239    /// Increasing this value will reduce the recall and improve the performance
240    /// 1.0 is the value that would give the best performance without recall loss
241    pub wand_factor: Option<f32>,
242}
243
244impl FullTextSearchQuery {
245    /// Create a new terms query
246    pub fn new(query: String) -> Self {
247        let query = MatchQuery::new(query).into();
248        Self {
249            query,
250            limit: None,
251            wand_factor: None,
252        }
253    }
254
255    /// Create a new fuzzy query
256    pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
257        let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
258        Self {
259            query,
260            limit: None,
261            wand_factor: None,
262        }
263    }
264
265    /// Create a new compound query
266    pub fn new_query(query: FtsQuery) -> Self {
267        Self {
268            query,
269            limit: None,
270            wand_factor: None,
271        }
272    }
273
274    /// Set the column to search over
275    /// This is available for only MatchQuery and PhraseQuery
276    pub fn with_column(mut self, column: String) -> Result<Self> {
277        self.query = fill_fts_query_column(&self.query, &[column], true)?;
278        Ok(self)
279    }
280
281    /// Set the column to search over
282    /// This is available for only MatchQuery
283    pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
284        self.query = fill_fts_query_column(&self.query, columns, true)?;
285        Ok(self)
286    }
287
288    /// limit the number of results to return
289    /// if None, return all results
290    pub fn limit(mut self, limit: Option<i64>) -> Self {
291        self.limit = limit;
292        self
293    }
294
295    pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
296        self.wand_factor = wand_factor;
297        self
298    }
299
300    pub fn columns(&self) -> HashSet<String> {
301        self.query.columns()
302    }
303
304    pub fn params(&self) -> FtsSearchParams {
305        let params = FtsSearchParams::new()
306            .with_limit(self.limit.map(|limit| limit as usize))
307            .with_wand_factor(self.wand_factor.unwrap_or(1.0));
308        match self.query {
309            FtsQuery::Phrase(ref query) => params.with_phrase_slop(Some(query.slop)),
310            _ => params,
311        }
312    }
313}
314
315/// A query that a basic scalar index (e.g. btree / bitmap) can satisfy
316///
317/// This is a subset of expression operators that is often referred to as the
318/// "sargable" operators
319///
320/// Note that negation is not included.  Negation should be applied later.  For
321/// example, to invert an equality query (e.g. all rows where the value is not 7)
322/// you can grab all rows where the value = 7 and then do an inverted take (or use
323/// a block list instead of an allow list for prefiltering)
324#[derive(Debug, Clone, PartialEq)]
325pub enum SargableQuery {
326    /// Retrieve all row ids where the value is in the given [min, max) range
327    Range(Bound<ScalarValue>, Bound<ScalarValue>),
328    /// Retrieve all row ids where the value is in the given set of values
329    IsIn(Vec<ScalarValue>),
330    /// Retrieve all row ids where the value is exactly the given value
331    Equals(ScalarValue),
332    /// Retrieve all row ids where the value matches the given full text search query
333    FullTextSearch(FullTextSearchQuery),
334    /// Retrieve all row ids where the value is null
335    IsNull(),
336}
337
338impl AnyQuery for SargableQuery {
339    fn as_any(&self) -> &dyn Any {
340        self
341    }
342
343    fn format(&self, col: &str) -> String {
344        match self {
345            Self::Range(lower, upper) => match (lower, upper) {
346                (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
347                (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
348                (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
349                (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
350                (Bound::Included(lhs), Bound::Included(rhs)) => {
351                    format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
352                }
353                (Bound::Included(lhs), Bound::Excluded(rhs)) => {
354                    format!("{} >= {} && {} < {}", col, lhs, col, rhs)
355                }
356                (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
357                (Bound::Excluded(lhs), Bound::Included(rhs)) => {
358                    format!("{} > {} && {} <= {}", col, lhs, col, rhs)
359                }
360                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
361                    format!("{} > {} && {} < {}", col, lhs, col, rhs)
362                }
363            },
364            Self::IsIn(values) => {
365                format!(
366                    "{} IN [{}]",
367                    col,
368                    values
369                        .iter()
370                        .map(|val| val.to_string())
371                        .collect::<Vec<_>>()
372                        .join(",")
373                )
374            }
375            Self::FullTextSearch(query) => {
376                format!("fts({})", query.query)
377            }
378            Self::IsNull() => {
379                format!("{} IS NULL", col)
380            }
381            Self::Equals(val) => {
382                format!("{} = {}", col, val)
383            }
384        }
385    }
386
387    fn to_expr(&self, col: String) -> Expr {
388        let col_expr = Expr::Column(Column::new_unqualified(col));
389        match self {
390            Self::Range(lower, upper) => match (lower, upper) {
391                (Bound::Unbounded, Bound::Unbounded) => {
392                    Expr::Literal(ScalarValue::Boolean(Some(true)), None)
393                }
394                (Bound::Unbounded, Bound::Included(rhs)) => {
395                    col_expr.lt_eq(Expr::Literal(rhs.clone(), None))
396                }
397                (Bound::Unbounded, Bound::Excluded(rhs)) => {
398                    col_expr.lt(Expr::Literal(rhs.clone(), None))
399                }
400                (Bound::Included(lhs), Bound::Unbounded) => {
401                    col_expr.gt_eq(Expr::Literal(lhs.clone(), None))
402                }
403                (Bound::Included(lhs), Bound::Included(rhs)) => col_expr.between(
404                    Expr::Literal(lhs.clone(), None),
405                    Expr::Literal(rhs.clone(), None),
406                ),
407                (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
408                    .clone()
409                    .gt_eq(Expr::Literal(lhs.clone(), None))
410                    .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
411                (Bound::Excluded(lhs), Bound::Unbounded) => {
412                    col_expr.gt(Expr::Literal(lhs.clone(), None))
413                }
414                (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
415                    .clone()
416                    .gt(Expr::Literal(lhs.clone(), None))
417                    .and(col_expr.lt_eq(Expr::Literal(rhs.clone(), None))),
418                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
419                    .clone()
420                    .gt(Expr::Literal(lhs.clone(), None))
421                    .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
422            },
423            Self::IsIn(values) => col_expr.in_list(
424                values
425                    .iter()
426                    .map(|val| Expr::Literal(val.clone(), None))
427                    .collect::<Vec<_>>(),
428                false,
429            ),
430            Self::FullTextSearch(query) => col_expr.like(Expr::Literal(
431                ScalarValue::Utf8(Some(query.query.to_string())),
432                None,
433            )),
434            Self::IsNull() => col_expr.is_null(),
435            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
436        }
437    }
438
439    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
440        match other.as_any().downcast_ref::<Self>() {
441            Some(o) => self == o,
442            None => false,
443        }
444    }
445}
446
447/// A query that a LabelListIndex can satisfy
448#[derive(Debug, Clone, PartialEq)]
449pub enum LabelListQuery {
450    /// Retrieve all row ids where every label is in the list of values for the row
451    HasAllLabels(Vec<ScalarValue>),
452    /// Retrieve all row ids where at least one of the given labels is in the list of values for the row
453    HasAnyLabel(Vec<ScalarValue>),
454}
455
456impl AnyQuery for LabelListQuery {
457    fn as_any(&self) -> &dyn Any {
458        self
459    }
460
461    fn format(&self, col: &str) -> String {
462        format!("{}", self.to_expr(col.to_string()))
463    }
464
465    fn to_expr(&self, col: String) -> Expr {
466        match self {
467            Self::HasAllLabels(labels) => {
468                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
469                let offsets_buffer =
470                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
471                let labels_list = ListArray::try_new(
472                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
473                    offsets_buffer,
474                    labels_arr,
475                    None,
476                )
477                .unwrap();
478                let labels_arr = Arc::new(labels_list);
479                Expr::ScalarFunction(ScalarFunction {
480                    func: Arc::new(array_has::ArrayHasAll::new().into()),
481                    args: vec![
482                        Expr::Column(Column::new_unqualified(col)),
483                        Expr::Literal(ScalarValue::List(labels_arr), None),
484                    ],
485                })
486            }
487            Self::HasAnyLabel(labels) => {
488                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
489                let offsets_buffer =
490                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
491                let labels_list = ListArray::try_new(
492                    Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
493                    offsets_buffer,
494                    labels_arr,
495                    None,
496                )
497                .unwrap();
498                let labels_arr = Arc::new(labels_list);
499                Expr::ScalarFunction(ScalarFunction {
500                    func: Arc::new(array_has::ArrayHasAny::new().into()),
501                    args: vec![
502                        Expr::Column(Column::new_unqualified(col)),
503                        Expr::Literal(ScalarValue::List(labels_arr), None),
504                    ],
505                })
506            }
507        }
508    }
509
510    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
511        match other.as_any().downcast_ref::<Self>() {
512            Some(o) => self == o,
513            None => false,
514        }
515    }
516}
517
518/// A query that a NGramIndex can satisfy
519#[derive(Debug, Clone, PartialEq)]
520pub enum TextQuery {
521    /// Retrieve all row ids where the text contains the given string
522    StringContains(String),
523    // TODO: In the future we should be able to do string-insensitive contains
524    // as well as partial matches (e.g. LIKE 'foo%') and potentially even
525    // some regular expressions
526}
527
528impl AnyQuery for TextQuery {
529    fn as_any(&self) -> &dyn Any {
530        self
531    }
532
533    fn format(&self, col: &str) -> String {
534        format!("{}", self.to_expr(col.to_string()))
535    }
536
537    fn to_expr(&self, col: String) -> Expr {
538        match self {
539            Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
540                func: Arc::new(ContainsFunc::new().into()),
541                args: vec![
542                    Expr::Column(Column::new_unqualified(col)),
543                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
544                ],
545            }),
546        }
547    }
548
549    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
550        match other.as_any().downcast_ref::<Self>() {
551            Some(o) => self == o,
552            None => false,
553        }
554    }
555}
556
557/// The result of a search operation against a scalar index
558#[derive(Debug, PartialEq)]
559pub enum SearchResult {
560    /// The exact row ids that satisfy the query
561    Exact(RowIdTreeMap),
562    /// Any row id satisfying the query will be in this set but not every
563    /// row id in this set will satisfy the query, a further recheck step
564    /// is needed
565    AtMost(RowIdTreeMap),
566    /// All of the given row ids satisfy the query but there may be more
567    ///
568    /// No scalar index actually returns this today but it can arise from
569    /// boolean operations (e.g. NOT(AtMost(x)) == AtLeast(NOT(x)))
570    AtLeast(RowIdTreeMap),
571}
572
573impl SearchResult {
574    pub fn row_ids(&self) -> &RowIdTreeMap {
575        match self {
576            Self::Exact(row_ids) => row_ids,
577            Self::AtMost(row_ids) => row_ids,
578            Self::AtLeast(row_ids) => row_ids,
579        }
580    }
581
582    pub fn is_exact(&self) -> bool {
583        matches!(self, Self::Exact(_))
584    }
585}
586
587/// A trait for a scalar index, a structure that can determine row ids that satisfy scalar queries
588#[async_trait]
589pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
590    /// Search the scalar index
591    ///
592    /// Returns all row ids that satisfy the query, these row ids are not necessarily ordered
593    async fn search(
594        &self,
595        query: &dyn AnyQuery,
596        metrics: &dyn MetricsCollector,
597    ) -> Result<SearchResult>;
598
599    /// Load the scalar index from storage
600    async fn load(
601        store: Arc<dyn IndexStore>,
602        frag_reuse_index: Option<Arc<FragReuseIndex>>,
603        index_cache: LanceCache,
604    ) -> Result<Arc<Self>>
605    where
606        Self: Sized;
607
608    /// Returns true if the remap operation is supported
609    fn can_remap(&self) -> bool;
610
611    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
612    async fn remap(
613        &self,
614        mapping: &HashMap<u64, Option<u64>>,
615        dest_store: &dyn IndexStore,
616    ) -> Result<()>;
617
618    /// Add the new data into the index, creating an updated version of the index in `dest_store`
619    async fn update(
620        &self,
621        new_data: SendableRecordBatchStream,
622        dest_store: &dyn IndexStore,
623    ) -> Result<()>;
624}