1use arrow::buffer::{OffsetBuffer, ScalarBuffer};
7use arrow_array::{BooleanArray, ListArray, RecordBatch, UInt64Array};
8use arrow_schema::{Field, Schema};
9use async_trait::async_trait;
10use bytes::Bytes;
11use datafusion::functions::string::contains::ContainsFunc;
12use datafusion::functions_nested::array_has;
13use datafusion::physical_plan::SendableRecordBatchStream;
14use datafusion_common::{Column, scalar::ScalarValue};
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::pin::Pin;
18use std::{any::Any, ops::Bound, sync::Arc};
19
20use datafusion_expr::Expr;
21use datafusion_expr::expr::ScalarFunction;
22use deepsize::DeepSizeOf;
23use inverted::query::{FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery, fill_fts_query_column};
24use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap, RowSetOps};
25use lance_core::{Error, Result};
26use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
27use roaring::RoaringBitmap;
28use serde::Serialize;
29
30use crate::metrics::MetricsCollector;
31use crate::scalar::registry::TrainingCriteria;
32use crate::{Index, IndexParams, IndexType};
33pub use lance_table::format::IndexFile;
34
35pub mod bitmap;
36pub mod bloomfilter;
37pub mod btree;
38pub mod expression;
39pub mod inverted;
40pub mod json;
41pub mod label_list;
42pub mod lance_format;
43pub mod ngram;
44pub mod registry;
45#[cfg(feature = "geo")]
46pub mod rtree;
47pub mod zoned;
48pub mod zonemap;
49
50use crate::frag_reuse::FragReuseIndex;
51pub use inverted::tokenizer::InvertedIndexParams;
52use lance_datafusion::udf::CONTAINS_TOKENS_UDF;
53
54pub const LANCE_SCALAR_INDEX: &str = "__lance_scalar_index";
55
56#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
62pub enum BuiltinIndexType {
63 BTree,
64 Bitmap,
65 LabelList,
66 NGram,
67 ZoneMap,
68 BloomFilter,
69 RTree,
70 Inverted,
71}
72
73impl BuiltinIndexType {
74 pub fn as_str(&self) -> &str {
75 match self {
76 Self::BTree => "btree",
77 Self::Bitmap => "bitmap",
78 Self::LabelList => "labellist",
79 Self::NGram => "ngram",
80 Self::ZoneMap => "zonemap",
81 Self::Inverted => "inverted",
82 Self::BloomFilter => "bloomfilter",
83 Self::RTree => "rtree",
84 }
85 }
86}
87
88impl TryFrom<IndexType> for BuiltinIndexType {
89 type Error = Error;
90
91 fn try_from(value: IndexType) -> Result<Self> {
92 match value {
93 IndexType::BTree => Ok(Self::BTree),
94 IndexType::Bitmap => Ok(Self::Bitmap),
95 IndexType::LabelList => Ok(Self::LabelList),
96 IndexType::NGram => Ok(Self::NGram),
97 IndexType::ZoneMap => Ok(Self::ZoneMap),
98 IndexType::Inverted => Ok(Self::Inverted),
99 IndexType::BloomFilter => Ok(Self::BloomFilter),
100 IndexType::RTree => Ok(Self::RTree),
101 _ => Err(Error::index("Invalid index type".to_string())),
102 }
103 }
104}
105
106#[derive(Debug, Clone, PartialEq)]
107pub struct ScalarIndexParams {
108 pub index_type: String,
112 pub params: Option<String>,
117}
118
119impl Default for ScalarIndexParams {
120 fn default() -> Self {
121 Self {
122 index_type: BuiltinIndexType::BTree.as_str().to_string(),
123 params: None,
124 }
125 }
126}
127
128impl ScalarIndexParams {
129 pub fn for_builtin(index_type: BuiltinIndexType) -> Self {
131 Self {
132 index_type: index_type.as_str().to_string(),
133 params: None,
134 }
135 }
136
137 pub fn new(index_type: String) -> Self {
139 Self {
140 index_type,
141 params: None,
142 }
143 }
144
145 pub fn with_params<ParamsType: Serialize>(mut self, params: &ParamsType) -> Self {
147 self.params = Some(serde_json::to_string(params).unwrap());
148 self
149 }
150}
151
152impl IndexParams for ScalarIndexParams {
153 fn as_any(&self) -> &dyn std::any::Any {
154 self
155 }
156
157 fn index_name(&self) -> &str {
158 LANCE_SCALAR_INDEX
159 }
160}
161
162impl IndexParams for InvertedIndexParams {
163 fn as_any(&self) -> &dyn std::any::Any {
164 self
165 }
166
167 fn index_name(&self) -> &str {
168 "INVERTED"
169 }
170}
171
172#[async_trait]
174pub trait IndexWriter: Send {
175 async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<u64>;
179 async fn add_global_buffer(&mut self, _data: Bytes) -> Result<u32> {
181 Err(Error::not_supported(
182 "global buffers are not supported by this index writer",
183 ))
184 }
185 async fn finish(&mut self) -> Result<()>;
187 async fn finish_with_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()>;
189}
190
191#[async_trait]
193pub trait IndexReader: Send + Sync {
194 async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch>;
196 async fn read_global_buffer(&self, _index: u32) -> Result<Bytes> {
198 Err(Error::not_supported(
199 "global buffers are not supported by this index reader",
200 ))
201 }
202 async fn read_range(
207 &self,
208 range: std::ops::Range<usize>,
209 projection: Option<&[&str]>,
210 ) -> Result<RecordBatch>;
211 async fn read_range_stream(
219 &self,
220 range: std::ops::Range<usize>,
221 projection: Option<&[&str]>,
222 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
223 let batch = self.read_range(range, projection).await?;
224 let schema = batch.schema();
225 Ok(Box::pin(RecordBatchStreamAdapter::new(
226 schema,
227 futures::stream::once(async move { Ok(batch) }),
228 )))
229 }
230 async fn num_batches(&self, batch_size: u64) -> u32;
232 fn num_rows(&self) -> usize;
234 fn schema(&self) -> &lance_core::datatypes::Schema;
236}
237
238#[async_trait]
244pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
245 fn as_any(&self) -> &dyn Any;
246 fn clone_arc(&self) -> Arc<dyn IndexStore>;
247
248 fn io_parallelism(&self) -> usize;
250
251 async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
253 -> Result<Box<dyn IndexWriter>>;
254
255 async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;
257
258 async fn copy_index_file(&self, name: &str, dest_store: &dyn IndexStore) -> Result<()>;
262
263 async fn rename_index_file(&self, name: &str, new_name: &str) -> Result<()>;
265
266 async fn delete_index_file(&self, name: &str) -> Result<()>;
268
269 async fn list_files_with_sizes(&self) -> Result<Vec<IndexFile>>;
274}
275
276pub trait AnyQuery: std::fmt::Debug + Any + Send + Sync {
287 fn as_any(&self) -> &dyn Any;
289 fn format(&self, col: &str) -> String;
291 fn to_expr(&self, col: String) -> Expr;
293 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool;
295}
296
297impl PartialEq for dyn AnyQuery {
298 fn eq(&self, other: &Self) -> bool {
299 self.dyn_eq(other)
300 }
301}
302#[derive(Debug, Clone, PartialEq)]
304pub struct FullTextSearchQuery {
305 pub query: FtsQuery,
306
307 pub limit: Option<i64>,
309
310 pub wand_factor: Option<f32>,
315}
316
317impl FullTextSearchQuery {
318 pub fn new(query: String) -> Self {
320 let query = MatchQuery::new(query).into();
321 Self {
322 query,
323 limit: None,
324 wand_factor: None,
325 }
326 }
327
328 pub fn new_fuzzy(term: String, max_distance: Option<u32>) -> Self {
330 let query = MatchQuery::new(term).with_fuzziness(max_distance).into();
331 Self {
332 query,
333 limit: None,
334 wand_factor: None,
335 }
336 }
337
338 pub fn new_query(query: FtsQuery) -> Self {
340 Self {
341 query,
342 limit: None,
343 wand_factor: None,
344 }
345 }
346
347 pub fn with_column(mut self, column: String) -> Result<Self> {
350 self.query = fill_fts_query_column(&self.query, &[column], true)?;
351 Ok(self)
352 }
353
354 pub fn with_columns(mut self, columns: &[String]) -> Result<Self> {
357 self.query = fill_fts_query_column(&self.query, columns, true)?;
358 Ok(self)
359 }
360
361 pub fn limit(mut self, limit: Option<i64>) -> Self {
364 self.limit = limit;
365 self
366 }
367
368 pub fn wand_factor(mut self, wand_factor: Option<f32>) -> Self {
369 self.wand_factor = wand_factor;
370 self
371 }
372
373 pub fn columns(&self) -> HashSet<String> {
374 self.query.columns()
375 }
376
377 pub fn params(&self) -> FtsSearchParams {
378 FtsSearchParams::new()
379 .with_limit(self.limit.map(|limit| limit as usize))
380 .with_wand_factor(self.wand_factor.unwrap_or(1.0))
381 }
382}
383
384#[derive(Debug, Clone, PartialEq)]
394pub enum SargableQuery {
395 Range(Bound<ScalarValue>, Bound<ScalarValue>),
397 IsIn(Vec<ScalarValue>),
399 Equals(ScalarValue),
401 FullTextSearch(FullTextSearchQuery),
403 IsNull(),
405 LikePrefix(ScalarValue),
408}
409
410impl AnyQuery for SargableQuery {
411 fn as_any(&self) -> &dyn Any {
412 self
413 }
414
415 fn format(&self, col: &str) -> String {
416 match self {
417 Self::Range(lower, upper) => match (lower, upper) {
418 (Bound::Unbounded, Bound::Unbounded) => "true".to_string(),
419 (Bound::Unbounded, Bound::Included(rhs)) => format!("{} <= {}", col, rhs),
420 (Bound::Unbounded, Bound::Excluded(rhs)) => format!("{} < {}", col, rhs),
421 (Bound::Included(lhs), Bound::Unbounded) => format!("{} >= {}", col, lhs),
422 (Bound::Included(lhs), Bound::Included(rhs)) => {
423 format!("{} >= {} && {} <= {}", col, lhs, col, rhs)
424 }
425 (Bound::Included(lhs), Bound::Excluded(rhs)) => {
426 format!("{} >= {} && {} < {}", col, lhs, col, rhs)
427 }
428 (Bound::Excluded(lhs), Bound::Unbounded) => format!("{} > {}", col, lhs),
429 (Bound::Excluded(lhs), Bound::Included(rhs)) => {
430 format!("{} > {} && {} <= {}", col, lhs, col, rhs)
431 }
432 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => {
433 format!("{} > {} && {} < {}", col, lhs, col, rhs)
434 }
435 },
436 Self::IsIn(values) => {
437 format!(
438 "{} IN [{}]",
439 col,
440 values
441 .iter()
442 .map(|val| val.to_string())
443 .collect::<Vec<_>>()
444 .join(",")
445 )
446 }
447 Self::FullTextSearch(query) => {
448 format!("fts({})", query.query)
449 }
450 Self::IsNull() => {
451 format!("{} IS NULL", col)
452 }
453 Self::Equals(val) => {
454 format!("{} = {}", col, val)
455 }
456 Self::LikePrefix(prefix) => {
457 format!("{} LIKE '{}%'", col, prefix)
458 }
459 }
460 }
461
462 fn to_expr(&self, col: String) -> Expr {
463 let col_expr = Expr::Column(Column::new_unqualified(col));
464 match self {
465 Self::Range(lower, upper) => match (lower, upper) {
466 (Bound::Unbounded, Bound::Unbounded) => {
467 Expr::Literal(ScalarValue::Boolean(Some(true)), None)
468 }
469 (Bound::Unbounded, Bound::Included(rhs)) => {
470 col_expr.lt_eq(Expr::Literal(rhs.clone(), None))
471 }
472 (Bound::Unbounded, Bound::Excluded(rhs)) => {
473 col_expr.lt(Expr::Literal(rhs.clone(), None))
474 }
475 (Bound::Included(lhs), Bound::Unbounded) => {
476 col_expr.gt_eq(Expr::Literal(lhs.clone(), None))
477 }
478 (Bound::Included(lhs), Bound::Included(rhs)) => col_expr.between(
479 Expr::Literal(lhs.clone(), None),
480 Expr::Literal(rhs.clone(), None),
481 ),
482 (Bound::Included(lhs), Bound::Excluded(rhs)) => col_expr
483 .clone()
484 .gt_eq(Expr::Literal(lhs.clone(), None))
485 .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
486 (Bound::Excluded(lhs), Bound::Unbounded) => {
487 col_expr.gt(Expr::Literal(lhs.clone(), None))
488 }
489 (Bound::Excluded(lhs), Bound::Included(rhs)) => col_expr
490 .clone()
491 .gt(Expr::Literal(lhs.clone(), None))
492 .and(col_expr.lt_eq(Expr::Literal(rhs.clone(), None))),
493 (Bound::Excluded(lhs), Bound::Excluded(rhs)) => col_expr
494 .clone()
495 .gt(Expr::Literal(lhs.clone(), None))
496 .and(col_expr.lt(Expr::Literal(rhs.clone(), None))),
497 },
498 Self::IsIn(values) => col_expr.in_list(
499 values
500 .iter()
501 .map(|val| Expr::Literal(val.clone(), None))
502 .collect::<Vec<_>>(),
503 false,
504 ),
505 Self::FullTextSearch(query) => col_expr.like(Expr::Literal(
506 ScalarValue::Utf8(Some(query.query.to_string())),
507 None,
508 )),
509 Self::IsNull() => col_expr.is_null(),
510 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
511 Self::LikePrefix(prefix) => {
512 let pattern = match prefix {
513 ScalarValue::Utf8(Some(s)) => ScalarValue::Utf8(Some(format!("{}%", s))),
514 ScalarValue::LargeUtf8(Some(s)) => {
515 ScalarValue::LargeUtf8(Some(format!("{}%", s)))
516 }
517 other => other.clone(),
518 };
519 col_expr.like(Expr::Literal(pattern, None))
520 }
521 }
522 }
523
524 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
525 match other.as_any().downcast_ref::<Self>() {
526 Some(o) => self == o,
527 None => false,
528 }
529 }
530}
531
532#[derive(Debug, Clone, PartialEq)]
534pub enum LabelListQuery {
535 HasAllLabels(Vec<ScalarValue>),
537 HasAnyLabel(Vec<ScalarValue>),
539}
540
541impl AnyQuery for LabelListQuery {
542 fn as_any(&self) -> &dyn Any {
543 self
544 }
545
546 fn format(&self, col: &str) -> String {
547 format!("{}", self.to_expr(col.to_string()))
548 }
549
550 fn to_expr(&self, col: String) -> Expr {
551 match self {
552 Self::HasAllLabels(labels) => {
553 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
554 let offsets_buffer =
555 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
556 let labels_list = ListArray::try_new(
557 Arc::new(Field::new("item", labels_arr.data_type().clone(), true)),
558 offsets_buffer,
559 labels_arr,
560 None,
561 )
562 .unwrap();
563 let labels_arr = Arc::new(labels_list);
564 Expr::ScalarFunction(ScalarFunction {
565 func: Arc::new(array_has::ArrayHasAll::new().into()),
566 args: vec![
567 Expr::Column(Column::new_unqualified(col)),
568 Expr::Literal(ScalarValue::List(labels_arr), None),
569 ],
570 })
571 }
572 Self::HasAnyLabel(labels) => {
573 let labels_arr = ScalarValue::iter_to_array(labels.iter().cloned()).unwrap();
574 let offsets_buffer =
575 OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, labels_arr.len() as i32]));
576 let labels_list = ListArray::try_new(
577 Arc::new(Field::new("item", labels_arr.data_type().clone(), true)),
578 offsets_buffer,
579 labels_arr,
580 None,
581 )
582 .unwrap();
583 let labels_arr = Arc::new(labels_list);
584 Expr::ScalarFunction(ScalarFunction {
585 func: Arc::new(array_has::ArrayHasAny::new().into()),
586 args: vec![
587 Expr::Column(Column::new_unqualified(col)),
588 Expr::Literal(ScalarValue::List(labels_arr), None),
589 ],
590 })
591 }
592 }
593 }
594
595 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
596 match other.as_any().downcast_ref::<Self>() {
597 Some(o) => self == o,
598 None => false,
599 }
600 }
601}
602
603#[derive(Debug, Clone, PartialEq)]
605pub enum TextQuery {
606 StringContains(String),
608 }
612
613impl AnyQuery for TextQuery {
614 fn as_any(&self) -> &dyn Any {
615 self
616 }
617
618 fn format(&self, col: &str) -> String {
619 format!("{}", self.to_expr(col.to_string()))
620 }
621
622 fn to_expr(&self, col: String) -> Expr {
623 match self {
624 Self::StringContains(substr) => Expr::ScalarFunction(ScalarFunction {
625 func: Arc::new(ContainsFunc::new().into()),
626 args: vec![
627 Expr::Column(Column::new_unqualified(col)),
628 Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
629 ],
630 }),
631 }
632 }
633
634 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
635 match other.as_any().downcast_ref::<Self>() {
636 Some(o) => self == o,
637 None => false,
638 }
639 }
640}
641
642#[derive(Debug, Clone, PartialEq)]
644pub enum TokenQuery {
645 TokensContains(String),
648}
649
650#[derive(Debug, Clone, PartialEq)]
655pub enum BloomFilterQuery {
656 Equals(ScalarValue),
658 IsNull(),
660 IsIn(Vec<ScalarValue>),
662}
663
664impl AnyQuery for BloomFilterQuery {
665 fn as_any(&self) -> &dyn Any {
666 self
667 }
668
669 fn format(&self, col: &str) -> String {
670 match self {
671 Self::Equals(val) => {
672 format!("{} = {}", col, val)
673 }
674 Self::IsNull() => {
675 format!("{} IS NULL", col)
676 }
677 Self::IsIn(values) => {
678 format!(
679 "{} IN [{}]",
680 col,
681 values
682 .iter()
683 .map(|val| val.to_string())
684 .collect::<Vec<_>>()
685 .join(",")
686 )
687 }
688 }
689 }
690
691 fn to_expr(&self, col: String) -> Expr {
692 let col_expr = Expr::Column(Column::new_unqualified(col));
693 match self {
694 Self::Equals(value) => col_expr.eq(Expr::Literal(value.clone(), None)),
695 Self::IsNull() => col_expr.is_null(),
696 Self::IsIn(values) => col_expr.in_list(
697 values
698 .iter()
699 .map(|val| Expr::Literal(val.clone(), None))
700 .collect::<Vec<_>>(),
701 false,
702 ),
703 }
704 }
705
706 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
707 match other.as_any().downcast_ref::<Self>() {
708 Some(o) => self == o,
709 None => false,
710 }
711 }
712}
713
714impl AnyQuery for TokenQuery {
715 fn as_any(&self) -> &dyn Any {
716 self
717 }
718
719 fn format(&self, col: &str) -> String {
720 format!("{}", self.to_expr(col.to_string()))
721 }
722
723 fn to_expr(&self, col: String) -> Expr {
724 match self {
725 Self::TokensContains(substr) => Expr::ScalarFunction(ScalarFunction {
726 func: Arc::new(CONTAINS_TOKENS_UDF.clone()),
727 args: vec![
728 Expr::Column(Column::new_unqualified(col)),
729 Expr::Literal(ScalarValue::Utf8(Some(substr.clone())), None),
730 ],
731 }),
732 }
733 }
734
735 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
736 match other.as_any().downcast_ref::<Self>() {
737 Some(o) => self == o,
738 None => false,
739 }
740 }
741}
742
743#[cfg(feature = "geo")]
744#[derive(Debug, Clone, PartialEq)]
745pub struct RelationQuery {
746 pub value: ScalarValue,
747 pub field: Field,
748}
749
750#[cfg(feature = "geo")]
752#[derive(Debug, Clone, PartialEq)]
753pub enum GeoQuery {
754 IntersectQuery(RelationQuery),
755 IsNull,
756}
757
758#[cfg(feature = "geo")]
759impl AnyQuery for GeoQuery {
760 fn as_any(&self) -> &dyn Any {
761 self
762 }
763
764 fn format(&self, col: &str) -> String {
765 match self {
766 Self::IntersectQuery(query) => {
767 format!("Intersect({} {})", col, query.value)
768 }
769 Self::IsNull => {
770 format!("{} IS NULL", col)
771 }
772 }
773 }
774
775 fn to_expr(&self, _col: String) -> Expr {
776 todo!()
777 }
778
779 fn dyn_eq(&self, other: &dyn AnyQuery) -> bool {
780 match other.as_any().downcast_ref::<Self>() {
781 Some(o) => self == o,
782 None => false,
783 }
784 }
785}
786
787#[derive(Debug, PartialEq)]
789pub enum SearchResult {
790 Exact(NullableRowAddrSet),
792 AtMost(NullableRowAddrSet),
796 AtLeast(NullableRowAddrSet),
801}
802
803impl SearchResult {
804 pub fn exact(row_ids: impl Into<RowAddrTreeMap>) -> Self {
805 Self::Exact(NullableRowAddrSet::new(row_ids.into(), Default::default()))
806 }
807
808 pub fn at_most(row_ids: impl Into<RowAddrTreeMap>) -> Self {
809 Self::AtMost(NullableRowAddrSet::new(row_ids.into(), Default::default()))
810 }
811
812 pub fn at_least(row_ids: impl Into<RowAddrTreeMap>) -> Self {
813 Self::AtLeast(NullableRowAddrSet::new(row_ids.into(), Default::default()))
814 }
815
816 pub fn with_nulls(self, nulls: impl Into<RowAddrTreeMap>) -> Self {
817 match self {
818 Self::Exact(row_ids) => Self::Exact(row_ids.with_nulls(nulls.into())),
819 Self::AtMost(row_ids) => Self::AtMost(row_ids.with_nulls(nulls.into())),
820 Self::AtLeast(row_ids) => Self::AtLeast(row_ids.with_nulls(nulls.into())),
821 }
822 }
823
824 pub fn row_addrs(&self) -> &NullableRowAddrSet {
825 match self {
826 Self::Exact(row_addrs) => row_addrs,
827 Self::AtMost(row_addrs) => row_addrs,
828 Self::AtLeast(row_addrs) => row_addrs,
829 }
830 }
831
832 pub fn is_exact(&self) -> bool {
833 matches!(self, Self::Exact(_))
834 }
835}
836
837pub struct CreatedIndex {
839 pub index_details: prost_types::Any,
844 pub index_version: u32,
848 pub files: Option<Vec<IndexFile>>,
853}
854
855pub struct UpdateCriteria {
857 pub requires_old_data: bool,
861 pub data_criteria: TrainingCriteria,
863}
864
865#[derive(Debug, Clone)]
872pub enum OldIndexDataFilter {
873 Fragments {
877 to_keep: RoaringBitmap,
878 to_remove: RoaringBitmap,
879 },
880 RowIds(RowAddrTreeMap),
885}
886
887impl OldIndexDataFilter {
888 pub fn filter_row_ids(&self, row_ids: &UInt64Array) -> BooleanArray {
890 match self {
891 Self::Fragments { to_keep, .. } => row_ids
892 .iter()
893 .map(|id| id.map(|id| to_keep.contains((id >> 32) as u32)))
894 .collect(),
895 Self::RowIds(valid_row_ids) => row_ids
896 .iter()
897 .map(|id| id.map(|id| valid_row_ids.contains(id)))
898 .collect(),
899 }
900 }
901}
902
903impl UpdateCriteria {
904 pub fn requires_old_data(data_criteria: TrainingCriteria) -> Self {
905 Self {
906 requires_old_data: true,
907 data_criteria,
908 }
909 }
910
911 pub fn only_new_data(data_criteria: TrainingCriteria) -> Self {
912 Self {
913 requires_old_data: false,
914 data_criteria,
915 }
916 }
917}
918
919pub fn compute_next_prefix(prefix: &str) -> Option<String> {
939 if prefix.is_empty() {
940 return None;
941 }
942
943 let chars: Vec<char> = prefix.chars().collect();
944
945 for i in (0..chars.len()).rev() {
947 if let Some(next_char) = next_unicode_char(chars[i]) {
948 let mut result: String = chars[..i].iter().collect();
949 result.push(next_char);
950 return Some(result);
951 }
952 }
954
955 None
957}
958
959fn next_unicode_char(c: char) -> Option<char> {
962 let cp = c as u32;
963 let next_cp = cp.checked_add(1)?;
964
965 let next_cp = if (0xD800..=0xDFFF).contains(&next_cp) {
967 0xE000
968 } else {
969 next_cp
970 };
971
972 char::from_u32(next_cp)
973}
974
975#[async_trait]
977pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
978 async fn search(
982 &self,
983 query: &dyn AnyQuery,
984 metrics: &dyn MetricsCollector,
985 ) -> Result<SearchResult>;
986
987 fn can_remap(&self) -> bool;
989
990 async fn remap(
992 &self,
993 mapping: &HashMap<u64, Option<u64>>,
994 dest_store: &dyn IndexStore,
995 ) -> Result<CreatedIndex>;
996
997 async fn update(
1002 &self,
1003 new_data: SendableRecordBatchStream,
1004 dest_store: &dyn IndexStore,
1005 old_data_filter: Option<OldIndexDataFilter>,
1006 ) -> Result<CreatedIndex>;
1007
1008 fn update_criteria(&self) -> UpdateCriteria;
1010
1011 fn derive_index_params(&self) -> Result<ScalarIndexParams>;
1016}