1use std::collections::{HashMap, HashSet};
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
23use lance_core::utils::mask::RowIdTreeMap;
24use lance_core::{Error, Result};
25use snafu::location;
26
27use crate::metrics::MetricsCollector;
28use crate::{Index, IndexParams, IndexType};
29
30pub mod bitmap;
31pub mod btree;
32pub mod expression;
33pub mod flat;
34pub mod inverted;
35pub mod label_list;
36pub mod lance_format;
37pub mod ngram;
38
39pub use inverted::tokenizer::InvertedIndexParams;
40
41pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
42
43#[derive(Debug, Copy, Clone, PartialEq, Eq)]
44pub enum ScalarIndexType {
45 BTree,
46 Bitmap,
47 LabelList,
48 NGram,
49 Inverted,
50}
51
52impl TryFrom<IndexType> for ScalarIndexType {
53 type Error = Error;
54
55 fn try_from(value: IndexType) -> Result<Self> {
56 match value {
57 IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
58 IndexType::Bitmap => Ok(Self::Bitmap),
59 IndexType::LabelList => Ok(Self::LabelList),
60 IndexType::NGram => Ok(Self::NGram),
61 IndexType::Inverted => Ok(Self::Inverted),
62 _ => Err(Error::InvalidInput {
63 source: format!("Index type {:?} is not a scalar index", value).into(),
64 location: location!(),
65 }),
66 }
67 }
68}
69
70#[derive(Default)]
71pub struct ScalarIndexParams {
72 pub force_index_type: Option<ScalarIndexType>,
74}
75
76impl ScalarIndexParams {
77 pub fn new(index_type: ScalarIndexType) -> Self {
78 Self {
79 force_index_type: Some(index_type),
80 }
81 }
82}
83
84impl IndexParams for ScalarIndexParams {
85 fn as_any(&self) -> &dyn std::any::Any {
86 self
87 }
88
89 fn index_type(&self) -> IndexType {
90 match self.force_index_type {
91 Some(ScalarIndexType::BTree) | None => IndexType::BTree,
92 Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
93 Some(ScalarIndexType::LabelList) => IndexType::LabelList,
94 Some(ScalarIndexType::Inverted) => IndexType::Inverted,
95 Some(ScalarIndexType::NGram) => IndexType::NGram,
96 }
97 }
98
99 fn index_name(&self) -> &str {
100 LANCE_SCALAR_INDEX
101 }
102}
103
104impl IndexParams for InvertedIndexParams {
105 fn as_any(&self) -> &dyn std::any::Any {
106 self
107 }
108
109 fn index_type(&self) -> IndexType {
110 IndexType::Inverted
111 }
112
113 fn index_name(&self) -> &str {
114 "INVERTED"
115 }
116}
117
118#[async_trait]
120pub trait IndexWriter: Send {
121 async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
125 async fn finish(&mut self) -> Result<()>;
127 async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
129}
130
131#[async_trait]
133pub trait IndexReader: Send + Sync {
134 async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
136 async fn read_range(
141 &self,
142 range: std::ops::Range<usize>,
143 projection: Option<&[&str]>,
144 ) -> Result<RecordBatch>;
145 async fn num_batches(&self, batch_size: u64) -> u32;
147 fn num_rows(&self) -> usize;
149 fn schema(&self) -> &lance_core::datatypes::Schema;
151}
152
153#[async_trait]
159pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
160 fn as_any(&self) -> &dyn Any;
161
162 fn io_parallelism(&self) -> usize;
164
165 async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
167 -> Result<Box<dyn IndexWriter>>;
168
169 async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
171
172 async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
176
177 async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
179
180 async fn delete_index_file(&self, name: &str) -> Result<()>;
182}
183
184pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
195 fn as_any(&self) -> &dyn Any;
197 fn format(&self, col: &str) -> String;
199 fn to_expr(&self, col: String) -> Expr;
201 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
203 fn needs_recheck(&self) -> bool {
205 false
206 }
207}
208
209impl PartialEq for dyn AnyQuery {
210 fn eq(&self, other: &Self) -> bool {
211 self.dyn_eq(other)
212 }
213}
214#[derive(Debug, Clone, PartialEq)]
216pub struct FullTextSearchQuery {
217 pub query: FtsQuery,
218
219 pub limit: Option<i64>,
221
222 pub wand_factor: Option<f32>,
227}
228
229impl FullTextSearchQuery {
230 pub fn new(query: String) -> Self {
232 let query = MatchQuery::new(query).into();
233 Self {
234 query,
235 limit: None,
236 wand_factor: None,
237 }
238 }
239
240 pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
242 let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
243 Self {
244 query,
245 limit: None,
246 wand_factor: None,
247 }
248 }
249
250 pub fn new_query(query: FtsQuery) -> Self {
252 Self {
253 query,
254 limit: None,
255 wand_factor: None,
256 }
257 }
258
259 pub fn with_column(mut self, column: String) -> Result<Self> {
262 self.query = fill_fts_query_column(&self.query, &[column], true)?;
263 Ok(self)
264 }
265
266 pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
269 self.query = fill_fts_query_column(&self.query, columns, true)?;
270 Ok(self)
271 }
272
273 pub fn limit(mut self, limit: Option<i64>) -> Self {
276 self.limit = limit;
277 self
278 }
279
280 pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
281 self.wand_factor = wand_factor;
282 self
283 }
284
285 pub fn columns(&self) -> HashSet<String> {
286 self.query.columns()
287 }
288
289 pub fn params(&self) -> FtsSearchParams {
290 FtsSearchParams::new()
291 .with_limit(self.limit.map(|limit| limit as usize))
292 .with_wand_factor(self.wand_factor.unwrap_or(1.0))
293 }
294}
295
296#[derive(Debug, Clone, PartialEq)]
306pub enum SargableQuery {
307 Range(Bound<ScalarValue>, Bound<ScalarValue>),
309 IsIn(Vec<ScalarValue>),
311 Equals(ScalarValue),
313 FullTextSearch(FullTextSearchQuery),
315 IsNull(),
317}
318
319impl AnyQuery for SargableQuery {
320 fn as_any(&self) -> &dyn Any {
321 self
322 }
323
324 fn format(&self, col: &str) -> String {
325 match self {
326 Self::Range(lower, upper) => match (lower, upper) {
327 (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
328 (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
329 (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
330 (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
331 (Bound::Included(lhs), Bound::Included(rhs)) => {
332 format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
333 }
334 (Bound::Included(lhs), Bound::Excluded(rhs)) => {
335 format!("{} >= {} && {} < {}", col, lhs, col, rhs)
336 }
337 (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
338 (Bound::Excluded(lhs), Bound::Included(rhs)) => {
339 format!("{} > {} && {} <= {}", col, lhs, col, rhs)
340 }
341 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
342 format!("{} > {} && {} < {}", col, lhs, col, rhs)
343 }
344 },
345 Self::IsIn(values) => {
346 format!(
347 "{} IN [{}]",
348 col,
349 values
350 .iter()
351 .map(|val| val.to_string())
352 .collect::<Vec<_>>()
353 .join(",")
354 )
355 }
356 Self::FullTextSearch(query) => {
357 format!("fts({})", query.query)
358 }
359 Self::IsNull() => {
360 format!("{} IS NULL", col)
361 }
362 Self::Equals(val) => {
363 format!("{} = {}", col, val)
364 }
365 }
366 }
367
368 fn to_expr(&self, col: String) -> Expr {
369 let col_expr = Expr::Column(Column::new_unqualified(col));
370 match self {
371 Self::Range(lower, upper) => match (lower, upper) {
372 (Bound::Unbounded, Bound::Unbounded) => {
373 Expr::Literal(ScalarValue::Boolean(Some(true)))
374 }
375 (Bound::Unbounded, Bound::Included(rhs)) => {
376 col_expr.lt_eq(Expr::Literal(rhs.clone()))
377 }
378 (Bound::Unbounded, Bound::Excluded(rhs)) => col_expr.lt(Expr::Literal(rhs.clone())),
379 (Bound::Included(lhs), Bound::Unbounded) => {
380 col_expr.gt_eq(Expr::Literal(lhs.clone()))
381 }
382 (Bound::Included(lhs), Bound::Included(rhs)) => {
383 col_expr.between(Expr::Literal(lhs.clone()), Expr::Literal(rhs.clone()))
384 }
385 (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
386 .clone()
387 .gt_eq(Expr::Literal(lhs.clone()))
388 .and(col_expr.lt(Expr::Literal(rhs.clone()))),
389 (Bound::Excluded(lhs), Bound::Unbounded) => col_expr.gt(Expr::Literal(lhs.clone())),
390 (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
391 .clone()
392 .gt(Expr::Literal(lhs.clone()))
393 .and(col_expr.lt_eq(Expr::Literal(rhs.clone()))),
394 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
395 .clone()
396 .gt(Expr::Literal(lhs.clone()))
397 .and(col_expr.lt(Expr::Literal(rhs.clone()))),
398 },
399 Self::IsIn(values) => col_expr.in_list(
400 values
401 .iter()
402 .map(|val| Expr::Literal(val.clone()))
403 .collect::<Vec<_>>(),
404 false,
405 ),
406 Self::FullTextSearch(query) => col_expr.like(Expr::Literal(ScalarValue::Utf8(Some(
407 query.query.to_string(),
408 )))),
409 Self::IsNull() => col_expr.is_null(),
410 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone())),
411 }
412 }
413
414 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
415 match other.as_any().downcast_ref::<Self>() {
416 Some(o) => self == o,
417 None => false,
418 }
419 }
420}
421
422#[derive(Debug, Clone, PartialEq)]
424pub enum LabelListQuery {
425 HasAllLabels(Vec<ScalarValue>),
427 HasAnyLabel(Vec<ScalarValue>),
429}
430
431impl AnyQuery for LabelListQuery {
432 fn as_any(&self) -> &dyn Any {
433 self
434 }
435
436 fn format(&self, col: &str) -> String {
437 format!("{}", self.to_expr(col.to_string()))
438 }
439
440 fn to_expr(&self, col: String) -> Expr {
441 match self {
442 Self::HasAllLabels(labels) => {
443 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
444 let offsets_buffer =
445 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
446 let labels_list = ListArray::try_new(
447 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
448 offsets_buffer,
449 labels_arr,
450 None,
451 )
452 .unwrap();
453 let labels_arr = Arc::new(labels_list);
454 Expr::ScalarFunction(ScalarFunction {
455 func: Arc::new(array_has::ArrayHasAll::new().into()),
456 args: vec![
457 Expr::Column(Column::new_unqualified(col)),
458 Expr::Literal(ScalarValue::List(labels_arr)),
459 ],
460 })
461 }
462 Self::HasAnyLabel(labels) => {
463 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
464 let offsets_buffer =
465 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
466 let labels_list = ListArray::try_new(
467 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
468 offsets_buffer,
469 labels_arr,
470 None,
471 )
472 .unwrap();
473 let labels_arr = Arc::new(labels_list);
474 Expr::ScalarFunction(ScalarFunction {
475 func: Arc::new(array_has::ArrayHasAny::new().into()),
476 args: vec![
477 Expr::Column(Column::new_unqualified(col)),
478 Expr::Literal(ScalarValue::List(labels_arr)),
479 ],
480 })
481 }
482 }
483 }
484
485 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
486 match other.as_any().downcast_ref::<Self>() {
487 Some(o) => self == o,
488 None => false,
489 }
490 }
491}
492
493#[derive(Debug, Clone, PartialEq)]
495pub enum TextQuery {
496 StringContains(String),
498 }
502
503impl AnyQuery for TextQuery {
504 fn as_any(&self) -> &dyn Any {
505 self
506 }
507
508 fn format(&self, col: &str) -> String {
509 format!("{}", self.to_expr(col.to_string()))
510 }
511
512 fn to_expr(&self, col: String) -> Expr {
513 match self {
514 Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
515 func: Arc::new(ContainsFunc::new().into()),
516 args: vec![
517 Expr::Column(Column::new_unqualified(col)),
518 Expr::Literal(ScalarValue::Utf8(Some(substr.clone()))),
519 ],
520 }),
521 }
522 }
523
524 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
525 match other.as_any().downcast_ref::<Self>() {
526 Some(o) => self == o,
527 None => false,
528 }
529 }
530
531 fn needs_recheck(&self) -> bool {
532 true
533 }
534}
535
536#[derive(Debug, PartialEq)]
538pub enum SearchResult {
539 Exact(RowIdTreeMap),
541 AtMost(RowIdTreeMap),
545 AtLeast(RowIdTreeMap),
550}
551
552impl SearchResult {
553 pub fn row_ids(&self) -> &RowIdTreeMap {
554 match self {
555 Self::Exact(row_ids) => row_ids,
556 Self::AtMost(row_ids) => row_ids,
557 Self::AtLeast(row_ids) => row_ids,
558 }
559 }
560
561 pub fn is_exact(&self) -> bool {
562 matches!(self, Self::Exact(_))
563 }
564}
565
566#[async_trait]
568pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
569 async fn search(
573 &self,
574 query: &dyn AnyQuery,
575 metrics: &dyn MetricsCollector,
576 ) -> Result<SearchResult>;
577
578 fn can_answer_exact(&self, query: &dyn AnyQuery) -> bool;
583
584 async fn load(store: Arc<dyn IndexStore>) -> Result<Arc<Self>>
586 where
587 Self: Sized;
588
589 async fn remap(
591 &self,
592 mapping: &HashMap<u64, Option<u64>>,
593 dest_store: &dyn IndexStore,
594 ) -> Result<()>;
595
596 async fn update(
598 &self,
599 new_data: SendableRecordBatchStream,
600 dest_store: &dyn IndexStore,
601 ) -> Result<()>;
602}