1use std::collections::{HashMap, HashSet};
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
23use lance_core::cache::LanceCache;
24use lance_core::utils::mask::RowIdTreeMap;
25use lance_core::{Error, Result};
26use snafu::location;
27
28use crate::metrics::MetricsCollector;
29use crate::{Index, IndexParams, IndexType};
30
31pub mod bitmap;
32pub mod btree;
33pub mod expression;
34pub mod flat;
35pub mod inverted;
36pub mod label_list;
37pub mod lance_format;
38pub mod ngram;
39
40use crate::frag_reuse::FragReuseIndex;
41pub use inverted::tokenizer::InvertedIndexParams;
42
43pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
44
45#[derive(Debug, Copy, Clone, PartialEq, Eq, DeepSizeOf)]
46pub enum ScalarIndexType {
47 BTree,
48 Bitmap,
49 LabelList,
50 NGram,
51 Inverted,
52}
53
54impl TryFrom<IndexType> for ScalarIndexType {
55 type Error = Error;
56
57 fn try_from(value: IndexType) -> Result<Self> {
58 match value {
59 IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
60 IndexType::Bitmap => Ok(Self::Bitmap),
61 IndexType::LabelList => Ok(Self::LabelList),
62 IndexType::NGram => Ok(Self::NGram),
63 IndexType::Inverted => Ok(Self::Inverted),
64 _ => Err(Error::InvalidInput {
65 source: format!("Index type {:?} is not a scalar index", value).into(),
66 location: location!(),
67 }),
68 }
69 }
70}
71
72impl From<ScalarIndexType> for IndexType {
73 fn from(val: ScalarIndexType) -> Self {
74 match val {
75 ScalarIndexType::BTree => Self::BTree,
76 ScalarIndexType::Bitmap => Self::Bitmap,
77 ScalarIndexType::LabelList => Self::LabelList,
78 ScalarIndexType::NGram => Self::NGram,
79 ScalarIndexType::Inverted => Self::Inverted,
80 }
81 }
82}
83
84#[derive(Default)]
85pub struct ScalarIndexParams {
86 pub force_index_type: Option<ScalarIndexType>,
88}
89
90impl ScalarIndexParams {
91 pub fn new(index_type: ScalarIndexType) -> Self {
92 Self {
93 force_index_type: Some(index_type),
94 }
95 }
96}
97
98impl IndexParams for ScalarIndexParams {
99 fn as_any(&self) -> &dyn std::any::Any {
100 self
101 }
102
103 fn index_type(&self) -> IndexType {
104 match self.force_index_type {
105 Some(ScalarIndexType::BTree) | None => IndexType::BTree,
106 Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
107 Some(ScalarIndexType::LabelList) => IndexType::LabelList,
108 Some(ScalarIndexType::Inverted) => IndexType::Inverted,
109 Some(ScalarIndexType::NGram) => IndexType::NGram,
110 }
111 }
112
113 fn index_name(&self) -> &str {
114 LANCE_SCALAR_INDEX
115 }
116}
117
118impl IndexParams for InvertedIndexParams {
119 fn as_any(&self) -> &dyn std::any::Any {
120 self
121 }
122
123 fn index_type(&self) -> IndexType {
124 IndexType::Inverted
125 }
126
127 fn index_name(&self) -> &str {
128 "INVERTED"
129 }
130}
131
132#[async_trait]
134pub trait IndexWriter: Send {
135 async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
139 async fn finish(&mut self) -> Result<()>;
141 async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
143}
144
145#[async_trait]
147pub trait IndexReader: Send + Sync {
148 async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
150 async fn read_range(
155 &self,
156 range: std::ops::Range<usize>,
157 projection: Option<&[&str]>,
158 ) -> Result<RecordBatch>;
159 async fn num_batches(&self, batch_size: u64) -> u32;
161 fn num_rows(&self) -> usize;
163 fn schema(&self) -> &lance_core::datatypes::Schema;
165}
166
167#[async_trait]
173pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
174 fn as_any(&self) -> &dyn Any;
175
176 fn io_parallelism(&self) -> usize;
178
179 async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
181 -> Result<Box<dyn IndexWriter>>;
182
183 async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
185
186 async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
190
191 async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
193
194 async fn delete_index_file(&self, name: &str) -> Result<()>;
196}
197
198pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
209 fn as_any(&self) -> &dyn Any;
211 fn format(&self, col: &str) -> String;
213 fn to_expr(&self, col: String) -> Expr;
215 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
217 fn needs_recheck(&self) -> bool {
219 false
220 }
221}
222
223impl PartialEq for dyn AnyQuery {
224 fn eq(&self, other: &Self) -> bool {
225 self.dyn_eq(other)
226 }
227}
228#[derive(Debug, Clone, PartialEq)]
230pub struct FullTextSearchQuery {
231 pub query: FtsQuery,
232
233 pub limit: Option<i64>,
235
236 pub wand_factor: Option<f32>,
241}
242
243impl FullTextSearchQuery {
244 pub fn new(query: String) -> Self {
246 let query = MatchQuery::new(query).into();
247 Self {
248 query,
249 limit: None,
250 wand_factor: None,
251 }
252 }
253
254 pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
256 let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
257 Self {
258 query,
259 limit: None,
260 wand_factor: None,
261 }
262 }
263
264 pub fn new_query(query: FtsQuery) -> Self {
266 Self {
267 query,
268 limit: None,
269 wand_factor: None,
270 }
271 }
272
273 pub fn with_column(mut self, column: String) -> Result<Self> {
276 self.query = fill_fts_query_column(&self.query, &[column], true)?;
277 Ok(self)
278 }
279
280 pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
283 self.query = fill_fts_query_column(&self.query, columns, true)?;
284 Ok(self)
285 }
286
287 pub fn limit(mut self, limit: Option<i64>) -> Self {
290 self.limit = limit;
291 self
292 }
293
294 pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
295 self.wand_factor = wand_factor;
296 self
297 }
298
299 pub fn columns(&self) -> HashSet<String> {
300 self.query.columns()
301 }
302
303 pub fn params(&self) -> FtsSearchParams {
304 let params = FtsSearchParams::new()
305 .with_limit(self.limit.map(|limit| limit as usize))
306 .with_wand_factor(self.wand_factor.unwrap_or(1.0));
307 match self.query {
308 FtsQuery::Phrase(ref query) => params.with_phrase_slop(Some(query.slop)),
309 _ => params,
310 }
311 }
312}
313
314#[derive(Debug, Clone, PartialEq)]
324pub enum SargableQuery {
325 Range(Bound<ScalarValue>, Bound<ScalarValue>),
327 IsIn(Vec<ScalarValue>),
329 Equals(ScalarValue),
331 FullTextSearch(FullTextSearchQuery),
333 IsNull(),
335}
336
337impl AnyQuery for SargableQuery {
338 fn as_any(&self) -> &dyn Any {
339 self
340 }
341
342 fn format(&self, col: &str) -> String {
343 match self {
344 Self::Range(lower, upper) => match (lower, upper) {
345 (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
346 (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
347 (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
348 (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
349 (Bound::Included(lhs), Bound::Included(rhs)) => {
350 format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
351 }
352 (Bound::Included(lhs), Bound::Excluded(rhs)) => {
353 format!("{} >= {} && {} < {}", col, lhs, col, rhs)
354 }
355 (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
356 (Bound::Excluded(lhs), Bound::Included(rhs)) => {
357 format!("{} > {} && {} <= {}", col, lhs, col, rhs)
358 }
359 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
360 format!("{} > {} && {} < {}", col, lhs, col, rhs)
361 }
362 },
363 Self::IsIn(values) => {
364 format!(
365 "{} IN [{}]",
366 col,
367 values
368 .iter()
369 .map(|val| val.to_string())
370 .collect::<Vec<_>>()
371 .join(",")
372 )
373 }
374 Self::FullTextSearch(query) => {
375 format!("fts({})", query.query)
376 }
377 Self::IsNull() => {
378 format!("{} IS NULL", col)
379 }
380 Self::Equals(val) => {
381 format!("{} = {}", col, val)
382 }
383 }
384 }
385
386 fn to_expr(&self, col: String) -> Expr {
387 let col_expr = Expr::Column(Column::new_unqualified(col));
388 match self {
389 Self::Range(lower, upper) => match (lower, upper) {
390 (Bound::Unbounded, Bound::Unbounded) => {
391 Expr::Literal(ScalarValue::Boolean(Some(true)), None)
392 }
393 (Bound::Unbounded, Bound::Included(rhs)) => {
394 col_expr.lt_eq(Expr::Literal(rhs.clone(), None))
395 }
396 (Bound::Unbounded, Bound::Excluded(rhs)) => {
397 col_expr.lt(Expr::Literal(rhs.clone(), None))
398 }
399 (Bound::Included(lhs), Bound::Unbounded) => {
400 col_expr.gt_eq(Expr::Literal(lhs.clone(), None))
401 }
402 (Bound::Included(lhs), Bound::Included(rhs)) => col_expr.between(
403 Expr::Literal(lhs.clone(), None),
404 Expr::Literal(rhs.clone(), None),
405 ),
406 (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
407 .clone()
408 .gt_eq(Expr::Literal(lhs.clone(), None))
409 .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
410 (Bound::Excluded(lhs), Bound::Unbounded) => {
411 col_expr.gt(Expr::Literal(lhs.clone(), None))
412 }
413 (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
414 .clone()
415 .gt(Expr::Literal(lhs.clone(), None))
416 .and(col_expr.lt_eq(Expr::Literal(rhs.clone(), None))),
417 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
418 .clone()
419 .gt(Expr::Literal(lhs.clone(), None))
420 .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
421 },
422 Self::IsIn(values) => col_expr.in_list(
423 values
424 .iter()
425 .map(|val| Expr::Literal(val.clone(), None))
426 .collect::<Vec<_>>(),
427 false,
428 ),
429 Self::FullTextSearch(query) => col_expr.like(Expr::Literal(
430 ScalarValue::Utf8(Some(query.query.to_string())),
431 None,
432 )),
433 Self::IsNull() => col_expr.is_null(),
434 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
435 }
436 }
437
438 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
439 match other.as_any().downcast_ref::<Self>() {
440 Some(o) => self == o,
441 None => false,
442 }
443 }
444}
445
446#[derive(Debug, Clone, PartialEq)]
448pub enum LabelListQuery {
449 HasAllLabels(Vec<ScalarValue>),
451 HasAnyLabel(Vec<ScalarValue>),
453}
454
455impl AnyQuery for LabelListQuery {
456 fn as_any(&self) -> &dyn Any {
457 self
458 }
459
460 fn format(&self, col: &str) -> String {
461 format!("{}", self.to_expr(col.to_string()))
462 }
463
464 fn to_expr(&self, col: String) -> Expr {
465 match self {
466 Self::HasAllLabels(labels) => {
467 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
468 let offsets_buffer =
469 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
470 let labels_list = ListArray::try_new(
471 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
472 offsets_buffer,
473 labels_arr,
474 None,
475 )
476 .unwrap();
477 let labels_arr = Arc::new(labels_list);
478 Expr::ScalarFunction(ScalarFunction {
479 func: Arc::new(array_has::ArrayHasAll::new().into()),
480 args: vec![
481 Expr::Column(Column::new_unqualified(col)),
482 Expr::Literal(ScalarValue::List(labels_arr), None),
483 ],
484 })
485 }
486 Self::HasAnyLabel(labels) => {
487 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
488 let offsets_buffer =
489 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
490 let labels_list = ListArray::try_new(
491 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
492 offsets_buffer,
493 labels_arr,
494 None,
495 )
496 .unwrap();
497 let labels_arr = Arc::new(labels_list);
498 Expr::ScalarFunction(ScalarFunction {
499 func: Arc::new(array_has::ArrayHasAny::new().into()),
500 args: vec![
501 Expr::Column(Column::new_unqualified(col)),
502 Expr::Literal(ScalarValue::List(labels_arr), None),
503 ],
504 })
505 }
506 }
507 }
508
509 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
510 match other.as_any().downcast_ref::<Self>() {
511 Some(o) => self == o,
512 None => false,
513 }
514 }
515}
516
517#[derive(Debug, Clone, PartialEq)]
519pub enum TextQuery {
520 StringContains(String),
522 }
526
527impl AnyQuery for TextQuery {
528 fn as_any(&self) -> &dyn Any {
529 self
530 }
531
532 fn format(&self, col: &str) -> String {
533 format!("{}", self.to_expr(col.to_string()))
534 }
535
536 fn to_expr(&self, col: String) -> Expr {
537 match self {
538 Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
539 func: Arc::new(ContainsFunc::new().into()),
540 args: vec![
541 Expr::Column(Column::new_unqualified(col)),
542 Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
543 ],
544 }),
545 }
546 }
547
548 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
549 match other.as_any().downcast_ref::<Self>() {
550 Some(o) => self == o,
551 None => false,
552 }
553 }
554
555 fn needs_recheck(&self) -> bool {
556 true
557 }
558}
559
560#[derive(Debug, PartialEq)]
562pub enum SearchResult {
563 Exact(RowIdTreeMap),
565 AtMost(RowIdTreeMap),
569 AtLeast(RowIdTreeMap),
574}
575
576impl SearchResult {
577 pub fn row_ids(&self) -> &RowIdTreeMap {
578 match self {
579 Self::Exact(row_ids) => row_ids,
580 Self::AtMost(row_ids) => row_ids,
581 Self::AtLeast(row_ids) => row_ids,
582 }
583 }
584
585 pub fn is_exact(&self) -> bool {
586 matches!(self, Self::Exact(_))
587 }
588}
589
590#[async_trait]
592pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
593 async fn search(
597 &self,
598 query: &dyn AnyQuery,
599 metrics: &dyn MetricsCollector,
600 ) -> Result<SearchResult>;
601
602 fn can_answer_exact(&self, query: &dyn AnyQuery) -> bool;
607
608 async fn load(
610 store: Arc<dyn IndexStore>,
611 frag_reuse_index: Option<Arc<FragReuseIndex>>,
612 index_cache: LanceCache,
613 ) -> Result<Arc<Self>>
614 where
615 Self: Sized;
616
617 async fn remap(
619 &self,
620 mapping: &HashMap<u64, Option<u64>>,
621 dest_store: &dyn IndexStore,
622 ) -> Result<()>;
623
624 async fn update(
626 &self,
627 new_data: SendableRecordBatchStream,
628 dest_store: &dyn IndexStore,
629 ) -> Result<()>;
630}