1use arrow::buffer::{OffsetBuffer, ScalarBuffer};
7use arrow_array::{ListArray, RecordBatch};
8use arrow_schema::{Field, Schema};
9use async_trait::async_trait;
10use datafusion::functions::string::contains::ContainsFunc;
11use datafusion::functions_nested::array_has;
12use datafusion::physical_plan::SendableRecordBatchStream;
13use datafusion_common::{scalar::ScalarValue, Column};
14use std::collections::{HashMap, HashSet};
15use std::fmt::Debug;
16use std::{any::Any, ops::Bound, sync::Arc};
17
18use datafusion_expr::expr::ScalarFunction;
19use datafusion_expr::Expr;
20use deepsize::DeepSizeOf;
21use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
22use lance_core::utils::mask::RowIdTreeMap;
23use lance_core::{Error, Result};
24use serde::Serialize;
25use snafu::location;
26
27use crate::metrics::MetricsCollector;
28use crate::scalar::registry::TrainingCriteria;
29use crate::{Index, IndexParams, IndexType};
30
31pub mod bitmap;
32pub mod bloomfilter;
33pub mod btree;
34pub mod expression;
35pub mod flat;
36pub mod inverted;
37pub mod json;
38pub mod label_list;
39pub mod lance_format;
40pub mod ngram;
41pub mod registry;
42pub mod zonemap;
43
44use crate::frag_reuse::FragReuseIndex;
45pub use inverted::tokenizer::InvertedIndexParams;
46use lance_datafusion::udf::CONTAINS_TOKENS_UDF;
47
48pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
49
50#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
56pub enum BuiltinIndexType {
57 BTree,
58 Bitmap,
59 LabelList,
60 NGram,
61 ZoneMap,
62 BloomFilter,
63 Inverted,
64}
65
66impl BuiltinIndexType {
67 pub fn as_str(&self) -> &str {
68 match self {
69 Self::BTree => "btree",
70 Self::Bitmap => "bitmap",
71 Self::LabelList => "labellist",
72 Self::NGram => "ngram",
73 Self::ZoneMap => "zonemap",
74 Self::Inverted => "inverted",
75 Self::BloomFilter => "bloomfilter",
76 }
77 }
78}
79
80impl TryFrom<IndexType> for BuiltinIndexType {
81 type Error = Error;
82
83 fn try_from(value: IndexType) -> Result<Self> {
84 match value {
85 IndexType::BTree => Ok(Self::BTree),
86 IndexType::Bitmap => Ok(Self::Bitmap),
87 IndexType::LabelList => Ok(Self::LabelList),
88 IndexType::NGram => Ok(Self::NGram),
89 IndexType::ZoneMap => Ok(Self::ZoneMap),
90 IndexType::Inverted => Ok(Self::Inverted),
91 IndexType::BloomFilter => Ok(Self::BloomFilter),
92 _ => Err(Error::Index {
93 message: "Invalid index type".to_string(),
94 location: location!(),
95 }),
96 }
97 }
98}
99
100#[derive(Debug, Clone, PartialEq)]
101pub struct ScalarIndexParams {
102 pub index_type: String,
106 pub params: Option<String>,
111}
112
113impl Default for ScalarIndexParams {
114 fn default() -> Self {
115 Self {
116 index_type: BuiltinIndexType::BTree.as_str().to_string(),
117 params: None,
118 }
119 }
120}
121
122impl ScalarIndexParams {
123 pub fn for_builtin(index_type: BuiltinIndexType) -> Self {
125 Self {
126 index_type: index_type.as_str().to_string(),
127 params: None,
128 }
129 }
130
131 pub fn new(index_type: String) -> Self {
133 Self {
134 index_type,
135 params: None,
136 }
137 }
138
139 pub fn with_params<ParamsType: Serialize>(mut self, params: &ParamsType) -> Self {
141 self.params = Some(serde_json::to_string(params).unwrap());
142 self
143 }
144}
145
146impl IndexParams for ScalarIndexParams {
147 fn as_any(&self) -> &dyn std::any::Any {
148 self
149 }
150
151 fn index_name(&self) -> &str {
152 LANCE_SCALAR_INDEX
153 }
154}
155
156impl IndexParams for InvertedIndexParams {
157 fn as_any(&self) -> &dyn std::any::Any {
158 self
159 }
160
161 fn index_name(&self) -> &str {
162 "INVERTED"
163 }
164}
165
166#[async_trait]
168pub trait IndexWriter: Send {
169 async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
173 async fn finish(&mut self) -> Result<()>;
175 async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
177}
178
179#[async_trait]
181pub trait IndexReader: Send + Sync {
182 async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
184 async fn read_range(
189 &self,
190 range: std::ops::Range<usize>,
191 projection: Option<&[&str]>,
192 ) -> Result<RecordBatch>;
193 async fn num_batches(&self, batch_size: u64) -> u32;
195 fn num_rows(&self) -> usize;
197 fn schema(&self) -> &lance_core::datatypes::Schema;
199}
200
201#[async_trait]
207pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
208 fn as_any(&self) -> &dyn Any;
209
210 fn io_parallelism(&self) -> usize;
212
213 async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
215 -> Result<Box<dyn IndexWriter>>;
216
217 async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
219
220 async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
224
225 async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
227
228 async fn delete_index_file(&self, name: &str) -> Result<()>;
230}
231
232pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
243 fn as_any(&self) -> &dyn Any;
245 fn format(&self, col: &str) -> String;
247 fn to_expr(&self, col: String) -> Expr;
249 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
251}
252
253impl PartialEq for dyn AnyQuery {
254 fn eq(&self, other: &Self) -> bool {
255 self.dyn_eq(other)
256 }
257}
258#[derive(Debug, Clone, PartialEq)]
260pub struct FullTextSearchQuery {
261 pub query: FtsQuery,
262
263 pub limit: Option<i64>,
265
266 pub wand_factor: Option<f32>,
271}
272
273impl FullTextSearchQuery {
274 pub fn new(query: String) -> Self {
276 let query = MatchQuery::new(query).into();
277 Self {
278 query,
279 limit: None,
280 wand_factor: None,
281 }
282 }
283
284 pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
286 let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
287 Self {
288 query,
289 limit: None,
290 wand_factor: None,
291 }
292 }
293
294 pub fn new_query(query: FtsQuery) -> Self {
296 Self {
297 query,
298 limit: None,
299 wand_factor: None,
300 }
301 }
302
303 pub fn with_column(mut self, column: String) -> Result<Self> {
306 self.query = fill_fts_query_column(&self.query, &[column], true)?;
307 Ok(self)
308 }
309
310 pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
313 self.query = fill_fts_query_column(&self.query, columns, true)?;
314 Ok(self)
315 }
316
317 pub fn limit(mut self, limit: Option<i64>) -> Self {
320 self.limit = limit;
321 self
322 }
323
324 pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
325 self.wand_factor = wand_factor;
326 self
327 }
328
329 pub fn columns(&self) -> HashSet<String> {
330 self.query.columns()
331 }
332
333 pub fn params(&self) -> FtsSearchParams {
334 FtsSearchParams::new()
335 .with_limit(self.limit.map(|limit| limit as usize))
336 .with_wand_factor(self.wand_factor.unwrap_or(1.0))
337 }
338}
339
340#[derive(Debug, Clone, PartialEq)]
350pub enum SargableQuery {
351 Range(Bound<ScalarValue>, Bound<ScalarValue>),
353 IsIn(Vec<ScalarValue>),
355 Equals(ScalarValue),
357 FullTextSearch(FullTextSearchQuery),
359 IsNull(),
361}
362
363impl AnyQuery for SargableQuery {
364 fn as_any(&self) -> &dyn Any {
365 self
366 }
367
368 fn format(&self, col: &str) -> String {
369 match self {
370 Self::Range(lower, upper) => match (lower, upper) {
371 (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
372 (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
373 (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
374 (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
375 (Bound::Included(lhs), Bound::Included(rhs)) => {
376 format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
377 }
378 (Bound::Included(lhs), Bound::Excluded(rhs)) => {
379 format!("{} >= {} && {} < {}", col, lhs, col, rhs)
380 }
381 (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
382 (Bound::Excluded(lhs), Bound::Included(rhs)) => {
383 format!("{} > {} && {} <= {}", col, lhs, col, rhs)
384 }
385 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
386 format!("{} > {} && {} < {}", col, lhs, col, rhs)
387 }
388 },
389 Self::IsIn(values) => {
390 format!(
391 "{} IN [{}]",
392 col,
393 values
394 .iter()
395 .map(|val| val.to_string())
396 .collect::<Vec<_>>()
397 .join(",")
398 )
399 }
400 Self::FullTextSearch(query) => {
401 format!("fts({})", query.query)
402 }
403 Self::IsNull() => {
404 format!("{} IS NULL", col)
405 }
406 Self::Equals(val) => {
407 format!("{} = {}", col, val)
408 }
409 }
410 }
411
412 fn to_expr(&self, col: String) -> Expr {
413 let col_expr = Expr::Column(Column::new_unqualified(col));
414 match self {
415 Self::Range(lower, upper) => match (lower, upper) {
416 (Bound::Unbounded, Bound::Unbounded) => {
417 Expr::Literal(ScalarValue::Boolean(Some(true)), None)
418 }
419 (Bound::Unbounded, Bound::Included(rhs)) => {
420 col_expr.lt_eq(Expr::Literal(rhs.clone(), None))
421 }
422 (Bound::Unbounded, Bound::Excluded(rhs)) => {
423 col_expr.lt(Expr::Literal(rhs.clone(), None))
424 }
425 (Bound::Included(lhs), Bound::Unbounded) => {
426 col_expr.gt_eq(Expr::Literal(lhs.clone(), None))
427 }
428 (Bound::Included(lhs), Bound::Included(rhs)) => col_expr.between(
429 Expr::Literal(lhs.clone(), None),
430 Expr::Literal(rhs.clone(), None),
431 ),
432 (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
433 .clone()
434 .gt_eq(Expr::Literal(lhs.clone(), None))
435 .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
436 (Bound::Excluded(lhs), Bound::Unbounded) => {
437 col_expr.gt(Expr::Literal(lhs.clone(), None))
438 }
439 (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
440 .clone()
441 .gt(Expr::Literal(lhs.clone(), None))
442 .and(col_expr.lt_eq(Expr::Literal(rhs.clone(), None))),
443 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
444 .clone()
445 .gt(Expr::Literal(lhs.clone(), None))
446 .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
447 },
448 Self::IsIn(values) => col_expr.in_list(
449 values
450 .iter()
451 .map(|val| Expr::Literal(val.clone(), None))
452 .collect::<Vec<_>>(),
453 false,
454 ),
455 Self::FullTextSearch(query) => col_expr.like(Expr::Literal(
456 ScalarValue::Utf8(Some(query.query.to_string())),
457 None,
458 )),
459 Self::IsNull() => col_expr.is_null(),
460 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
461 }
462 }
463
464 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
465 match other.as_any().downcast_ref::<Self>() {
466 Some(o) => self == o,
467 None => false,
468 }
469 }
470}
471
472#[derive(Debug, Clone, PartialEq)]
474pub enum LabelListQuery {
475 HasAllLabels(Vec<ScalarValue>),
477 HasAnyLabel(Vec<ScalarValue>),
479}
480
481impl AnyQuery for LabelListQuery {
482 fn as_any(&self) -> &dyn Any {
483 self
484 }
485
486 fn format(&self, col: &str) -> String {
487 format!("{}", self.to_expr(col.to_string()))
488 }
489
490 fn to_expr(&self, col: String) -> Expr {
491 match self {
492 Self::HasAllLabels(labels) => {
493 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
494 let offsets_buffer =
495 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
496 let labels_list = ListArray::try_new(
497 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
498 offsets_buffer,
499 labels_arr,
500 None,
501 )
502 .unwrap();
503 let labels_arr = Arc::new(labels_list);
504 Expr::ScalarFunction(ScalarFunction {
505 func: Arc::new(array_has::ArrayHasAll::new().into()),
506 args: vec![
507 Expr::Column(Column::new_unqualified(col)),
508 Expr::Literal(ScalarValue::List(labels_arr), None),
509 ],
510 })
511 }
512 Self::HasAnyLabel(labels) => {
513 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
514 let offsets_buffer =
515 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
516 let labels_list = ListArray::try_new(
517 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
518 offsets_buffer,
519 labels_arr,
520 None,
521 )
522 .unwrap();
523 let labels_arr = Arc::new(labels_list);
524 Expr::ScalarFunction(ScalarFunction {
525 func: Arc::new(array_has::ArrayHasAny::new().into()),
526 args: vec![
527 Expr::Column(Column::new_unqualified(col)),
528 Expr::Literal(ScalarValue::List(labels_arr), None),
529 ],
530 })
531 }
532 }
533 }
534
535 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
536 match other.as_any().downcast_ref::<Self>() {
537 Some(o) => self == o,
538 None => false,
539 }
540 }
541}
542
543#[derive(Debug, Clone, PartialEq)]
545pub enum TextQuery {
546 StringContains(String),
548 }
552
553impl AnyQuery for TextQuery {
554 fn as_any(&self) -> &dyn Any {
555 self
556 }
557
558 fn format(&self, col: &str) -> String {
559 format!("{}", self.to_expr(col.to_string()))
560 }
561
562 fn to_expr(&self, col: String) -> Expr {
563 match self {
564 Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
565 func: Arc::new(ContainsFunc::new().into()),
566 args: vec![
567 Expr::Column(Column::new_unqualified(col)),
568 Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
569 ],
570 }),
571 }
572 }
573
574 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
575 match other.as_any().downcast_ref::<Self>() {
576 Some(o) => self == o,
577 None => false,
578 }
579 }
580}
581
582#[derive(Debug, Clone, PartialEq)]
584pub enum TokenQuery {
585 TokensContains(String),
588}
589
590#[derive(Debug, Clone, PartialEq)]
595pub enum BloomFilterQuery {
596 Equals(ScalarValue),
598 IsNull(),
600 IsIn(Vec<ScalarValue>),
602}
603
604impl AnyQuery for BloomFilterQuery {
605 fn as_any(&self) -> &dyn Any {
606 self
607 }
608
609 fn format(&self, col: &str) -> String {
610 match self {
611 Self::Equals(val) => {
612 format!("{} = {}", col, val)
613 }
614 Self::IsNull() => {
615 format!("{} IS NULL", col)
616 }
617 Self::IsIn(values) => {
618 format!(
619 "{} IN [{}]",
620 col,
621 values
622 .iter()
623 .map(|val| val.to_string())
624 .collect::<Vec<_>>()
625 .join(",")
626 )
627 }
628 }
629 }
630
631 fn to_expr(&self, col: String) -> Expr {
632 let col_expr = Expr::Column(Column::new_unqualified(col));
633 match self {
634 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
635 Self::IsNull() => col_expr.is_null(),
636 Self::IsIn(values) => col_expr.in_list(
637 values
638 .iter()
639 .map(|val| Expr::Literal(val.clone(), None))
640 .collect::<Vec<_>>(),
641 false,
642 ),
643 }
644 }
645
646 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
647 match other.as_any().downcast_ref::<Self>() {
648 Some(o) => self == o,
649 None => false,
650 }
651 }
652}
653
654impl AnyQuery for TokenQuery {
655 fn as_any(&self) -> &dyn Any {
656 self
657 }
658
659 fn format(&self, col: &str) -> String {
660 format!("{}", self.to_expr(col.to_string()))
661 }
662
663 fn to_expr(&self, col: String) -> Expr {
664 match self {
665 Self::TokensContains(substr) => Expr::ScalarFunction(ScalarFunction {
666 func: Arc::new(CONTAINS_TOKENS_UDF.clone()),
667 args: vec![
668 Expr::Column(Column::new_unqualified(col)),
669 Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
670 ],
671 }),
672 }
673 }
674
675 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
676 match other.as_any().downcast_ref::<Self>() {
677 Some(o) => self == o,
678 None => false,
679 }
680 }
681}
682
683#[derive(Debug, PartialEq)]
685pub enum SearchResult {
686 Exact(RowIdTreeMap),
688 AtMost(RowIdTreeMap),
692 AtLeast(RowIdTreeMap),
697}
698
699impl SearchResult {
700 pub fn row_ids(&self) -> &RowIdTreeMap {
701 match self {
702 Self::Exact(row_ids) => row_ids,
703 Self::AtMost(row_ids) => row_ids,
704 Self::AtLeast(row_ids) => row_ids,
705 }
706 }
707
708 pub fn is_exact(&self) -> bool {
709 matches!(self, Self::Exact(_))
710 }
711}
712
713pub struct CreatedIndex {
715 pub index_details: prost_types::Any,
720 pub index_version: u32,
724}
725
726pub struct UpdateCriteria {
728 pub requires_old_data: bool,
732 pub data_criteria: TrainingCriteria,
734}
735
736impl UpdateCriteria {
737 pub fn requires_old_data(data_criteria: TrainingCriteria) -> Self {
738 Self {
739 requires_old_data: true,
740 data_criteria,
741 }
742 }
743
744 pub fn only_new_data(data_criteria: TrainingCriteria) -> Self {
745 Self {
746 requires_old_data: false,
747 data_criteria,
748 }
749 }
750}
751
752#[async_trait]
754pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
755 async fn search(
759 &self,
760 query: &dyn AnyQuery,
761 metrics: &dyn MetricsCollector,
762 ) -> Result<SearchResult>;
763
764 fn can_remap(&self) -> bool;
766
767 async fn remap(
769 &self,
770 mapping: &HashMap<u64, Option<u64>>,
771 dest_store: &dyn IndexStore,
772 ) -> Result<CreatedIndex>;
773
774 async fn update(
776 &self,
777 new_data: SendableRecordBatchStream,
778 dest_store: &dyn IndexStore,
779 ) -> Result<CreatedIndex>;
780
781 fn update_criteria(&self) -> UpdateCriteria;
783
784 fn derive_index_params(&self) -> Result<ScalarIndexParams>;
789}