1use std::collections::{HashMap, HashSet};
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
23use inverted::TokenizerConfig;
24use lance_core::utils::mask::RowIdTreeMap;
25use lance_core::{Error, Result};
26use serde::{Deserialize, Serialize};
27use snafu::location;
28
29use crate::metrics::MetricsCollector;
30use crate::{Index, IndexParams, IndexType};
31
32pub mod bitmap;
33pub mod btree;
34pub mod expression;
35pub mod flat;
36pub mod inverted;
37pub mod label_list;
38pub mod lance_format;
39pub mod ngram;
40
41pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
42
43#[derive(Debug, Copy, Clone)]
44pub enum ScalarIndexType {
45 BTree,
46 Bitmap,
47 LabelList,
48 NGram,
49 Inverted,
50}
51
52impl TryFrom<IndexType> for ScalarIndexType {
53 type Error = Error;
54
55 fn try_from(value: IndexType) -> Result<Self> {
56 match value {
57 IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
58 IndexType::Bitmap => Ok(Self::Bitmap),
59 IndexType::LabelList => Ok(Self::LabelList),
60 IndexType::NGram => Ok(Self::NGram),
61 IndexType::Inverted => Ok(Self::Inverted),
62 _ => Err(Error::InvalidInput {
63 source: format!("Index type {:?} is not a scalar index", value).into(),
64 location: location!(),
65 }),
66 }
67 }
68}
69
70#[derive(Default)]
71pub struct ScalarIndexParams {
72 pub force_index_type: Option<ScalarIndexType>,
74}
75
76impl ScalarIndexParams {
77 pub fn new(index_type: ScalarIndexType) -> Self {
78 Self {
79 force_index_type: Some(index_type),
80 }
81 }
82}
83
84impl IndexParams for ScalarIndexParams {
85 fn as_any(&self) -> &dyn std::any::Any {
86 self
87 }
88
89 fn index_type(&self) -> IndexType {
90 match self.force_index_type {
91 Some(ScalarIndexType::BTree) | None => IndexType::BTree,
92 Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
93 Some(ScalarIndexType::LabelList) => IndexType::LabelList,
94 Some(ScalarIndexType::Inverted) => IndexType::Inverted,
95 Some(ScalarIndexType::NGram) => IndexType::NGram,
96 }
97 }
98
99 fn index_name(&self) -> &str {
100 LANCE_SCALAR_INDEX
101 }
102}
103
104#[derive(Clone, Serialize, Deserialize)]
105pub struct InvertedIndexParams {
106 pub with_position: bool,
111
112 #[serde(flatten)]
113 pub tokenizer_config: TokenizerConfig,
114}
115
116impl Debug for InvertedIndexParams {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 f.debug_struct("InvertedIndexParams")
119 .field("with_position", &self.with_position)
120 .finish()
121 }
122}
123
124impl DeepSizeOf for InvertedIndexParams {
125 fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize {
126 0
127 }
128}
129
130impl Default for InvertedIndexParams {
131 fn default() -> Self {
132 Self {
133 with_position: true,
134 tokenizer_config: TokenizerConfig::default(),
135 }
136 }
137}
138
139impl InvertedIndexParams {
140 pub fn with_position(mut self, with_position: bool) -> Self {
141 self.with_position = with_position;
142 self
143 }
144}
145
146impl IndexParams for InvertedIndexParams {
147 fn as_any(&self) -> &dyn std::any::Any {
148 self
149 }
150
151 fn index_type(&self) -> IndexType {
152 IndexType::Inverted
153 }
154
155 fn index_name(&self) -> &str {
156 "INVERTED"
157 }
158}
159
160#[async_trait]
162pub trait IndexWriter: Send {
163 async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
167 async fn finish(&mut self) -> Result<()>;
169 async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
171}
172
173#[async_trait]
175pub trait IndexReader: Send + Sync {
176 async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
178 async fn read_range(
183 &self,
184 range: std::ops::Range<usize>,
185 projection: Option<&[&str]>,
186 ) -> Result<RecordBatch>;
187 async fn num_batches(&self, batch_size: u64) -> u32;
189 fn num_rows(&self) -> usize;
191 fn schema(&self) -> &lance_core::datatypes::Schema;
193}
194
195#[async_trait]
201pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
202 fn as_any(&self) -> &dyn Any;
203
204 fn io_parallelism(&self) -> usize;
206
207 async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
209 -> Result<Box<dyn IndexWriter>>;
210
211 async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
213
214 async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
218
219 async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
221
222 async fn delete_index_file(&self, name: &str) -> Result<()>;
224}
225
226pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
237 fn as_any(&self) -> &dyn Any;
239 fn format(&self, col: &str) -> String;
241 fn to_expr(&self, col: String) -> Expr;
243 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
245 fn needs_recheck(&self) -> bool {
247 false
248 }
249}
250
251impl PartialEq for dyn AnyQuery {
252 fn eq(&self, other: &Self) -> bool {
253 self.dyn_eq(other)
254 }
255}
256#[derive(Debug, Clone, PartialEq)]
258pub struct FullTextSearchQuery {
259 pub query: FtsQuery,
260
261 pub limit: Option<i64>,
263
264 pub wand_factor: Option<f32>,
269}
270
271impl FullTextSearchQuery {
272 pub fn new(query: String) -> Self {
274 let query = MatchQuery::new(query).into();
275 Self {
276 query,
277 limit: None,
278 wand_factor: None,
279 }
280 }
281
282 pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
284 let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
285 Self {
286 query,
287 limit: None,
288 wand_factor: None,
289 }
290 }
291
292 pub fn new_query(query: FtsQuery) -> Self {
294 Self {
295 query,
296 limit: None,
297 wand_factor: None,
298 }
299 }
300
301 pub fn with_column(mut self, column: String) -> Result<Self> {
304 self.query = fill_fts_query_column(&self.query, &[column], true)?;
305 Ok(self)
306 }
307
308 pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
311 self.query = fill_fts_query_column(&self.query, columns, true)?;
312 Ok(self)
313 }
314
315 pub fn limit(mut self, limit: Option<i64>) -> Self {
318 self.limit = limit;
319 self
320 }
321
322 pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
323 self.wand_factor = wand_factor;
324 self
325 }
326
327 pub fn columns(&self) -> HashSet<String> {
328 self.query.columns()
329 }
330
331 pub fn params(&self) -> FtsSearchParams {
332 FtsSearchParams {
333 limit: self.limit.map(|limit| limit as usize),
334 wand_factor: self.wand_factor.unwrap_or(1.0),
335 }
336 }
337}
338
339#[derive(Debug, Clone, PartialEq)]
349pub enum SargableQuery {
350 Range(Bound<ScalarValue>, Bound<ScalarValue>),
352 IsIn(Vec<ScalarValue>),
354 Equals(ScalarValue),
356 FullTextSearch(FullTextSearchQuery),
358 IsNull(),
360}
361
362impl AnyQuery for SargableQuery {
363 fn as_any(&self) -> &dyn Any {
364 self
365 }
366
367 fn format(&self, col: &str) -> String {
368 match self {
369 Self::Range(lower, upper) => match (lower, upper) {
370 (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
371 (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
372 (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
373 (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
374 (Bound::Included(lhs), Bound::Included(rhs)) => {
375 format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
376 }
377 (Bound::Included(lhs), Bound::Excluded(rhs)) => {
378 format!("{} >= {} && {} < {}", col, lhs, col, rhs)
379 }
380 (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
381 (Bound::Excluded(lhs), Bound::Included(rhs)) => {
382 format!("{} > {} && {} <= {}", col, lhs, col, rhs)
383 }
384 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
385 format!("{} > {} && {} < {}", col, lhs, col, rhs)
386 }
387 },
388 Self::IsIn(values) => {
389 format!(
390 "{} IN [{}]",
391 col,
392 values
393 .iter()
394 .map(|val| val.to_string())
395 .collect::<Vec<_>>()
396 .join(",")
397 )
398 }
399 Self::FullTextSearch(query) => {
400 format!("fts({})", query.query)
401 }
402 Self::IsNull() => {
403 format!("{} IS NULL", col)
404 }
405 Self::Equals(val) => {
406 format!("{} = {}", col, val)
407 }
408 }
409 }
410
411 fn to_expr(&self, col: String) -> Expr {
412 let col_expr = Expr::Column(Column::new_unqualified(col));
413 match self {
414 Self::Range(lower, upper) => match (lower, upper) {
415 (Bound::Unbounded, Bound::Unbounded) => {
416 Expr::Literal(ScalarValue::Boolean(Some(true)))
417 }
418 (Bound::Unbounded, Bound::Included(rhs)) => {
419 col_expr.lt_eq(Expr::Literal(rhs.clone()))
420 }
421 (Bound::Unbounded, Bound::Excluded(rhs)) => col_expr.lt(Expr::Literal(rhs.clone())),
422 (Bound::Included(lhs), Bound::Unbounded) => {
423 col_expr.gt_eq(Expr::Literal(lhs.clone()))
424 }
425 (Bound::Included(lhs), Bound::Included(rhs)) => {
426 col_expr.between(Expr::Literal(lhs.clone()), Expr::Literal(rhs.clone()))
427 }
428 (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
429 .clone()
430 .gt_eq(Expr::Literal(lhs.clone()))
431 .and(col_expr.lt(Expr::Literal(rhs.clone()))),
432 (Bound::Excluded(lhs), Bound::Unbounded) => col_expr.gt(Expr::Literal(lhs.clone())),
433 (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
434 .clone()
435 .gt(Expr::Literal(lhs.clone()))
436 .and(col_expr.lt_eq(Expr::Literal(rhs.clone()))),
437 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
438 .clone()
439 .gt(Expr::Literal(lhs.clone()))
440 .and(col_expr.lt(Expr::Literal(rhs.clone()))),
441 },
442 Self::IsIn(values) => col_expr.in_list(
443 values
444 .iter()
445 .map(|val| Expr::Literal(val.clone()))
446 .collect::<Vec<_>>(),
447 false,
448 ),
449 Self::FullTextSearch(query) => col_expr.like(Expr::Literal(ScalarValue::Utf8(Some(
450 query.query.to_string(),
451 )))),
452 Self::IsNull() => col_expr.is_null(),
453 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone())),
454 }
455 }
456
457 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
458 match other.as_any().downcast_ref::<Self>() {
459 Some(o) => self == o,
460 None => false,
461 }
462 }
463}
464
465#[derive(Debug, Clone, PartialEq)]
467pub enum LabelListQuery {
468 HasAllLabels(Vec<ScalarValue>),
470 HasAnyLabel(Vec<ScalarValue>),
472}
473
474impl AnyQuery for LabelListQuery {
475 fn as_any(&self) -> &dyn Any {
476 self
477 }
478
479 fn format(&self, col: &str) -> String {
480 format!("{}", self.to_expr(col.to_string()))
481 }
482
483 fn to_expr(&self, col: String) -> Expr {
484 match self {
485 Self::HasAllLabels(labels) => {
486 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
487 let offsets_buffer =
488 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
489 let labels_list = ListArray::try_new(
490 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
491 offsets_buffer,
492 labels_arr,
493 None,
494 )
495 .unwrap();
496 let labels_arr = Arc::new(labels_list);
497 Expr::ScalarFunction(ScalarFunction {
498 func: Arc::new(array_has::ArrayHasAll::new().into()),
499 args: vec![
500 Expr::Column(Column::new_unqualified(col)),
501 Expr::Literal(ScalarValue::List(labels_arr)),
502 ],
503 })
504 }
505 Self::HasAnyLabel(labels) => {
506 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
507 let offsets_buffer =
508 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
509 let labels_list = ListArray::try_new(
510 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
511 offsets_buffer,
512 labels_arr,
513 None,
514 )
515 .unwrap();
516 let labels_arr = Arc::new(labels_list);
517 Expr::ScalarFunction(ScalarFunction {
518 func: Arc::new(array_has::ArrayHasAny::new().into()),
519 args: vec![
520 Expr::Column(Column::new_unqualified(col)),
521 Expr::Literal(ScalarValue::List(labels_arr)),
522 ],
523 })
524 }
525 }
526 }
527
528 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
529 match other.as_any().downcast_ref::<Self>() {
530 Some(o) => self == o,
531 None => false,
532 }
533 }
534}
535
536#[derive(Debug, Clone, PartialEq)]
538pub enum TextQuery {
539 StringContains(String),
541 }
545
546impl AnyQuery for TextQuery {
547 fn as_any(&self) -> &dyn Any {
548 self
549 }
550
551 fn format(&self, col: &str) -> String {
552 format!("{}", self.to_expr(col.to_string()))
553 }
554
555 fn to_expr(&self, col: String) -> Expr {
556 match self {
557 Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
558 func: Arc::new(ContainsFunc::new().into()),
559 args: vec![
560 Expr::Column(Column::new_unqualified(col)),
561 Expr::Literal(ScalarValue::Utf8(Some(substr.clone()))),
562 ],
563 }),
564 }
565 }
566
567 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
568 match other.as_any().downcast_ref::<Self>() {
569 Some(o) => self == o,
570 None => false,
571 }
572 }
573
574 fn needs_recheck(&self) -> bool {
575 true
576 }
577}
578
579#[derive(Debug, PartialEq)]
581pub enum SearchResult {
582 Exact(RowIdTreeMap),
584 AtMost(RowIdTreeMap),
588 AtLeast(RowIdTreeMap),
593}
594
595impl SearchResult {
596 pub fn row_ids(&self) -> &RowIdTreeMap {
597 match self {
598 Self::Exact(row_ids) => row_ids,
599 Self::AtMost(row_ids) => row_ids,
600 Self::AtLeast(row_ids) => row_ids,
601 }
602 }
603
604 pub fn is_exact(&self) -> bool {
605 matches!(self, Self::Exact(_))
606 }
607}
608
609#[async_trait]
611pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
612 async fn search(
616 &self,
617 query: &dyn AnyQuery,
618 metrics: &dyn MetricsCollector,
619 ) -> Result<SearchResult>;
620
621 fn can_answer_exact(&self, query: &dyn AnyQuery) -> bool;
626
627 async fn load(store: Arc<dyn IndexStore>) -> Result<Arc<Self>>
629 where
630 Self: Sized;
631
632 async fn remap(
634 &self,
635 mapping: &HashMap<u64, Option<u64>>,
636 dest_store: &dyn IndexStore,
637 ) -> Result<()>;
638
639 async fn update(
641 &self,
642 new_data: SendableRecordBatchStream,
643 dest_store: &dyn IndexStore,
644 ) -> Result<()>;
645}