1use std::collections::HashMap;
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::TokenizerConfig;
23use lance_core::utils::mask::RowIdTreeMap;
24use lance_core::{Error, Result};
25use snafu::location;
26
27use crate::{Index, IndexParams, IndexType};
28
29pub mod bitmap;
30pub mod btree;
31pub mod expression;
32pub mod flat;
33pub mod inverted;
34pub mod label_list;
35pub mod lance_format;
36pub mod ngram;
37
38pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
39
40#[derive(Debug, Copy, Clone)]
41pub enum ScalarIndexType {
42 BTree,
43 Bitmap,
44 LabelList,
45 NGram,
46 Inverted,
47}
48
49impl TryFrom<IndexType> for ScalarIndexType {
50 type Error = Error;
51
52 fn try_from(value: IndexType) -> Result<Self> {
53 match value {
54 IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
55 IndexType::Bitmap => Ok(Self::Bitmap),
56 IndexType::LabelList => Ok(Self::LabelList),
57 IndexType::NGram => Ok(Self::NGram),
58 IndexType::Inverted => Ok(Self::Inverted),
59 _ => Err(Error::InvalidInput {
60 source: format!("Index type {:?} is not a scalar index", value).into(),
61 location: location!(),
62 }),
63 }
64 }
65}
66
67#[derive(Default)]
68pub struct ScalarIndexParams {
69 pub force_index_type: Option<ScalarIndexType>,
71}
72
73impl ScalarIndexParams {
74 pub fn new(index_type: ScalarIndexType) -> Self {
75 Self {
76 force_index_type: Some(index_type),
77 }
78 }
79}
80
81impl IndexParams for ScalarIndexParams {
82 fn as_any(&self) -> &dyn std::any::Any {
83 self
84 }
85
86 fn index_type(&self) -> IndexType {
87 match self.force_index_type {
88 Some(ScalarIndexType::BTree) | None => IndexType::BTree,
89 Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
90 Some(ScalarIndexType::LabelList) => IndexType::LabelList,
91 Some(ScalarIndexType::Inverted) => IndexType::Inverted,
92 Some(ScalarIndexType::NGram) => IndexType::NGram,
93 }
94 }
95
96 fn index_name(&self) -> &str {
97 LANCE_SCALAR_INDEX
98 }
99}
100
101#[derive(Clone)]
102pub struct InvertedIndexParams {
103 pub with_position: bool,
108
109 pub tokenizer_config: TokenizerConfig,
110}
111
112impl Debug for InvertedIndexParams {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 f.debug_struct("InvertedIndexParams")
115 .field("with_position", &self.with_position)
116 .finish()
117 }
118}
119
120impl DeepSizeOf for InvertedIndexParams {
121 fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize {
122 0
123 }
124}
125
126impl Default for InvertedIndexParams {
127 fn default() -> Self {
128 Self {
129 with_position: true,
130 tokenizer_config: TokenizerConfig::default(),
131 }
132 }
133}
134
135impl InvertedIndexParams {
136 pub fn with_position(mut self, with_position: bool) -> Self {
137 self.with_position = with_position;
138 self
139 }
140}
141
142impl IndexParams for InvertedIndexParams {
143 fn as_any(&self) -> &dyn std::any::Any {
144 self
145 }
146
147 fn index_type(&self) -> IndexType {
148 IndexType::Inverted
149 }
150
151 fn index_name(&self) -> &str {
152 "INVERTED"
153 }
154}
155
156#[async_trait]
158pub trait IndexWriter: Send {
159 async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
163 async fn finish(&mut self) -> Result<()>;
165 async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
167}
168
169#[async_trait]
171pub trait IndexReader: Send + Sync {
172 async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
174 async fn read_range(
179 &self,
180 range: std::ops::Range<usize>,
181 projection: Option<&[&str]>,
182 ) -> Result<RecordBatch>;
183 async fn num_batches(&self) -> u32;
185 fn num_rows(&self) -> usize;
187 fn schema(&self) -> &lance_core::datatypes::Schema;
189}
190
191#[async_trait]
197pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
198 fn as_any(&self) -> &dyn Any;
199
200 fn io_parallelism(&self) -> usize;
202
203 async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
205 -> Result<Box<dyn IndexWriter>>;
206
207 async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
209
210 async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
214}
215
216pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
227 fn as_any(&self) -> &dyn Any;
229 fn format(&self, col: &str) -> String;
231 fn to_expr(&self, col: String) -> Expr;
233 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
235 fn needs_recheck(&self) -> bool {
237 false
238 }
239}
240
241impl PartialEq for dyn AnyQuery {
242 fn eq(&self, other: &Self) -> bool {
243 self.dyn_eq(other)
244 }
245}
246
247#[derive(Debug, Clone, PartialEq)]
249pub struct FullTextSearchQuery {
250 pub columns: Vec<String>,
253 pub query: String,
255 pub limit: Option<i64>,
257 pub wand_factor: Option<f32>,
262}
263
264impl FullTextSearchQuery {
265 pub fn new(query: String) -> Self {
266 Self {
267 query,
268 limit: None,
269 columns: vec![],
270 wand_factor: None,
271 }
272 }
273
274 pub fn columns(mut self, columns: Option<Vec<String>>) -> Self {
275 if let Some(columns) = columns {
276 self.columns = columns;
277 }
278 self
279 }
280
281 pub fn limit(mut self, limit: Option<i64>) -> Self {
282 self.limit = limit;
283 self
284 }
285
286 pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
287 self.wand_factor = wand_factor;
288 self
289 }
290}
291
292#[derive(Debug, Clone, PartialEq)]
302pub enum SargableQuery {
303 Range(Bound<ScalarValue>, Bound<ScalarValue>),
305 IsIn(Vec<ScalarValue>),
307 Equals(ScalarValue),
309 FullTextSearch(FullTextSearchQuery),
311 IsNull(),
313}
314
315impl AnyQuery for SargableQuery {
316 fn as_any(&self) -> &dyn Any {
317 self
318 }
319
320 fn format(&self, col: &str) -> String {
321 match self {
322 Self::Range(lower, upper) => match (lower, upper) {
323 (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
324 (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
325 (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
326 (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
327 (Bound::Included(lhs), Bound::Included(rhs)) => {
328 format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
329 }
330 (Bound::Included(lhs), Bound::Excluded(rhs)) => {
331 format!("{} >= {} && {} < {}", col, lhs, col, rhs)
332 }
333 (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
334 (Bound::Excluded(lhs), Bound::Included(rhs)) => {
335 format!("{} > {} && {} <= {}", col, lhs, col, rhs)
336 }
337 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
338 format!("{} > {} && {} < {}", col, lhs, col, rhs)
339 }
340 },
341 Self::IsIn(values) => {
342 format!(
343 "{} IN [{}]",
344 col,
345 values
346 .iter()
347 .map(|val| val.to_string())
348 .collect::<Vec<_>>()
349 .join(",")
350 )
351 }
352 Self::FullTextSearch(query) => {
353 format!("fts({})", query.query)
354 }
355 Self::IsNull() => {
356 format!("{} IS NULL", col)
357 }
358 Self::Equals(val) => {
359 format!("{} = {}", col, val)
360 }
361 }
362 }
363
364 fn to_expr(&self, col: String) -> Expr {
365 let col_expr = Expr::Column(Column::new_unqualified(col));
366 match self {
367 Self::Range(lower, upper) => match (lower, upper) {
368 (Bound::Unbounded, Bound::Unbounded) => {
369 Expr::Literal(ScalarValue::Boolean(Some(true)))
370 }
371 (Bound::Unbounded, Bound::Included(rhs)) => {
372 col_expr.lt_eq(Expr::Literal(rhs.clone()))
373 }
374 (Bound::Unbounded, Bound::Excluded(rhs)) => col_expr.lt(Expr::Literal(rhs.clone())),
375 (Bound::Included(lhs), Bound::Unbounded) => {
376 col_expr.gt_eq(Expr::Literal(lhs.clone()))
377 }
378 (Bound::Included(lhs), Bound::Included(rhs)) => {
379 col_expr.between(Expr::Literal(lhs.clone()), Expr::Literal(rhs.clone()))
380 }
381 (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
382 .clone()
383 .gt_eq(Expr::Literal(lhs.clone()))
384 .and(col_expr.lt(Expr::Literal(rhs.clone()))),
385 (Bound::Excluded(lhs), Bound::Unbounded) => col_expr.gt(Expr::Literal(lhs.clone())),
386 (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
387 .clone()
388 .gt(Expr::Literal(lhs.clone()))
389 .and(col_expr.lt_eq(Expr::Literal(rhs.clone()))),
390 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
391 .clone()
392 .gt(Expr::Literal(lhs.clone()))
393 .and(col_expr.lt(Expr::Literal(rhs.clone()))),
394 },
395 Self::IsIn(values) => col_expr.in_list(
396 values
397 .iter()
398 .map(|val| Expr::Literal(val.clone()))
399 .collect::<Vec<_>>(),
400 false,
401 ),
402 Self::FullTextSearch(query) => {
403 col_expr.like(Expr::Literal(ScalarValue::Utf8(Some(query.query.clone()))))
404 }
405 Self::IsNull() => col_expr.is_null(),
406 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone())),
407 }
408 }
409
410 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
411 match other.as_any().downcast_ref::<Self>() {
412 Some(o) => self == o,
413 None => false,
414 }
415 }
416}
417
418#[derive(Debug, Clone, PartialEq)]
420pub enum LabelListQuery {
421 HasAllLabels(Vec<ScalarValue>),
423 HasAnyLabel(Vec<ScalarValue>),
425}
426
427impl AnyQuery for LabelListQuery {
428 fn as_any(&self) -> &dyn Any {
429 self
430 }
431
432 fn format(&self, col: &str) -> String {
433 format!("{}", self.to_expr(col.to_string()))
434 }
435
436 fn to_expr(&self, col: String) -> Expr {
437 match self {
438 Self::HasAllLabels(labels) => {
439 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
440 let offsets_buffer =
441 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
442 let labels_list = ListArray::try_new(
443 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
444 offsets_buffer,
445 labels_arr,
446 None,
447 )
448 .unwrap();
449 let labels_arr = Arc::new(labels_list);
450 Expr::ScalarFunction(ScalarFunction {
451 func: Arc::new(array_has::ArrayHasAll::new().into()),
452 args: vec![
453 Expr::Column(Column::new_unqualified(col)),
454 Expr::Literal(ScalarValue::List(labels_arr)),
455 ],
456 })
457 }
458 Self::HasAnyLabel(labels) => {
459 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
460 let offsets_buffer =
461 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
462 let labels_list = ListArray::try_new(
463 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
464 offsets_buffer,
465 labels_arr,
466 None,
467 )
468 .unwrap();
469 let labels_arr = Arc::new(labels_list);
470 Expr::ScalarFunction(ScalarFunction {
471 func: Arc::new(array_has::ArrayHasAny::new().into()),
472 args: vec![
473 Expr::Column(Column::new_unqualified(col)),
474 Expr::Literal(ScalarValue::List(labels_arr)),
475 ],
476 })
477 }
478 }
479 }
480
481 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
482 match other.as_any().downcast_ref::<Self>() {
483 Some(o) => self == o,
484 None => false,
485 }
486 }
487}
488
489#[derive(Debug, Clone, PartialEq)]
491pub enum TextQuery {
492 StringContains(String),
494 }
498
499impl AnyQuery for TextQuery {
500 fn as_any(&self) -> &dyn Any {
501 self
502 }
503
504 fn format(&self, col: &str) -> String {
505 format!("{}", self.to_expr(col.to_string()))
506 }
507
508 fn to_expr(&self, col: String) -> Expr {
509 match self {
510 Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
511 func: Arc::new(ContainsFunc::new().into()),
512 args: vec![
513 Expr::Column(Column::new_unqualified(col)),
514 Expr::Literal(ScalarValue::Utf8(Some(substr.clone()))),
515 ],
516 }),
517 }
518 }
519
520 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
521 match other.as_any().downcast_ref::<Self>() {
522 Some(o) => self == o,
523 None => false,
524 }
525 }
526
527 fn needs_recheck(&self) -> bool {
528 true
529 }
530}
531
532#[derive(Debug)]
534pub enum SearchResult {
535 Exact(RowIdTreeMap),
537 AtMost(RowIdTreeMap),
541 AtLeast(RowIdTreeMap),
546}
547
548impl SearchResult {
549 pub fn row_ids(&self) -> &RowIdTreeMap {
550 match self {
551 Self::Exact(row_ids) => row_ids,
552 Self::AtMost(row_ids) => row_ids,
553 Self::AtLeast(row_ids) => row_ids,
554 }
555 }
556
557 pub fn is_exact(&self) -> bool {
558 matches!(self, Self::Exact(_))
559 }
560}
561
562#[async_trait]
564pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
565 async fn search(&self, query: &dyn AnyQuery) -> Result<SearchResult>;
569
570 fn can_answer_exact(&self, query: &dyn AnyQuery) -> bool;
575
576 async fn load(store: Arc<dyn IndexStore>) -> Result<Arc<Self>>
578 where
579 Self: Sized;
580
581 async fn remap(
583 &self,
584 mapping: &HashMap<u64, Option<u64>>,
585 dest_store: &dyn IndexStore,
586 ) -> Result<()>;
587
588 async fn update(
590 &self,
591 new_data: SendableRecordBatchStream,
592 dest_store: &dyn IndexStore,
593 ) -> Result<()>;
594}