1use std::collections::HashMap;
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::TokenizerConfig;
23use lance_core::utils::mask::RowIdTreeMap;
24use lance_core::{Error, Result};
25use snafu::location;
26
27use crate::metrics::MetricsCollector;
28use crate::{Index, IndexParams, IndexType};
29
30pub mod bitmap;
31pub mod btree;
32pub mod expression;
33pub mod flat;
34pub mod inverted;
35pub mod label_list;
36pub mod lance_format;
37pub mod ngram;
38
39pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
40
41#[derive(Debug, Copy, Clone)]
42pub enum ScalarIndexType {
43 BTree,
44 Bitmap,
45 LabelList,
46 NGram,
47 Inverted,
48}
49
50impl TryFrom<IndexType> for ScalarIndexType {
51 type Error = Error;
52
53 fn try_from(value: IndexType) -> Result<Self> {
54 match value {
55 IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
56 IndexType::Bitmap => Ok(Self::Bitmap),
57 IndexType::LabelList => Ok(Self::LabelList),
58 IndexType::NGram => Ok(Self::NGram),
59 IndexType::Inverted => Ok(Self::Inverted),
60 _ => Err(Error::InvalidInput {
61 source: format!("Index type {:?} is not a scalar index", value).into(),
62 location: location!(),
63 }),
64 }
65 }
66}
67
68#[derive(Default)]
69pub struct ScalarIndexParams {
70 pub force_index_type: Option<ScalarIndexType>,
72}
73
74impl ScalarIndexParams {
75 pub fn new(index_type: ScalarIndexType) -> Self {
76 Self {
77 force_index_type: Some(index_type),
78 }
79 }
80}
81
82impl IndexParams for ScalarIndexParams {
83 fn as_any(&self) -> &dyn std::any::Any {
84 self
85 }
86
87 fn index_type(&self) -> IndexType {
88 match self.force_index_type {
89 Some(ScalarIndexType::BTree) | None => IndexType::BTree,
90 Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
91 Some(ScalarIndexType::LabelList) => IndexType::LabelList,
92 Some(ScalarIndexType::Inverted) => IndexType::Inverted,
93 Some(ScalarIndexType::NGram) => IndexType::NGram,
94 }
95 }
96
97 fn index_name(&self) -> &str {
98 LANCE_SCALAR_INDEX
99 }
100}
101
102#[derive(Clone)]
103pub struct InvertedIndexParams {
104 pub with_position: bool,
109
110 pub tokenizer_config: TokenizerConfig,
111}
112
113impl Debug for InvertedIndexParams {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 f.debug_struct("InvertedIndexParams")
116 .field("with_position", &self.with_position)
117 .finish()
118 }
119}
120
121impl DeepSizeOf for InvertedIndexParams {
122 fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize {
123 0
124 }
125}
126
127impl Default for InvertedIndexParams {
128 fn default() -> Self {
129 Self {
130 with_position: true,
131 tokenizer_config: TokenizerConfig::default(),
132 }
133 }
134}
135
136impl InvertedIndexParams {
137 pub fn with_position(mut self, with_position: bool) -> Self {
138 self.with_position = with_position;
139 self
140 }
141}
142
143impl IndexParams for InvertedIndexParams {
144 fn as_any(&self) -> &dyn std::any::Any {
145 self
146 }
147
148 fn index_type(&self) -> IndexType {
149 IndexType::Inverted
150 }
151
152 fn index_name(&self) -> &str {
153 "INVERTED"
154 }
155}
156
157#[async_trait]
159pub trait IndexWriter: Send {
160 async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
164 async fn finish(&mut self) -> Result<()>;
166 async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
168}
169
170#[async_trait]
172pub trait IndexReader: Send + Sync {
173 async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
175 async fn read_range(
180 &self,
181 range: std::ops::Range<usize>,
182 projection: Option<&[&str]>,
183 ) -> Result<RecordBatch>;
184 async fn num_batches(&self) -> u32;
186 fn num_rows(&self) -> usize;
188 fn schema(&self) -> &lance_core::datatypes::Schema;
190}
191
192#[async_trait]
198pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
199 fn as_any(&self) -> &dyn Any;
200
201 fn io_parallelism(&self) -> usize;
203
204 async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
206 -> Result<Box<dyn IndexWriter>>;
207
208 async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
210
211 async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
215
216 async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
218
219 async fn delete_index_file(&self, name: &str) -> Result<()>;
221}
222
223pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
234 fn as_any(&self) -> &dyn Any;
236 fn format(&self, col: &str) -> String;
238 fn to_expr(&self, col: String) -> Expr;
240 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
242 fn needs_recheck(&self) -> bool {
244 false
245 }
246}
247
248impl PartialEq for dyn AnyQuery {
249 fn eq(&self, other: &Self) -> bool {
250 self.dyn_eq(other)
251 }
252}
253
254#[derive(Debug, Clone, PartialEq)]
256pub struct FullTextSearchQuery {
257 pub columns: Vec<String>,
260 pub query: String,
262 pub limit: Option<i64>,
264 pub wand_factor: Option<f32>,
269}
270
271impl FullTextSearchQuery {
272 pub fn new(query: String) -> Self {
273 Self {
274 query,
275 limit: None,
276 columns: vec![],
277 wand_factor: None,
278 }
279 }
280
281 pub fn columns(mut self, columns: Option<Vec<String>>) -> Self {
282 if let Some(columns) = columns {
283 self.columns = columns;
284 }
285 self
286 }
287
288 pub fn limit(mut self, limit: Option<i64>) -> Self {
289 self.limit = limit;
290 self
291 }
292
293 pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
294 self.wand_factor = wand_factor;
295 self
296 }
297}
298
299#[derive(Debug, Clone, PartialEq)]
309pub enum SargableQuery {
310 Range(Bound<ScalarValue>, Bound<ScalarValue>),
312 IsIn(Vec<ScalarValue>),
314 Equals(ScalarValue),
316 FullTextSearch(FullTextSearchQuery),
318 IsNull(),
320}
321
322impl AnyQuery for SargableQuery {
323 fn as_any(&self) -> &dyn Any {
324 self
325 }
326
327 fn format(&self, col: &str) -> String {
328 match self {
329 Self::Range(lower, upper) => match (lower, upper) {
330 (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
331 (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
332 (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
333 (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
334 (Bound::Included(lhs), Bound::Included(rhs)) => {
335 format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
336 }
337 (Bound::Included(lhs), Bound::Excluded(rhs)) => {
338 format!("{} >= {} && {} < {}", col, lhs, col, rhs)
339 }
340 (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
341 (Bound::Excluded(lhs), Bound::Included(rhs)) => {
342 format!("{} > {} && {} <= {}", col, lhs, col, rhs)
343 }
344 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
345 format!("{} > {} && {} < {}", col, lhs, col, rhs)
346 }
347 },
348 Self::IsIn(values) => {
349 format!(
350 "{} IN [{}]",
351 col,
352 values
353 .iter()
354 .map(|val| val.to_string())
355 .collect::<Vec<_>>()
356 .join(",")
357 )
358 }
359 Self::FullTextSearch(query) => {
360 format!("fts({})", query.query)
361 }
362 Self::IsNull() => {
363 format!("{} IS NULL", col)
364 }
365 Self::Equals(val) => {
366 format!("{} = {}", col, val)
367 }
368 }
369 }
370
371 fn to_expr(&self, col: String) -> Expr {
372 let col_expr = Expr::Column(Column::new_unqualified(col));
373 match self {
374 Self::Range(lower, upper) => match (lower, upper) {
375 (Bound::Unbounded, Bound::Unbounded) => {
376 Expr::Literal(ScalarValue::Boolean(Some(true)))
377 }
378 (Bound::Unbounded, Bound::Included(rhs)) => {
379 col_expr.lt_eq(Expr::Literal(rhs.clone()))
380 }
381 (Bound::Unbounded, Bound::Excluded(rhs)) => col_expr.lt(Expr::Literal(rhs.clone())),
382 (Bound::Included(lhs), Bound::Unbounded) => {
383 col_expr.gt_eq(Expr::Literal(lhs.clone()))
384 }
385 (Bound::Included(lhs), Bound::Included(rhs)) => {
386 col_expr.between(Expr::Literal(lhs.clone()), Expr::Literal(rhs.clone()))
387 }
388 (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
389 .clone()
390 .gt_eq(Expr::Literal(lhs.clone()))
391 .and(col_expr.lt(Expr::Literal(rhs.clone()))),
392 (Bound::Excluded(lhs), Bound::Unbounded) => col_expr.gt(Expr::Literal(lhs.clone())),
393 (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
394 .clone()
395 .gt(Expr::Literal(lhs.clone()))
396 .and(col_expr.lt_eq(Expr::Literal(rhs.clone()))),
397 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
398 .clone()
399 .gt(Expr::Literal(lhs.clone()))
400 .and(col_expr.lt(Expr::Literal(rhs.clone()))),
401 },
402 Self::IsIn(values) => col_expr.in_list(
403 values
404 .iter()
405 .map(|val| Expr::Literal(val.clone()))
406 .collect::<Vec<_>>(),
407 false,
408 ),
409 Self::FullTextSearch(query) => {
410 col_expr.like(Expr::Literal(ScalarValue::Utf8(Some(query.query.clone()))))
411 }
412 Self::IsNull() => col_expr.is_null(),
413 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone())),
414 }
415 }
416
417 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
418 match other.as_any().downcast_ref::<Self>() {
419 Some(o) => self == o,
420 None => false,
421 }
422 }
423}
424
425#[derive(Debug, Clone, PartialEq)]
427pub enum LabelListQuery {
428 HasAllLabels(Vec<ScalarValue>),
430 HasAnyLabel(Vec<ScalarValue>),
432}
433
434impl AnyQuery for LabelListQuery {
435 fn as_any(&self) -> &dyn Any {
436 self
437 }
438
439 fn format(&self, col: &str) -> String {
440 format!("{}", self.to_expr(col.to_string()))
441 }
442
443 fn to_expr(&self, col: String) -> Expr {
444 match self {
445 Self::HasAllLabels(labels) => {
446 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
447 let offsets_buffer =
448 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
449 let labels_list = ListArray::try_new(
450 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
451 offsets_buffer,
452 labels_arr,
453 None,
454 )
455 .unwrap();
456 let labels_arr = Arc::new(labels_list);
457 Expr::ScalarFunction(ScalarFunction {
458 func: Arc::new(array_has::ArrayHasAll::new().into()),
459 args: vec![
460 Expr::Column(Column::new_unqualified(col)),
461 Expr::Literal(ScalarValue::List(labels_arr)),
462 ],
463 })
464 }
465 Self::HasAnyLabel(labels) => {
466 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
467 let offsets_buffer =
468 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
469 let labels_list = ListArray::try_new(
470 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
471 offsets_buffer,
472 labels_arr,
473 None,
474 )
475 .unwrap();
476 let labels_arr = Arc::new(labels_list);
477 Expr::ScalarFunction(ScalarFunction {
478 func: Arc::new(array_has::ArrayHasAny::new().into()),
479 args: vec![
480 Expr::Column(Column::new_unqualified(col)),
481 Expr::Literal(ScalarValue::List(labels_arr)),
482 ],
483 })
484 }
485 }
486 }
487
488 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
489 match other.as_any().downcast_ref::<Self>() {
490 Some(o) => self == o,
491 None => false,
492 }
493 }
494}
495
496#[derive(Debug, Clone, PartialEq)]
498pub enum TextQuery {
499 StringContains(String),
501 }
505
506impl AnyQuery for TextQuery {
507 fn as_any(&self) -> &dyn Any {
508 self
509 }
510
511 fn format(&self, col: &str) -> String {
512 format!("{}", self.to_expr(col.to_string()))
513 }
514
515 fn to_expr(&self, col: String) -> Expr {
516 match self {
517 Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
518 func: Arc::new(ContainsFunc::new().into()),
519 args: vec![
520 Expr::Column(Column::new_unqualified(col)),
521 Expr::Literal(ScalarValue::Utf8(Some(substr.clone()))),
522 ],
523 }),
524 }
525 }
526
527 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
528 match other.as_any().downcast_ref::<Self>() {
529 Some(o) => self == o,
530 None => false,
531 }
532 }
533
534 fn needs_recheck(&self) -> bool {
535 true
536 }
537}
538
539#[derive(Debug, PartialEq)]
541pub enum SearchResult {
542 Exact(RowIdTreeMap),
544 AtMost(RowIdTreeMap),
548 AtLeast(RowIdTreeMap),
553}
554
555impl SearchResult {
556 pub fn row_ids(&self) -> &RowIdTreeMap {
557 match self {
558 Self::Exact(row_ids) => row_ids,
559 Self::AtMost(row_ids) => row_ids,
560 Self::AtLeast(row_ids) => row_ids,
561 }
562 }
563
564 pub fn is_exact(&self) -> bool {
565 matches!(self, Self::Exact(_))
566 }
567}
568
569#[async_trait]
571pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
572 async fn search(
576 &self,
577 query: &dyn AnyQuery,
578 metrics: &dyn MetricsCollector,
579 ) -> Result<SearchResult>;
580
581 fn can_answer_exact(&self, query: &dyn AnyQuery) -> bool;
586
587 async fn load(store: Arc<dyn IndexStore>) -> Result<Arc<Self>>
589 where
590 Self: Sized;
591
592 async fn remap(
594 &self,
595 mapping: &HashMap<u64, Option<u64>>,
596 dest_store: &dyn IndexStore,
597 ) -> Result<()>;
598
599 async fn update(
601 &self,
602 new_data: SendableRecordBatchStream,
603 dest_store: &dyn IndexStore,
604 ) -> Result<()>;
605}