1use std::collections::{HashMap, HashSet};
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
23use lance_core::cache::LanceCache;
24use lance_core::utils::mask::RowIdTreeMap;
25use lance_core::{Error, Result};
26use snafu::location;
27
28use crate::metrics::MetricsCollector;
29use crate::{Index, IndexParams, IndexType};
30
31pub mod bitmap;
32pub mod btree;
33pub mod expression;
34pub mod flat;
35pub mod inverted;
36pub mod label_list;
37pub mod lance_format;
38pub mod ngram;
39pub mod zonemap;
40
41use crate::frag_reuse::FragReuseIndex;
42pub use inverted::tokenizer::InvertedIndexParams;
43
44pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
45
46#[derive(Debug, Copy, Clone, PartialEq, Eq, DeepSizeOf)]
47pub enum ScalarIndexType {
48 BTree,
49 Bitmap,
50 LabelList,
51 NGram,
52 ZoneMap,
53 Inverted,
54}
55
56impl TryFrom<IndexType> for ScalarIndexType {
57 type Error = Error;
58
59 fn try_from(value: IndexType) -> Result<Self> {
60 match value {
61 IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
62 IndexType::Bitmap => Ok(Self::Bitmap),
63 IndexType::LabelList => Ok(Self::LabelList),
64 IndexType::NGram => Ok(Self::NGram),
65 IndexType::ZoneMap => Ok(Self::ZoneMap),
66 IndexType::Inverted => Ok(Self::Inverted),
67 _ => Err(Error::InvalidInput {
68 source: format!("Index type {:?} is not a scalar index", value).into(),
69 location: location!(),
70 }),
71 }
72 }
73}
74
75impl From<ScalarIndexType> for IndexType {
76 fn from(val: ScalarIndexType) -> Self {
77 match val {
78 ScalarIndexType::BTree => Self::BTree,
79 ScalarIndexType::Bitmap => Self::Bitmap,
80 ScalarIndexType::LabelList => Self::LabelList,
81 ScalarIndexType::NGram => Self::NGram,
82 ScalarIndexType::ZoneMap => Self::ZoneMap,
83 ScalarIndexType::Inverted => Self::Inverted,
84 }
85 }
86}
87
88#[derive(Default)]
89pub struct ScalarIndexParams {
90 pub force_index_type: Option<ScalarIndexType>,
92}
93
94impl ScalarIndexParams {
95 pub fn new(index_type: ScalarIndexType) -> Self {
96 Self {
97 force_index_type: Some(index_type),
98 }
99 }
100}
101
102impl IndexParams for ScalarIndexParams {
103 fn as_any(&self) -> &dyn std::any::Any {
104 self
105 }
106
107 fn index_type(&self) -> IndexType {
108 match self.force_index_type {
109 Some(ScalarIndexType::BTree) | None => IndexType::BTree,
110 Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
111 Some(ScalarIndexType::LabelList) => IndexType::LabelList,
112 Some(ScalarIndexType::Inverted) => IndexType::Inverted,
113 Some(ScalarIndexType::NGram) => IndexType::NGram,
114 Some(ScalarIndexType::ZoneMap) => IndexType::ZoneMap,
115 }
116 }
117
118 fn index_name(&self) -> &str {
119 LANCE_SCALAR_INDEX
120 }
121}
122
123impl IndexParams for InvertedIndexParams {
124 fn as_any(&self) -> &dyn std::any::Any {
125 self
126 }
127
128 fn index_type(&self) -> IndexType {
129 IndexType::Inverted
130 }
131
132 fn index_name(&self) -> &str {
133 "INVERTED"
134 }
135}
136
137#[async_trait]
139pub trait IndexWriter: Send {
140 async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
144 async fn finish(&mut self) -> Result<()>;
146 async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
148}
149
150#[async_trait]
152pub trait IndexReader: Send + Sync {
153 async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
155 async fn read_range(
160 &self,
161 range: std::ops::Range<usize>,
162 projection: Option<&[&str]>,
163 ) -> Result<RecordBatch>;
164 async fn num_batches(&self, batch_size: u64) -> u32;
166 fn num_rows(&self) -> usize;
168 fn schema(&self) -> &lance_core::datatypes::Schema;
170}
171
172#[async_trait]
178pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
179 fn as_any(&self) -> &dyn Any;
180
181 fn io_parallelism(&self) -> usize;
183
184 async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
186 -> Result<Box<dyn IndexWriter>>;
187
188 async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
190
191 async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
195
196 async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
198
199 async fn delete_index_file(&self, name: &str) -> Result<()>;
201}
202
203pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
214 fn as_any(&self) -> &dyn Any;
216 fn format(&self, col: &str) -> String;
218 fn to_expr(&self, col: String) -> Expr;
220 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
222}
223
224impl PartialEq for dyn AnyQuery {
225 fn eq(&self, other: &Self) -> bool {
226 self.dyn_eq(other)
227 }
228}
229#[derive(Debug, Clone, PartialEq)]
231pub struct FullTextSearchQuery {
232 pub query: FtsQuery,
233
234 pub limit: Option<i64>,
236
237 pub wand_factor: Option<f32>,
242}
243
244impl FullTextSearchQuery {
245 pub fn new(query: String) -> Self {
247 let query = MatchQuery::new(query).into();
248 Self {
249 query,
250 limit: None,
251 wand_factor: None,
252 }
253 }
254
255 pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
257 let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
258 Self {
259 query,
260 limit: None,
261 wand_factor: None,
262 }
263 }
264
265 pub fn new_query(query: FtsQuery) -> Self {
267 Self {
268 query,
269 limit: None,
270 wand_factor: None,
271 }
272 }
273
274 pub fn with_column(mut self, column: String) -> Result<Self> {
277 self.query = fill_fts_query_column(&self.query, &[column], true)?;
278 Ok(self)
279 }
280
281 pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
284 self.query = fill_fts_query_column(&self.query, columns, true)?;
285 Ok(self)
286 }
287
288 pub fn limit(mut self, limit: Option<i64>) -> Self {
291 self.limit = limit;
292 self
293 }
294
295 pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
296 self.wand_factor = wand_factor;
297 self
298 }
299
300 pub fn columns(&self) -> HashSet<String> {
301 self.query.columns()
302 }
303
304 pub fn params(&self) -> FtsSearchParams {
305 let params = FtsSearchParams::new()
306 .with_limit(self.limit.map(|limit| limit as usize))
307 .with_wand_factor(self.wand_factor.unwrap_or(1.0));
308 match self.query {
309 FtsQuery::Phrase(ref query) => params.with_phrase_slop(Some(query.slop)),
310 _ => params,
311 }
312 }
313}
314
315#[derive(Debug, Clone, PartialEq)]
325pub enum SargableQuery {
326 Range(Bound<ScalarValue>, Bound<ScalarValue>),
328 IsIn(Vec<ScalarValue>),
330 Equals(ScalarValue),
332 FullTextSearch(FullTextSearchQuery),
334 IsNull(),
336}
337
338impl AnyQuery for SargableQuery {
339 fn as_any(&self) -> &dyn Any {
340 self
341 }
342
343 fn format(&self, col: &str) -> String {
344 match self {
345 Self::Range(lower, upper) => match (lower, upper) {
346 (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
347 (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
348 (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
349 (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
350 (Bound::Included(lhs), Bound::Included(rhs)) => {
351 format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
352 }
353 (Bound::Included(lhs), Bound::Excluded(rhs)) => {
354 format!("{} >= {} && {} < {}", col, lhs, col, rhs)
355 }
356 (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
357 (Bound::Excluded(lhs), Bound::Included(rhs)) => {
358 format!("{} > {} && {} <= {}", col, lhs, col, rhs)
359 }
360 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
361 format!("{} > {} && {} < {}", col, lhs, col, rhs)
362 }
363 },
364 Self::IsIn(values) => {
365 format!(
366 "{} IN [{}]",
367 col,
368 values
369 .iter()
370 .map(|val| val.to_string())
371 .collect::<Vec<_>>()
372 .join(",")
373 )
374 }
375 Self::FullTextSearch(query) => {
376 format!("fts({})", query.query)
377 }
378 Self::IsNull() => {
379 format!("{} IS NULL", col)
380 }
381 Self::Equals(val) => {
382 format!("{} = {}", col, val)
383 }
384 }
385 }
386
387 fn to_expr(&self, col: String) -> Expr {
388 let col_expr = Expr::Column(Column::new_unqualified(col));
389 match self {
390 Self::Range(lower, upper) => match (lower, upper) {
391 (Bound::Unbounded, Bound::Unbounded) => {
392 Expr::Literal(ScalarValue::Boolean(Some(true)), None)
393 }
394 (Bound::Unbounded, Bound::Included(rhs)) => {
395 col_expr.lt_eq(Expr::Literal(rhs.clone(), None))
396 }
397 (Bound::Unbounded, Bound::Excluded(rhs)) => {
398 col_expr.lt(Expr::Literal(rhs.clone(), None))
399 }
400 (Bound::Included(lhs), Bound::Unbounded) => {
401 col_expr.gt_eq(Expr::Literal(lhs.clone(), None))
402 }
403 (Bound::Included(lhs), Bound::Included(rhs)) => col_expr.between(
404 Expr::Literal(lhs.clone(), None),
405 Expr::Literal(rhs.clone(), None),
406 ),
407 (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
408 .clone()
409 .gt_eq(Expr::Literal(lhs.clone(), None))
410 .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
411 (Bound::Excluded(lhs), Bound::Unbounded) => {
412 col_expr.gt(Expr::Literal(lhs.clone(), None))
413 }
414 (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
415 .clone()
416 .gt(Expr::Literal(lhs.clone(), None))
417 .and(col_expr.lt_eq(Expr::Literal(rhs.clone(), None))),
418 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
419 .clone()
420 .gt(Expr::Literal(lhs.clone(), None))
421 .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
422 },
423 Self::IsIn(values) => col_expr.in_list(
424 values
425 .iter()
426 .map(|val| Expr::Literal(val.clone(), None))
427 .collect::<Vec<_>>(),
428 false,
429 ),
430 Self::FullTextSearch(query) => col_expr.like(Expr::Literal(
431 ScalarValue::Utf8(Some(query.query.to_string())),
432 None,
433 )),
434 Self::IsNull() => col_expr.is_null(),
435 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
436 }
437 }
438
439 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
440 match other.as_any().downcast_ref::<Self>() {
441 Some(o) => self == o,
442 None => false,
443 }
444 }
445}
446
447#[derive(Debug, Clone, PartialEq)]
449pub enum LabelListQuery {
450 HasAllLabels(Vec<ScalarValue>),
452 HasAnyLabel(Vec<ScalarValue>),
454}
455
456impl AnyQuery for LabelListQuery {
457 fn as_any(&self) -> &dyn Any {
458 self
459 }
460
461 fn format(&self, col: &str) -> String {
462 format!("{}", self.to_expr(col.to_string()))
463 }
464
465 fn to_expr(&self, col: String) -> Expr {
466 match self {
467 Self::HasAllLabels(labels) => {
468 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
469 let offsets_buffer =
470 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
471 let labels_list = ListArray::try_new(
472 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
473 offsets_buffer,
474 labels_arr,
475 None,
476 )
477 .unwrap();
478 let labels_arr = Arc::new(labels_list);
479 Expr::ScalarFunction(ScalarFunction {
480 func: Arc::new(array_has::ArrayHasAll::new().into()),
481 args: vec![
482 Expr::Column(Column::new_unqualified(col)),
483 Expr::Literal(ScalarValue::List(labels_arr), None),
484 ],
485 })
486 }
487 Self::HasAnyLabel(labels) => {
488 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
489 let offsets_buffer =
490 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
491 let labels_list = ListArray::try_new(
492 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
493 offsets_buffer,
494 labels_arr,
495 None,
496 )
497 .unwrap();
498 let labels_arr = Arc::new(labels_list);
499 Expr::ScalarFunction(ScalarFunction {
500 func: Arc::new(array_has::ArrayHasAny::new().into()),
501 args: vec![
502 Expr::Column(Column::new_unqualified(col)),
503 Expr::Literal(ScalarValue::List(labels_arr), None),
504 ],
505 })
506 }
507 }
508 }
509
510 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
511 match other.as_any().downcast_ref::<Self>() {
512 Some(o) => self == o,
513 None => false,
514 }
515 }
516}
517
518#[derive(Debug, Clone, PartialEq)]
520pub enum TextQuery {
521 StringContains(String),
523 }
527
528impl AnyQuery for TextQuery {
529 fn as_any(&self) -> &dyn Any {
530 self
531 }
532
533 fn format(&self, col: &str) -> String {
534 format!("{}", self.to_expr(col.to_string()))
535 }
536
537 fn to_expr(&self, col: String) -> Expr {
538 match self {
539 Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
540 func: Arc::new(ContainsFunc::new().into()),
541 args: vec![
542 Expr::Column(Column::new_unqualified(col)),
543 Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
544 ],
545 }),
546 }
547 }
548
549 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
550 match other.as_any().downcast_ref::<Self>() {
551 Some(o) => self == o,
552 None => false,
553 }
554 }
555}
556
557#[derive(Debug, PartialEq)]
559pub enum SearchResult {
560 Exact(RowIdTreeMap),
562 AtMost(RowIdTreeMap),
566 AtLeast(RowIdTreeMap),
571}
572
573impl SearchResult {
574 pub fn row_ids(&self) -> &RowIdTreeMap {
575 match self {
576 Self::Exact(row_ids) => row_ids,
577 Self::AtMost(row_ids) => row_ids,
578 Self::AtLeast(row_ids) => row_ids,
579 }
580 }
581
582 pub fn is_exact(&self) -> bool {
583 matches!(self, Self::Exact(_))
584 }
585}
586
587#[async_trait]
589pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
590 async fn search(
594 &self,
595 query: &dyn AnyQuery,
596 metrics: &dyn MetricsCollector,
597 ) -> Result<SearchResult>;
598
599 async fn load(
601 store: Arc<dyn IndexStore>,
602 frag_reuse_index: Option<Arc<FragReuseIndex>>,
603 index_cache: LanceCache,
604 ) -> Result<Arc<Self>>
605 where
606 Self: Sized;
607
608 fn can_remap(&self) -> bool;
610
611 async fn remap(
613 &self,
614 mapping: &HashMap<u64, Option<u64>>,
615 dest_store: &dyn IndexStore,
616 ) -> Result<()>;
617
618 async fn update(
620 &self,
621 new_data: SendableRecordBatchStream,
622 dest_store: &dyn IndexStore,
623 ) -> Result<()>;
624}