1use std::collections::{HashMap, HashSet};
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
23use lance_core::utils::mask::RowIdTreeMap;
24use lance_core::{Error, Result};
25use serde::Serialize;
26use snafu::location;
27
28use crate::metrics::MetricsCollector;
29use crate::scalar::registry::TrainingCriteria;
30use crate::{Index, IndexParams, IndexType};
31
32pub mod bitmap;
33pub mod btree;
34pub mod expression;
35pub mod flat;
36pub mod inverted;
37pub mod json;
38pub mod label_list;
39pub mod lance_format;
40pub mod ngram;
41pub mod registry;
42pub mod zonemap;
43
44use crate::frag_reuse::FragReuseIndex;
45pub use inverted::tokenizer::InvertedIndexParams;
46use lance_datafusion::udf::CONTAINS_TOKENS_UDF;
47
48pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
49
50#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
56pub enum BuiltinIndexType {
57 BTree,
58 Bitmap,
59 LabelList,
60 NGram,
61 ZoneMap,
62 Inverted,
63}
64
65impl BuiltinIndexType {
66 pub fn as_str(&self) -> &str {
67 match self {
68 Self::BTree => "btree",
69 Self::Bitmap => "bitmap",
70 Self::LabelList => "labellist",
71 Self::NGram => "ngram",
72 Self::ZoneMap => "zonemap",
73 Self::Inverted => "inverted",
74 }
75 }
76}
77
78impl TryFrom<IndexType> for BuiltinIndexType {
79 type Error = Error;
80
81 fn try_from(value: IndexType) -> Result<Self> {
82 match value {
83 IndexType::BTree => Ok(Self::BTree),
84 IndexType::Bitmap => Ok(Self::Bitmap),
85 IndexType::LabelList => Ok(Self::LabelList),
86 IndexType::NGram => Ok(Self::NGram),
87 IndexType::ZoneMap => Ok(Self::ZoneMap),
88 IndexType::Inverted => Ok(Self::Inverted),
89 _ => Err(Error::Index {
90 message: "Invalid index type".to_string(),
91 location: location!(),
92 }),
93 }
94 }
95}
96
97#[derive(Debug, Clone, PartialEq)]
98pub struct ScalarIndexParams {
99 pub index_type: String,
105 pub params: Option<String>,
110}
111
112impl Default for ScalarIndexParams {
113 fn default() -> Self {
114 Self {
115 index_type: BuiltinIndexType::BTree.as_str().to_string(),
116 params: None,
117 }
118 }
119}
120
121impl ScalarIndexParams {
122 pub fn for_builtin(index_type: BuiltinIndexType) -> Self {
124 Self {
125 index_type: index_type.as_str().to_string(),
126 params: None,
127 }
128 }
129
130 pub fn new(index_type: String) -> Self {
132 Self {
133 index_type,
134 params: None,
135 }
136 }
137
138 pub fn with_params<ParamsType: Serialize>(mut self, params: ParamsType) -> Self {
140 self.params = Some(serde_json::to_string(¶ms).unwrap());
141 self
142 }
143}
144
145impl IndexParams for ScalarIndexParams {
146 fn as_any(&self) -> &dyn std::any::Any {
147 self
148 }
149
150 fn index_name(&self) -> &str {
151 LANCE_SCALAR_INDEX
152 }
153}
154
155impl IndexParams for InvertedIndexParams {
156 fn as_any(&self) -> &dyn std::any::Any {
157 self
158 }
159
160 fn index_name(&self) -> &str {
161 "INVERTED"
162 }
163}
164
165#[async_trait]
167pub trait IndexWriter: Send {
168 async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
172 async fn finish(&mut self) -> Result<()>;
174 async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
176}
177
178#[async_trait]
180pub trait IndexReader: Send + Sync {
181 async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
183 async fn read_range(
188 &self,
189 range: std::ops::Range<usize>,
190 projection: Option<&[&str]>,
191 ) -> Result<RecordBatch>;
192 async fn num_batches(&self, batch_size: u64) -> u32;
194 fn num_rows(&self) -> usize;
196 fn schema(&self) -> &lance_core::datatypes::Schema;
198}
199
200#[async_trait]
206pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
207 fn as_any(&self) -> &dyn Any;
208
209 fn io_parallelism(&self) -> usize;
211
212 async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
214 -> Result<Box<dyn IndexWriter>>;
215
216 async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
218
219 async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
223
224 async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
226
227 async fn delete_index_file(&self, name: &str) -> Result<()>;
229}
230
231pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
242 fn as_any(&self) -> &dyn Any;
244 fn format(&self, col: &str) -> String;
246 fn to_expr(&self, col: String) -> Expr;
248 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
250}
251
252impl PartialEq for dyn AnyQuery {
253 fn eq(&self, other: &Self) -> bool {
254 self.dyn_eq(other)
255 }
256}
257#[derive(Debug, Clone, PartialEq)]
259pub struct FullTextSearchQuery {
260 pub query: FtsQuery,
261
262 pub limit: Option<i64>,
264
265 pub wand_factor: Option<f32>,
270}
271
272impl FullTextSearchQuery {
273 pub fn new(query: String) -> Self {
275 let query = MatchQuery::new(query).into();
276 Self {
277 query,
278 limit: None,
279 wand_factor: None,
280 }
281 }
282
283 pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
285 let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
286 Self {
287 query,
288 limit: None,
289 wand_factor: None,
290 }
291 }
292
293 pub fn new_query(query: FtsQuery) -> Self {
295 Self {
296 query,
297 limit: None,
298 wand_factor: None,
299 }
300 }
301
302 pub fn with_column(mut self, column: String) -> Result<Self> {
305 self.query = fill_fts_query_column(&self.query, &[column], true)?;
306 Ok(self)
307 }
308
309 pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
312 self.query = fill_fts_query_column(&self.query, columns, true)?;
313 Ok(self)
314 }
315
316 pub fn limit(mut self, limit: Option<i64>) -> Self {
319 self.limit = limit;
320 self
321 }
322
323 pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
324 self.wand_factor = wand_factor;
325 self
326 }
327
328 pub fn columns(&self) -> HashSet<String> {
329 self.query.columns()
330 }
331
332 pub fn params(&self) -> FtsSearchParams {
333 let params = FtsSearchParams::new()
334 .with_limit(self.limit.map(|limit| limit as usize))
335 .with_wand_factor(self.wand_factor.unwrap_or(1.0));
336 match self.query {
337 FtsQuery::Phrase(ref query) => params.with_phrase_slop(Some(query.slop)),
338 _ => params,
339 }
340 }
341}
342
343#[derive(Debug, Clone, PartialEq)]
353pub enum SargableQuery {
354 Range(Bound<ScalarValue>, Bound<ScalarValue>),
356 IsIn(Vec<ScalarValue>),
358 Equals(ScalarValue),
360 FullTextSearch(FullTextSearchQuery),
362 IsNull(),
364}
365
366impl AnyQuery for SargableQuery {
367 fn as_any(&self) -> &dyn Any {
368 self
369 }
370
371 fn format(&self, col: &str) -> String {
372 match self {
373 Self::Range(lower, upper) => match (lower, upper) {
374 (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
375 (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
376 (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
377 (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
378 (Bound::Included(lhs), Bound::Included(rhs)) => {
379 format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
380 }
381 (Bound::Included(lhs), Bound::Excluded(rhs)) => {
382 format!("{} >= {} && {} < {}", col, lhs, col, rhs)
383 }
384 (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
385 (Bound::Excluded(lhs), Bound::Included(rhs)) => {
386 format!("{} > {} && {} <= {}", col, lhs, col, rhs)
387 }
388 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
389 format!("{} > {} && {} < {}", col, lhs, col, rhs)
390 }
391 },
392 Self::IsIn(values) => {
393 format!(
394 "{} IN [{}]",
395 col,
396 values
397 .iter()
398 .map(|val| val.to_string())
399 .collect::<Vec<_>>()
400 .join(",")
401 )
402 }
403 Self::FullTextSearch(query) => {
404 format!("fts({})", query.query)
405 }
406 Self::IsNull() => {
407 format!("{} IS NULL", col)
408 }
409 Self::Equals(val) => {
410 format!("{} = {}", col, val)
411 }
412 }
413 }
414
415 fn to_expr(&self, col: String) -> Expr {
416 let col_expr = Expr::Column(Column::new_unqualified(col));
417 match self {
418 Self::Range(lower, upper) => match (lower, upper) {
419 (Bound::Unbounded, Bound::Unbounded) => {
420 Expr::Literal(ScalarValue::Boolean(Some(true)), None)
421 }
422 (Bound::Unbounded, Bound::Included(rhs)) => {
423 col_expr.lt_eq(Expr::Literal(rhs.clone(), None))
424 }
425 (Bound::Unbounded, Bound::Excluded(rhs)) => {
426 col_expr.lt(Expr::Literal(rhs.clone(), None))
427 }
428 (Bound::Included(lhs), Bound::Unbounded) => {
429 col_expr.gt_eq(Expr::Literal(lhs.clone(), None))
430 }
431 (Bound::Included(lhs), Bound::Included(rhs)) => col_expr.between(
432 Expr::Literal(lhs.clone(), None),
433 Expr::Literal(rhs.clone(), None),
434 ),
435 (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
436 .clone()
437 .gt_eq(Expr::Literal(lhs.clone(), None))
438 .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
439 (Bound::Excluded(lhs), Bound::Unbounded) => {
440 col_expr.gt(Expr::Literal(lhs.clone(), None))
441 }
442 (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
443 .clone()
444 .gt(Expr::Literal(lhs.clone(), None))
445 .and(col_expr.lt_eq(Expr::Literal(rhs.clone(), None))),
446 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
447 .clone()
448 .gt(Expr::Literal(lhs.clone(), None))
449 .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
450 },
451 Self::IsIn(values) => col_expr.in_list(
452 values
453 .iter()
454 .map(|val| Expr::Literal(val.clone(), None))
455 .collect::<Vec<_>>(),
456 false,
457 ),
458 Self::FullTextSearch(query) => col_expr.like(Expr::Literal(
459 ScalarValue::Utf8(Some(query.query.to_string())),
460 None,
461 )),
462 Self::IsNull() => col_expr.is_null(),
463 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
464 }
465 }
466
467 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
468 match other.as_any().downcast_ref::<Self>() {
469 Some(o) => self == o,
470 None => false,
471 }
472 }
473}
474
475#[derive(Debug, Clone, PartialEq)]
477pub enum LabelListQuery {
478 HasAllLabels(Vec<ScalarValue>),
480 HasAnyLabel(Vec<ScalarValue>),
482}
483
484impl AnyQuery for LabelListQuery {
485 fn as_any(&self) -> &dyn Any {
486 self
487 }
488
489 fn format(&self, col: &str) -> String {
490 format!("{}", self.to_expr(col.to_string()))
491 }
492
493 fn to_expr(&self, col: String) -> Expr {
494 match self {
495 Self::HasAllLabels(labels) => {
496 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
497 let offsets_buffer =
498 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
499 let labels_list = ListArray::try_new(
500 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
501 offsets_buffer,
502 labels_arr,
503 None,
504 )
505 .unwrap();
506 let labels_arr = Arc::new(labels_list);
507 Expr::ScalarFunction(ScalarFunction {
508 func: Arc::new(array_has::ArrayHasAll::new().into()),
509 args: vec![
510 Expr::Column(Column::new_unqualified(col)),
511 Expr::Literal(ScalarValue::List(labels_arr), None),
512 ],
513 })
514 }
515 Self::HasAnyLabel(labels) => {
516 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
517 let offsets_buffer =
518 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
519 let labels_list = ListArray::try_new(
520 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
521 offsets_buffer,
522 labels_arr,
523 None,
524 )
525 .unwrap();
526 let labels_arr = Arc::new(labels_list);
527 Expr::ScalarFunction(ScalarFunction {
528 func: Arc::new(array_has::ArrayHasAny::new().into()),
529 args: vec![
530 Expr::Column(Column::new_unqualified(col)),
531 Expr::Literal(ScalarValue::List(labels_arr), None),
532 ],
533 })
534 }
535 }
536 }
537
538 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
539 match other.as_any().downcast_ref::<Self>() {
540 Some(o) => self == o,
541 None => false,
542 }
543 }
544}
545
546#[derive(Debug, Clone, PartialEq)]
548pub enum TextQuery {
549 StringContains(String),
551 }
555
556impl AnyQuery for TextQuery {
557 fn as_any(&self) -> &dyn Any {
558 self
559 }
560
561 fn format(&self, col: &str) -> String {
562 format!("{}", self.to_expr(col.to_string()))
563 }
564
565 fn to_expr(&self, col: String) -> Expr {
566 match self {
567 Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
568 func: Arc::new(ContainsFunc::new().into()),
569 args: vec![
570 Expr::Column(Column::new_unqualified(col)),
571 Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
572 ],
573 }),
574 }
575 }
576
577 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
578 match other.as_any().downcast_ref::<Self>() {
579 Some(o) => self == o,
580 None => false,
581 }
582 }
583}
584
585#[derive(Debug, Clone, PartialEq)]
587pub enum TokenQuery {
588 TokensContains(String),
591}
592
593impl AnyQuery for TokenQuery {
594 fn as_any(&self) -> &dyn Any {
595 self
596 }
597
598 fn format(&self, col: &str) -> String {
599 format!("{}", self.to_expr(col.to_string()))
600 }
601
602 fn to_expr(&self, col: String) -> Expr {
603 match self {
604 Self::TokensContains(substr) => Expr::ScalarFunction(ScalarFunction {
605 func: Arc::new(CONTAINS_TOKENS_UDF.clone()),
606 args: vec![
607 Expr::Column(Column::new_unqualified(col)),
608 Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
609 ],
610 }),
611 }
612 }
613
614 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
615 match other.as_any().downcast_ref::<Self>() {
616 Some(o) => self == o,
617 None => false,
618 }
619 }
620}
621
622#[derive(Debug, PartialEq)]
624pub enum SearchResult {
625 Exact(RowIdTreeMap),
627 AtMost(RowIdTreeMap),
631 AtLeast(RowIdTreeMap),
636}
637
638impl SearchResult {
639 pub fn row_ids(&self) -> &RowIdTreeMap {
640 match self {
641 Self::Exact(row_ids) => row_ids,
642 Self::AtMost(row_ids) => row_ids,
643 Self::AtLeast(row_ids) => row_ids,
644 }
645 }
646
647 pub fn is_exact(&self) -> bool {
648 matches!(self, Self::Exact(_))
649 }
650}
651
652pub struct CreatedIndex {
654 pub index_details: prost_types::Any,
659 pub index_version: u32,
663}
664
665pub struct UpdateCriteria {
667 pub requires_old_data: bool,
671 pub data_criteria: TrainingCriteria,
673}
674
675impl UpdateCriteria {
676 pub fn requires_old_data(data_criteria: TrainingCriteria) -> Self {
677 Self {
678 requires_old_data: true,
679 data_criteria,
680 }
681 }
682
683 pub fn only_new_data(data_criteria: TrainingCriteria) -> Self {
684 Self {
685 requires_old_data: false,
686 data_criteria,
687 }
688 }
689}
690
691#[async_trait]
693pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
694 async fn search(
698 &self,
699 query: &dyn AnyQuery,
700 metrics: &dyn MetricsCollector,
701 ) -> Result<SearchResult>;
702
703 fn can_remap(&self) -> bool;
705
706 async fn remap(
708 &self,
709 mapping: &HashMap<u64, Option<u64>>,
710 dest_store: &dyn IndexStore,
711 ) -> Result<CreatedIndex>;
712
713 async fn update(
715 &self,
716 new_data: SendableRecordBatchStream,
717 dest_store: &dyn IndexStore,
718 ) -> Result<CreatedIndex>;
719
720 fn update_criteria(&self) -> UpdateCriteria;
722}