1use std::collections::{HashMap, HashSet};
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
23use inverted::TokenizerConfig;
24use lance_core::utils::mask::RowIdTreeMap;
25use lance_core::{Error, Result};
26use snafu::location;
27
28use crate::metrics::MetricsCollector;
29use crate::{Index, IndexParams, IndexType};
30
31pub mod bitmap;
32pub mod btree;
33pub mod expression;
34pub mod flat;
35pub mod inverted;
36pub mod label_list;
37pub mod lance_format;
38pub mod ngram;
39
40pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
41
42#[derive(Debug, Copy, Clone)]
43pub enum ScalarIndexType {
44 BTree,
45 Bitmap,
46 LabelList,
47 NGram,
48 Inverted,
49}
50
51impl TryFrom<IndexType> for ScalarIndexType {
52 type Error = Error;
53
54 fn try_from(value: IndexType) -> Result<Self> {
55 match value {
56 IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
57 IndexType::Bitmap => Ok(Self::Bitmap),
58 IndexType::LabelList => Ok(Self::LabelList),
59 IndexType::NGram => Ok(Self::NGram),
60 IndexType::Inverted => Ok(Self::Inverted),
61 _ => Err(Error::InvalidInput {
62 source: format!("Index type {:?} is not a scalar index", value).into(),
63 location: location!(),
64 }),
65 }
66 }
67}
68
69#[derive(Default)]
70pub struct ScalarIndexParams {
71 pub force_index_type: Option<ScalarIndexType>,
73}
74
75impl ScalarIndexParams {
76 pub fn new(index_type: ScalarIndexType) -> Self {
77 Self {
78 force_index_type: Some(index_type),
79 }
80 }
81}
82
83impl IndexParams for ScalarIndexParams {
84 fn as_any(&self) -> &dyn std::any::Any {
85 self
86 }
87
88 fn index_type(&self) -> IndexType {
89 match self.force_index_type {
90 Some(ScalarIndexType::BTree) | None => IndexType::BTree,
91 Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
92 Some(ScalarIndexType::LabelList) => IndexType::LabelList,
93 Some(ScalarIndexType::Inverted) => IndexType::Inverted,
94 Some(ScalarIndexType::NGram) => IndexType::NGram,
95 }
96 }
97
98 fn index_name(&self) -> &str {
99 LANCE_SCALAR_INDEX
100 }
101}
102
103#[derive(Clone)]
104pub struct InvertedIndexParams {
105 pub with_position: bool,
110
111 pub tokenizer_config: TokenizerConfig,
112}
113
114impl Debug for InvertedIndexParams {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 f.debug_struct("InvertedIndexParams")
117 .field("with_position", &self.with_position)
118 .finish()
119 }
120}
121
122impl DeepSizeOf for InvertedIndexParams {
123 fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize {
124 0
125 }
126}
127
128impl Default for InvertedIndexParams {
129 fn default() -> Self {
130 Self {
131 with_position: true,
132 tokenizer_config: TokenizerConfig::default(),
133 }
134 }
135}
136
137impl InvertedIndexParams {
138 pub fn with_position(mut self, with_position: bool) -> Self {
139 self.with_position = with_position;
140 self
141 }
142}
143
144impl IndexParams for InvertedIndexParams {
145 fn as_any(&self) -> &dyn std::any::Any {
146 self
147 }
148
149 fn index_type(&self) -> IndexType {
150 IndexType::Inverted
151 }
152
153 fn index_name(&self) -> &str {
154 "INVERTED"
155 }
156}
157
158#[async_trait]
160pub trait IndexWriter: Send {
161 async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
165 async fn finish(&mut self) -> Result<()>;
167 async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
169}
170
171#[async_trait]
173pub trait IndexReader: Send + Sync {
174 async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
176 async fn read_range(
181 &self,
182 range: std::ops::Range<usize>,
183 projection: Option<&[&str]>,
184 ) -> Result<RecordBatch>;
185 async fn num_batches(&self, batch_size: u64) -> u32;
187 fn num_rows(&self) -> usize;
189 fn schema(&self) -> &lance_core::datatypes::Schema;
191}
192
193#[async_trait]
199pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
200 fn as_any(&self) -> &dyn Any;
201
202 fn io_parallelism(&self) -> usize;
204
205 async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
207 -> Result<Box<dyn IndexWriter>>;
208
209 async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
211
212 async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
216
217 async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
219
220 async fn delete_index_file(&self, name: &str) -> Result<()>;
222}
223
224pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
235 fn as_any(&self) -> &dyn Any;
237 fn format(&self, col: &str) -> String;
239 fn to_expr(&self, col: String) -> Expr;
241 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
243 fn needs_recheck(&self) -> bool {
245 false
246 }
247}
248
249impl PartialEq for dyn AnyQuery {
250 fn eq(&self, other: &Self) -> bool {
251 self.dyn_eq(other)
252 }
253}
254#[derive(Debug, Clone, PartialEq)]
256pub struct FullTextSearchQuery {
257 pub query: FtsQuery,
258
259 pub limit: Option<i64>,
261
262 pub wand_factor: Option<f32>,
267}
268
269impl FullTextSearchQuery {
270 pub fn new(query: String) -> Self {
272 let query = MatchQuery::new(query).into();
273 Self {
274 query,
275 limit: None,
276 wand_factor: None,
277 }
278 }
279
280 pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
282 let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
283 Self {
284 query,
285 limit: None,
286 wand_factor: None,
287 }
288 }
289
290 pub fn new_query(query: FtsQuery) -> Self {
292 Self {
293 query,
294 limit: None,
295 wand_factor: None,
296 }
297 }
298
299 pub fn with_column(mut self, column: String) -> Result<Self> {
302 self.query = fill_fts_query_column(&self.query, &[column], true)?;
303 Ok(self)
304 }
305
306 pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
309 self.query = fill_fts_query_column(&self.query, columns, true)?;
310 Ok(self)
311 }
312
313 pub fn limit(mut self, limit: Option<i64>) -> Self {
316 self.limit = limit;
317 self
318 }
319
320 pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
321 self.wand_factor = wand_factor;
322 self
323 }
324
325 pub fn columns(&self) -> HashSet<String> {
326 self.query.columns()
327 }
328
329 pub fn params(&self) -> FtsSearchParams {
330 FtsSearchParams {
331 limit: self.limit.map(|limit| limit as usize),
332 wand_factor: self.wand_factor.unwrap_or(1.0),
333 }
334 }
335}
336
337#[derive(Debug, Clone, PartialEq)]
347pub enum SargableQuery {
348 Range(Bound<ScalarValue>, Bound<ScalarValue>),
350 IsIn(Vec<ScalarValue>),
352 Equals(ScalarValue),
354 FullTextSearch(FullTextSearchQuery),
356 IsNull(),
358}
359
360impl AnyQuery for SargableQuery {
361 fn as_any(&self) -> &dyn Any {
362 self
363 }
364
365 fn format(&self, col: &str) -> String {
366 match self {
367 Self::Range(lower, upper) => match (lower, upper) {
368 (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
369 (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
370 (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
371 (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
372 (Bound::Included(lhs), Bound::Included(rhs)) => {
373 format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
374 }
375 (Bound::Included(lhs), Bound::Excluded(rhs)) => {
376 format!("{} >= {} && {} < {}", col, lhs, col, rhs)
377 }
378 (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
379 (Bound::Excluded(lhs), Bound::Included(rhs)) => {
380 format!("{} > {} && {} <= {}", col, lhs, col, rhs)
381 }
382 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
383 format!("{} > {} && {} < {}", col, lhs, col, rhs)
384 }
385 },
386 Self::IsIn(values) => {
387 format!(
388 "{} IN [{}]",
389 col,
390 values
391 .iter()
392 .map(|val| val.to_string())
393 .collect::<Vec<_>>()
394 .join(",")
395 )
396 }
397 Self::FullTextSearch(query) => {
398 format!("fts({})", query.query)
399 }
400 Self::IsNull() => {
401 format!("{} IS NULL", col)
402 }
403 Self::Equals(val) => {
404 format!("{} = {}", col, val)
405 }
406 }
407 }
408
409 fn to_expr(&self, col: String) -> Expr {
410 let col_expr = Expr::Column(Column::new_unqualified(col));
411 match self {
412 Self::Range(lower, upper) => match (lower, upper) {
413 (Bound::Unbounded, Bound::Unbounded) => {
414 Expr::Literal(ScalarValue::Boolean(Some(true)))
415 }
416 (Bound::Unbounded, Bound::Included(rhs)) => {
417 col_expr.lt_eq(Expr::Literal(rhs.clone()))
418 }
419 (Bound::Unbounded, Bound::Excluded(rhs)) => col_expr.lt(Expr::Literal(rhs.clone())),
420 (Bound::Included(lhs), Bound::Unbounded) => {
421 col_expr.gt_eq(Expr::Literal(lhs.clone()))
422 }
423 (Bound::Included(lhs), Bound::Included(rhs)) => {
424 col_expr.between(Expr::Literal(lhs.clone()), Expr::Literal(rhs.clone()))
425 }
426 (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
427 .clone()
428 .gt_eq(Expr::Literal(lhs.clone()))
429 .and(col_expr.lt(Expr::Literal(rhs.clone()))),
430 (Bound::Excluded(lhs), Bound::Unbounded) => col_expr.gt(Expr::Literal(lhs.clone())),
431 (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
432 .clone()
433 .gt(Expr::Literal(lhs.clone()))
434 .and(col_expr.lt_eq(Expr::Literal(rhs.clone()))),
435 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
436 .clone()
437 .gt(Expr::Literal(lhs.clone()))
438 .and(col_expr.lt(Expr::Literal(rhs.clone()))),
439 },
440 Self::IsIn(values) => col_expr.in_list(
441 values
442 .iter()
443 .map(|val| Expr::Literal(val.clone()))
444 .collect::<Vec<_>>(),
445 false,
446 ),
447 Self::FullTextSearch(query) => col_expr.like(Expr::Literal(ScalarValue::Utf8(Some(
448 query.query.to_string(),
449 )))),
450 Self::IsNull() => col_expr.is_null(),
451 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone())),
452 }
453 }
454
455 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
456 match other.as_any().downcast_ref::<Self>() {
457 Some(o) => self == o,
458 None => false,
459 }
460 }
461}
462
463#[derive(Debug, Clone, PartialEq)]
465pub enum LabelListQuery {
466 HasAllLabels(Vec<ScalarValue>),
468 HasAnyLabel(Vec<ScalarValue>),
470}
471
472impl AnyQuery for LabelListQuery {
473 fn as_any(&self) -> &dyn Any {
474 self
475 }
476
477 fn format(&self, col: &str) -> String {
478 format!("{}", self.to_expr(col.to_string()))
479 }
480
481 fn to_expr(&self, col: String) -> Expr {
482 match self {
483 Self::HasAllLabels(labels) => {
484 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
485 let offsets_buffer =
486 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
487 let labels_list = ListArray::try_new(
488 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
489 offsets_buffer,
490 labels_arr,
491 None,
492 )
493 .unwrap();
494 let labels_arr = Arc::new(labels_list);
495 Expr::ScalarFunction(ScalarFunction {
496 func: Arc::new(array_has::ArrayHasAll::new().into()),
497 args: vec![
498 Expr::Column(Column::new_unqualified(col)),
499 Expr::Literal(ScalarValue::List(labels_arr)),
500 ],
501 })
502 }
503 Self::HasAnyLabel(labels) => {
504 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
505 let offsets_buffer =
506 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
507 let labels_list = ListArray::try_new(
508 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
509 offsets_buffer,
510 labels_arr,
511 None,
512 )
513 .unwrap();
514 let labels_arr = Arc::new(labels_list);
515 Expr::ScalarFunction(ScalarFunction {
516 func: Arc::new(array_has::ArrayHasAny::new().into()),
517 args: vec![
518 Expr::Column(Column::new_unqualified(col)),
519 Expr::Literal(ScalarValue::List(labels_arr)),
520 ],
521 })
522 }
523 }
524 }
525
526 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
527 match other.as_any().downcast_ref::<Self>() {
528 Some(o) => self == o,
529 None => false,
530 }
531 }
532}
533
534#[derive(Debug, Clone, PartialEq)]
536pub enum TextQuery {
537 StringContains(String),
539 }
543
544impl AnyQuery for TextQuery {
545 fn as_any(&self) -> &dyn Any {
546 self
547 }
548
549 fn format(&self, col: &str) -> String {
550 format!("{}", self.to_expr(col.to_string()))
551 }
552
553 fn to_expr(&self, col: String) -> Expr {
554 match self {
555 Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
556 func: Arc::new(ContainsFunc::new().into()),
557 args: vec![
558 Expr::Column(Column::new_unqualified(col)),
559 Expr::Literal(ScalarValue::Utf8(Some(substr.clone()))),
560 ],
561 }),
562 }
563 }
564
565 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
566 match other.as_any().downcast_ref::<Self>() {
567 Some(o) => self == o,
568 None => false,
569 }
570 }
571
572 fn needs_recheck(&self) -> bool {
573 true
574 }
575}
576
577#[derive(Debug, PartialEq)]
579pub enum SearchResult {
580 Exact(RowIdTreeMap),
582 AtMost(RowIdTreeMap),
586 AtLeast(RowIdTreeMap),
591}
592
593impl SearchResult {
594 pub fn row_ids(&self) -> &RowIdTreeMap {
595 match self {
596 Self::Exact(row_ids) => row_ids,
597 Self::AtMost(row_ids) => row_ids,
598 Self::AtLeast(row_ids) => row_ids,
599 }
600 }
601
602 pub fn is_exact(&self) -> bool {
603 matches!(self, Self::Exact(_))
604 }
605}
606
607#[async_trait]
609pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
610 async fn search(
614 &self,
615 query: &dyn AnyQuery,
616 metrics: &dyn MetricsCollector,
617 ) -> Result<SearchResult>;
618
619 fn can_answer_exact(&self, query: &dyn AnyQuery) -> bool;
624
625 async fn load(store: Arc<dyn IndexStore>) -> Result<Arc<Self>>
627 where
628 Self: Sized;
629
630 async fn remap(
632 &self,
633 mapping: &HashMap<u64, Option<u64>>,
634 dest_store: &dyn IndexStore,
635 ) -> Result<()>;
636
637 async fn update(
639 &self,
640 new_data: SendableRecordBatchStream,
641 dest_store: &dyn IndexStore,
642 ) -> Result<()>;
643}