1use std::collections::{HashMap, HashSet};
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
23use lance_core::utils::mask::RowIdTreeMap;
24use lance_core::{Error, Result};
25use snafu::location;
26
27use crate::metrics::MetricsCollector;
28use crate::{Index, IndexParams, IndexType};
29
30pub mod bitmap;
31pub mod btree;
32pub mod expression;
33pub mod flat;
34pub mod inverted;
35pub mod label_list;
36pub mod lance_format;
37pub mod ngram;
38
39use crate::frag_reuse::FragReuseIndex;
40pub use inverted::tokenizer::InvertedIndexParams;
41
42pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
43
44#[derive(Debug, Copy, Clone, PartialEq, Eq)]
45pub enum ScalarIndexType {
46 BTree,
47 Bitmap,
48 LabelList,
49 NGram,
50 Inverted,
51}
52
53impl TryFrom<IndexType> for ScalarIndexType {
54 type Error = Error;
55
56 fn try_from(value: IndexType) -> Result<Self> {
57 match value {
58 IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
59 IndexType::Bitmap => Ok(Self::Bitmap),
60 IndexType::LabelList => Ok(Self::LabelList),
61 IndexType::NGram => Ok(Self::NGram),
62 IndexType::Inverted => Ok(Self::Inverted),
63 _ => Err(Error::InvalidInput {
64 source: format!("Index type {:?} is not a scalar index", value).into(),
65 location: location!(),
66 }),
67 }
68 }
69}
70
71impl From<ScalarIndexType> for IndexType {
72 fn from(val: ScalarIndexType) -> Self {
73 match val {
74 ScalarIndexType::BTree => Self::BTree,
75 ScalarIndexType::Bitmap => Self::Bitmap,
76 ScalarIndexType::LabelList => Self::LabelList,
77 ScalarIndexType::NGram => Self::NGram,
78 ScalarIndexType::Inverted => Self::Inverted,
79 }
80 }
81}
82
83#[derive(Default)]
84pub struct ScalarIndexParams {
85 pub force_index_type: Option<ScalarIndexType>,
87}
88
89impl ScalarIndexParams {
90 pub fn new(index_type: ScalarIndexType) -> Self {
91 Self {
92 force_index_type: Some(index_type),
93 }
94 }
95}
96
97impl IndexParams for ScalarIndexParams {
98 fn as_any(&self) -> &dyn std::any::Any {
99 self
100 }
101
102 fn index_type(&self) -> IndexType {
103 match self.force_index_type {
104 Some(ScalarIndexType::BTree) | None => IndexType::BTree,
105 Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
106 Some(ScalarIndexType::LabelList) => IndexType::LabelList,
107 Some(ScalarIndexType::Inverted) => IndexType::Inverted,
108 Some(ScalarIndexType::NGram) => IndexType::NGram,
109 }
110 }
111
112 fn index_name(&self) -> &str {
113 LANCE_SCALAR_INDEX
114 }
115}
116
117impl IndexParams for InvertedIndexParams {
118 fn as_any(&self) -> &dyn std::any::Any {
119 self
120 }
121
122 fn index_type(&self) -> IndexType {
123 IndexType::Inverted
124 }
125
126 fn index_name(&self) -> &str {
127 "INVERTED"
128 }
129}
130
131#[async_trait]
133pub trait IndexWriter: Send {
134 async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
138 async fn finish(&mut self) -> Result<()>;
140 async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
142}
143
144#[async_trait]
146pub trait IndexReader: Send + Sync {
147 async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
149 async fn read_range(
154 &self,
155 range: std::ops::Range<usize>,
156 projection: Option<&[&str]>,
157 ) -> Result<RecordBatch>;
158 async fn num_batches(&self, batch_size: u64) -> u32;
160 fn num_rows(&self) -> usize;
162 fn schema(&self) -> &lance_core::datatypes::Schema;
164}
165
166#[async_trait]
172pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
173 fn as_any(&self) -> &dyn Any;
174
175 fn io_parallelism(&self) -> usize;
177
178 async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
180 -> Result<Box<dyn IndexWriter>>;
181
182 async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
184
185 async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
189
190 async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
192
193 async fn delete_index_file(&self, name: &str) -> Result<()>;
195}
196
197pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
208 fn as_any(&self) -> &dyn Any;
210 fn format(&self, col: &str) -> String;
212 fn to_expr(&self, col: String) -> Expr;
214 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
216 fn needs_recheck(&self) -> bool {
218 false
219 }
220}
221
222impl PartialEq for dyn AnyQuery {
223 fn eq(&self, other: &Self) -> bool {
224 self.dyn_eq(other)
225 }
226}
227#[derive(Debug, Clone, PartialEq)]
229pub struct FullTextSearchQuery {
230 pub query: FtsQuery,
231
232 pub limit: Option<i64>,
234
235 pub wand_factor: Option<f32>,
240}
241
242impl FullTextSearchQuery {
243 pub fn new(query: String) -> Self {
245 let query = MatchQuery::new(query).into();
246 Self {
247 query,
248 limit: None,
249 wand_factor: None,
250 }
251 }
252
253 pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
255 let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
256 Self {
257 query,
258 limit: None,
259 wand_factor: None,
260 }
261 }
262
263 pub fn new_query(query: FtsQuery) -> Self {
265 Self {
266 query,
267 limit: None,
268 wand_factor: None,
269 }
270 }
271
272 pub fn with_column(mut self, column: String) -> Result<Self> {
275 self.query = fill_fts_query_column(&self.query, &[column], true)?;
276 Ok(self)
277 }
278
279 pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
282 self.query = fill_fts_query_column(&self.query, columns, true)?;
283 Ok(self)
284 }
285
286 pub fn limit(mut self, limit: Option<i64>) -> Self {
289 self.limit = limit;
290 self
291 }
292
293 pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
294 self.wand_factor = wand_factor;
295 self
296 }
297
298 pub fn columns(&self) -> HashSet<String> {
299 self.query.columns()
300 }
301
302 pub fn params(&self) -> FtsSearchParams {
303 let params = FtsSearchParams::new()
304 .with_limit(self.limit.map(|limit| limit as usize))
305 .with_wand_factor(self.wand_factor.unwrap_or(1.0));
306 match self.query {
307 FtsQuery::Phrase(ref query) => params.with_phrase_slop(Some(query.slop)),
308 _ => params,
309 }
310 }
311}
312
313#[derive(Debug, Clone, PartialEq)]
323pub enum SargableQuery {
324 Range(Bound<ScalarValue>, Bound<ScalarValue>),
326 IsIn(Vec<ScalarValue>),
328 Equals(ScalarValue),
330 FullTextSearch(FullTextSearchQuery),
332 IsNull(),
334}
335
336impl AnyQuery for SargableQuery {
337 fn as_any(&self) -> &dyn Any {
338 self
339 }
340
341 fn format(&self, col: &str) -> String {
342 match self {
343 Self::Range(lower, upper) => match (lower, upper) {
344 (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
345 (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
346 (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
347 (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
348 (Bound::Included(lhs), Bound::Included(rhs)) => {
349 format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
350 }
351 (Bound::Included(lhs), Bound::Excluded(rhs)) => {
352 format!("{} >= {} && {} < {}", col, lhs, col, rhs)
353 }
354 (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
355 (Bound::Excluded(lhs), Bound::Included(rhs)) => {
356 format!("{} > {} && {} <= {}", col, lhs, col, rhs)
357 }
358 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
359 format!("{} > {} && {} < {}", col, lhs, col, rhs)
360 }
361 },
362 Self::IsIn(values) => {
363 format!(
364 "{} IN [{}]",
365 col,
366 values
367 .iter()
368 .map(|val| val.to_string())
369 .collect::<Vec<_>>()
370 .join(",")
371 )
372 }
373 Self::FullTextSearch(query) => {
374 format!("fts({})", query.query)
375 }
376 Self::IsNull() => {
377 format!("{} IS NULL", col)
378 }
379 Self::Equals(val) => {
380 format!("{} = {}", col, val)
381 }
382 }
383 }
384
385 fn to_expr(&self, col: String) -> Expr {
386 let col_expr = Expr::Column(Column::new_unqualified(col));
387 match self {
388 Self::Range(lower, upper) => match (lower, upper) {
389 (Bound::Unbounded, Bound::Unbounded) => {
390 Expr::Literal(ScalarValue::Boolean(Some(true)), None)
391 }
392 (Bound::Unbounded, Bound::Included(rhs)) => {
393 col_expr.lt_eq(Expr::Literal(rhs.clone(), None))
394 }
395 (Bound::Unbounded, Bound::Excluded(rhs)) => {
396 col_expr.lt(Expr::Literal(rhs.clone(), None))
397 }
398 (Bound::Included(lhs), Bound::Unbounded) => {
399 col_expr.gt_eq(Expr::Literal(lhs.clone(), None))
400 }
401 (Bound::Included(lhs), Bound::Included(rhs)) => col_expr.between(
402 Expr::Literal(lhs.clone(), None),
403 Expr::Literal(rhs.clone(), None),
404 ),
405 (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
406 .clone()
407 .gt_eq(Expr::Literal(lhs.clone(), None))
408 .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
409 (Bound::Excluded(lhs), Bound::Unbounded) => {
410 col_expr.gt(Expr::Literal(lhs.clone(), None))
411 }
412 (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
413 .clone()
414 .gt(Expr::Literal(lhs.clone(), None))
415 .and(col_expr.lt_eq(Expr::Literal(rhs.clone(), None))),
416 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
417 .clone()
418 .gt(Expr::Literal(lhs.clone(), None))
419 .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
420 },
421 Self::IsIn(values) => col_expr.in_list(
422 values
423 .iter()
424 .map(|val| Expr::Literal(val.clone(), None))
425 .collect::<Vec<_>>(),
426 false,
427 ),
428 Self::FullTextSearch(query) => col_expr.like(Expr::Literal(
429 ScalarValue::Utf8(Some(query.query.to_string())),
430 None,
431 )),
432 Self::IsNull() => col_expr.is_null(),
433 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
434 }
435 }
436
437 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
438 match other.as_any().downcast_ref::<Self>() {
439 Some(o) => self == o,
440 None => false,
441 }
442 }
443}
444
445#[derive(Debug, Clone, PartialEq)]
447pub enum LabelListQuery {
448 HasAllLabels(Vec<ScalarValue>),
450 HasAnyLabel(Vec<ScalarValue>),
452}
453
454impl AnyQuery for LabelListQuery {
455 fn as_any(&self) -> &dyn Any {
456 self
457 }
458
459 fn format(&self, col: &str) -> String {
460 format!("{}", self.to_expr(col.to_string()))
461 }
462
463 fn to_expr(&self, col: String) -> Expr {
464 match self {
465 Self::HasAllLabels(labels) => {
466 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
467 let offsets_buffer =
468 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
469 let labels_list = ListArray::try_new(
470 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
471 offsets_buffer,
472 labels_arr,
473 None,
474 )
475 .unwrap();
476 let labels_arr = Arc::new(labels_list);
477 Expr::ScalarFunction(ScalarFunction {
478 func: Arc::new(array_has::ArrayHasAll::new().into()),
479 args: vec![
480 Expr::Column(Column::new_unqualified(col)),
481 Expr::Literal(ScalarValue::List(labels_arr), None),
482 ],
483 })
484 }
485 Self::HasAnyLabel(labels) => {
486 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
487 let offsets_buffer =
488 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
489 let labels_list = ListArray::try_new(
490 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
491 offsets_buffer,
492 labels_arr,
493 None,
494 )
495 .unwrap();
496 let labels_arr = Arc::new(labels_list);
497 Expr::ScalarFunction(ScalarFunction {
498 func: Arc::new(array_has::ArrayHasAny::new().into()),
499 args: vec![
500 Expr::Column(Column::new_unqualified(col)),
501 Expr::Literal(ScalarValue::List(labels_arr), None),
502 ],
503 })
504 }
505 }
506 }
507
508 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
509 match other.as_any().downcast_ref::<Self>() {
510 Some(o) => self == o,
511 None => false,
512 }
513 }
514}
515
516#[derive(Debug, Clone, PartialEq)]
518pub enum TextQuery {
519 StringContains(String),
521 }
525
526impl AnyQuery for TextQuery {
527 fn as_any(&self) -> &dyn Any {
528 self
529 }
530
531 fn format(&self, col: &str) -> String {
532 format!("{}", self.to_expr(col.to_string()))
533 }
534
535 fn to_expr(&self, col: String) -> Expr {
536 match self {
537 Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
538 func: Arc::new(ContainsFunc::new().into()),
539 args: vec![
540 Expr::Column(Column::new_unqualified(col)),
541 Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
542 ],
543 }),
544 }
545 }
546
547 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
548 match other.as_any().downcast_ref::<Self>() {
549 Some(o) => self == o,
550 None => false,
551 }
552 }
553
554 fn needs_recheck(&self) -> bool {
555 true
556 }
557}
558
559#[derive(Debug, PartialEq)]
561pub enum SearchResult {
562 Exact(RowIdTreeMap),
564 AtMost(RowIdTreeMap),
568 AtLeast(RowIdTreeMap),
573}
574
575impl SearchResult {
576 pub fn row_ids(&self) -> &RowIdTreeMap {
577 match self {
578 Self::Exact(row_ids) => row_ids,
579 Self::AtMost(row_ids) => row_ids,
580 Self::AtLeast(row_ids) => row_ids,
581 }
582 }
583
584 pub fn is_exact(&self) -> bool {
585 matches!(self, Self::Exact(_))
586 }
587}
588
589#[async_trait]
591pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
592 async fn search(
596 &self,
597 query: &dyn AnyQuery,
598 metrics: &dyn MetricsCollector,
599 ) -> Result<SearchResult>;
600
601 fn can_answer_exact(&self, query: &dyn AnyQuery) -> bool;
606
607 async fn load(
609 store: Arc<dyn IndexStore>,
610 fri: Option<Arc<FragReuseIndex>>,
611 ) -> Result<Arc<Self>>
612 where
613 Self: Sized;
614
615 async fn remap(
617 &self,
618 mapping: &HashMap<u64, Option<u64>>,
619 dest_store: &dyn IndexStore,
620 ) -> Result<()>;
621
622 async fn update(
624 &self,
625 new_data: SendableRecordBatchStream,
626 dest_store: &dyn IndexStore,
627 ) -> Result<()>;
628}