1use arrow::buffer::{OffsetBuffer, ScalarBuffer};
7use arrow_array::{ListArray, RecordBatch};
8use arrow_schema::{Field, Schema};
9use async_trait::async_trait;
10use datafusion::functions::string::contains::ContainsFunc;
11use datafusion::functions_nested::array_has;
12use datafusion::physical_plan::SendableRecordBatchStream;
13use datafusion_common::{Column, scalar::ScalarValue};
14use std::collections::{HashMap, HashSet};
15use std::fmt::Debug;
16use std::{any::Any, ops::Bound, sync::Arc};
17
18use datafusion_expr::Expr;
19use datafusion_expr::expr::ScalarFunction;
20use deepsize::DeepSizeOf;
21use inverted::query::{FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery, fill_fts_query_column};
22use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap};
23use lance_core::{Error, Result};
24use roaring::RoaringBitmap;
25use serde::Serialize;
26
27use crate::metrics::MetricsCollector;
28use crate::scalar::registry::TrainingCriteria;
29use crate::{Index, IndexParams, IndexType};
30
31pub mod bitmap;
32pub mod bloomfilter;
33pub mod btree;
34pub mod expression;
35pub mod inverted;
36pub mod json;
37pub mod label_list;
38pub mod lance_format;
39pub mod ngram;
40pub mod registry;
41#[cfg(feature = "geo")]
42pub mod rtree;
43pub mod zoned;
44pub mod zonemap;
45
46use crate::frag_reuse::FragReuseIndex;
47pub use inverted::tokenizer::InvertedIndexParams;
48use lance_datafusion::udf::CONTAINS_TOKENS_UDF;
49
50pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
51
52#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
58pub enum BuiltinIndexType {
59 BTree,
60 Bitmap,
61 LabelList,
62 NGram,
63 ZoneMap,
64 BloomFilter,
65 RTree,
66 Inverted,
67}
68
69impl BuiltinIndexType {
70 pub fn as_str(&self) -> &str {
71 match self {
72 Self::BTree => "btree",
73 Self::Bitmap => "bitmap",
74 Self::LabelList => "labellist",
75 Self::NGram => "ngram",
76 Self::ZoneMap => "zonemap",
77 Self::Inverted => "inverted",
78 Self::BloomFilter => "bloomfilter",
79 Self::RTree => "rtree",
80 }
81 }
82}
83
84impl TryFrom<IndexType> for BuiltinIndexType {
85 type Error = Error;
86
87 fn try_from(value: IndexType) -> Result<Self> {
88 match value {
89 IndexType::BTree => Ok(Self::BTree),
90 IndexType::Bitmap => Ok(Self::Bitmap),
91 IndexType::LabelList => Ok(Self::LabelList),
92 IndexType::NGram => Ok(Self::NGram),
93 IndexType::ZoneMap => Ok(Self::ZoneMap),
94 IndexType::Inverted => Ok(Self::Inverted),
95 IndexType::BloomFilter => Ok(Self::BloomFilter),
96 IndexType::RTree => Ok(Self::RTree),
97 _ => Err(Error::index("Invalid index type".to_string())),
98 }
99 }
100}
101
102#[derive(Debug, Clone, PartialEq)]
103pub struct ScalarIndexParams {
104 pub index_type: String,
108 pub params: Option<String>,
113}
114
115impl Default for ScalarIndexParams {
116 fn default() -> Self {
117 Self {
118 index_type: BuiltinIndexType::BTree.as_str().to_string(),
119 params: None,
120 }
121 }
122}
123
124impl ScalarIndexParams {
125 pub fn for_builtin(index_type: BuiltinIndexType) -> Self {
127 Self {
128 index_type: index_type.as_str().to_string(),
129 params: None,
130 }
131 }
132
133 pub fn new(index_type: String) -> Self {
135 Self {
136 index_type,
137 params: None,
138 }
139 }
140
141 pub fn with_params<ParamsType: Serialize>(mut self, params: &ParamsType) -> Self {
143 self.params = Some(serde_json::to_string(params).unwrap());
144 self
145 }
146}
147
148impl IndexParams for ScalarIndexParams {
149 fn as_any(&self) -> &dyn std::any::Any {
150 self
151 }
152
153 fn index_name(&self) -> &str {
154 LANCE_SCALAR_INDEX
155 }
156}
157
158impl IndexParams for InvertedIndexParams {
159 fn as_any(&self) -> &dyn std::any::Any {
160 self
161 }
162
163 fn index_name(&self) -> &str {
164 "INVERTED"
165 }
166}
167
168#[async_trait]
170pub trait IndexWriter: Send {
171 async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
175 async fn finish(&mut self) -> Result<()>;
177 async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
179}
180
181#[async_trait]
183pub trait IndexReader: Send + Sync {
184 async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
186 async fn read_range(
191 &self,
192 range: std::ops::Range<usize>,
193 projection: Option<&[&str]>,
194 ) -> Result<RecordBatch>;
195 async fn num_batches(&self, batch_size: u64) -> u32;
197 fn num_rows(&self) -> usize;
199 fn schema(&self) -> &lance_core::datatypes::Schema;
201}
202
203#[async_trait]
209pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
210 fn as_any(&self) -> &dyn Any;
211
212 fn io_parallelism(&self) -> usize;
214
215 async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
217 -> Result<Box<dyn IndexWriter>>;
218
219 async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
221
222 async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
226
227 async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
229
230 async fn delete_index_file(&self, name: &str) -> Result<()>;
232}
233
234pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
245 fn as_any(&self) -> &dyn Any;
247 fn format(&self, col: &str) -> String;
249 fn to_expr(&self, col: String) -> Expr;
251 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
253}
254
255impl PartialEq for dyn AnyQuery {
256 fn eq(&self, other: &Self) -> bool {
257 self.dyn_eq(other)
258 }
259}
260#[derive(Debug, Clone, PartialEq)]
262pub struct FullTextSearchQuery {
263 pub query: FtsQuery,
264
265 pub limit: Option<i64>,
267
268 pub wand_factor: Option<f32>,
273}
274
275impl FullTextSearchQuery {
276 pub fn new(query: String) -> Self {
278 let query = MatchQuery::new(query).into();
279 Self {
280 query,
281 limit: None,
282 wand_factor: None,
283 }
284 }
285
286 pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
288 let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
289 Self {
290 query,
291 limit: None,
292 wand_factor: None,
293 }
294 }
295
296 pub fn new_query(query: FtsQuery) -> Self {
298 Self {
299 query,
300 limit: None,
301 wand_factor: None,
302 }
303 }
304
305 pub fn with_column(mut self, column: String) -> Result<Self> {
308 self.query = fill_fts_query_column(&self.query, &[column], true)?;
309 Ok(self)
310 }
311
312 pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
315 self.query = fill_fts_query_column(&self.query, columns, true)?;
316 Ok(self)
317 }
318
319 pub fn limit(mut self, limit: Option<i64>) -> Self {
322 self.limit = limit;
323 self
324 }
325
326 pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
327 self.wand_factor = wand_factor;
328 self
329 }
330
331 pub fn columns(&self) -> HashSet<String> {
332 self.query.columns()
333 }
334
335 pub fn params(&self) -> FtsSearchParams {
336 FtsSearchParams::new()
337 .with_limit(self.limit.map(|limit| limit as usize))
338 .with_wand_factor(self.wand_factor.unwrap_or(1.0))
339 }
340}
341
342#[derive(Debug, Clone, PartialEq)]
352pub enum SargableQuery {
353 Range(Bound<ScalarValue>, Bound<ScalarValue>),
355 IsIn(Vec<ScalarValue>),
357 Equals(ScalarValue),
359 FullTextSearch(FullTextSearchQuery),
361 IsNull(),
363}
364
365impl AnyQuery for SargableQuery {
366 fn as_any(&self) -> &dyn Any {
367 self
368 }
369
370 fn format(&self, col: &str) -> String {
371 match self {
372 Self::Range(lower, upper) => match (lower, upper) {
373 (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
374 (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
375 (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
376 (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
377 (Bound::Included(lhs), Bound::Included(rhs)) => {
378 format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
379 }
380 (Bound::Included(lhs), Bound::Excluded(rhs)) => {
381 format!("{} >= {} && {} < {}", col, lhs, col, rhs)
382 }
383 (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
384 (Bound::Excluded(lhs), Bound::Included(rhs)) => {
385 format!("{} > {} && {} <= {}", col, lhs, col, rhs)
386 }
387 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
388 format!("{} > {} && {} < {}", col, lhs, col, rhs)
389 }
390 },
391 Self::IsIn(values) => {
392 format!(
393 "{} IN [{}]",
394 col,
395 values
396 .iter()
397 .map(|val| val.to_string())
398 .collect::<Vec<_>>()
399 .join(",")
400 )
401 }
402 Self::FullTextSearch(query) => {
403 format!("fts({})", query.query)
404 }
405 Self::IsNull() => {
406 format!("{} IS NULL", col)
407 }
408 Self::Equals(val) => {
409 format!("{} = {}", col, val)
410 }
411 }
412 }
413
414 fn to_expr(&self, col: String) -> Expr {
415 let col_expr = Expr::Column(Column::new_unqualified(col));
416 match self {
417 Self::Range(lower, upper) => match (lower, upper) {
418 (Bound::Unbounded, Bound::Unbounded) => {
419 Expr::Literal(ScalarValue::Boolean(Some(true)), None)
420 }
421 (Bound::Unbounded, Bound::Included(rhs)) => {
422 col_expr.lt_eq(Expr::Literal(rhs.clone(), None))
423 }
424 (Bound::Unbounded, Bound::Excluded(rhs)) => {
425 col_expr.lt(Expr::Literal(rhs.clone(), None))
426 }
427 (Bound::Included(lhs), Bound::Unbounded) => {
428 col_expr.gt_eq(Expr::Literal(lhs.clone(), None))
429 }
430 (Bound::Included(lhs), Bound::Included(rhs)) => col_expr.between(
431 Expr::Literal(lhs.clone(), None),
432 Expr::Literal(rhs.clone(), None),
433 ),
434 (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
435 .clone()
436 .gt_eq(Expr::Literal(lhs.clone(), None))
437 .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
438 (Bound::Excluded(lhs), Bound::Unbounded) => {
439 col_expr.gt(Expr::Literal(lhs.clone(), None))
440 }
441 (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
442 .clone()
443 .gt(Expr::Literal(lhs.clone(), None))
444 .and(col_expr.lt_eq(Expr::Literal(rhs.clone(), None))),
445 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
446 .clone()
447 .gt(Expr::Literal(lhs.clone(), None))
448 .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
449 },
450 Self::IsIn(values) => col_expr.in_list(
451 values
452 .iter()
453 .map(|val| Expr::Literal(val.clone(), None))
454 .collect::<Vec<_>>(),
455 false,
456 ),
457 Self::FullTextSearch(query) => col_expr.like(Expr::Literal(
458 ScalarValue::Utf8(Some(query.query.to_string())),
459 None,
460 )),
461 Self::IsNull() => col_expr.is_null(),
462 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
463 }
464 }
465
466 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
467 match other.as_any().downcast_ref::<Self>() {
468 Some(o) => self == o,
469 None => false,
470 }
471 }
472}
473
474#[derive(Debug, Clone, PartialEq)]
476pub enum LabelListQuery {
477 HasAllLabels(Vec<ScalarValue>),
479 HasAnyLabel(Vec<ScalarValue>),
481}
482
483impl AnyQuery for LabelListQuery {
484 fn as_any(&self) -> &dyn Any {
485 self
486 }
487
488 fn format(&self, col: &str) -> String {
489 format!("{}", self.to_expr(col.to_string()))
490 }
491
492 fn to_expr(&self, col: String) -> Expr {
493 match self {
494 Self::HasAllLabels(labels) => {
495 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
496 let offsets_buffer =
497 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
498 let labels_list = ListArray::try_new(
499 Arc::new(Field::new("item", labels_arr.data_type().clone(), true)),
500 offsets_buffer,
501 labels_arr,
502 None,
503 )
504 .unwrap();
505 let labels_arr = Arc::new(labels_list);
506 Expr::ScalarFunction(ScalarFunction {
507 func: Arc::new(array_has::ArrayHasAll::new().into()),
508 args: vec![
509 Expr::Column(Column::new_unqualified(col)),
510 Expr::Literal(ScalarValue::List(labels_arr), None),
511 ],
512 })
513 }
514 Self::HasAnyLabel(labels) => {
515 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
516 let offsets_buffer =
517 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
518 let labels_list = ListArray::try_new(
519 Arc::new(Field::new("item", labels_arr.data_type().clone(), true)),
520 offsets_buffer,
521 labels_arr,
522 None,
523 )
524 .unwrap();
525 let labels_arr = Arc::new(labels_list);
526 Expr::ScalarFunction(ScalarFunction {
527 func: Arc::new(array_has::ArrayHasAny::new().into()),
528 args: vec![
529 Expr::Column(Column::new_unqualified(col)),
530 Expr::Literal(ScalarValue::List(labels_arr), None),
531 ],
532 })
533 }
534 }
535 }
536
537 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
538 match other.as_any().downcast_ref::<Self>() {
539 Some(o) => self == o,
540 None => false,
541 }
542 }
543}
544
545#[derive(Debug, Clone, PartialEq)]
547pub enum TextQuery {
548 StringContains(String),
550 }
554
555impl AnyQuery for TextQuery {
556 fn as_any(&self) -> &dyn Any {
557 self
558 }
559
560 fn format(&self, col: &str) -> String {
561 format!("{}", self.to_expr(col.to_string()))
562 }
563
564 fn to_expr(&self, col: String) -> Expr {
565 match self {
566 Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
567 func: Arc::new(ContainsFunc::new().into()),
568 args: vec![
569 Expr::Column(Column::new_unqualified(col)),
570 Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
571 ],
572 }),
573 }
574 }
575
576 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
577 match other.as_any().downcast_ref::<Self>() {
578 Some(o) => self == o,
579 None => false,
580 }
581 }
582}
583
584#[derive(Debug, Clone, PartialEq)]
586pub enum TokenQuery {
587 TokensContains(String),
590}
591
592#[derive(Debug, Clone, PartialEq)]
597pub enum BloomFilterQuery {
598 Equals(ScalarValue),
600 IsNull(),
602 IsIn(Vec<ScalarValue>),
604}
605
606impl AnyQuery for BloomFilterQuery {
607 fn as_any(&self) -> &dyn Any {
608 self
609 }
610
611 fn format(&self, col: &str) -> String {
612 match self {
613 Self::Equals(val) => {
614 format!("{} = {}", col, val)
615 }
616 Self::IsNull() => {
617 format!("{} IS NULL", col)
618 }
619 Self::IsIn(values) => {
620 format!(
621 "{} IN [{}]",
622 col,
623 values
624 .iter()
625 .map(|val| val.to_string())
626 .collect::<Vec<_>>()
627 .join(",")
628 )
629 }
630 }
631 }
632
633 fn to_expr(&self, col: String) -> Expr {
634 let col_expr = Expr::Column(Column::new_unqualified(col));
635 match self {
636 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
637 Self::IsNull() => col_expr.is_null(),
638 Self::IsIn(values) => col_expr.in_list(
639 values
640 .iter()
641 .map(|val| Expr::Literal(val.clone(), None))
642 .collect::<Vec<_>>(),
643 false,
644 ),
645 }
646 }
647
648 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
649 match other.as_any().downcast_ref::<Self>() {
650 Some(o) => self == o,
651 None => false,
652 }
653 }
654}
655
656impl AnyQuery for TokenQuery {
657 fn as_any(&self) -> &dyn Any {
658 self
659 }
660
661 fn format(&self, col: &str) -> String {
662 format!("{}", self.to_expr(col.to_string()))
663 }
664
665 fn to_expr(&self, col: String) -> Expr {
666 match self {
667 Self::TokensContains(substr) => Expr::ScalarFunction(ScalarFunction {
668 func: Arc::new(CONTAINS_TOKENS_UDF.clone()),
669 args: vec![
670 Expr::Column(Column::new_unqualified(col)),
671 Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
672 ],
673 }),
674 }
675 }
676
677 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
678 match other.as_any().downcast_ref::<Self>() {
679 Some(o) => self == o,
680 None => false,
681 }
682 }
683}
684
685#[cfg(feature = "geo")]
686#[derive(Debug, Clone, PartialEq)]
687pub struct RelationQuery {
688 pub value: ScalarValue,
689 pub field: Field,
690}
691
692#[cfg(feature = "geo")]
694#[derive(Debug, Clone, PartialEq)]
695pub enum GeoQuery {
696 IntersectQuery(RelationQuery),
697 IsNull,
698}
699
700#[cfg(feature = "geo")]
701impl AnyQuery for GeoQuery {
702 fn as_any(&self) -> &dyn Any {
703 self
704 }
705
706 fn format(&self, col: &str) -> String {
707 match self {
708 Self::IntersectQuery(query) => {
709 format!("Intersect({} {})", col, query.value)
710 }
711 Self::IsNull => {
712 format!("{} IS NULL", col)
713 }
714 }
715 }
716
717 fn to_expr(&self, _col: String) -> Expr {
718 todo!()
719 }
720
721 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
722 match other.as_any().downcast_ref::<Self>() {
723 Some(o) => self == o,
724 None => false,
725 }
726 }
727}
728
729#[derive(Debug, PartialEq)]
731pub enum SearchResult {
732 Exact(NullableRowAddrSet),
734 AtMost(NullableRowAddrSet),
738 AtLeast(NullableRowAddrSet),
743}
744
745impl SearchResult {
746 pub fn exact(row_ids: impl Into<RowAddrTreeMap>) -> Self {
747 Self::Exact(NullableRowAddrSet::new(row_ids.into(), Default::default()))
748 }
749
750 pub fn at_most(row_ids: impl Into<RowAddrTreeMap>) -> Self {
751 Self::AtMost(NullableRowAddrSet::new(row_ids.into(), Default::default()))
752 }
753
754 pub fn at_least(row_ids: impl Into<RowAddrTreeMap>) -> Self {
755 Self::AtLeast(NullableRowAddrSet::new(row_ids.into(), Default::default()))
756 }
757
758 pub fn with_nulls(self, nulls: impl Into<RowAddrTreeMap>) -> Self {
759 match self {
760 Self::Exact(row_ids) => Self::Exact(row_ids.with_nulls(nulls.into())),
761 Self::AtMost(row_ids) => Self::AtMost(row_ids.with_nulls(nulls.into())),
762 Self::AtLeast(row_ids) => Self::AtLeast(row_ids.with_nulls(nulls.into())),
763 }
764 }
765
766 pub fn row_addrs(&self) -> &NullableRowAddrSet {
767 match self {
768 Self::Exact(row_addrs) => row_addrs,
769 Self::AtMost(row_addrs) => row_addrs,
770 Self::AtLeast(row_addrs) => row_addrs,
771 }
772 }
773
774 pub fn is_exact(&self) -> bool {
775 matches!(self, Self::Exact(_))
776 }
777}
778
779pub struct CreatedIndex {
781 pub index_details: prost_types::Any,
786 pub index_version: u32,
790}
791
792pub struct UpdateCriteria {
794 pub requires_old_data: bool,
798 pub data_criteria: TrainingCriteria,
800}
801
802impl UpdateCriteria {
803 pub fn requires_old_data(data_criteria: TrainingCriteria) -> Self {
804 Self {
805 requires_old_data: true,
806 data_criteria,
807 }
808 }
809
810 pub fn only_new_data(data_criteria: TrainingCriteria) -> Self {
811 Self {
812 requires_old_data: false,
813 data_criteria,
814 }
815 }
816}
817
818#[async_trait]
820pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
821 async fn search(
825 &self,
826 query: &dyn AnyQuery,
827 metrics: &dyn MetricsCollector,
828 ) -> Result<SearchResult>;
829
830 fn can_remap(&self) -> bool;
832
833 async fn remap(
835 &self,
836 mapping: &HashMap<u64, Option<u64>>,
837 dest_store: &dyn IndexStore,
838 ) -> Result<CreatedIndex>;
839
840 async fn update(
845 &self,
846 new_data: SendableRecordBatchStream,
847 dest_store: &dyn IndexStore,
848 valid_old_fragments: Option<&RoaringBitmap>,
849 ) -> Result<CreatedIndex>;
850
851 fn update_criteria(&self) -> UpdateCriteria;
853
854 fn derive_index_params(&self) -> Result<ScalarIndexParams>;
859}