Skip to main content

lance_index/
scalar.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Scalar indices for metadata search & filtering
5
6use arrow::buffer::{OffsetBuffer, ScalarBuffer};
7use arrow_array::{ListArray, RecordBatch};
8use arrow_schema::{Field, Schema};
9use async_trait::async_trait;
10use datafusion::functions::string::contains::ContainsFunc;
11use datafusion::functions_nested::array_has;
12use datafusion::physical_plan::SendableRecordBatchStream;
13use datafusion_common::{Column, scalar::ScalarValue};
14use std::collections::{HashMap, HashSet};
15use std::fmt::Debug;
16use std::{any::Any, ops::Bound, sync::Arc};
17
18use datafusion_expr::Expr;
19use datafusion_expr::expr::ScalarFunction;
20use deepsize::DeepSizeOf;
21use inverted::query::{FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery, fill_fts_query_column};
22use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap};
23use lance_core::{Error, Result};
24use roaring::RoaringBitmap;
25use serde::Serialize;
26
27use crate::metrics::MetricsCollector;
28use crate::scalar::registry::TrainingCriteria;
29use crate::{Index, IndexParams, IndexType};
30
31pub mod bitmap;
32pub mod bloomfilter;
33pub mod btree;
34pub mod expression;
35pub mod inverted;
36pub mod json;
37pub mod label_list;
38pub mod lance_format;
39pub mod ngram;
40pub mod registry;
41#[cfg(feature = "geo")]
42pub mod rtree;
43pub mod zoned;
44pub mod zonemap;
45
46use crate::frag_reuse::FragReuseIndex;
47pub use inverted::tokenizer::InvertedIndexParams;
48use lance_datafusion::udf::CONTAINS_TOKENS_UDF;
49
50pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
51
52/// Builtin index types supported by the Lance library
53///
54/// This is primarily for convenience to avoid a bunch of string
55/// constants and provide some auto-complete.  This type should not
56/// be used in the manifest as plugins cannot add new entries.
57#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
58pub enum BuiltinIndexType {
59    BTree,
60    Bitmap,
61    LabelList,
62    NGram,
63    ZoneMap,
64    BloomFilter,
65    RTree,
66    Inverted,
67}
68
69impl BuiltinIndexType {
70    pub fn as_str(&self) -> &str {
71        match self {
72            Self::BTree => "btree",
73            Self::Bitmap => "bitmap",
74            Self::LabelList => "labellist",
75            Self::NGram => "ngram",
76            Self::ZoneMap => "zonemap",
77            Self::Inverted => "inverted",
78            Self::BloomFilter => "bloomfilter",
79            Self::RTree => "rtree",
80        }
81    }
82}
83
84impl TryFrom<IndexType> for BuiltinIndexType {
85    type Error = Error;
86
87    fn try_from(value: IndexType) -> Result<Self> {
88        match value {
89            IndexType::BTree => Ok(Self::BTree),
90            IndexType::Bitmap => Ok(Self::Bitmap),
91            IndexType::LabelList => Ok(Self::LabelList),
92            IndexType::NGram => Ok(Self::NGram),
93            IndexType::ZoneMap => Ok(Self::ZoneMap),
94            IndexType::Inverted => Ok(Self::Inverted),
95            IndexType::BloomFilter => Ok(Self::BloomFilter),
96            IndexType::RTree => Ok(Self::RTree),
97            _ => Err(Error::index("Invalid index type".to_string())),
98        }
99    }
100}
101
102#[derive(Debug, Clone, PartialEq)]
103pub struct ScalarIndexParams {
104    /// The type of index to create
105    ///
106    /// Plugins may add additional index types.  Index type lookup is case-insensitive.
107    pub index_type: String,
108    /// The parameters to train the index
109    ///
110    /// This should be a JSON string.  The contents of the JSON string will be specific to the
111    /// index type.  If not set, then default parameters will be used for the index type.
112    pub params: Option<String>,
113}
114
115impl Default for ScalarIndexParams {
116    fn default() -> Self {
117        Self {
118            index_type: BuiltinIndexType::BTree.as_str().to_string(),
119            params: None,
120        }
121    }
122}
123
124impl ScalarIndexParams {
125    /// Creates a new ScalarIndexParams from one of the builtin index types
126    pub fn for_builtin(index_type: BuiltinIndexType) -> Self {
127        Self {
128            index_type: index_type.as_str().to_string(),
129            params: None,
130        }
131    }
132
133    /// Create a new ScalarIndexParams with the given index type
134    pub fn new(index_type: String) -> Self {
135        Self {
136            index_type,
137            params: None,
138        }
139    }
140
141    /// Set the parameters for the index
142    pub fn with_params<ParamsType: Serialize>(mut self, params: &ParamsType) -> Self {
143        self.params = Some(serde_json::to_string(params).unwrap());
144        self
145    }
146}
147
148impl IndexParams for ScalarIndexParams {
149    fn as_any(&self) -> &dyn std::any::Any {
150        self
151    }
152
153    fn index_name(&self) -> &str {
154        LANCE_SCALAR_INDEX
155    }
156}
157
158impl IndexParams for InvertedIndexParams {
159    fn as_any(&self) -> &dyn std::any::Any {
160        self
161    }
162
163    fn index_name(&self) -> &str {
164        "INVERTED"
165    }
166}
167
168/// Trait for storing an index (or parts of an index) into storage
169#[async_trait]
170pub trait IndexWriter: Send {
171    /// Writes a record batch into the file, returning the 0-based index of the batch in the file
172    ///
173    /// E.g. if this is the third time this is called this method will return 2
174    async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
175    /// Finishes writing the file and closes the file
176    async fn finish(&mut self) -> Result<()>;
177    /// Finishes writing the file and closes the file with additional metadata
178    async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
179}
180
181/// Trait for reading an index (or parts of an index) from storage
182#[async_trait]
183pub trait IndexReader: Send + Sync {
184    /// Read the n-th record batch from the file
185    async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
186    /// Read the range of rows from the file.
187    /// If projection is Some, only return the columns in the projection,
188    /// nested columns like Some(&["x.y"]) are not supported.
189    /// If projection is None, return all columns.
190    async fn read_range(
191        &self,
192        range: std::ops::Range<usize>,
193        projection: Option<&[&str]>,
194    ) -> Result<RecordBatch>;
195    /// Return the number of batches in the file
196    async fn num_batches(&self, batch_size: u64) -> u32;
197    /// Return the number of rows in the file
198    fn num_rows(&self) -> usize;
199    /// Return the metadata of the file
200    fn schema(&self) -> &lance_core::datatypes::Schema;
201}
202
203/// Trait abstracting I/O away from index logic
204///
205/// Scalar indices are currently serialized as indexable arrow record batches stored in
206/// named "files".  The index store is responsible for serializing and deserializing
207/// these batches into file data (e.g. as .lance files or .parquet files, etc.)
208#[async_trait]
209pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
210    fn as_any(&self) -> &dyn Any;
211
212    /// Suggested I/O parallelism for the store
213    fn io_parallelism(&self) -> usize;
214
215    /// Create a new file and return a writer to store data in the file
216    async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
217    -> Result<Box<dyn IndexWriter>>;
218
219    /// Open an existing file for retrieval
220    async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
221
222    /// Copy a range of batches from an index file from this store to another
223    ///
224    /// This is often useful when remapping or updating
225    async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
226
227    /// Rename an index file
228    async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
229
230    /// Delete an index file (used in the tmp spill store to keep tmp size down)
231    async fn delete_index_file(&self, name: &str) -> Result<()>;
232}
233
234/// Different scalar indices may support different kinds of queries
235///
236/// For example, a btree index can support a wide range of queries (e.g. x > 7)
237/// while an index based on FTS only supports queries like "x LIKE 'foo'"
238///
239/// This trait is used when we need an object that can represent any kind of query
240///
241/// Note: if you are implementing this trait for a query type then you probably also
242/// need to implement the [crate::scalar::expression::ScalarQueryParser] trait to
243/// create instances of your query at parse time.
244pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
245    /// Cast the query as Any to allow for downcasting
246    fn as_any(&self) -> &dyn Any;
247    /// Format the query as a string for display purposes
248    fn format(&self, col: &str) -> String;
249    /// Convert the query to a datafusion expression
250    fn to_expr(&self, col: String) -> Expr;
251    /// Compare this query to another query
252    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
253}
254
255impl PartialEq for dyn AnyQuery {
256    fn eq(&self, other: &Self) -> bool {
257        self.dyn_eq(other)
258    }
259}
260/// A full text search query
261#[derive(Debug, Clone, PartialEq)]
262pub struct FullTextSearchQuery {
263    pub query: FtsQuery,
264
265    /// The maximum number of results to return
266    pub limit: Option<i64>,
267
268    /// The wand factor to use for ranking
269    /// if None, use the default value of 1.0
270    /// Increasing this value will reduce the recall and improve the performance
271    /// 1.0 is the value that would give the best performance without recall loss
272    pub wand_factor: Option<f32>,
273}
274
275impl FullTextSearchQuery {
276    /// Create a new terms query
277    pub fn new(query: String) -> Self {
278        let query = MatchQuery::new(query).into();
279        Self {
280            query,
281            limit: None,
282            wand_factor: None,
283        }
284    }
285
286    /// Create a new fuzzy query
287    pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
288        let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
289        Self {
290            query,
291            limit: None,
292            wand_factor: None,
293        }
294    }
295
296    /// Create a new compound query
297    pub fn new_query(query: FtsQuery) -> Self {
298        Self {
299            query,
300            limit: None,
301            wand_factor: None,
302        }
303    }
304
305    /// Set the column to search over
306    /// This is available for only MatchQuery and PhraseQuery
307    pub fn with_column(mut self, column: String) -> Result<Self> {
308        self.query = fill_fts_query_column(&self.query, &[column], true)?;
309        Ok(self)
310    }
311
312    /// Set the column to search over
313    /// This is available for only MatchQuery
314    pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
315        self.query = fill_fts_query_column(&self.query, columns, true)?;
316        Ok(self)
317    }
318
319    /// limit the number of results to return
320    /// if None, return all results
321    pub fn limit(mut self, limit: Option<i64>) -> Self {
322        self.limit = limit;
323        self
324    }
325
326    pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
327        self.wand_factor = wand_factor;
328        self
329    }
330
331    pub fn columns(&self) -> HashSet<String> {
332        self.query.columns()
333    }
334
335    pub fn params(&self) -> FtsSearchParams {
336        FtsSearchParams::new()
337            .with_limit(self.limit.map(|limit| limit as usize))
338            .with_wand_factor(self.wand_factor.unwrap_or(1.0))
339    }
340}
341
342/// A query that a basic scalar index (e.g. btree / bitmap) can satisfy
343///
344/// This is a subset of expression operators that is often referred to as the
345/// "sargable" operators
346///
347/// Note that negation is not included.  Negation should be applied later.  For
348/// example, to invert an equality query (e.g. all rows where the value is not 7)
349/// you can grab all rows where the value = 7 and then do an inverted take (or use
350/// a block list instead of an allow list for prefiltering)
351#[derive(Debug, Clone, PartialEq)]
352pub enum SargableQuery {
353    /// Retrieve all row ids where the value is in the given [min, max) range
354    Range(Bound<ScalarValue>, Bound<ScalarValue>),
355    /// Retrieve all row ids where the value is in the given set of values
356    IsIn(Vec<ScalarValue>),
357    /// Retrieve all row ids where the value is exactly the given value
358    Equals(ScalarValue),
359    /// Retrieve all row ids where the value matches the given full text search query
360    FullTextSearch(FullTextSearchQuery),
361    /// Retrieve all row ids where the value is null
362    IsNull(),
363}
364
365impl AnyQuery for SargableQuery {
366    fn as_any(&self) -> &dyn Any {
367        self
368    }
369
370    fn format(&self, col: &str) -> String {
371        match self {
372            Self::Range(lower, upper) => match (lower, upper) {
373                (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
374                (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
375                (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
376                (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
377                (Bound::Included(lhs), Bound::Included(rhs)) => {
378                    format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
379                }
380                (Bound::Included(lhs), Bound::Excluded(rhs)) => {
381                    format!("{} >= {} && {} < {}", col, lhs, col, rhs)
382                }
383                (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
384                (Bound::Excluded(lhs), Bound::Included(rhs)) => {
385                    format!("{} > {} && {} <= {}", col, lhs, col, rhs)
386                }
387                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
388                    format!("{} > {} && {} < {}", col, lhs, col, rhs)
389                }
390            },
391            Self::IsIn(values) => {
392                format!(
393                    "{} IN [{}]",
394                    col,
395                    values
396                        .iter()
397                        .map(|val| val.to_string())
398                        .collect::<Vec<_>>()
399                        .join(",")
400                )
401            }
402            Self::FullTextSearch(query) => {
403                format!("fts({})", query.query)
404            }
405            Self::IsNull() => {
406                format!("{} IS NULL", col)
407            }
408            Self::Equals(val) => {
409                format!("{} = {}", col, val)
410            }
411        }
412    }
413
414    fn to_expr(&self, col: String) -> Expr {
415        let col_expr = Expr::Column(Column::new_unqualified(col));
416        match self {
417            Self::Range(lower, upper) => match (lower, upper) {
418                (Bound::Unbounded, Bound::Unbounded) => {
419                    Expr::Literal(ScalarValue::Boolean(Some(true)), None)
420                }
421                (Bound::Unbounded, Bound::Included(rhs)) => {
422                    col_expr.lt_eq(Expr::Literal(rhs.clone(), None))
423                }
424                (Bound::Unbounded, Bound::Excluded(rhs)) => {
425                    col_expr.lt(Expr::Literal(rhs.clone(), None))
426                }
427                (Bound::Included(lhs), Bound::Unbounded) => {
428                    col_expr.gt_eq(Expr::Literal(lhs.clone(), None))
429                }
430                (Bound::Included(lhs), Bound::Included(rhs)) => col_expr.between(
431                    Expr::Literal(lhs.clone(), None),
432                    Expr::Literal(rhs.clone(), None),
433                ),
434                (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
435                    .clone()
436                    .gt_eq(Expr::Literal(lhs.clone(), None))
437                    .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
438                (Bound::Excluded(lhs), Bound::Unbounded) => {
439                    col_expr.gt(Expr::Literal(lhs.clone(), None))
440                }
441                (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
442                    .clone()
443                    .gt(Expr::Literal(lhs.clone(), None))
444                    .and(col_expr.lt_eq(Expr::Literal(rhs.clone(), None))),
445                (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
446                    .clone()
447                    .gt(Expr::Literal(lhs.clone(), None))
448                    .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
449            },
450            Self::IsIn(values) => col_expr.in_list(
451                values
452                    .iter()
453                    .map(|val| Expr::Literal(val.clone(), None))
454                    .collect::<Vec<_>>(),
455                false,
456            ),
457            Self::FullTextSearch(query) => col_expr.like(Expr::Literal(
458                ScalarValue::Utf8(Some(query.query.to_string())),
459                None,
460            )),
461            Self::IsNull() => col_expr.is_null(),
462            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
463        }
464    }
465
466    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
467        match other.as_any().downcast_ref::<Self>() {
468            Some(o) => self == o,
469            None => false,
470        }
471    }
472}
473
474/// A query that a LabelListIndex can satisfy
475#[derive(Debug, Clone, PartialEq)]
476pub enum LabelListQuery {
477    /// Retrieve all row ids where every label is in the list of values for the row
478    HasAllLabels(Vec<ScalarValue>),
479    /// Retrieve all row ids where at least one of the given labels is in the list of values for the row
480    HasAnyLabel(Vec<ScalarValue>),
481}
482
483impl AnyQuery for LabelListQuery {
484    fn as_any(&self) -> &dyn Any {
485        self
486    }
487
488    fn format(&self, col: &str) -> String {
489        format!("{}", self.to_expr(col.to_string()))
490    }
491
492    fn to_expr(&self, col: String) -> Expr {
493        match self {
494            Self::HasAllLabels(labels) => {
495                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
496                let offsets_buffer =
497                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
498                let labels_list = ListArray::try_new(
499                    Arc::new(Field::new("item", labels_arr.data_type().clone(), true)),
500                    offsets_buffer,
501                    labels_arr,
502                    None,
503                )
504                .unwrap();
505                let labels_arr = Arc::new(labels_list);
506                Expr::ScalarFunction(ScalarFunction {
507                    func: Arc::new(array_has::ArrayHasAll::new().into()),
508                    args: vec![
509                        Expr::Column(Column::new_unqualified(col)),
510                        Expr::Literal(ScalarValue::List(labels_arr), None),
511                    ],
512                })
513            }
514            Self::HasAnyLabel(labels) => {
515                let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
516                let offsets_buffer =
517                    OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
518                let labels_list = ListArray::try_new(
519                    Arc::new(Field::new("item", labels_arr.data_type().clone(), true)),
520                    offsets_buffer,
521                    labels_arr,
522                    None,
523                )
524                .unwrap();
525                let labels_arr = Arc::new(labels_list);
526                Expr::ScalarFunction(ScalarFunction {
527                    func: Arc::new(array_has::ArrayHasAny::new().into()),
528                    args: vec![
529                        Expr::Column(Column::new_unqualified(col)),
530                        Expr::Literal(ScalarValue::List(labels_arr), None),
531                    ],
532                })
533            }
534        }
535    }
536
537    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
538        match other.as_any().downcast_ref::<Self>() {
539            Some(o) => self == o,
540            None => false,
541        }
542    }
543}
544
545/// A query that a NGramIndex can satisfy
546#[derive(Debug, Clone, PartialEq)]
547pub enum TextQuery {
548    /// Retrieve all row ids where the text contains the given string
549    StringContains(String),
550    // TODO: In the future we should be able to do string-insensitive contains
551    // as well as partial matches (e.g. LIKE 'foo%') and potentially even
552    // some regular expressions
553}
554
555impl AnyQuery for TextQuery {
556    fn as_any(&self) -> &dyn Any {
557        self
558    }
559
560    fn format(&self, col: &str) -> String {
561        format!("{}", self.to_expr(col.to_string()))
562    }
563
564    fn to_expr(&self, col: String) -> Expr {
565        match self {
566            Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
567                func: Arc::new(ContainsFunc::new().into()),
568                args: vec![
569                    Expr::Column(Column::new_unqualified(col)),
570                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
571                ],
572            }),
573        }
574    }
575
576    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
577        match other.as_any().downcast_ref::<Self>() {
578            Some(o) => self == o,
579            None => false,
580        }
581    }
582}
583
584/// A query that a InvertedIndex can satisfy
585#[derive(Debug, Clone, PartialEq)]
586pub enum TokenQuery {
587    /// Retrieve all row ids where the text contains all tokens parsed from given string. The tokens
588    /// are separated by punctuations and white spaces.
589    TokensContains(String),
590}
591
592/// A query that a BloomFilter index can satisfy
593///
594/// This is a subset of SargableQuery that only includes operations that bloom filters
595/// can efficiently handle: equals, is_null, and is_in queries.
596#[derive(Debug, Clone, PartialEq)]
597pub enum BloomFilterQuery {
598    /// Retrieve all row ids where the value is exactly the given value
599    Equals(ScalarValue),
600    /// Retrieve all row ids where the value is null
601    IsNull(),
602    /// Retrieve all row ids where the value is in the given set of values
603    IsIn(Vec<ScalarValue>),
604}
605
606impl AnyQuery for BloomFilterQuery {
607    fn as_any(&self) -> &dyn Any {
608        self
609    }
610
611    fn format(&self, col: &str) -> String {
612        match self {
613            Self::Equals(val) => {
614                format!("{} = {}", col, val)
615            }
616            Self::IsNull() => {
617                format!("{} IS NULL", col)
618            }
619            Self::IsIn(values) => {
620                format!(
621                    "{} IN [{}]",
622                    col,
623                    values
624                        .iter()
625                        .map(|val| val.to_string())
626                        .collect::<Vec<_>>()
627                        .join(",")
628                )
629            }
630        }
631    }
632
633    fn to_expr(&self, col: String) -> Expr {
634        let col_expr = Expr::Column(Column::new_unqualified(col));
635        match self {
636            Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
637            Self::IsNull() => col_expr.is_null(),
638            Self::IsIn(values) => col_expr.in_list(
639                values
640                    .iter()
641                    .map(|val| Expr::Literal(val.clone(), None))
642                    .collect::<Vec<_>>(),
643                false,
644            ),
645        }
646    }
647
648    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
649        match other.as_any().downcast_ref::<Self>() {
650            Some(o) => self == o,
651            None => false,
652        }
653    }
654}
655
656impl AnyQuery for TokenQuery {
657    fn as_any(&self) -> &dyn Any {
658        self
659    }
660
661    fn format(&self, col: &str) -> String {
662        format!("{}", self.to_expr(col.to_string()))
663    }
664
665    fn to_expr(&self, col: String) -> Expr {
666        match self {
667            Self::TokensContains(substr) => Expr::ScalarFunction(ScalarFunction {
668                func: Arc::new(CONTAINS_TOKENS_UDF.clone()),
669                args: vec![
670                    Expr::Column(Column::new_unqualified(col)),
671                    Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
672                ],
673            }),
674        }
675    }
676
677    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
678        match other.as_any().downcast_ref::<Self>() {
679            Some(o) => self == o,
680            None => false,
681        }
682    }
683}
684
685#[cfg(feature = "geo")]
686#[derive(Debug, Clone, PartialEq)]
687pub struct RelationQuery {
688    pub value: ScalarValue,
689    pub field: Field,
690}
691
692/// A query that a Geo index can satisfy
693#[cfg(feature = "geo")]
694#[derive(Debug, Clone, PartialEq)]
695pub enum GeoQuery {
696    IntersectQuery(RelationQuery),
697    IsNull,
698}
699
700#[cfg(feature = "geo")]
701impl AnyQuery for GeoQuery {
702    fn as_any(&self) -> &dyn Any {
703        self
704    }
705
706    fn format(&self, col: &str) -> String {
707        match self {
708            Self::IntersectQuery(query) => {
709                format!("Intersect({} {})", col, query.value)
710            }
711            Self::IsNull => {
712                format!("{} IS NULL", col)
713            }
714        }
715    }
716
717    fn to_expr(&self, _col: String) -> Expr {
718        todo!()
719    }
720
721    fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
722        match other.as_any().downcast_ref::<Self>() {
723            Some(o) => self == o,
724            None => false,
725        }
726    }
727}
728
729/// The result of a search operation against a scalar index
730#[derive(Debug, PartialEq)]
731pub enum SearchResult {
732    /// The exact row ids that satisfy the query
733    Exact(NullableRowAddrSet),
734    /// Any row id satisfying the query will be in this set but not every
735    /// row id in this set will satisfy the query, a further recheck step
736    /// is needed
737    AtMost(NullableRowAddrSet),
738    /// All of the given row ids satisfy the query but there may be more
739    ///
740    /// No scalar index actually returns this today but it can arise from
741    /// boolean operations (e.g. NOT(AtMost(x)) == AtLeast(NOT(x)))
742    AtLeast(NullableRowAddrSet),
743}
744
745impl SearchResult {
746    pub fn exact(row_ids: impl Into<RowAddrTreeMap>) -> Self {
747        Self::Exact(NullableRowAddrSet::new(row_ids.into(), Default::default()))
748    }
749
750    pub fn at_most(row_ids: impl Into<RowAddrTreeMap>) -> Self {
751        Self::AtMost(NullableRowAddrSet::new(row_ids.into(), Default::default()))
752    }
753
754    pub fn at_least(row_ids: impl Into<RowAddrTreeMap>) -> Self {
755        Self::AtLeast(NullableRowAddrSet::new(row_ids.into(), Default::default()))
756    }
757
758    pub fn with_nulls(self, nulls: impl Into<RowAddrTreeMap>) -> Self {
759        match self {
760            Self::Exact(row_ids) => Self::Exact(row_ids.with_nulls(nulls.into())),
761            Self::AtMost(row_ids) => Self::AtMost(row_ids.with_nulls(nulls.into())),
762            Self::AtLeast(row_ids) => Self::AtLeast(row_ids.with_nulls(nulls.into())),
763        }
764    }
765
766    pub fn row_addrs(&self) -> &NullableRowAddrSet {
767        match self {
768            Self::Exact(row_addrs) => row_addrs,
769            Self::AtMost(row_addrs) => row_addrs,
770            Self::AtLeast(row_addrs) => row_addrs,
771        }
772    }
773
774    pub fn is_exact(&self) -> bool {
775        matches!(self, Self::Exact(_))
776    }
777}
778
779/// Brief information about an index that was created
780pub struct CreatedIndex {
781    /// The details of the index that was created
782    ///
783    /// These should be stored somewhere as they will be needed to
784    /// load the index later.
785    pub index_details: prost_types::Any,
786    /// The version of the index that was created
787    ///
788    /// This can be used to determine if a reader is able to load the index.
789    pub index_version: u32,
790}
791
792/// The criteria that specifies how to update an index
793pub struct UpdateCriteria {
794    /// If true, then we need to read the old data to update the index
795    ///
796    /// This should be avoided if possible but is left in for some legacy paths
797    pub requires_old_data: bool,
798    /// The criteria required for data (both old and new)
799    pub data_criteria: TrainingCriteria,
800}
801
802impl UpdateCriteria {
803    pub fn requires_old_data(data_criteria: TrainingCriteria) -> Self {
804        Self {
805            requires_old_data: true,
806            data_criteria,
807        }
808    }
809
810    pub fn only_new_data(data_criteria: TrainingCriteria) -> Self {
811        Self {
812            requires_old_data: false,
813            data_criteria,
814        }
815    }
816}
817
818/// A trait for a scalar index, a structure that can determine row ids that satisfy scalar queries
819#[async_trait]
820pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
821    /// Search the scalar index
822    ///
823    /// Returns all row ids that satisfy the query, these row ids are not necessarily ordered
824    async fn search(
825        &self,
826        query: &dyn AnyQuery,
827        metrics: &dyn MetricsCollector,
828    ) -> Result<SearchResult>;
829
830    /// Returns true if the remap operation is supported
831    fn can_remap(&self) -> bool;
832
833    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
834    async fn remap(
835        &self,
836        mapping: &HashMap<u64, Option<u64>>,
837        dest_store: &dyn IndexStore,
838    ) -> Result<CreatedIndex>;
839
840    /// Add the new data into the index, creating an updated version of the index in `dest_store`
841    ///
842    /// If `valid_old_fragments` is provided, old index data for fragments not in the bitmap
843    /// will be filtered out during the merge.
844    async fn update(
845        &self,
846        new_data: SendableRecordBatchStream,
847        dest_store: &dyn IndexStore,
848        valid_old_fragments: Option<&RoaringBitmap>,
849    ) -> Result<CreatedIndex>;
850
851    /// Returns the criteria that will be used to update the index
852    fn update_criteria(&self) -> UpdateCriteria;
853
854    /// Derive the index parameters from the current index
855    ///
856    /// This returns a ScalarIndexParams that can be used to recreate an index
857    /// with the same configuration on another dataset.
858    fn derive_index_params(&self) -> Result<ScalarIndexParams>;
859}