1use std::collections::HashMap;
7use std::fmt::Debug;
8use std::{any::Any, ops::Bound, sync::Arc};
9
10use arrow::buffer::{OffsetBuffer, ScalarBuffer};
11use arrow_array::{ListArray, RecordBatch};
12use arrow_schema::{Field, Schema};
13use async_trait::async_trait;
14use datafusion::functions_array::array_has;
15use datafusion::physical_plan::SendableRecordBatchStream;
16use datafusion_common::{scalar::ScalarValue, Column};
17
18use datafusion_expr::expr::ScalarFunction;
19use datafusion_expr::Expr;
20use deepsize::DeepSizeOf;
21use inverted::TokenizerConfig;
22use lance_core::utils::mask::RowIdTreeMap;
23use lance_core::{Error, Result};
24use snafu::location;
25
26use crate::{Index, IndexParams, IndexType};
27
28pub mod bitmap;
29pub mod btree;
30pub mod expression;
31pub mod flat;
32pub mod inverted;
33pub mod label_list;
34pub mod lance_format;
35
36pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
37
38#[derive(Debug, Copy, Clone)]
39pub enum ScalarIndexType {
40 BTree,
41 Bitmap,
42 LabelList,
43 Inverted,
44}
45
46impl TryFrom<IndexType> for ScalarIndexType {
47 type Error = Error;
48
49 fn try_from(value: IndexType) -> Result<Self> {
50 match value {
51 IndexType::BTree | IndexType::Scalar => Ok(Self::BTree),
52 IndexType::Bitmap => Ok(Self::Bitmap),
53 IndexType::LabelList => Ok(Self::LabelList),
54 IndexType::Inverted => Ok(Self::Inverted),
55 _ => Err(Error::InvalidInput {
56 source: format!("Index type {:?} is not a scalar index", value).into(),
57 location: location!(),
58 }),
59 }
60 }
61}
62
63#[derive(Default)]
64pub struct ScalarIndexParams {
65 pub force_index_type: Option<ScalarIndexType>,
67}
68
69impl ScalarIndexParams {
70 pub fn new(index_type: ScalarIndexType) -> Self {
71 Self {
72 force_index_type: Some(index_type),
73 }
74 }
75}
76
77impl IndexParams for ScalarIndexParams {
78 fn as_any(&self) -> &dyn std::any::Any {
79 self
80 }
81
82 fn index_type(&self) -> IndexType {
83 match self.force_index_type {
84 Some(ScalarIndexType::BTree) | None => IndexType::BTree,
85 Some(ScalarIndexType::Bitmap) => IndexType::Bitmap,
86 Some(ScalarIndexType::LabelList) => IndexType::LabelList,
87 Some(ScalarIndexType::Inverted) => IndexType::Inverted,
88 }
89 }
90
91 fn index_name(&self) -> &str {
92 LANCE_SCALAR_INDEX
93 }
94}
95
96#[derive(Clone)]
97pub struct InvertedIndexParams {
98 pub with_position: bool,
103
104 pub tokenizer_config: TokenizerConfig,
105}
106
107impl Debug for InvertedIndexParams {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 f.debug_struct("InvertedIndexParams")
110 .field("with_position", &self.with_position)
111 .finish()
112 }
113}
114
115impl DeepSizeOf for InvertedIndexParams {
116 fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize {
117 0
118 }
119}
120
121impl Default for InvertedIndexParams {
122 fn default() -> Self {
123 Self {
124 with_position: true,
125 tokenizer_config: TokenizerConfig::default(),
126 }
127 }
128}
129
130impl InvertedIndexParams {
131 pub fn with_position(mut self, with_position: bool) -> Self {
132 self.with_position = with_position;
133 self
134 }
135}
136
137impl IndexParams for InvertedIndexParams {
138 fn as_any(&self) -> &dyn std::any::Any {
139 self
140 }
141
142 fn index_type(&self) -> IndexType {
143 IndexType::Inverted
144 }
145
146 fn index_name(&self) -> &str {
147 "INVERTED"
148 }
149}
150
151#[async_trait]
153pub trait IndexWriter: Send {
154 async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
158 async fn finish(&mut self) -> Result<()>;
160 async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
162}
163
164#[async_trait]
166pub trait IndexReader: Send + Sync {
167 async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
169 async fn read_range(
174 &self,
175 range: std::ops::Range<usize>,
176 projection: Option<&[&str]>,
177 ) -> Result<RecordBatch>;
178 async fn num_batches(&self) -> u32;
180 fn num_rows(&self) -> usize;
182 fn schema(&self) -> &lance_core::datatypes::Schema;
184}
185
186#[async_trait]
192pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
193 fn as_any(&self) -> &dyn Any;
194
195 fn io_parallelism(&self) -> usize;
197
198 async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
200 -> Result<Box<dyn IndexWriter>>;
201
202 async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
204
205 async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
209}
210
211pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
222 fn as_any(&self) -> &dyn Any;
224 fn format(&self, col: &str) -> String;
226 fn to_expr(&self, col: String) -> Expr;
228 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
230}
231
232impl PartialEq for dyn AnyQuery {
233 fn eq(&self, other: &Self) -> bool {
234 self.dyn_eq(other)
235 }
236}
237
238#[derive(Debug, Clone, PartialEq)]
240pub struct FullTextSearchQuery {
241 pub columns: Vec<String>,
244 pub query: String,
246 pub limit: Option<i64>,
248 pub wand_factor: Option<f32>,
253}
254
255impl FullTextSearchQuery {
256 pub fn new(query: String) -> Self {
257 Self {
258 query,
259 limit: None,
260 columns: vec![],
261 wand_factor: None,
262 }
263 }
264
265 pub fn columns(mut self, columns: Option<Vec<String>>) -> Self {
266 if let Some(columns) = columns {
267 self.columns = columns;
268 }
269 self
270 }
271
272 pub fn limit(mut self, limit: Option<i64>) -> Self {
273 self.limit = limit;
274 self
275 }
276
277 pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
278 self.wand_factor = wand_factor;
279 self
280 }
281}
282
283#[derive(Debug, Clone, PartialEq)]
293pub enum SargableQuery {
294 Range(Bound<ScalarValue>, Bound<ScalarValue>),
296 IsIn(Vec<ScalarValue>),
298 Equals(ScalarValue),
300 FullTextSearch(FullTextSearchQuery),
302 IsNull(),
304}
305
306impl AnyQuery for SargableQuery {
307 fn as_any(&self) -> &dyn Any {
308 self
309 }
310
311 fn format(&self, col: &str) -> String {
312 match self {
313 Self::Range(lower, upper) => match (lower, upper) {
314 (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
315 (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
316 (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
317 (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
318 (Bound::Included(lhs), Bound::Included(rhs)) => {
319 format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
320 }
321 (Bound::Included(lhs), Bound::Excluded(rhs)) => {
322 format!("{} >= {} && {} < {}", col, lhs, col, rhs)
323 }
324 (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
325 (Bound::Excluded(lhs), Bound::Included(rhs)) => {
326 format!("{} > {} && {} <= {}", col, lhs, col, rhs)
327 }
328 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
329 format!("{} > {} && {} < {}", col, lhs, col, rhs)
330 }
331 },
332 Self::IsIn(values) => {
333 format!(
334 "{} IN [{}]",
335 col,
336 values
337 .iter()
338 .map(|val| val.to_string())
339 .collect::<Vec<_>>()
340 .join(",")
341 )
342 }
343 Self::FullTextSearch(query) => {
344 format!("fts({})", query.query)
345 }
346 Self::IsNull() => {
347 format!("{} IS NULL", col)
348 }
349 Self::Equals(val) => {
350 format!("{} = {}", col, val)
351 }
352 }
353 }
354
355 fn to_expr(&self, col: String) -> Expr {
356 let col_expr = Expr::Column(Column::new_unqualified(col));
357 match self {
358 Self::Range(lower, upper) => match (lower, upper) {
359 (Bound::Unbounded, Bound::Unbounded) => {
360 Expr::Literal(ScalarValue::Boolean(Some(true)))
361 }
362 (Bound::Unbounded, Bound::Included(rhs)) => {
363 col_expr.lt_eq(Expr::Literal(rhs.clone()))
364 }
365 (Bound::Unbounded, Bound::Excluded(rhs)) => col_expr.lt(Expr::Literal(rhs.clone())),
366 (Bound::Included(lhs), Bound::Unbounded) => {
367 col_expr.gt_eq(Expr::Literal(lhs.clone()))
368 }
369 (Bound::Included(lhs), Bound::Included(rhs)) => {
370 col_expr.between(Expr::Literal(lhs.clone()), Expr::Literal(rhs.clone()))
371 }
372 (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
373 .clone()
374 .gt_eq(Expr::Literal(lhs.clone()))
375 .and(col_expr.lt(Expr::Literal(rhs.clone()))),
376 (Bound::Excluded(lhs), Bound::Unbounded) => col_expr.gt(Expr::Literal(lhs.clone())),
377 (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
378 .clone()
379 .gt(Expr::Literal(lhs.clone()))
380 .and(col_expr.lt_eq(Expr::Literal(rhs.clone()))),
381 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
382 .clone()
383 .gt(Expr::Literal(lhs.clone()))
384 .and(col_expr.lt(Expr::Literal(rhs.clone()))),
385 },
386 Self::IsIn(values) => col_expr.in_list(
387 values
388 .iter()
389 .map(|val| Expr::Literal(val.clone()))
390 .collect::<Vec<_>>(),
391 false,
392 ),
393 Self::FullTextSearch(query) => {
394 col_expr.like(Expr::Literal(ScalarValue::Utf8(Some(query.query.clone()))))
395 }
396 Self::IsNull() => col_expr.is_null(),
397 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone())),
398 }
399 }
400
401 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
402 match other.as_any().downcast_ref::<Self>() {
403 Some(o) => self == o,
404 None => false,
405 }
406 }
407}
408
409#[derive(Debug, Clone, PartialEq)]
411pub enum LabelListQuery {
412 HasAllLabels(Vec<ScalarValue>),
414 HasAnyLabel(Vec<ScalarValue>),
416}
417
418impl AnyQuery for LabelListQuery {
419 fn as_any(&self) -> &dyn Any {
420 self
421 }
422
423 fn format(&self, col: &str) -> String {
424 format!("{}", self.to_expr(col.to_string()))
425 }
426
427 fn to_expr(&self, col: String) -> Expr {
428 match self {
429 Self::HasAllLabels(labels) => {
430 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
431 let offsets_buffer =
432 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
433 let labels_list = ListArray::try_new(
434 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
435 offsets_buffer,
436 labels_arr,
437 None,
438 )
439 .unwrap();
440 let labels_arr = Arc::new(labels_list);
441 Expr::ScalarFunction(ScalarFunction {
442 func: Arc::new(array_has::ArrayHasAll::new().into()),
443 args: vec![
444 Expr::Column(Column::new_unqualified(col)),
445 Expr::Literal(ScalarValue::List(labels_arr)),
446 ],
447 })
448 }
449 Self::HasAnyLabel(labels) => {
450 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
451 let offsets_buffer =
452 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
453 let labels_list = ListArray::try_new(
454 Arc::new(Field::new("item", labels_arr.data_type().clone(), false)),
455 offsets_buffer,
456 labels_arr,
457 None,
458 )
459 .unwrap();
460 let labels_arr = Arc::new(labels_list);
461 Expr::ScalarFunction(ScalarFunction {
462 func: Arc::new(array_has::ArrayHasAny::new().into()),
463 args: vec![
464 Expr::Column(Column::new_unqualified(col)),
465 Expr::Literal(ScalarValue::List(labels_arr)),
466 ],
467 })
468 }
469 }
470 }
471
472 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
473 match other.as_any().downcast_ref::<Self>() {
474 Some(o) => self == o,
475 None => false,
476 }
477 }
478}
479
480#[async_trait]
482pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
483 async fn search(&self, query: &dyn AnyQuery) -> Result<RowIdTreeMap>;
487
488 async fn load(store: Arc<dyn IndexStore>) -> Result<Arc<Self>>
490 where
491 Self: Sized;
492
493 async fn remap(
495 &self,
496 mapping: &HashMap<u64, Option<u64>>,
497 dest_store: &dyn IndexStore,
498 ) -> Result<()>;
499
500 async fn update(
502 &self,
503 new_data: SendableRecordBatchStream,
504 dest_store: &dyn IndexStore,
505 ) -> Result<()>;
506}