1use std::collections::{HashMap, HashSet};
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions::string::contains::ContainsFunc;
15use datafusion::functions_array::array_has;
16use datafusion::physical_plan::SendableRecordBatchStream;
17use datafusion_common::{scalar::ScalarValue, Column};
18
19use datafusion_expr::expr::ScalarFunction;
20use datafusion_expr::Expr;
21use deepsize::DeepSizeOf;
22use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
23use lance_core::utils::mask::RowIdTreeMap;
24use lance_core::{Error, Result};
25use snafu::location;
26
27use crate::metrics::MetricsCollector;
28use crate::{Index, IndexParams, IndexType};
29
30pub mod bitmap;
31pub mod btree;
32pub mod expression;
33pub mod flat;
34pub mod inverted;
35pub mod label_list;
36pub mod lance_format;
37pub mod ngram;
38
39use crate::frag_reuse::FragReuseIndex;
40pub use inverted::tokenizer::InvertedIndexParams;
41
42pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
43
44#[derive(Debug, Copy, Clone, PartialEq, Eq)]
45pub enum ScalarIndexType {
46 BTree,
47 Bitmap,
48 LabelList,
49 NGram,
50 Inverted,
51}
52
53impl TryFrom<IndexType> for ScalarIndexType {
54 type Error = Error;
55
56 fn try_from(value: IndexType) -> Result<Self> {
57 match value {
58 IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
59 IndexType::Bitmap => Ok(Self::Bitmap),
60 IndexType::LabelList => Ok(Self::LabelList),
61 IndexType::NGram => Ok(Self::NGram),
62 IndexType::Inverted => Ok(Self::Inverted),
63 _ => Err(Error::InvalidInput {
64 source: format!("Index type {:?} is not a scalar index", value).into(),
65 location: location!(),
66 }),
67 }
68 }
69}
70
71impl From<ScalarIndexType> for IndexType {
72 fn from(val: ScalarIndexType) -> Self {
73 match val {
74 ScalarIndexType::BTree => Self::BTree,
75 ScalarIndexType::Bitmap => Self::Bitmap,
76 ScalarIndexType::LabelList => Self::LabelList,
77 ScalarIndexType::NGram => Self::NGram,
78 ScalarIndexType::Inverted => Self::Inverted,
79 }
80 }
81}
82
83#[derive(Default)]
84pub struct ScalarIndexParams {
85 pub force_index_type: Option<ScalarIndexType>,
87}
88
89impl ScalarIndexParams {
90 pub fn new(index_type: ScalarIndexType) -> Self {
91 Self {
92 force_index_type: Some(index_type),
93 }
94 }
95}
96
97impl IndexParams for ScalarIndexParams {
98 fn as_any(&self) -> &dyn std::any::Any {
99 self
100 }
101
102 fn index_type(&self) -> IndexType {
103 match self.force_index_type {
104 Some(ScalarIndexType::BTree) | None => IndexType::BTree,
105 Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
106 Some(ScalarIndexType::LabelList) => IndexType::LabelList,
107 Some(ScalarIndexType::Inverted) => IndexType::Inverted,
108 Some(ScalarIndexType::NGram) => IndexType::NGram,
109 }
110 }
111
112 fn index_name(&self) -> &str {
113 LANCE_SCALAR_INDEX
114 }
115}
116
117impl IndexParams for InvertedIndexParams {
118 fn as_any(&self) -> &dyn std::any::Any {
119 self
120 }
121
122 fn index_type(&self) -> IndexType {
123 IndexType::Inverted
124 }
125
126 fn index_name(&self) -> &str {
127 "INVERTED"
128 }
129}
130
131#[async_trait]
133pub trait IndexWriter: Send {
134 async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
138 async fn finish(&mut self) -> Result<()>;
140 async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
142}
143
144#[async_trait]
146pub trait IndexReader: Send + Sync {
147 async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
149 async fn read_range(
154 &self,
155 range: std::ops::Range<usize>,
156 projection: Option<&[&str]>,
157 ) -> Result<RecordBatch>;
158 async fn num_batches(&self, batch_size: u64) -> u32;
160 fn num_rows(&self) -> usize;
162 fn schema(&self) -> &lance_core::datatypes::Schema;
164}
165
166#[async_trait]
172pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
173 fn as_any(&self) -> &dyn Any;
174
175 fn io_parallelism(&self) -> usize;
177
178 async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
180 -> Result<Box<dyn IndexWriter>>;
181
182 async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
184
185 async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
189
190 async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
192
193 async fn delete_index_file(&self, name: &str) -> Result<()>;
195}
196
197pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
208 fn as_any(&self) -> &dyn Any;
210 fn format(&self, col: &str) -> String;
212 fn to_expr(&self, col: String) -> Expr;
214 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
216 fn needs_recheck(&self) -> bool {
218 false
219 }
220}
221
222impl PartialEq for dyn AnyQuery {
223 fn eq(&self, other: &Self) -> bool {
224 self.dyn_eq(other)
225 }
226}
227#[derive(Debug, Clone, PartialEq)]
229pub struct FullTextSearchQuery {
230 pub query: FtsQuery,
231
232 pub limit: Option<i64>,
234
235 pub wand_factor: Option<f32>,
240}
241
242impl FullTextSearchQuery {
243 pub fn new(query: String) -> Self {
245 let query = MatchQuery::new(query).into();
246 Self {
247 query,
248 limit: None,
249 wand_factor: None,
250 }
251 }
252
253 pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
255 let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
256 Self {
257 query,
258 limit: None,
259 wand_factor: None,
260 }
261 }
262
263 pub fn new_query(query: FtsQuery) -> Self {
265 Self {
266 query,
267 limit: None,
268 wand_factor: None,
269 }
270 }
271
272 pub fn with_column(mut self, column: String) -> Result<Self> {
275 self.query = fill_fts_query_column(&self.query, &[column], true)?;
276 Ok(self)
277 }
278
279 pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
282 self.query = fill_fts_query_column(&self.query, columns, true)?;
283 Ok(self)
284 }
285
286 pub fn limit(mut self, limit: Option<i64>) -> Self {
289 self.limit = limit;
290 self
291 }
292
293 pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
294 self.wand_factor = wand_factor;
295 self
296 }
297
298 pub fn columns(&self) -> HashSet<String> {
299 self.query.columns()
300 }
301
302 pub fn params(&self) -> FtsSearchParams {
303 let params = FtsSearchParams::new()
304 .with_limit(self.limit.map(|limit| limit as usize))
305 .with_wand_factor(self.wand_factor.unwrap_or(1.0));
306 match self.query {
307 FtsQuery::Phrase(ref query) => params.with_phrase_slop(Some(query.slop)),
308 _ => params,
309 }
310 }
311}
312
313#[derive(Debug, Clone, PartialEq)]
323pub enum SargableQuery {
324 Range(Bound<ScalarValue>, Bound<ScalarValue>),
326 IsIn(Vec<ScalarValue>),
328 Equals(ScalarValue),
330 FullTextSearch(FullTextSearchQuery),
332 IsNull(),
334}
335
336impl AnyQuery for SargableQuery {
337 fn as_any(&self) -> &dyn Any {
338 self
339 }
340
341 fn format(&self, col: &str) -> String {
342 match self {
343 Self::Range(lower, upper) => match (lower, upper) {
344 (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
345 (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
346 (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
347 (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
348 (Bound::Included(lhs), Bound::Included(rhs)) => {
349 format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
350 }
351 (Bound::Included(lhs), Bound::Excluded(rhs)) => {
352 format!("{} >= {} && {} < {}", col, lhs, col, rhs)
353 }
354 (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
355 (Bound::Excluded(lhs), Bound::Included(rhs)) => {
356 format!("{} > {} && {} <= {}", col, lhs, col, rhs)
357 }
358 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
359 format!("{} > {} && {} < {}", col, lhs, col, rhs)
360 }
361 },
362 Self::IsIn(values) => {
363 format!(
364 "{} IN [{}]",
365 col,
366 values
367 .iter()
368 .map(|val| val.to_string())
369 .collect::<Vec<_>>()
370 .join(",")
371 )
372 }
373 Self::FullTextSearch(query) => {
374 format!("fts({})", query.query)
375 }
376 Self::IsNull() => {
377 format!("{} IS NULL", col)
378 }
379 Self::Equals(val) => {
380 format!("{} = {}", col, val)
381 }
382 }
383 }
384
385 fn to_expr(&self, col: String) -> Expr {
386 let col_expr = Expr::Column(Column::new_unqualified(col));
387 match self {
388 Self::Range(lower, upper) => match (lower, upper) {
389 (Bound::Unbounded, Bound::Unbounded) => {
390 Expr::Literal(ScalarValue::Boolean(Some(true)))
391 }
392 (Bound::Unbounded, Bound::Included(rhs)) => {
393 col_expr.lt_eq(Expr::Literal(rhs.clone()))
394 }
395 (Bound::Unbounded, Bound::Excluded(rhs)) => col_expr.lt(Expr::Literal(rhs.clone())),
396 (Bound::Included(lhs), Bound::Unbounded) => {
397 col_expr.gt_eq(Expr::Literal(lhs.clone()))
398 }
399 (Bound::Included(lhs), Bound::Included(rhs)) => {
400 col_expr.between(Expr::Literal(lhs.clone()), Expr::Literal(rhs.clone()))
401 }
402 (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
403 .clone()
404 .gt_eq(Expr::Literal(lhs.clone()))
405 .and(col_expr.lt(Expr::Literal(rhs.clone()))),
406 (Bound::Excluded(lhs), Bound::Unbounded) => col_expr.gt(Expr::Literal(lhs.clone())),
407 (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
408 .clone()
409 .gt(Expr::Literal(lhs.clone()))
410 .and(col_expr.lt_eq(Expr::Literal(rhs.clone()))),
411 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
412 .clone()
413 .gt(Expr::Literal(lhs.clone()))
414 .and(col_expr.lt(Expr::Literal(rhs.clone()))),
415 },
416 Self::IsIn(values) => col_expr.in_list(
417 values
418 .iter()
419 .map(|val| Expr::Literal(val.clone()))
420 .collect::<Vec<_>>(),
421 false,
422 ),
423 Self::FullTextSearch(query) => col_expr.like(Expr::Literal(ScalarValue::Utf8(Some(
424 query.query.to_string(),
425 )))),
426 Self::IsNull() => col_expr.is_null(),
427 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone())),
428 }
429 }
430
431 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
432 match other.as_any().downcast_ref::<Self>() {
433 Some(o) => self == o,
434 None => false,
435 }
436 }
437}
438
439#[derive(Debug, Clone, PartialEq)]
441pub enum LabelListQuery {
442 HasAllLabels(Vec<ScalarValue>),
444 HasAnyLabel(Vec<ScalarValue>),
446}
447
448impl AnyQuery for LabelListQuery {
449 fn as_any(&self) -> &dyn Any {
450 self
451 }
452
453 fn format(&self, col: &str) -> String {
454 format!("{}", self.to_expr(col.to_string()))
455 }
456
457 fn to_expr(&self, col: String) -> Expr {
458 match self {
459 Self::HasAllLabels(labels) => {
460 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
461 let offsets_buffer =
462 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
463 let labels_list = ListArray::try_new(
464 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
465 offsets_buffer,
466 labels_arr,
467 None,
468 )
469 .unwrap();
470 let labels_arr = Arc::new(labels_list);
471 Expr::ScalarFunction(ScalarFunction {
472 func: Arc::new(array_has::ArrayHasAll::new().into()),
473 args: vec![
474 Expr::Column(Column::new_unqualified(col)),
475 Expr::Literal(ScalarValue::List(labels_arr)),
476 ],
477 })
478 }
479 Self::HasAnyLabel(labels) => {
480 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
481 let offsets_buffer =
482 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
483 let labels_list = ListArray::try_new(
484 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
485 offsets_buffer,
486 labels_arr,
487 None,
488 )
489 .unwrap();
490 let labels_arr = Arc::new(labels_list);
491 Expr::ScalarFunction(ScalarFunction {
492 func: Arc::new(array_has::ArrayHasAny::new().into()),
493 args: vec![
494 Expr::Column(Column::new_unqualified(col)),
495 Expr::Literal(ScalarValue::List(labels_arr)),
496 ],
497 })
498 }
499 }
500 }
501
502 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
503 match other.as_any().downcast_ref::<Self>() {
504 Some(o) => self == o,
505 None => false,
506 }
507 }
508}
509
510#[derive(Debug, Clone, PartialEq)]
512pub enum TextQuery {
513 StringContains(String),
515 }
519
520impl AnyQuery for TextQuery {
521 fn as_any(&self) -> &dyn Any {
522 self
523 }
524
525 fn format(&self, col: &str) -> String {
526 format!("{}", self.to_expr(col.to_string()))
527 }
528
529 fn to_expr(&self, col: String) -> Expr {
530 match self {
531 Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
532 func: Arc::new(ContainsFunc::new().into()),
533 args: vec![
534 Expr::Column(Column::new_unqualified(col)),
535 Expr::Literal(ScalarValue::Utf8(Some(substr.clone()))),
536 ],
537 }),
538 }
539 }
540
541 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
542 match other.as_any().downcast_ref::<Self>() {
543 Some(o) => self == o,
544 None => false,
545 }
546 }
547
548 fn needs_recheck(&self) -> bool {
549 true
550 }
551}
552
553#[derive(Debug, PartialEq)]
555pub enum SearchResult {
556 Exact(RowIdTreeMap),
558 AtMost(RowIdTreeMap),
562 AtLeast(RowIdTreeMap),
567}
568
569impl SearchResult {
570 pub fn row_ids(&self) -> &RowIdTreeMap {
571 match self {
572 Self::Exact(row_ids) => row_ids,
573 Self::AtMost(row_ids) => row_ids,
574 Self::AtLeast(row_ids) => row_ids,
575 }
576 }
577
578 pub fn is_exact(&self) -> bool {
579 matches!(self, Self::Exact(_))
580 }
581}
582
583#[async_trait]
585pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
586 async fn search(
590 &self,
591 query: &dyn AnyQuery,
592 metrics: &dyn MetricsCollector,
593 ) -> Result<SearchResult>;
594
595 fn can_answer_exact(&self, query: &dyn AnyQuery) -> bool;
600
601 async fn load(
603 store: Arc<dyn IndexStore>,
604 fri: Option<Arc<FragReuseIndex>>,
605 ) -> Result<Arc<Self>>
606 where
607 Self: Sized;
608
609 async fn remap(
611 &self,
612 mapping: &HashMap<u64, Option<u64>>,
613 dest_store: &dyn IndexStore,
614 ) -> Result<()>;
615
616 async fn update(
618 &self,
619 new_data: SendableRecordBatchStream,
620 dest_store: &dyn IndexStore,
621 ) -> Result<()>;
622}