1use std::collections::{HashMap, HashSet};
8use std::sync::{Arc, OnceLock};
9
10use arrow_schema::{DataType, Schema};
11use async_trait::async_trait;
12use datafusion::execution::SendableRecordBatchStream;
13use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
14use futures::{stream, StreamExt, TryStreamExt};
15use itertools::Itertools;
16use lance_core::cache::{CacheKey, UnsizedCacheKey};
17use lance_core::utils::address::RowAddress;
18use lance_core::utils::parse::str_is_truthy;
19use lance_core::utils::tracing::{
20 IO_TYPE_OPEN_FRAG_REUSE, IO_TYPE_OPEN_MEM_WAL, IO_TYPE_OPEN_SCALAR, IO_TYPE_OPEN_VECTOR,
21 TRACE_IO_EVENTS,
22};
23use lance_file::previous::reader::FileReader as PreviousFileReader;
24use lance_file::reader::FileReaderOptions;
25use lance_index::frag_reuse::{FragReuseIndex, FRAG_REUSE_INDEX_NAME};
26use lance_index::mem_wal::{MemWalIndex, MEM_WAL_INDEX_NAME};
27use lance_index::optimize::OptimizeOptions;
28use lance_index::pb::index::Implementation;
29use lance_index::scalar::expression::{
30 IndexInformationProvider, MultiQueryParser, ScalarQueryParser,
31};
32use lance_index::scalar::inverted::InvertedIndexPlugin;
33use lance_index::scalar::lance_format::LanceIndexStore;
34use lance_index::scalar::registry::{TrainingCriteria, TrainingOrdering};
35use lance_index::scalar::{CreatedIndex, ScalarIndex};
36use lance_index::vector::bq::builder::RabitQuantizer;
37use lance_index::vector::flat::index::{FlatBinQuantizer, FlatIndex, FlatQuantizer};
38use lance_index::vector::hnsw::HNSW;
39use lance_index::vector::pq::ProductQuantizer;
40use lance_index::vector::sq::ScalarQuantizer;
41pub use lance_index::IndexParams;
42use lance_index::{
43 is_system_index,
44 metrics::{MetricsCollector, NoOpMetricsCollector},
45 IndexCriteria,
46};
47use lance_index::{pb, vector::VectorIndex, Index, IndexType, INDEX_FILE_NAME};
48use lance_index::{
49 DatasetIndexExt, IndexDescription, INDEX_METADATA_SCHEMA_KEY, VECTOR_INDEX_VERSION,
50};
51use lance_io::scheduler::{ScanScheduler, SchedulerConfig};
52use lance_io::traits::Reader;
53use lance_io::utils::{
54 read_last_block, read_message, read_message_from_buf, read_metadata_offset, read_version,
55 CachedFileSize,
56};
57use lance_table::format::IndexMetadata;
58use lance_table::format::{Fragment, SelfDescribingFileReader};
59use lance_table::io::manifest::read_manifest_indexes;
60use roaring::RoaringBitmap;
61use scalar::index_matches_criteria;
62use serde_json::json;
63use snafu::location;
64use tracing::{info, instrument};
65use uuid::Uuid;
66use vector::ivf::v2::IVFIndex;
67use vector::utils::get_vector_type;
68
69pub(crate) mod append;
70mod create;
71pub mod frag_reuse;
72pub mod mem_wal;
73pub mod prefilter;
74pub mod scalar;
75pub mod vector;
76
77use self::append::merge_indices;
78use self::vector::remap_vector_index;
79use crate::dataset::index::LanceIndexStoreExt;
80use crate::dataset::optimize::remapping::RemapResult;
81use crate::dataset::optimize::RemappedIndex;
82use crate::dataset::transaction::{Operation, Transaction};
83use crate::index::frag_reuse::{load_frag_reuse_index_details, open_frag_reuse_index};
84use crate::index::mem_wal::open_mem_wal_index;
85pub use crate::index::prefilter::{FilterLoader, PreFilter};
86use crate::index::scalar::{fetch_index_details, load_training_data, IndexDetails};
87use crate::session::index_caches::{FragReuseIndexKey, IndexMetadataKey};
88use crate::{dataset::Dataset, Error, Result};
89pub use create::CreateIndexBuilder;
90
91#[derive(Debug, Clone)]
93pub struct ScalarIndexCacheKey<'a> {
94 pub uuid: &'a str,
95 pub fri_uuid: Option<&'a Uuid>,
96}
97
98impl<'a> ScalarIndexCacheKey<'a> {
99 pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
100 Self { uuid, fri_uuid }
101 }
102}
103
104impl UnsizedCacheKey for ScalarIndexCacheKey<'_> {
105 type ValueType = dyn ScalarIndex;
106
107 fn key(&self) -> std::borrow::Cow<'_, str> {
108 if let Some(fri_uuid) = self.fri_uuid {
109 format!("{}-{}", self.uuid, fri_uuid).into()
110 } else {
111 self.uuid.into()
112 }
113 }
114}
115
116#[derive(Debug, Clone)]
117pub struct VectorIndexCacheKey<'a> {
118 pub uuid: &'a str,
119 pub fri_uuid: Option<&'a Uuid>,
120}
121
122impl<'a> VectorIndexCacheKey<'a> {
123 pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
124 Self { uuid, fri_uuid }
125 }
126}
127
128impl UnsizedCacheKey for VectorIndexCacheKey<'_> {
129 type ValueType = dyn VectorIndex;
130
131 fn key(&self) -> std::borrow::Cow<'_, str> {
132 if let Some(fri_uuid) = self.fri_uuid {
133 format!("{}-{}", self.uuid, fri_uuid).into()
134 } else {
135 self.uuid.into()
136 }
137 }
138}
139
140#[derive(Debug, Clone)]
141pub struct FragReuseIndexCacheKey<'a> {
142 pub uuid: &'a str,
143 pub fri_uuid: Option<&'a Uuid>,
144}
145
146impl<'a> FragReuseIndexCacheKey<'a> {
147 pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
148 Self { uuid, fri_uuid }
149 }
150}
151
152impl CacheKey for FragReuseIndexCacheKey<'_> {
153 type ValueType = FragReuseIndex;
154
155 fn key(&self) -> std::borrow::Cow<'_, str> {
156 if let Some(fri_uuid) = self.fri_uuid {
157 format!("{}-{}", self.uuid, fri_uuid).into()
158 } else {
159 self.uuid.into()
160 }
161 }
162}
163
164#[derive(Debug, Clone)]
165pub struct MemWalCacheKey<'a> {
166 pub uuid: &'a Uuid,
167 pub fri_uuid: Option<&'a Uuid>,
168}
169
170impl<'a> MemWalCacheKey<'a> {
171 pub fn new(uuid: &'a Uuid, fri_uuid: Option<&'a Uuid>) -> Self {
172 Self { uuid, fri_uuid }
173 }
174}
175
176impl CacheKey for MemWalCacheKey<'_> {
177 type ValueType = MemWalIndex;
178
179 fn key(&self) -> std::borrow::Cow<'_, str> {
180 if let Some(fri_uuid) = self.fri_uuid {
181 format!("{}-{}", self.uuid, fri_uuid).into()
182 } else {
183 self.uuid.to_string().into()
184 }
185 }
186}
187
188fn auto_migrate_corruption() -> bool {
190 static LANCE_AUTO_MIGRATION: OnceLock<bool> = OnceLock::new();
191 *LANCE_AUTO_MIGRATION.get_or_init(|| {
192 std::env::var("LANCE_AUTO_MIGRATION")
193 .ok()
194 .map(|s| str_is_truthy(&s))
195 .unwrap_or(true)
196 })
197}
198
199#[async_trait]
201pub trait IndexBuilder {
202 fn index_type() -> IndexType;
203
204 async fn build(&self) -> Result<()>;
205}
206
207pub(crate) async fn remap_index(
208 dataset: &Dataset,
209 index_id: &Uuid,
210 row_id_map: &HashMap<u64, Option<u64>>,
211) -> Result<RemapResult> {
212 let indices = dataset.load_indices().await?;
214 let matched = indices
215 .iter()
216 .find(|i| i.uuid == *index_id)
217 .ok_or_else(|| Error::Index {
218 message: format!("Index with id {} does not exist", index_id),
219 location: location!(),
220 })?;
221
222 if matched.fields.len() > 1 {
223 return Err(Error::Index {
224 message: "Remapping indices with multiple fields is not supported".to_string(),
225 location: location!(),
226 });
227 }
228
229 if row_id_map.values().all(|v| v.is_none()) {
230 let deleted_bitmap = RoaringBitmap::from_iter(
231 row_id_map
232 .keys()
233 .map(|row_id| RowAddress::new_from_u64(*row_id))
234 .map(|addr| addr.fragment_id()),
235 );
236 if Some(deleted_bitmap) == matched.fragment_bitmap {
237 return Ok(RemapResult::Keep(*index_id));
242 }
243 }
244
245 let field_id = matched
246 .fields
247 .first()
248 .expect("An index existed with no fields");
249 let field_path = dataset.schema().field_path(*field_id)?;
250
251 let new_id = Uuid::new_v4();
252
253 let generic = dataset
254 .open_generic_index(&field_path, &index_id.to_string(), &NoOpMetricsCollector)
255 .await?;
256
257 let created_index = match generic.index_type() {
258 it if it.is_scalar() => {
259 let new_store = LanceIndexStore::from_dataset_for_new(dataset, &new_id.to_string())?;
260
261 let scalar_index = dataset
262 .open_scalar_index(&field_path, &index_id.to_string(), &NoOpMetricsCollector)
263 .await?;
264 if !scalar_index.can_remap() {
265 return Ok(RemapResult::Drop);
266 }
267
268 match scalar_index.index_type() {
269 IndexType::Inverted => {
270 let inverted_index = scalar_index
271 .as_any()
272 .downcast_ref::<lance_index::scalar::inverted::InvertedIndex>()
273 .ok_or(Error::Index {
274 message: "expected inverted index".to_string(),
275 location: location!(),
276 })?;
277 if inverted_index.is_legacy() {
278 log::warn!("reindex because of legacy format, index_type: {}, index_id: {}, field: {}",
279 scalar_index.index_type(),
280 index_id,
281 field_path
282 );
283 let training_data = load_training_data(
284 dataset,
285 &field_path,
286 &TrainingCriteria::new(TrainingOrdering::None),
287 None,
288 true, None,
290 )
291 .await?;
292 InvertedIndexPlugin::train_inverted_index(
293 training_data,
294 &new_store,
295 inverted_index.params().clone(),
296 None,
297 )
298 .await?
299 } else {
300 scalar_index.remap(row_id_map, &new_store).await?
301 }
302 }
303 _ => scalar_index.remap(row_id_map, &new_store).await?,
304 }
305 }
306 it if it.is_vector() => {
307 remap_vector_index(
308 Arc::new(dataset.clone()),
309 &field_path,
310 index_id,
311 &new_id,
312 matched,
313 row_id_map,
314 )
315 .await?;
316 CreatedIndex {
317 index_details: prost_types::Any::from_msg(
318 &lance_table::format::pb::VectorIndexDetails::default(),
319 )
320 .unwrap(),
321 index_version: VECTOR_INDEX_VERSION,
322 }
323 }
324 _ => {
325 return Err(Error::Index {
326 message: format!("Index type {} is not supported", generic.index_type()),
327 location: location!(),
328 });
329 }
330 };
331
332 Ok(RemapResult::Remapped(RemappedIndex {
333 old_id: *index_id,
334 new_id,
335 index_details: created_index.index_details,
336 index_version: created_index.index_version,
337 }))
338}
339
340#[derive(Debug)]
341pub struct ScalarIndexInfo {
342 indexed_columns: HashMap<String, (DataType, Box<MultiQueryParser>)>,
343}
344
345impl IndexInformationProvider for ScalarIndexInfo {
346 fn get_index(&self, col: &str) -> Option<(&DataType, &dyn ScalarQueryParser)> {
347 self.indexed_columns
348 .get(col)
349 .map(|(ty, parser)| (ty, parser.as_ref() as &dyn ScalarQueryParser))
350 }
351}
352
353async fn open_index_proto(reader: &dyn Reader) -> Result<pb::Index> {
354 let file_size = reader.size().await?;
355 let tail_bytes = read_last_block(reader).await?;
356 let metadata_pos = read_metadata_offset(&tail_bytes)?;
357 let proto: pb::Index = if metadata_pos < file_size - tail_bytes.len() {
358 read_message(reader, metadata_pos).await?
360 } else {
361 let offset = tail_bytes.len() - (file_size - metadata_pos);
362 read_message_from_buf(&tail_bytes.slice(offset..))?
363 };
364 Ok(proto)
365}
366
367fn vector_index_details() -> prost_types::Any {
368 let details = lance_table::format::pb::VectorIndexDetails::default();
369 prost_types::Any::from_msg(&details).unwrap()
370}
371
372struct IndexDescriptionImpl {
373 name: String,
374 field_ids: Vec<u32>,
375 segments: Vec<IndexMetadata>,
376 index_type: String,
377 details: IndexDetails,
378 rows_indexed: u64,
379}
380
381impl IndexDescriptionImpl {
382 fn try_new(segments: Vec<IndexMetadata>, dataset: &Dataset) -> Result<Self> {
383 if segments.is_empty() {
384 return Err(Error::Index {
385 message: "Index metadata is empty".to_string(),
386 location: location!(),
387 });
388 }
389
390 let example_metadata = &segments[0];
392
393 let name = example_metadata.name.clone();
394 if !segments.iter().all(|shard| shard.name == name) {
395 return Err(Error::Index {
396 message: "Index name should be identical across all segments".to_string(),
397 location: location!(),
398 });
399 }
400
401 let field_ids = &example_metadata.fields;
402 if !segments.iter().all(|shard| shard.fields == *field_ids) {
403 return Err(Error::Index {
404 message: "Index fields should be identical across all segments".to_string(),
405 location: location!(),
406 });
407 }
408 let field_ids = field_ids.iter().map(|id| *id as u32).collect();
409
410 let index_details = example_metadata.index_details.as_ref().ok_or(Error::Index {
412 message:
413 "Index details are required for index description. This index must be retrained to support this method."
414 .to_string(),
415 location: location!(),
416 })?;
417 let type_url = &index_details.type_url;
418 if !segments.iter().all(|shard| {
419 shard
420 .index_details
421 .as_ref()
422 .map(|d| d.type_url == *type_url)
423 .unwrap_or(false)
424 }) {
425 return Err(Error::Index {
426 message: "Index type URL should be present and identical across all segments"
427 .to_string(),
428 location: location!(),
429 });
430 }
431
432 let details = IndexDetails(index_details.clone());
433 let mut rows_indexed = 0;
434
435 let index_type = details
436 .get_plugin()
437 .map(|p| p.name().to_string())
438 .unwrap_or_else(|_| "Unknown".to_string());
439
440 for shard in &segments {
441 let fragment_bitmap = shard
442 .fragment_bitmap
443 .as_ref()
444 .ok_or_else(|| Error::Index {
445 message: "Fragment bitmap is required for index description. This index must be retrained to support this method.".to_string(),
446 location: location!(),
447 })?;
448
449 for fragment in dataset.get_fragments() {
450 if fragment_bitmap.contains(fragment.id() as u32) {
451 rows_indexed += fragment.fast_logical_rows()? as u64;
452 }
453 }
454 }
455
456 Ok(Self {
457 name,
458 field_ids,
459 index_type,
460 segments,
461 details,
462 rows_indexed,
463 })
464 }
465}
466
467impl IndexDescription for IndexDescriptionImpl {
468 fn name(&self) -> &str {
469 &self.name
470 }
471
472 fn field_ids(&self) -> &[u32] {
473 &self.field_ids
474 }
475
476 fn index_type(&self) -> &str {
477 &self.index_type
478 }
479
480 fn metadata(&self) -> &[IndexMetadata] {
481 &self.segments
482 }
483
484 fn type_url(&self) -> &str {
485 self.details.0.type_url.as_str()
486 }
487
488 fn rows_indexed(&self) -> u64 {
489 self.rows_indexed
490 }
491
492 fn details(&self) -> Result<String> {
493 let plugin = self.details.get_plugin()?;
494 plugin
495 .details_as_json(&self.details.0)
496 .map(|v| v.to_string())
497 }
498}
499
500#[async_trait]
501impl DatasetIndexExt for Dataset {
502 type IndexBuilder<'a> = CreateIndexBuilder<'a>;
503
504 fn create_index_builder<'a>(
540 &'a mut self,
541 columns: &[&str],
542 index_type: IndexType,
543 params: &'a dyn IndexParams,
544 ) -> CreateIndexBuilder<'a> {
545 CreateIndexBuilder::new(self, columns, index_type, params)
546 }
547
548 #[instrument(skip_all)]
549 async fn create_index(
550 &mut self,
551 columns: &[&str],
552 index_type: IndexType,
553 name: Option<String>,
554 params: &dyn IndexParams,
555 replace: bool,
556 ) -> Result<()> {
557 let mut builder = self.create_index_builder(columns, index_type, params);
559
560 if let Some(name) = name {
561 builder = builder.name(name);
562 }
563
564 builder.replace(replace).await
565 }
566
567 async fn drop_index(&mut self, name: &str) -> Result<()> {
568 let indices = self.load_indices_by_name(name).await?;
569 if indices.is_empty() {
570 return Err(Error::IndexNotFound {
571 identity: format!("name={}", name),
572 location: location!(),
573 });
574 }
575
576 let transaction = Transaction::new(
577 self.manifest.version,
578 Operation::CreateIndex {
579 new_indices: vec![],
580 removed_indices: indices.clone(),
581 },
582 None,
583 );
584
585 self.apply_commit(transaction, &Default::default(), &Default::default())
586 .await?;
587
588 Ok(())
589 }
590
591 async fn prewarm_index(&self, name: &str) -> Result<()> {
592 let indices = self.load_indices_by_name(name).await?;
593 if indices.is_empty() {
594 return Err(Error::IndexNotFound {
595 identity: format!("name={}", name),
596 location: location!(),
597 });
598 }
599
600 let index = self
601 .open_generic_index(name, &indices[0].uuid.to_string(), &NoOpMetricsCollector)
602 .await?;
603 index.prewarm().await?;
604
605 Ok(())
606 }
607
608 async fn describe_indices<'a, 'b>(
609 &'a self,
610 criteria: Option<IndexCriteria<'b>>,
611 ) -> Result<Vec<Arc<dyn IndexDescription>>> {
612 let indices = self.load_indices().await?;
613 let mut indices = if let Some(criteria) = criteria {
614 indices.iter().filter(|idx| {
615 if idx.index_details.is_none() {
616 log::warn!("The method describe_indices does not support indexes without index details. Please retrain the index {}", idx.name);
617 return false;
618 }
619 let fields = idx
620 .fields
621 .iter()
622 .filter_map(|id| self.schema().field_by_id(*id))
623 .collect::<Vec<_>>();
624 match index_matches_criteria(idx, &criteria, &fields, false, self.schema()) {
625 Ok(matched) => matched,
626 Err(err) => {
627 log::warn!("Could not describe index {}: {}", idx.name, err);
628 false
629 }
630 }
631 }).collect::<Vec<_>>()
632 } else {
633 indices.iter().collect::<Vec<_>>()
634 };
635 indices.sort_by_key(|idx| &idx.name);
636
637 indices
638 .into_iter()
639 .chunk_by(|idx| &idx.name)
640 .into_iter()
641 .map(|(_, segments)| {
642 let segments = segments.cloned().collect::<Vec<_>>();
643 let desc = IndexDescriptionImpl::try_new(segments, self)?;
644 Ok(Arc::new(desc) as Arc<dyn IndexDescription>)
645 })
646 .collect::<Result<Vec<_>>>()
647 }
648
649 async fn load_indices(&self) -> Result<Arc<Vec<IndexMetadata>>> {
650 let metadata_key = IndexMetadataKey {
651 version: self.version().version,
652 };
653 let indices = match self.index_cache.get_with_key(&metadata_key).await {
654 Some(indices) => indices,
655 None => {
656 let mut loaded_indices = read_manifest_indexes(
657 &self.object_store,
658 &self.manifest_location,
659 &self.manifest,
660 )
661 .await?;
662 retain_supported_indices(&mut loaded_indices);
663 let loaded_indices = Arc::new(loaded_indices);
664 self.index_cache
665 .insert_with_key(&metadata_key, loaded_indices.clone())
666 .await;
667 loaded_indices
668 }
669 };
670
671 if let Some(frag_reuse_index_meta) =
672 indices.iter().find(|idx| idx.name == FRAG_REUSE_INDEX_NAME)
673 {
674 let uuid = frag_reuse_index_meta.uuid.to_string();
675 let fri_key = FragReuseIndexKey { uuid: &uuid };
676 let frag_reuse_index = self
677 .index_cache
678 .get_or_insert_with_key(fri_key, || async move {
679 let index_details =
680 load_frag_reuse_index_details(self, frag_reuse_index_meta).await?;
681 open_frag_reuse_index(frag_reuse_index_meta.uuid, index_details.as_ref()).await
682 })
683 .await?;
684 let mut indices = indices.as_ref().clone();
685 indices.iter_mut().for_each(|idx| {
686 if let Some(bitmap) = idx.fragment_bitmap.as_mut() {
687 frag_reuse_index.remap_fragment_bitmap(bitmap).unwrap();
688 }
689 });
690 Ok(Arc::new(indices))
691 } else {
692 Ok(indices)
693 }
694 }
695
696 async fn commit_existing_index(
697 &mut self,
698 index_name: &str,
699 column: &str,
700 index_id: Uuid,
701 ) -> Result<()> {
702 let Some(field) = self.schema().field(column) else {
703 return Err(Error::Index {
704 message: format!("CreateIndex: column '{column}' does not exist"),
705 location: location!(),
706 });
707 };
708
709 let new_idx = IndexMetadata {
713 uuid: index_id,
714 name: index_name.to_string(),
715 fields: vec![field.id],
716 dataset_version: self.manifest.version,
717 fragment_bitmap: Some(self.get_fragments().iter().map(|f| f.id() as u32).collect()),
718 index_details: None,
719 index_version: 0,
720 created_at: Some(chrono::Utc::now()),
721 base_id: None, };
723
724 let transaction = Transaction::new(
725 self.manifest.version,
726 Operation::CreateIndex {
727 new_indices: vec![new_idx],
728 removed_indices: vec![],
729 },
730 None,
731 );
732
733 self.apply_commit(transaction, &Default::default(), &Default::default())
734 .await?;
735
736 Ok(())
737 }
738
739 async fn load_scalar_index<'a, 'b>(
740 &'a self,
741 criteria: IndexCriteria<'b>,
742 ) -> Result<Option<IndexMetadata>> {
743 let indices = self.load_indices().await?;
744
745 let mut indices = indices
746 .iter()
747 .filter(|idx| {
748 if idx.fields.is_empty() {
751 if idx.name != FRAG_REUSE_INDEX_NAME {
752 log::error!("Index {} has no fields", idx.name);
753 }
754 false
755 } else {
756 true
757 }
758 })
759 .collect::<Vec<_>>();
760 indices.sort_by_key(|idx| idx.fields[0]);
767 let indice_by_field = indices.into_iter().chunk_by(|idx| idx.fields[0]);
768 for (field_id, indices) in &indice_by_field {
769 let indices = indices.collect::<Vec<_>>();
770 let has_multiple = indices.len() > 1;
771 for idx in indices {
772 let field = self.schema().field_by_id(field_id);
773 if let Some(field) = field {
774 if index_matches_criteria(
775 idx,
776 &criteria,
777 &[field],
778 has_multiple,
779 self.schema(),
780 )? {
781 let non_empty = idx.fragment_bitmap.as_ref().is_some_and(|bitmap| {
782 bitmap.intersection_len(self.fragment_bitmap.as_ref()) > 0
783 });
784 let is_fts_index = if let Some(details) = &idx.index_details {
785 IndexDetails(details.clone()).supports_fts()
786 } else {
787 false
788 };
789 if non_empty || is_fts_index {
794 return Ok(Some(idx.clone()));
795 }
796 }
797 }
798 }
799 }
800 return Ok(None);
801 }
802
803 #[instrument(skip_all)]
804 async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()> {
805 let dataset = Arc::new(self.clone());
806 let indices = self.load_indices().await?;
807
808 let indices_to_optimize = options
809 .index_names
810 .as_ref()
811 .map(|names| names.iter().collect::<HashSet<_>>());
812 let name_to_indices = indices
813 .iter()
814 .filter(|idx| {
815 indices_to_optimize
816 .as_ref()
817 .is_none_or(|names| names.contains(&idx.name))
818 && !is_system_index(idx)
819 })
820 .map(|idx| (idx.name.clone(), idx))
821 .into_group_map();
822
823 let mut new_indices = vec![];
824 let mut removed_indices = vec![];
825 for deltas in name_to_indices.values() {
826 let Some(res) = merge_indices(dataset.clone(), deltas.as_slice(), options).await?
827 else {
828 continue;
829 };
830
831 let last_idx = deltas.last().expect("Delta indices should not be empty");
832 let new_idx = IndexMetadata {
833 uuid: res.new_uuid,
834 name: last_idx.name.clone(), fields: last_idx.fields.clone(),
836 dataset_version: self.manifest.version,
837 fragment_bitmap: Some(res.new_fragment_bitmap),
838 index_details: Some(Arc::new(res.new_index_details)),
839 index_version: res.new_index_version,
840 created_at: Some(chrono::Utc::now()),
841 base_id: None, };
843 removed_indices.extend(res.removed_indices.iter().map(|&idx| idx.clone()));
844 if deltas.len() > res.removed_indices.len() {
845 new_indices.extend(
846 deltas[0..(deltas.len() - res.removed_indices.len())]
847 .iter()
848 .map(|&idx| idx.clone()),
849 );
850 }
851 new_indices.push(new_idx);
852 }
853
854 if new_indices.is_empty() {
855 return Ok(());
856 }
857
858 let transaction = Transaction::new(
859 self.manifest.version,
860 Operation::CreateIndex {
861 new_indices,
862 removed_indices,
863 },
864 None,
865 );
866
867 self.apply_commit(transaction, &Default::default(), &Default::default())
868 .await?;
869
870 Ok(())
871 }
872
873 async fn index_statistics(&self, index_name: &str) -> Result<String> {
874 let metadatas = self.load_indices_by_name(index_name).await?;
875 if metadatas.is_empty() {
876 return Err(Error::IndexNotFound {
877 identity: format!("name={}", index_name),
878 location: location!(),
879 });
880 }
881
882 if index_name == FRAG_REUSE_INDEX_NAME {
883 let index = self
884 .open_frag_reuse_index(&NoOpMetricsCollector)
885 .await?
886 .expect("FragmentReuse index does not exist");
887 return serde_json::to_string(&index.statistics()?).map_err(|e| Error::Index {
888 message: format!("Failed to serialize index statistics: {}", e),
889 location: location!(),
890 });
891 }
892
893 if index_name == MEM_WAL_INDEX_NAME {
894 let index = self
895 .open_mem_wal_index(&NoOpMetricsCollector)
896 .await?
897 .expect("MemWal index does not exist");
898 return serde_json::to_string(&index.statistics()?).map_err(|e| Error::Index {
899 message: format!("Failed to serialize index statistics: {}", e),
900 location: location!(),
901 });
902 }
903
904 let field_id = metadatas[0].fields[0];
905 let field_path = self.schema().field_path(field_id)?;
906
907 let indices = stream::iter(metadatas.iter())
909 .then(|m| {
910 let field_path = field_path.clone();
911 async move {
912 self.open_generic_index(&field_path, &m.uuid.to_string(), &NoOpMetricsCollector)
913 .await
914 }
915 })
916 .try_collect::<Vec<_>>()
917 .await?;
918
919 let indices_stats = indices
921 .iter()
922 .map(|idx| idx.statistics())
923 .collect::<Result<Vec<_>>>()?;
924
925 let index_type = indices[0].index_type().to_string();
926
927 let indexed_fragments_per_delta = self.indexed_fragments(index_name).await?;
928
929 let res = indexed_fragments_per_delta
930 .iter()
931 .map(|frags| {
932 let mut sum = 0;
933 for frag in frags.iter() {
934 sum += frag.num_rows().ok_or_else(|| Error::Internal {
935 message: "Fragment should have row counts, please upgrade lance and \
936 trigger a single write to fix this"
937 .to_string(),
938 location: location!(),
939 })?;
940 }
941 Ok(sum)
942 })
943 .collect::<Result<Vec<_>>>();
944
945 async fn migrate_and_recompute(ds: &Dataset, index_name: &str) -> Result<String> {
946 let mut ds = ds.clone();
947 log::warn!(
948 "Detecting out-dated fragment metadata, migrating dataset. \
949 To disable migration, set LANCE_AUTO_MIGRATION=false"
950 );
951 ds.delete("false").await.map_err(|err| {
952 Error::Execution {
953 message: format!("Failed to migrate dataset while calculating index statistics. \
954 To disable migration, set LANCE_AUTO_MIGRATION=false. Original error: {}", err),
955 location: location!(),
956 }
957 })?;
958 ds.index_statistics(index_name).await
959 }
960
961 let num_indexed_rows_per_delta = match res {
962 Ok(rows) => rows,
963 Err(Error::Internal { message, .. })
964 if auto_migrate_corruption() && message.contains("trigger a single write") =>
965 {
966 return migrate_and_recompute(self, index_name).await;
967 }
968 Err(e) => return Err(e),
969 };
970
971 let mut fragment_ids = HashSet::new();
972 for frags in indexed_fragments_per_delta.iter() {
973 for frag in frags.iter() {
974 if !fragment_ids.insert(frag.id) {
975 if auto_migrate_corruption() {
976 return migrate_and_recompute(self, index_name).await;
977 } else {
978 return Err(Error::Internal {
979 message:
980 "Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \
981 and trigger a single write to fix this"
982 .to_string(),
983 location: location!(),
984 });
985 }
986 }
987 }
988 }
989 let num_indexed_fragments = fragment_ids.len();
990
991 let num_unindexed_fragments = self.fragments().len() - num_indexed_fragments;
992 let num_indexed_rows: usize = num_indexed_rows_per_delta.iter().cloned().sum();
993 let num_unindexed_rows = self.count_rows(None).await? - num_indexed_rows;
994
995 let updated_at = metadatas
997 .iter()
998 .filter_map(|m| m.created_at)
999 .max()
1000 .map(|dt| dt.timestamp_millis() as u64);
1001
1002 let stats = json!({
1003 "index_type": index_type,
1004 "name": index_name,
1005 "num_indices": metadatas.len(),
1006 "indices": indices_stats,
1007 "num_indexed_fragments": num_indexed_fragments,
1008 "num_indexed_rows": num_indexed_rows,
1009 "num_unindexed_fragments": num_unindexed_fragments,
1010 "num_unindexed_rows": num_unindexed_rows,
1011 "num_indexed_rows_per_delta": num_indexed_rows_per_delta,
1012 "updated_at_timestamp_ms": updated_at,
1013 });
1014
1015 serde_json::to_string(&stats).map_err(|e| Error::Index {
1016 message: format!("Failed to serialize index statistics: {}", e),
1017 location: location!(),
1018 })
1019 }
1020
1021 async fn read_index_partition(
1022 &self,
1023 index_name: &str,
1024 partition_id: usize,
1025 with_vector: bool,
1026 ) -> Result<SendableRecordBatchStream> {
1027 let indices = self.load_indices_by_name(index_name).await?;
1028 if indices.is_empty() {
1029 return Err(Error::IndexNotFound {
1030 identity: format!("name={}", index_name),
1031 location: location!(),
1032 });
1033 }
1034 let column = self.schema().field_by_id(indices[0].fields[0]).unwrap();
1035
1036 let mut schema: Option<Arc<Schema>> = None;
1037 let mut partition_streams = Vec::with_capacity(indices.len());
1038 for index in indices {
1039 let index = self
1040 .open_vector_index(&column.name, &index.uuid.to_string(), &NoOpMetricsCollector)
1041 .await?;
1042
1043 let stream = index
1044 .partition_reader(partition_id, with_vector, &NoOpMetricsCollector)
1045 .await?;
1046 if schema.is_none() {
1047 schema = Some(stream.schema());
1048 }
1049 partition_streams.push(stream);
1050 }
1051
1052 match schema {
1053 Some(schema) => {
1054 let merged = stream::select_all(partition_streams);
1055 let stream = RecordBatchStreamAdapter::new(schema, merged);
1056 Ok(Box::pin(stream))
1057 }
1058 None => Ok(Box::pin(RecordBatchStreamAdapter::new(
1059 Arc::new(Schema::empty()),
1060 stream::empty(),
1061 ))),
1062 }
1063 }
1064}
1065
1066pub(crate) fn retain_supported_indices(indices: &mut Vec<IndexMetadata>) {
1067 indices.retain(|idx| {
1068 let max_supported_version = idx
1069 .index_details
1070 .as_ref()
1071 .map(|details| {
1072 IndexDetails(details.clone())
1073 .index_version()
1074 .unwrap_or(i32::MAX as u32)
1076 })
1077 .unwrap_or_default();
1078 let is_valid = idx.index_version <= max_supported_version as i32;
1079 if !is_valid {
1080 log::warn!(
1081 "Index {} has version {}, which is not supported (<={}), ignoring it",
1082 idx.name,
1083 idx.index_version,
1084 max_supported_version,
1085 );
1086 }
1087 is_valid
1088 })
1089}
1090
1091#[async_trait]
1095pub trait DatasetIndexInternalExt: DatasetIndexExt {
1096 async fn open_generic_index(
1098 &self,
1099 column: &str,
1100 uuid: &str,
1101 metrics: &dyn MetricsCollector,
1102 ) -> Result<Arc<dyn Index>>;
1103 async fn open_scalar_index(
1105 &self,
1106 column: &str,
1107 uuid: &str,
1108 metrics: &dyn MetricsCollector,
1109 ) -> Result<Arc<dyn ScalarIndex>>;
1110 async fn open_vector_index(
1112 &self,
1113 column: &str,
1114 uuid: &str,
1115 metrics: &dyn MetricsCollector,
1116 ) -> Result<Arc<dyn VectorIndex>>;
1117
1118 async fn open_frag_reuse_index(
1120 &self,
1121 metrics: &dyn MetricsCollector,
1122 ) -> Result<Option<Arc<FragReuseIndex>>>;
1123
1124 async fn open_mem_wal_index(
1126 &self,
1127 metrics: &dyn MetricsCollector,
1128 ) -> Result<Option<Arc<MemWalIndex>>>;
1129
1130 async fn frag_reuse_index_uuid(&self) -> Option<Uuid>;
1132
1133 async fn scalar_index_info(&self) -> Result<ScalarIndexInfo>;
1135
1136 async fn unindexed_fragments(&self, idx_name: &str) -> Result<Vec<Fragment>>;
1138
1139 async fn indexed_fragments(&self, idx_name: &str) -> Result<Vec<Vec<Fragment>>>;
1141
1142 async fn initialize_index(&mut self, source_dataset: &Dataset, index_name: &str) -> Result<()>;
1144
1145 async fn initialize_indices(&mut self, source_dataset: &Dataset) -> Result<()>;
1148}
1149
1150#[async_trait]
1151impl DatasetIndexInternalExt for Dataset {
1152 async fn open_generic_index(
1153 &self,
1154 column: &str,
1155 uuid: &str,
1156 metrics: &dyn MetricsCollector,
1157 ) -> Result<Arc<dyn Index>> {
1158 let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1160 let cache_key = ScalarIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1161 if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
1162 return Ok(index.as_index());
1163 }
1164
1165 let vector_cache_key = VectorIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1166 if let Some(index) = self
1167 .index_cache
1168 .get_unsized_with_key(&vector_cache_key)
1169 .await
1170 {
1171 return Ok(index.as_index());
1172 }
1173
1174 let frag_reuse_cache_key = FragReuseIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1175 if let Some(index) = self.index_cache.get_with_key(&frag_reuse_cache_key).await {
1176 return Ok(index.as_index());
1177 }
1178
1179 let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1188 message: format!("Index with id {} does not exist", uuid),
1189 location: location!(),
1190 })?;
1191 let index_dir = self.indice_files_dir(&index_meta)?;
1192 let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME);
1193 if self.object_store.exists(&index_file).await? {
1194 let index = self.open_vector_index(column, uuid, metrics).await?;
1195 Ok(index.as_index())
1196 } else {
1197 let index = self.open_scalar_index(column, uuid, metrics).await?;
1198 Ok(index.as_index())
1199 }
1200 }
1201
1202 async fn open_scalar_index(
1203 &self,
1204 column: &str,
1205 uuid: &str,
1206 metrics: &dyn MetricsCollector,
1207 ) -> Result<Arc<dyn ScalarIndex>> {
1208 let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1209 let cache_key = ScalarIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1210 if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
1211 return Ok(index);
1212 }
1213
1214 let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1215 message: format!("Index with id {} does not exist", uuid),
1216 location: location!(),
1217 })?;
1218
1219 let index = scalar::open_scalar_index(self, column, &index_meta, metrics).await?;
1220
1221 info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_SCALAR, index_type=index.index_type().to_string());
1222 metrics.record_index_load();
1223
1224 self.index_cache
1225 .insert_unsized_with_key(&cache_key, index.clone())
1226 .await;
1227 Ok(index)
1228 }
1229
1230 async fn open_vector_index(
1231 &self,
1232 column: &str,
1233 uuid: &str,
1234 metrics: &dyn MetricsCollector,
1235 ) -> Result<Arc<dyn VectorIndex>> {
1236 let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1237 let cache_key = VectorIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1238
1239 if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
1240 log::debug!("Found vector index in cache uuid: {}", uuid);
1241 return Ok(index);
1242 }
1243
1244 let frag_reuse_index = self.open_frag_reuse_index(metrics).await?;
1245 let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1246 message: format!("Index with id {} does not exist", uuid),
1247 location: location!(),
1248 })?;
1249 let index_dir = self.indice_files_dir(&index_meta)?;
1250 let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME);
1251 let reader: Arc<dyn Reader> = self.object_store.open(&index_file).await?.into();
1252
1253 let tailing_bytes = read_last_block(reader.as_ref()).await?;
1254 let (major_version, minor_version) = read_version(&tailing_bytes)?;
1255
1256 let index_cache = self.index_cache.with_key_prefix(&cache_key.key());
1258
1259 let index = match (major_version, minor_version) {
1262 (0, 1) | (0, 0) => {
1263 info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.1", index_type="IVF_PQ");
1264 let proto = open_index_proto(reader.as_ref()).await?;
1265 match &proto.implementation {
1266 Some(Implementation::VectorIndex(vector_index)) => {
1267 let dataset = Arc::new(self.clone());
1268 vector::open_vector_index(
1269 dataset,
1270 uuid,
1271 vector_index,
1272 reader,
1273 frag_reuse_index,
1274 )
1275 .await
1276 }
1277 None => Err(Error::Internal {
1278 message: "Index proto was missing implementation field".into(),
1279 location: location!(),
1280 }),
1281 }
1282 }
1283
1284 (0, 2) => {
1285 info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.2", index_type="IVF_PQ");
1286 let reader = PreviousFileReader::try_new_self_described_from_reader(
1287 reader.clone(),
1288 Some(&self.metadata_cache.file_metadata_cache(&index_file)),
1289 )
1290 .await?;
1291 vector::open_vector_index_v2(
1292 Arc::new(self.clone()),
1293 column,
1294 uuid,
1295 reader,
1296 frag_reuse_index,
1297 )
1298 .await
1299 }
1300
1301 (0, 3) | (2, _) => {
1302 let scheduler = ScanScheduler::new(
1303 self.object_store.clone(),
1304 SchedulerConfig::max_bandwidth(&self.object_store),
1305 );
1306 let file = scheduler
1307 .open_file(&index_file, &CachedFileSize::unknown())
1308 .await?;
1309 let reader = lance_file::reader::FileReader::try_open(
1310 file,
1311 None,
1312 Default::default(),
1313 &self.metadata_cache.file_metadata_cache(&index_file),
1314 FileReaderOptions::default(),
1315 )
1316 .await?;
1317 let index_metadata = reader
1318 .schema()
1319 .metadata
1320 .get(INDEX_METADATA_SCHEMA_KEY)
1321 .ok_or(Error::Index {
1322 message: "Index Metadata not found".to_owned(),
1323 location: location!(),
1324 })?;
1325 let index_metadata: lance_index::IndexMetadata =
1326 serde_json::from_str(index_metadata)?;
1327 let field = self.schema().field(column).ok_or_else(|| Error::Index {
1328 message: format!("Column {} does not exist in the schema", column),
1329 location: location!(),
1330 })?;
1331
1332 let (_, element_type) = get_vector_type(self.schema(), column)?;
1333
1334 info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.3", index_type=index_metadata.index_type);
1335
1336 match index_metadata.index_type.as_str() {
1337 "IVF_FLAT" => match element_type {
1338 DataType::Float16 | DataType::Float32 | DataType::Float64 => {
1339 let ivf = IVFIndex::<FlatIndex, FlatQuantizer>::try_new(
1340 self.object_store.clone(),
1341 index_dir,
1342 uuid.to_owned(),
1343 frag_reuse_index,
1344 self.metadata_cache.as_ref(),
1345 index_cache,
1346 )
1347 .await?;
1348 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1349 }
1350 DataType::UInt8 => {
1351 let ivf = IVFIndex::<FlatIndex, FlatBinQuantizer>::try_new(
1352 self.object_store.clone(),
1353 index_dir,
1354 uuid.to_owned(),
1355 frag_reuse_index,
1356 self.metadata_cache.as_ref(),
1357 index_cache,
1358 )
1359 .await?;
1360 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1361 }
1362 _ => Err(Error::Index {
1363 message: format!(
1364 "the field type {} is not supported for FLAT index",
1365 field.data_type()
1366 ),
1367 location: location!(),
1368 }),
1369 },
1370
1371 "IVF_PQ" => {
1372 let ivf = IVFIndex::<FlatIndex, ProductQuantizer>::try_new(
1373 self.object_store.clone(),
1374 index_dir,
1375 uuid.to_owned(),
1376 frag_reuse_index,
1377 self.metadata_cache.as_ref(),
1378 index_cache,
1379 )
1380 .await?;
1381 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1382 }
1383
1384 "IVF_SQ" => {
1385 let ivf = IVFIndex::<FlatIndex, ScalarQuantizer>::try_new(
1386 self.object_store.clone(),
1387 index_dir,
1388 uuid.to_owned(),
1389 frag_reuse_index,
1390 self.metadata_cache.as_ref(),
1391 index_cache,
1392 )
1393 .await?;
1394 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1395 }
1396
1397 "IVF_RQ" => {
1398 let ivf = IVFIndex::<FlatIndex, RabitQuantizer>::try_new(
1399 self.object_store.clone(),
1400 self.indices_dir(),
1401 uuid.to_owned(),
1402 frag_reuse_index,
1403 self.metadata_cache.as_ref(),
1404 index_cache,
1405 )
1406 .await?;
1407 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1408 }
1409
1410 "IVF_HNSW_FLAT" => {
1411 let uri = index_dir.child(uuid).child("index.pb");
1412 let file_metadata_cache =
1413 self.session.metadata_cache.file_metadata_cache(&uri);
1414 let ivf = IVFIndex::<HNSW, FlatQuantizer>::try_new(
1415 self.object_store.clone(),
1416 index_dir,
1417 uuid.to_owned(),
1418 frag_reuse_index,
1419 &file_metadata_cache,
1420 index_cache,
1421 )
1422 .await?;
1423 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1424 }
1425
1426 "IVF_HNSW_SQ" => {
1427 let ivf = IVFIndex::<HNSW, ScalarQuantizer>::try_new(
1428 self.object_store.clone(),
1429 index_dir,
1430 uuid.to_owned(),
1431 frag_reuse_index,
1432 self.metadata_cache.as_ref(),
1433 index_cache,
1434 )
1435 .await?;
1436 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1437 }
1438
1439 "IVF_HNSW_PQ" => {
1440 let ivf = IVFIndex::<HNSW, ProductQuantizer>::try_new(
1441 self.object_store.clone(),
1442 index_dir,
1443 uuid.to_owned(),
1444 frag_reuse_index,
1445 self.metadata_cache.as_ref(),
1446 index_cache,
1447 )
1448 .await?;
1449 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1450 }
1451
1452 _ => Err(Error::Index {
1453 message: format!("Unsupported index type: {}", index_metadata.index_type),
1454 location: location!(),
1455 }),
1456 }
1457 }
1458
1459 _ => Err(Error::Index {
1460 message: "unsupported index version (maybe need to upgrade your lance version)"
1461 .to_owned(),
1462 location: location!(),
1463 }),
1464 };
1465 let index = index?;
1466 metrics.record_index_load();
1467 self.index_cache
1468 .insert_unsized_with_key(&cache_key, index.clone())
1469 .await;
1470 Ok(index)
1471 }
1472
1473 async fn open_frag_reuse_index(
1474 &self,
1475 metrics: &dyn MetricsCollector,
1476 ) -> Result<Option<Arc<FragReuseIndex>>> {
1477 if let Some(frag_reuse_index_meta) = self.load_index_by_name(FRAG_REUSE_INDEX_NAME).await? {
1478 let uuid = frag_reuse_index_meta.uuid.to_string();
1479 let frag_reuse_key = FragReuseIndexKey { uuid: &uuid };
1480 let uuid_clone = uuid.clone();
1481
1482 let index = self
1483 .index_cache
1484 .get_or_insert_with_key(frag_reuse_key, || async move {
1485 let index_meta = self.load_index(&uuid_clone).await?.ok_or_else(|| Error::Index {
1486 message: format!("Index with id {} does not exist", uuid_clone),
1487 location: location!(),
1488 })?;
1489 let index_details = load_frag_reuse_index_details(self, &index_meta).await?;
1490 let index =
1491 open_frag_reuse_index(frag_reuse_index_meta.uuid, index_details.as_ref()).await?;
1492
1493 info!(target: TRACE_IO_EVENTS, index_uuid=uuid_clone, r#type=IO_TYPE_OPEN_FRAG_REUSE);
1494 metrics.record_index_load();
1495
1496 Ok(index)
1497 })
1498 .await?;
1499
1500 Ok(Some(index))
1501 } else {
1502 Ok(None)
1503 }
1504 }
1505
1506 async fn open_mem_wal_index(
1507 &self,
1508 metrics: &dyn MetricsCollector,
1509 ) -> Result<Option<Arc<MemWalIndex>>> {
1510 let Some(mem_wal_meta) = self.load_index_by_name(MEM_WAL_INDEX_NAME).await? else {
1511 return Ok(None);
1512 };
1513
1514 let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1515 let cache_key = MemWalCacheKey::new(&mem_wal_meta.uuid, frag_reuse_uuid.as_ref());
1516 if let Some(index) = self.index_cache.get_with_key(&cache_key).await {
1517 log::debug!("Found MemWAL index in cache uuid: {}", mem_wal_meta.uuid);
1518 return Ok(Some(index));
1519 }
1520
1521 let uuid = mem_wal_meta.uuid.to_string();
1522
1523 let index_meta = self.load_index(&uuid).await?.ok_or_else(|| Error::Index {
1524 message: format!("Index with id {} does not exist", uuid),
1525 location: location!(),
1526 })?;
1527 let index = open_mem_wal_index(index_meta)?;
1528
1529 info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_MEM_WAL);
1530 metrics.record_index_load();
1531
1532 self.index_cache
1533 .insert_with_key(&cache_key, index.clone())
1534 .await;
1535 Ok(Some(index))
1536 }
1537
1538 async fn frag_reuse_index_uuid(&self) -> Option<Uuid> {
1539 if let Ok(indices) = self.load_indices().await {
1540 indices
1541 .iter()
1542 .find(|idx| idx.name == FRAG_REUSE_INDEX_NAME)
1543 .map(|idx| idx.uuid)
1544 } else {
1545 None
1546 }
1547 }
1548
1549 #[instrument(level = "trace", skip_all)]
1550 async fn scalar_index_info(&self) -> Result<ScalarIndexInfo> {
1551 let indices = self.load_indices().await?;
1552 let schema = self.schema();
1553 let mut indexed_fields = Vec::new();
1554 for index in indices.iter().filter(|idx| {
1555 let idx_schema = schema.project_by_ids(idx.fields.as_slice(), true);
1556 let is_vector_index = idx_schema
1557 .fields
1558 .iter()
1559 .any(|f| is_vector_field(f.data_type()));
1560
1561 let is_fts_index = if let Some(details) = &idx.index_details {
1563 IndexDetails(details.clone()).supports_fts()
1564 } else {
1565 false
1566 };
1567
1568 let has_non_empty_bitmap = idx.fragment_bitmap.as_ref().is_some_and(|bitmap| {
1571 !bitmap.is_empty() && !(bitmap & self.fragment_bitmap.as_ref()).is_empty()
1572 });
1573
1574 idx.fields.len() == 1 && !is_vector_index && (has_non_empty_bitmap || is_fts_index)
1575 }) {
1576 let field = index.fields[0];
1577 let field = schema.field_by_id(field).ok_or_else(|| Error::Internal {
1578 message: format!(
1579 "Index referenced a field with id {field} which did not exist in the schema"
1580 ),
1581 location: location!(),
1582 })?;
1583
1584 let field_path = if let Some(ancestors) = schema.field_ancestry_by_id(field.id) {
1586 let field_refs: Vec<&str> = ancestors.iter().map(|f| f.name.as_str()).collect();
1587 lance_core::datatypes::format_field_path(&field_refs)
1588 } else {
1589 field.name.clone()
1590 };
1591
1592 let index_details = IndexDetails(fetch_index_details(self, &field_path, index).await?);
1593 if index_details.is_vector() {
1594 continue;
1595 }
1596
1597 let plugin = index_details.get_plugin()?;
1598 let query_parser = plugin.new_query_parser(index.name.clone(), &index_details.0);
1599
1600 if let Some(query_parser) = query_parser {
1601 indexed_fields.push((field_path, (field.data_type(), query_parser)));
1602 }
1603 }
1604 let mut index_info_map = HashMap::with_capacity(indexed_fields.len());
1605 for indexed_field in indexed_fields {
1606 let mut parser = Some(indexed_field.1 .1);
1609 let parser = &mut parser;
1610 index_info_map
1611 .entry(indexed_field.0)
1612 .and_modify(|existing: &mut (DataType, Box<MultiQueryParser>)| {
1613 debug_assert_eq!(existing.0, indexed_field.1 .0);
1615
1616 existing.1.add(parser.take().unwrap());
1617 })
1618 .or_insert_with(|| {
1619 (
1620 indexed_field.1 .0,
1621 Box::new(MultiQueryParser::single(parser.take().unwrap())),
1622 )
1623 });
1624 }
1625 Ok(ScalarIndexInfo {
1626 indexed_columns: index_info_map,
1627 })
1628 }
1629
1630 async fn unindexed_fragments(&self, name: &str) -> Result<Vec<Fragment>> {
1631 let indices = self.load_indices_by_name(name).await?;
1632 let mut total_fragment_bitmap = RoaringBitmap::new();
1633 for idx in indices.iter() {
1634 total_fragment_bitmap |= idx.fragment_bitmap.as_ref().ok_or(Error::Index {
1635 message: "Please upgrade lance to 0.8+ to use this function".to_string(),
1636 location: location!(),
1637 })?;
1638 }
1639 Ok(self
1640 .fragments()
1641 .iter()
1642 .filter(|f| !total_fragment_bitmap.contains(f.id as u32))
1643 .cloned()
1644 .collect())
1645 }
1646
1647 async fn indexed_fragments(&self, name: &str) -> Result<Vec<Vec<Fragment>>> {
1648 let indices = self.load_indices_by_name(name).await?;
1649 indices
1650 .iter()
1651 .map(|index| {
1652 let fragment_bitmap = index.fragment_bitmap.as_ref().ok_or(Error::Index {
1653 message: "Please upgrade lance to 0.8+ to use this function".to_string(),
1654 location: location!(),
1655 })?;
1656 let mut indexed_frags = Vec::with_capacity(fragment_bitmap.len() as usize);
1657 for frag in self.fragments().iter() {
1658 if fragment_bitmap.contains(frag.id as u32) {
1659 indexed_frags.push(frag.clone());
1660 }
1661 }
1662 Ok(indexed_frags)
1663 })
1664 .collect()
1665 }
1666
1667 async fn initialize_index(&mut self, source_dataset: &Dataset, index_name: &str) -> Result<()> {
1668 let source_indices = source_dataset.load_indices_by_name(index_name).await?;
1669
1670 if source_indices.is_empty() {
1671 return Err(Error::Index {
1672 message: format!("Index '{}' not found in source dataset", index_name),
1673 location: location!(),
1674 });
1675 }
1676
1677 let source_index = source_indices
1678 .iter()
1679 .min_by_key(|idx| idx.created_at)
1680 .ok_or_else(|| Error::Index {
1681 message: format!("Could not determine oldest index for '{}'", index_name),
1682 location: location!(),
1683 })?;
1684
1685 let mut field_names = Vec::new();
1686 for field_id in source_index.fields.iter() {
1687 let source_field = source_dataset
1688 .schema()
1689 .field_by_id(*field_id)
1690 .ok_or_else(|| Error::Index {
1691 message: format!("Field with id {} not found in source dataset", field_id),
1692 location: location!(),
1693 })?;
1694
1695 let target_field =
1696 self.schema()
1697 .field(&source_field.name)
1698 .ok_or_else(|| Error::Index {
1699 message: format!(
1700 "Field '{}' required by index '{}' not found in target dataset",
1701 source_field.name, index_name
1702 ),
1703 location: location!(),
1704 })?;
1705
1706 if source_field.data_type() != target_field.data_type() {
1707 return Err(Error::Index {
1708 message: format!(
1709 "Field '{}' has different types in source ({:?}) and target ({:?}) datasets",
1710 source_field.name,
1711 source_field.data_type(),
1712 target_field.data_type()
1713 ),
1714 location: location!(),
1715 });
1716 }
1717
1718 field_names.push(source_field.name.as_str());
1719 }
1720
1721 if field_names.is_empty() {
1722 return Err(Error::Index {
1723 message: format!("Index '{}' has no fields", index_name),
1724 location: location!(),
1725 });
1726 }
1727
1728 if let Some(index_details) = &source_index.index_details {
1729 let index_details_wrapper = IndexDetails(index_details.clone());
1730
1731 if index_details_wrapper.is_vector() {
1732 vector::initialize_vector_index(self, source_dataset, source_index, &field_names)
1733 .await?;
1734 } else {
1735 scalar::initialize_scalar_index(self, source_dataset, source_index, &field_names)
1736 .await?;
1737 }
1738 } else {
1739 log::warn!(
1740 "Index '{}' has no index_details, skipping",
1741 source_index.name
1742 );
1743 }
1744
1745 Ok(())
1746 }
1747
1748 async fn initialize_indices(&mut self, source_dataset: &Dataset) -> Result<()> {
1749 let source_indices = source_dataset.load_indices().await?;
1750 let non_system_indices: Vec<_> = source_indices
1751 .iter()
1752 .filter(|idx| !lance_index::is_system_index(idx))
1753 .collect();
1754
1755 if non_system_indices.is_empty() {
1756 return Ok(());
1757 }
1758
1759 let mut unique_index_names = HashSet::new();
1760 for index in non_system_indices.iter() {
1761 unique_index_names.insert(index.name.clone());
1762 }
1763
1764 for index_name in unique_index_names {
1765 self.initialize_index(source_dataset, &index_name).await?;
1766 }
1767
1768 Ok(())
1769 }
1770}
1771
1772fn is_vector_field(data_type: DataType) -> bool {
1773 match data_type {
1774 DataType::FixedSizeList(_, _) => true,
1775 DataType::List(inner) => {
1776 matches!(inner.data_type(), DataType::FixedSizeList(_, _))
1778 }
1779 _ => false,
1780 }
1781}
1782
1783#[cfg(test)]
1784mod tests {
1785 use crate::dataset::builder::DatasetBuilder;
1786 use crate::dataset::optimize::{compact_files, CompactionOptions};
1787 use crate::dataset::{WriteMode, WriteParams};
1788 use crate::index::vector::VectorIndexParams;
1789 use crate::session::Session;
1790 use crate::utils::test::{copy_test_data_to_tmp, DatagenExt, FragmentCount, FragmentRowCount};
1791 use arrow_array::Int32Array;
1792
1793 use lance_io::{assert_io_eq, assert_io_lt};
1794
1795 use super::*;
1796
1797 use arrow::array::AsArray;
1798 use arrow::datatypes::{Float32Type, Int32Type};
1799 use arrow_array::{
1800 FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator, StringArray,
1801 };
1802 use arrow_schema::{Field, Schema};
1803 use lance_arrow::*;
1804 use lance_core::utils::tempfile::TempStrDir;
1805 use lance_datagen::gen_batch;
1806 use lance_datagen::{array, BatchCount, Dimension, RowCount};
1807 use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams};
1808 use lance_index::vector::{
1809 hnsw::builder::HnswBuildParams, ivf::IvfBuildParams, sq::builder::SQBuildParams,
1810 };
1811
1812 use lance_linalg::distance::{DistanceType, MetricType};
1813 use lance_testing::datagen::generate_random_array;
1814 use rstest::rstest;
1815 use std::collections::HashSet;
1816
1817 #[tokio::test]
1818 async fn test_recreate_index() {
1819 const DIM: i32 = 8;
1820 let schema = Arc::new(Schema::new(vec![
1821 Field::new(
1822 "v",
1823 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), DIM),
1824 true,
1825 ),
1826 Field::new(
1827 "o",
1828 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), DIM),
1829 true,
1830 ),
1831 ]));
1832 let data = generate_random_array(2048 * DIM as usize);
1833 let batches: Vec<RecordBatch> = vec![RecordBatch::try_new(
1834 schema.clone(),
1835 vec![
1836 Arc::new(FixedSizeListArray::try_new_from_values(data.clone(), DIM).unwrap()),
1837 Arc::new(FixedSizeListArray::try_new_from_values(data, DIM).unwrap()),
1838 ],
1839 )
1840 .unwrap()];
1841
1842 let test_dir = TempStrDir::default();
1843 let test_uri = &test_dir;
1844 let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
1845 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
1846
1847 let params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 2);
1848 dataset
1849 .create_index(&["v"], IndexType::Vector, None, ¶ms, true)
1850 .await
1851 .unwrap();
1852 dataset
1853 .create_index(&["o"], IndexType::Vector, None, ¶ms, true)
1854 .await
1855 .unwrap();
1856
1857 dataset
1859 .create_index(&["v"], IndexType::Vector, None, ¶ms, true)
1860 .await
1861 .unwrap();
1862
1863 assert!(dataset
1865 .create_index(
1866 &["v"],
1867 IndexType::Vector,
1868 Some("o_idx".to_string()),
1869 ¶ms,
1870 true,
1871 )
1872 .await
1873 .is_err());
1874 }
1875
1876 fn sample_vector_field() -> Field {
1877 let dimensions = 16;
1878 let column_name = "vec";
1879 Field::new(
1880 column_name,
1881 DataType::FixedSizeList(
1882 Arc::new(Field::new("item", DataType::Float32, true)),
1883 dimensions,
1884 ),
1885 false,
1886 )
1887 }
1888
1889 #[tokio::test]
1890 async fn test_drop_index() {
1891 let test_dir = TempStrDir::default();
1892 let schema = Schema::new(vec![
1893 sample_vector_field(),
1894 Field::new("ints", DataType::Int32, false),
1895 ]);
1896 let mut dataset = lance_datagen::rand(&schema)
1897 .into_dataset(
1898 &test_dir,
1899 FragmentCount::from(1),
1900 FragmentRowCount::from(256),
1901 )
1902 .await
1903 .unwrap();
1904
1905 let idx_name = "name".to_string();
1906 dataset
1907 .create_index(
1908 &["vec"],
1909 IndexType::Vector,
1910 Some(idx_name.clone()),
1911 &VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10),
1912 true,
1913 )
1914 .await
1915 .unwrap();
1916 dataset
1917 .create_index(
1918 &["ints"],
1919 IndexType::BTree,
1920 None,
1921 &ScalarIndexParams::default(),
1922 true,
1923 )
1924 .await
1925 .unwrap();
1926
1927 assert_eq!(dataset.load_indices().await.unwrap().len(), 2);
1928
1929 dataset.drop_index(&idx_name).await.unwrap();
1930
1931 assert_eq!(dataset.load_indices().await.unwrap().len(), 1);
1932
1933 let scalar_idx_name = &dataset.load_indices().await.unwrap()[0].name;
1935 dataset.drop_index(scalar_idx_name).await.unwrap();
1936
1937 assert_eq!(dataset.load_indices().await.unwrap().len(), 0);
1938
1939 assert!(dataset.drop_index(scalar_idx_name).await.is_err());
1941 }
1942
1943 #[tokio::test]
1944 async fn test_count_index_rows() {
1945 let test_dir = TempStrDir::default();
1946 let dimensions = 16;
1947 let column_name = "vec";
1948 let field = sample_vector_field();
1949 let schema = Arc::new(Schema::new(vec![field]));
1950
1951 let float_arr = generate_random_array(512 * dimensions as usize);
1952
1953 let vectors =
1954 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
1955
1956 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
1957
1958 let reader =
1959 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
1960 let test_uri = &test_dir;
1961 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
1962 dataset.validate().await.unwrap();
1963
1964 assert!(dataset.index_statistics("bad_id").await.is_err());
1966 let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
1968 dataset
1969 .create_index(
1970 &[column_name],
1971 IndexType::Vector,
1972 Some("vec_idx".into()),
1973 ¶ms,
1974 true,
1975 )
1976 .await
1977 .unwrap();
1978
1979 let stats: serde_json::Value =
1980 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
1981 assert_eq!(stats["num_unindexed_rows"], 0);
1982 assert_eq!(stats["num_indexed_rows"], 512);
1983
1984 let float_arr = generate_random_array(512 * dimensions as usize);
1987 let vectors =
1988 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
1989
1990 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
1991
1992 let reader = RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema);
1993 dataset.append(reader, None).await.unwrap();
1994
1995 let stats: serde_json::Value =
1996 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
1997 assert_eq!(stats["num_unindexed_rows"], 512);
1998 assert_eq!(stats["num_indexed_rows"], 512);
1999 }
2000
2001 #[tokio::test]
2002 async fn test_optimize_delta_indices() {
2003 let dimensions = 16;
2004 let column_name = "vec";
2005 let vec_field = Field::new(
2006 column_name,
2007 DataType::FixedSizeList(
2008 Arc::new(Field::new("item", DataType::Float32, true)),
2009 dimensions,
2010 ),
2011 false,
2012 );
2013 let other_column_name = "other_vec";
2014 let other_vec_field = Field::new(
2015 other_column_name,
2016 DataType::FixedSizeList(
2017 Arc::new(Field::new("item", DataType::Float32, true)),
2018 dimensions,
2019 ),
2020 false,
2021 );
2022 let schema = Arc::new(Schema::new(vec![vec_field, other_vec_field]));
2023
2024 let float_arr = generate_random_array(512 * dimensions as usize);
2025
2026 let vectors = Arc::new(
2027 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(),
2028 );
2029
2030 let record_batch =
2031 RecordBatch::try_new(schema.clone(), vec![vectors.clone(), vectors.clone()]).unwrap();
2032
2033 let reader = RecordBatchIterator::new(
2034 vec![record_batch.clone()].into_iter().map(Ok),
2035 schema.clone(),
2036 );
2037
2038 let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2039 let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
2040 dataset
2041 .create_index(
2042 &[column_name],
2043 IndexType::Vector,
2044 Some("vec_idx".into()),
2045 ¶ms,
2046 true,
2047 )
2048 .await
2049 .unwrap();
2050 dataset
2051 .create_index(
2052 &[other_column_name],
2053 IndexType::Vector,
2054 Some("other_vec_idx".into()),
2055 ¶ms,
2056 true,
2057 )
2058 .await
2059 .unwrap();
2060
2061 async fn get_stats(dataset: &Dataset, name: &str) -> serde_json::Value {
2062 serde_json::from_str(&dataset.index_statistics(name).await.unwrap()).unwrap()
2063 }
2064 async fn get_meta(dataset: &Dataset, name: &str) -> Vec<IndexMetadata> {
2065 dataset
2066 .load_indices()
2067 .await
2068 .unwrap()
2069 .iter()
2070 .filter(|m| m.name == name)
2071 .cloned()
2072 .collect()
2073 }
2074 fn get_bitmap(meta: &IndexMetadata) -> Vec<u32> {
2075 meta.fragment_bitmap.as_ref().unwrap().iter().collect()
2076 }
2077
2078 let stats = get_stats(&dataset, "vec_idx").await;
2079 assert_eq!(stats["num_unindexed_rows"], 0);
2080 assert_eq!(stats["num_indexed_rows"], 512);
2081 assert_eq!(stats["num_indexed_fragments"], 1);
2082 assert_eq!(stats["num_indices"], 1);
2083 let meta = get_meta(&dataset, "vec_idx").await;
2084 assert_eq!(meta.len(), 1);
2085 assert_eq!(get_bitmap(&meta[0]), vec![0]);
2086
2087 let reader =
2088 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2089 dataset.append(reader, None).await.unwrap();
2090 let stats = get_stats(&dataset, "vec_idx").await;
2091 assert_eq!(stats["num_unindexed_rows"], 512);
2092 assert_eq!(stats["num_indexed_rows"], 512);
2093 assert_eq!(stats["num_indexed_fragments"], 1);
2094 assert_eq!(stats["num_unindexed_fragments"], 1);
2095 assert_eq!(stats["num_indices"], 1);
2096 let meta = get_meta(&dataset, "vec_idx").await;
2097 assert_eq!(meta.len(), 1);
2098 assert_eq!(get_bitmap(&meta[0]), vec![0]);
2099
2100 dataset
2101 .optimize_indices(&OptimizeOptions::append().index_names(vec![])) .await
2103 .unwrap();
2104 let stats = get_stats(&dataset, "vec_idx").await;
2105 assert_eq!(stats["num_unindexed_rows"], 512);
2106 assert_eq!(stats["num_indexed_rows"], 512);
2107 assert_eq!(stats["num_indexed_fragments"], 1);
2108 assert_eq!(stats["num_unindexed_fragments"], 1);
2109 assert_eq!(stats["num_indices"], 1);
2110 let meta = get_meta(&dataset, "vec_idx").await;
2111 assert_eq!(meta.len(), 1);
2112 assert_eq!(get_bitmap(&meta[0]), vec![0]);
2113
2114 dataset
2116 .optimize_indices(
2117 &OptimizeOptions::append().index_names(vec!["other_vec_idx".to_owned()]),
2118 )
2119 .await
2120 .unwrap();
2121 let stats = get_stats(&dataset, "vec_idx").await;
2122 assert_eq!(stats["num_unindexed_rows"], 512);
2123 assert_eq!(stats["num_indexed_rows"], 512);
2124 assert_eq!(stats["num_indexed_fragments"], 1);
2125 assert_eq!(stats["num_unindexed_fragments"], 1);
2126 assert_eq!(stats["num_indices"], 1);
2127 let meta = get_meta(&dataset, "vec_idx").await;
2128 assert_eq!(meta.len(), 1);
2129 assert_eq!(get_bitmap(&meta[0]), vec![0]);
2130
2131 let stats = get_stats(&dataset, "other_vec_idx").await;
2132 assert_eq!(stats["num_unindexed_rows"], 0);
2133 assert_eq!(stats["num_indexed_rows"], 1024);
2134 assert_eq!(stats["num_indexed_fragments"], 2);
2135 assert_eq!(stats["num_unindexed_fragments"], 0);
2136 assert_eq!(stats["num_indices"], 2);
2137 let meta = get_meta(&dataset, "other_vec_idx").await;
2138 assert_eq!(meta.len(), 2);
2139 assert_eq!(get_bitmap(&meta[0]), vec![0]);
2140 assert_eq!(get_bitmap(&meta[1]), vec![1]);
2141
2142 dataset
2143 .optimize_indices(&OptimizeOptions::retrain())
2144 .await
2145 .unwrap();
2146
2147 let stats = get_stats(&dataset, "vec_idx").await;
2148 assert_eq!(stats["num_unindexed_rows"], 0);
2149 assert_eq!(stats["num_indexed_rows"], 1024);
2150 assert_eq!(stats["num_indexed_fragments"], 2);
2151 assert_eq!(stats["num_unindexed_fragments"], 0);
2152 assert_eq!(stats["num_indices"], 1);
2153 let meta = get_meta(&dataset, "vec_idx").await;
2154 assert_eq!(meta.len(), 1);
2155 assert_eq!(get_bitmap(&meta[0]), vec![0, 1]);
2156
2157 dataset
2158 .optimize_indices(&OptimizeOptions::retrain())
2159 .await
2160 .unwrap();
2161 let stats = get_stats(&dataset, "other_vec_idx").await;
2162 assert_eq!(stats["num_unindexed_rows"], 0);
2163 assert_eq!(stats["num_indexed_rows"], 1024);
2164 assert_eq!(stats["num_indexed_fragments"], 2);
2165 assert_eq!(stats["num_unindexed_fragments"], 0);
2166 assert_eq!(stats["num_indices"], 1);
2167 let meta = get_meta(&dataset, "other_vec_idx").await;
2168 assert_eq!(meta.len(), 1);
2169 assert_eq!(get_bitmap(&meta[0]), vec![0, 1]);
2170 }
2171
2172 #[tokio::test]
2173 async fn test_optimize_ivf_hnsw_sq_delta_indices() {
2174 let test_dir = TempStrDir::default();
2175 let dimensions = 16;
2176 let column_name = "vec";
2177 let field = Field::new(
2178 column_name,
2179 DataType::FixedSizeList(
2180 Arc::new(Field::new("item", DataType::Float32, true)),
2181 dimensions,
2182 ),
2183 false,
2184 );
2185 let schema = Arc::new(Schema::new(vec![field]));
2186
2187 let float_arr = generate_random_array(512 * dimensions as usize);
2188
2189 let vectors =
2190 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2191
2192 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
2193
2194 let reader = RecordBatchIterator::new(
2195 vec![record_batch.clone()].into_iter().map(Ok),
2196 schema.clone(),
2197 );
2198
2199 let test_uri = &test_dir;
2200 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2201
2202 let ivf_params = IvfBuildParams::default();
2203 let hnsw_params = HnswBuildParams::default();
2204 let sq_params = SQBuildParams::default();
2205 let params = VectorIndexParams::with_ivf_hnsw_sq_params(
2206 MetricType::L2,
2207 ivf_params,
2208 hnsw_params,
2209 sq_params,
2210 );
2211 dataset
2212 .create_index(
2213 &[column_name],
2214 IndexType::Vector,
2215 Some("vec_idx".into()),
2216 ¶ms,
2217 true,
2218 )
2219 .await
2220 .unwrap();
2221
2222 let stats: serde_json::Value =
2223 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2224 assert_eq!(stats["num_unindexed_rows"], 0);
2225 assert_eq!(stats["num_indexed_rows"], 512);
2226 assert_eq!(stats["num_indexed_fragments"], 1);
2227 assert_eq!(stats["num_indices"], 1);
2228
2229 let reader =
2230 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2231 dataset.append(reader, None).await.unwrap();
2232 let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap();
2233 let stats: serde_json::Value =
2234 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2235 assert_eq!(stats["num_unindexed_rows"], 512);
2236 assert_eq!(stats["num_indexed_rows"], 512);
2237 assert_eq!(stats["num_indexed_fragments"], 1);
2238 assert_eq!(stats["num_unindexed_fragments"], 1);
2239 assert_eq!(stats["num_indices"], 1);
2240
2241 dataset
2242 .optimize_indices(&OptimizeOptions::append())
2243 .await
2244 .unwrap();
2245
2246 let stats: serde_json::Value =
2247 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2248 assert_eq!(stats["num_unindexed_rows"], 0);
2249 assert_eq!(stats["num_indexed_rows"], 1024);
2250 assert_eq!(stats["num_indexed_fragments"], 2);
2251 assert_eq!(stats["num_unindexed_fragments"], 0);
2252 assert_eq!(stats["num_indices"], 2);
2253
2254 dataset
2255 .optimize_indices(&OptimizeOptions::retrain())
2256 .await
2257 .unwrap();
2258 let stats: serde_json::Value =
2259 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2260 assert_eq!(stats["num_unindexed_rows"], 0);
2261 assert_eq!(stats["num_indexed_rows"], 1024);
2262 assert_eq!(stats["num_indexed_fragments"], 2);
2263 assert_eq!(stats["num_unindexed_fragments"], 0);
2264 assert_eq!(stats["num_indices"], 1);
2265 }
2266
2267 #[rstest]
2268 #[tokio::test]
2269 async fn test_optimize_fts(#[values(false, true)] with_position: bool) {
2270 let words = ["apple", "banana", "cherry", "date"];
2271
2272 let dir = TempStrDir::default();
2273 let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
2274 let data = StringArray::from_iter_values(words.iter().map(|s| s.to_string()));
2275 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)]).unwrap();
2276 let batch_iterator = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2277
2278 let mut dataset = Dataset::write(batch_iterator, &dir, None).await.unwrap();
2279
2280 let params = InvertedIndexParams::default()
2281 .lower_case(false)
2282 .with_position(with_position);
2283 dataset
2284 .create_index(&["text"], IndexType::Inverted, None, ¶ms, true)
2285 .await
2286 .unwrap();
2287
2288 async fn assert_indexed_rows(dataset: &Dataset, expected_indexed_rows: usize) {
2289 let stats = dataset.index_statistics("text_idx").await.unwrap();
2290 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2291 let indexed_rows = stats["num_indexed_rows"].as_u64().unwrap() as usize;
2292 let unindexed_rows = stats["num_unindexed_rows"].as_u64().unwrap() as usize;
2293 let num_rows = dataset.count_all_rows().await.unwrap();
2294 assert_eq!(indexed_rows, expected_indexed_rows);
2295 assert_eq!(unindexed_rows, num_rows - expected_indexed_rows);
2296 }
2297
2298 let num_rows = dataset.count_all_rows().await.unwrap();
2299 assert_indexed_rows(&dataset, num_rows).await;
2300
2301 let new_words = ["elephant", "fig", "grape", "honeydew"];
2302 let new_data = StringArray::from_iter_values(new_words.iter().map(|s| s.to_string()));
2303 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(new_data)]).unwrap();
2304 let batch_iter = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2305 dataset.append(batch_iter, None).await.unwrap();
2306 assert_indexed_rows(&dataset, num_rows).await;
2307
2308 dataset
2309 .optimize_indices(&OptimizeOptions::append())
2310 .await
2311 .unwrap();
2312 let num_rows = dataset.count_all_rows().await.unwrap();
2313 assert_indexed_rows(&dataset, num_rows).await;
2314
2315 for &word in words.iter().chain(new_words.iter()) {
2316 let query_result = dataset
2317 .scan()
2318 .project(&["text"])
2319 .unwrap()
2320 .full_text_search(FullTextSearchQuery::new(word.to_string()))
2321 .unwrap()
2322 .limit(Some(10), None)
2323 .unwrap()
2324 .try_into_batch()
2325 .await
2326 .unwrap();
2327
2328 let texts = query_result["text"]
2329 .as_string::<i32>()
2330 .iter()
2331 .map(|v| match v {
2332 None => "".to_string(),
2333 Some(v) => v.to_string(),
2334 })
2335 .collect::<Vec<String>>();
2336
2337 assert_eq!(texts.len(), 1);
2338 assert_eq!(texts[0], word);
2339 }
2340
2341 let uppercase_words = ["Apple", "Banana", "Cherry", "Date"];
2342 for &word in uppercase_words.iter() {
2343 let query_result = dataset
2344 .scan()
2345 .project(&["text"])
2346 .unwrap()
2347 .full_text_search(FullTextSearchQuery::new(word.to_string()))
2348 .unwrap()
2349 .limit(Some(10), None)
2350 .unwrap()
2351 .try_into_batch()
2352 .await
2353 .unwrap();
2354
2355 let texts = query_result["text"]
2356 .as_string::<i32>()
2357 .iter()
2358 .map(|v| match v {
2359 None => "".to_string(),
2360 Some(v) => v.to_string(),
2361 })
2362 .collect::<Vec<String>>();
2363
2364 assert_eq!(texts.len(), 0);
2365 }
2366 let new_data = StringArray::from_iter_values(uppercase_words.iter().map(|s| s.to_string()));
2367 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(new_data)]).unwrap();
2368 let batch_iter = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2369 dataset.append(batch_iter, None).await.unwrap();
2370 assert_indexed_rows(&dataset, num_rows).await;
2371
2372 for &word in uppercase_words.iter() {
2374 let query_result = dataset
2375 .scan()
2376 .project(&["text"])
2377 .unwrap()
2378 .full_text_search(FullTextSearchQuery::new(word.to_string()))
2379 .unwrap()
2380 .limit(Some(10), None)
2381 .unwrap()
2382 .try_into_batch()
2383 .await
2384 .unwrap();
2385
2386 let texts = query_result["text"]
2387 .as_string::<i32>()
2388 .iter()
2389 .map(|v| match v {
2390 None => "".to_string(),
2391 Some(v) => v.to_string(),
2392 })
2393 .collect::<Vec<String>>();
2394
2395 assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2396 assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2397 }
2398
2399 dataset
2400 .optimize_indices(&OptimizeOptions::append())
2401 .await
2402 .unwrap();
2403 let num_rows = dataset.count_all_rows().await.unwrap();
2404 assert_indexed_rows(&dataset, num_rows).await;
2405
2406 for &word in uppercase_words.iter() {
2408 let query_result = dataset
2409 .scan()
2410 .project(&["text"])
2411 .unwrap()
2412 .full_text_search(FullTextSearchQuery::new(word.to_string()))
2413 .unwrap()
2414 .limit(Some(10), None)
2415 .unwrap()
2416 .try_into_batch()
2417 .await
2418 .unwrap();
2419
2420 let texts = query_result["text"]
2421 .as_string::<i32>()
2422 .iter()
2423 .map(|v| match v {
2424 None => "".to_string(),
2425 Some(v) => v.to_string(),
2426 })
2427 .collect::<Vec<String>>();
2428
2429 assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2430 assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2431
2432 compact_files(&mut dataset, CompactionOptions::default(), None)
2434 .await
2435 .unwrap();
2436 for &word in uppercase_words.iter() {
2437 let query_result = dataset
2438 .scan()
2439 .project(&["text"])
2440 .unwrap()
2441 .full_text_search(FullTextSearchQuery::new(word.to_string()))
2442 .unwrap()
2443 .try_into_batch()
2444 .await
2445 .unwrap();
2446 let texts = query_result["text"]
2447 .as_string::<i32>()
2448 .iter()
2449 .map(|v| match v {
2450 None => "".to_string(),
2451 Some(v) => v.to_string(),
2452 })
2453 .collect::<Vec<String>>();
2454 assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2455 assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2456 }
2457 assert_indexed_rows(&dataset, num_rows).await;
2458 }
2459 }
2460
2461 #[tokio::test]
2462 async fn test_create_index_too_small_for_pq() {
2463 let test_dir = TempStrDir::default();
2464 let dimensions = 1536;
2465
2466 let field = Field::new(
2467 "vector",
2468 DataType::FixedSizeList(
2469 Arc::new(Field::new("item", DataType::Float32, true)),
2470 dimensions,
2471 ),
2472 false,
2473 );
2474
2475 let schema = Arc::new(Schema::new(vec![field]));
2476 let float_arr = generate_random_array(100 * dimensions as usize);
2477
2478 let vectors =
2479 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2480 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
2481 let reader = RecordBatchIterator::new(
2482 vec![record_batch.clone()].into_iter().map(Ok),
2483 schema.clone(),
2484 );
2485
2486 let test_uri = &test_dir;
2487 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2488
2489 let params = VectorIndexParams::ivf_pq(1, 8, 96, DistanceType::L2, 1);
2490 let result = dataset
2491 .create_index(&["vector"], IndexType::Vector, None, ¶ms, false)
2492 .await;
2493
2494 assert!(matches!(result, Err(Error::Unprocessable { .. })));
2495 if let Error::Unprocessable { message, .. } = result.unwrap_err() {
2496 assert_eq!(
2497 message,
2498 "Not enough rows to train PQ. Requires 256 rows but only 100 available",
2499 )
2500 }
2501 }
2502
2503 #[tokio::test]
2504 async fn test_create_bitmap_index() {
2505 let test_dir = TempStrDir::default();
2506 let field = Field::new("tag", DataType::Utf8, false);
2507 let schema = Arc::new(Schema::new(vec![field]));
2508 let array = StringArray::from_iter_values((0..128).map(|i| ["a", "b", "c"][i % 3]));
2509 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2510 let reader = RecordBatchIterator::new(
2511 vec![record_batch.clone()].into_iter().map(Ok),
2512 schema.clone(),
2513 );
2514
2515 let test_uri = &test_dir;
2516 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2517 dataset
2518 .create_index(
2519 &["tag"],
2520 IndexType::Bitmap,
2521 None,
2522 &ScalarIndexParams::default(),
2523 false,
2524 )
2525 .await
2526 .unwrap();
2527 let indices = dataset.load_indices().await.unwrap();
2528 let index = dataset
2529 .open_generic_index("tag", &indices[0].uuid.to_string(), &NoOpMetricsCollector)
2530 .await
2531 .unwrap();
2532 assert_eq!(index.index_type(), IndexType::Bitmap);
2533 }
2534
2535 #[lance_test_macros::test(tokio::test)]
2537 async fn test_load_indices() {
2538 let session = Arc::new(Session::default());
2539 let write_params = WriteParams {
2540 session: Some(session.clone()),
2541 ..Default::default()
2542 };
2543
2544 let test_dir = TempStrDir::default();
2545 let field = Field::new("tag", DataType::Utf8, false);
2546 let schema = Arc::new(Schema::new(vec![field]));
2547 let array = StringArray::from_iter_values((0..128).map(|i| ["a", "b", "c"][i % 3]));
2548 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2549 let reader = RecordBatchIterator::new(
2550 vec![record_batch.clone()].into_iter().map(Ok),
2551 schema.clone(),
2552 );
2553
2554 let test_uri = &test_dir;
2555 let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
2556 .await
2557 .unwrap();
2558 dataset
2559 .create_index(
2560 &["tag"],
2561 IndexType::Bitmap,
2562 None,
2563 &ScalarIndexParams::default(),
2564 false,
2565 )
2566 .await
2567 .unwrap();
2568 dataset.object_store().io_stats_incremental(); let indices = dataset.load_indices().await.unwrap();
2571 let stats = dataset.object_store().io_stats_incremental();
2572 assert_io_eq!(stats, read_iops, 0);
2574 assert_io_eq!(stats, read_bytes, 0);
2575 assert_eq!(indices.len(), 1);
2576
2577 session.index_cache.clear().await; let dataset2 = DatasetBuilder::from_uri(test_uri)
2580 .with_session(session.clone())
2581 .load()
2582 .await
2583 .unwrap();
2584 let stats = dataset2.object_store().io_stats_incremental(); assert_io_lt!(stats, read_bytes, 64 * 1024);
2586
2587 let indices2 = dataset2.load_indices().await.unwrap();
2590 let stats = dataset2.object_store().io_stats_incremental();
2591 assert_io_eq!(stats, read_iops, 0);
2592 assert_io_eq!(stats, read_bytes, 0);
2593 assert_eq!(indices2.len(), 1);
2594 }
2595
2596 #[tokio::test]
2597 async fn test_remap_empty() {
2598 let data = gen_batch()
2599 .col("int", array::step::<Int32Type>())
2600 .col(
2601 "vector",
2602 array::rand_vec::<Float32Type>(Dimension::from(16)),
2603 )
2604 .into_reader_rows(RowCount::from(256), BatchCount::from(1));
2605 let mut dataset = Dataset::write(data, "memory://", None).await.unwrap();
2606
2607 let params = VectorIndexParams::ivf_pq(1, 8, 1, DistanceType::L2, 1);
2608 dataset
2609 .create_index(&["vector"], IndexType::Vector, None, ¶ms, false)
2610 .await
2611 .unwrap();
2612
2613 let index_uuid = dataset.load_indices().await.unwrap()[0].uuid;
2614 let remap_to_empty = (0..dataset.count_all_rows().await.unwrap())
2615 .map(|i| (i as u64, None))
2616 .collect::<HashMap<_, _>>();
2617 let new_uuid = remap_index(&dataset, &index_uuid, &remap_to_empty)
2618 .await
2619 .unwrap();
2620 assert_eq!(new_uuid, RemapResult::Keep(index_uuid));
2621 }
2622
2623 #[tokio::test]
2624 async fn test_optimize_ivf_pq_up_to_date() {
2625 let nrows = 256;
2627 let dimensions = 16;
2628 let column_name = "vector";
2629 let schema = Arc::new(Schema::new(vec![
2630 Field::new("id", DataType::Int32, false),
2631 Field::new(
2632 column_name,
2633 DataType::FixedSizeList(
2634 Arc::new(Field::new("item", DataType::Float32, true)),
2635 dimensions,
2636 ),
2637 false,
2638 ),
2639 ]));
2640
2641 let float_arr = generate_random_array(nrows * dimensions as usize);
2642 let vectors =
2643 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2644 let record_batch = RecordBatch::try_new(
2645 schema.clone(),
2646 vec![
2647 Arc::new(arrow_array::Int32Array::from_iter_values(0..nrows as i32)),
2648 Arc::new(vectors),
2649 ],
2650 )
2651 .unwrap();
2652
2653 let reader = RecordBatchIterator::new(
2654 vec![record_batch.clone()].into_iter().map(Ok),
2655 schema.clone(),
2656 );
2657 let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2658
2659 let params = VectorIndexParams::ivf_pq(1, 8, 2, MetricType::L2, 2);
2660 dataset
2661 .create_index(&[column_name], IndexType::Vector, None, ¶ms, true)
2662 .await
2663 .unwrap();
2664
2665 let query_vector = generate_random_array(dimensions as usize);
2666
2667 let nearest = dataset
2668 .scan()
2669 .nearest(column_name, &query_vector, 5)
2670 .unwrap()
2671 .try_into_batch()
2672 .await
2673 .unwrap();
2674
2675 let ids = nearest["id"].as_primitive::<Int32Type>();
2676 let mut seen = HashSet::new();
2677 for id in ids.values() {
2678 assert!(seen.insert(*id), "Duplicate id found: {}", id);
2679 }
2680
2681 dataset
2682 .optimize_indices(&OptimizeOptions::default())
2683 .await
2684 .unwrap();
2685
2686 dataset.validate().await.unwrap();
2687
2688 let nearest_after = dataset
2689 .scan()
2690 .nearest(column_name, &query_vector, 5)
2691 .unwrap()
2692 .try_into_batch()
2693 .await
2694 .unwrap();
2695
2696 let ids = nearest_after["id"].as_primitive::<Int32Type>();
2697 let mut seen = HashSet::new();
2698 for id in ids.values() {
2699 assert!(seen.insert(*id), "Duplicate id found: {}", id);
2700 }
2701 }
2702
2703 #[tokio::test]
2704 async fn test_index_created_at_timestamp() {
2705 let schema = Arc::new(Schema::new(vec![
2707 Field::new("id", DataType::Int32, false),
2708 Field::new("values", DataType::Utf8, false),
2709 ]));
2710
2711 let values = StringArray::from_iter_values(["hello", "world", "foo", "bar"]);
2712 let record_batch = RecordBatch::try_new(
2713 schema.clone(),
2714 vec![
2715 Arc::new(Int32Array::from_iter_values(0..4)),
2716 Arc::new(values),
2717 ],
2718 )
2719 .unwrap();
2720
2721 let reader =
2722 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2723
2724 let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2725
2726 let before_index = chrono::Utc::now();
2728
2729 dataset
2731 .create_index(
2732 &["values"],
2733 IndexType::Scalar,
2734 Some("test_idx".to_string()),
2735 &ScalarIndexParams::default(),
2736 false,
2737 )
2738 .await
2739 .unwrap();
2740
2741 let after_index = chrono::Utc::now();
2743
2744 let indices = dataset.load_indices().await.unwrap();
2746 let test_index = indices.iter().find(|idx| idx.name == "test_idx").unwrap();
2747
2748 assert!(test_index.created_at.is_some());
2750 let created_at = test_index.created_at.unwrap();
2751 assert!(created_at >= before_index);
2752 assert!(created_at <= after_index);
2753 }
2754
2755 #[tokio::test]
2756 async fn test_index_statistics_updated_at() {
2757 let schema = Arc::new(Schema::new(vec![
2759 Field::new("id", DataType::Int32, false),
2760 Field::new("values", DataType::Utf8, false),
2761 ]));
2762
2763 let values = StringArray::from_iter_values(["hello", "world", "foo", "bar"]);
2764 let record_batch = RecordBatch::try_new(
2765 schema.clone(),
2766 vec![
2767 Arc::new(Int32Array::from_iter_values(0..4)),
2768 Arc::new(values),
2769 ],
2770 )
2771 .unwrap();
2772
2773 let reader =
2774 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2775
2776 let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2777
2778 dataset
2780 .create_index(
2781 &["values"],
2782 IndexType::Scalar,
2783 Some("test_idx".to_string()),
2784 &ScalarIndexParams::default(),
2785 false,
2786 )
2787 .await
2788 .unwrap();
2789
2790 let stats_str = dataset.index_statistics("test_idx").await.unwrap();
2792 let stats: serde_json::Value = serde_json::from_str(&stats_str).unwrap();
2793
2794 assert!(stats["updated_at_timestamp_ms"].is_number());
2796 let updated_at = stats["updated_at_timestamp_ms"].as_u64().unwrap();
2797
2798 let indices = dataset.load_indices().await.unwrap();
2800 let test_index = indices.iter().find(|idx| idx.name == "test_idx").unwrap();
2801 let created_at = test_index.created_at.unwrap().timestamp_millis() as u64;
2802
2803 assert_eq!(updated_at, created_at);
2805 }
2806
2807 #[tokio::test]
2808 async fn test_index_statistics_updated_at_multiple_deltas() {
2809 let schema = Arc::new(Schema::new(vec![
2811 Field::new("id", DataType::Int32, false),
2812 Field::new(
2813 "vector",
2814 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4),
2815 false,
2816 ),
2817 ]));
2818
2819 let num_rows = 300;
2821 let float_arr = generate_random_array(4 * num_rows);
2822 let vectors = FixedSizeListArray::try_new_from_values(float_arr, 4).unwrap();
2823 let record_batch = RecordBatch::try_new(
2824 schema.clone(),
2825 vec![
2826 Arc::new(Int32Array::from_iter_values(0..num_rows as i32)),
2827 Arc::new(vectors),
2828 ],
2829 )
2830 .unwrap();
2831
2832 let reader =
2833 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2834
2835 let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2836
2837 let params = VectorIndexParams::ivf_pq(1, 8, 2, MetricType::L2, 2);
2839 dataset
2840 .create_index(
2841 &["vector"],
2842 IndexType::Vector,
2843 Some("test_vec_idx".to_string()),
2844 ¶ms,
2845 false,
2846 )
2847 .await
2848 .unwrap();
2849
2850 let stats_str_1 = dataset.index_statistics("test_vec_idx").await.unwrap();
2852 let stats_1: serde_json::Value = serde_json::from_str(&stats_str_1).unwrap();
2853 let initial_updated_at = stats_1["updated_at_timestamp_ms"].as_u64().unwrap();
2854
2855 std::thread::sleep(std::time::Duration::from_millis(10)); let num_rows_2 = 50;
2859 let float_arr_2 = generate_random_array(4 * num_rows_2);
2860 let vectors_2 = FixedSizeListArray::try_new_from_values(float_arr_2, 4).unwrap();
2861 let record_batch_2 = RecordBatch::try_new(
2862 schema.clone(),
2863 vec![
2864 Arc::new(Int32Array::from_iter_values(
2865 num_rows as i32..(num_rows + num_rows_2) as i32,
2866 )),
2867 Arc::new(vectors_2),
2868 ],
2869 )
2870 .unwrap();
2871
2872 let reader_2 =
2873 RecordBatchIterator::new(vec![record_batch_2].into_iter().map(Ok), schema.clone());
2874
2875 dataset.append(reader_2, None).await.unwrap();
2876
2877 dataset
2879 .create_index(
2880 &["vector"],
2881 IndexType::Vector,
2882 Some("test_vec_idx".to_string()),
2883 ¶ms,
2884 true,
2885 )
2886 .await
2887 .unwrap();
2888
2889 let stats_str_2 = dataset.index_statistics("test_vec_idx").await.unwrap();
2891 let stats_2: serde_json::Value = serde_json::from_str(&stats_str_2).unwrap();
2892 let final_updated_at = stats_2["updated_at_timestamp_ms"].as_u64().unwrap();
2893
2894 assert!(final_updated_at >= initial_updated_at);
2896 }
2897
2898 #[tokio::test]
2899 async fn test_index_statistics_updated_at_none_when_no_created_at() {
2900 let test_dir =
2905 copy_test_data_to_tmp("v0.30.0_pre_created_at/index_without_created_at").unwrap();
2906 let test_uri = test_dir.path_str();
2907 let test_uri = &test_uri;
2908
2909 let dataset = Dataset::open(test_uri).await.unwrap();
2910
2911 let indices = dataset.load_indices().await.unwrap();
2913 assert!(!indices.is_empty(), "Test dataset should have indices");
2914
2915 for index in indices.iter() {
2917 assert!(
2918 index.created_at.is_none(),
2919 "Index from old version should have created_at = None"
2920 );
2921 }
2922
2923 let index_name = &indices[0].name;
2925 let stats_str = dataset.index_statistics(index_name).await.unwrap();
2926 let stats: serde_json::Value = serde_json::from_str(&stats_str).unwrap();
2927
2928 assert!(
2930 stats["updated_at_timestamp_ms"].is_null(),
2931 "updated_at_timestamp_ms should be null when no indices have created_at timestamps"
2932 );
2933 }
2934 #[rstest]
2935 #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
2936 #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
2937 #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
2938 #[tokio::test]
2939 async fn test_create_empty_scalar_index(
2940 #[case] column_name: &str,
2941 #[case] index_type: IndexType,
2942 #[case] params: Box<dyn IndexParams>,
2943 ) {
2944 use lance_datagen::{array, BatchCount, ByteCount, RowCount};
2945
2946 let reader = lance_datagen::gen_batch()
2948 .col("i", array::step::<Int32Type>())
2949 .col("text", array::rand_utf8(ByteCount::from(10), false))
2950 .into_reader_rows(RowCount::from(100), BatchCount::from(1));
2951 let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
2952
2953 dataset
2956 .create_index_builder(&[column_name], index_type, params.as_ref())
2957 .name("index".to_string())
2958 .train(false)
2959 .await
2960 .unwrap();
2961
2962 let stats = dataset.index_statistics("index").await.unwrap();
2964 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2965 assert_eq!(
2966 stats["num_indexed_rows"], 0,
2967 "Empty index should have zero indexed rows"
2968 );
2969
2970 let append_reader = lance_datagen::gen_batch()
2972 .col("i", array::step::<Int32Type>())
2973 .col("text", array::rand_utf8(ByteCount::from(10), false))
2974 .into_reader_rows(RowCount::from(50), BatchCount::from(1));
2975
2976 dataset.append(append_reader, None).await.unwrap();
2977
2978 let indices_after_append = dataset.load_indices().await.unwrap();
2980 assert_eq!(
2981 indices_after_append.len(),
2982 1,
2983 "Index should be retained after append for index type {:?}",
2984 index_type
2985 );
2986
2987 let stats = dataset.index_statistics("index").await.unwrap();
2988 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2989 assert_eq!(
2990 stats["num_indexed_rows"], 0,
2991 "Empty index should still have zero indexed rows after append"
2992 );
2993
2994 dataset.optimize_indices(&Default::default()).await.unwrap();
2996
2997 let indices_after_optimize = dataset.load_indices().await.unwrap();
2999 assert_eq!(
3000 indices_after_optimize.len(),
3001 1,
3002 "Index should still exist after optimization"
3003 );
3004
3005 let stats = dataset.index_statistics("index").await.unwrap();
3007 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3008 assert_eq!(
3009 stats["num_unindexed_rows"], 0,
3010 "Empty index should indexed all rows"
3011 );
3012 }
3013
3014 fn assert_index_usage(plan: &str, column_name: &str, should_use_index: bool, context: &str) {
3016 let index_used = if column_name == "text" {
3017 plan.contains("MatchQuery")
3019 } else {
3020 plan.contains("ScalarIndexQuery")
3022 };
3023
3024 if should_use_index {
3025 assert!(
3026 index_used,
3027 "Query plan should use index {}: {}",
3028 context, plan
3029 );
3030 } else {
3031 assert!(
3032 !index_used,
3033 "Query plan should NOT use index {}: {}",
3034 context, plan
3035 );
3036 }
3037 }
3038
3039 #[rstest]
3047 #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
3048 #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
3049 #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
3050 #[tokio::test]
3051 async fn test_scalar_index_retained_after_delete_all(
3052 #[case] column_name: &str,
3053 #[case] index_type: IndexType,
3054 #[case] params: Box<dyn IndexParams>,
3055 ) {
3056 use lance_datagen::{array, BatchCount, ByteCount, RowCount};
3057
3058 let reader = lance_datagen::gen_batch()
3060 .col("i", array::step::<Int32Type>())
3061 .col("text", array::rand_utf8(ByteCount::from(10), false))
3062 .into_reader_rows(RowCount::from(100), BatchCount::from(1));
3063 let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
3064
3065 dataset
3067 .create_index_builder(&[column_name], index_type, params.as_ref())
3068 .name("index".to_string())
3069 .train(true)
3070 .await
3071 .unwrap();
3072
3073 let stats = dataset.index_statistics("index").await.unwrap();
3075 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3076 assert_eq!(
3077 stats["num_indexed_rows"], 100,
3078 "Index should have indexed all 100 rows"
3079 );
3080
3081 let plan = if column_name == "text" {
3083 dataset
3085 .scan()
3086 .full_text_search(FullTextSearchQuery::new("test".to_string()))
3087 .unwrap()
3088 .explain_plan(false)
3089 .await
3090 .unwrap()
3091 } else {
3092 dataset
3094 .scan()
3095 .filter(format!("{} = 50", column_name).as_str())
3096 .unwrap()
3097 .explain_plan(false)
3098 .await
3099 .unwrap()
3100 };
3101 assert_index_usage(&plan, column_name, true, "before delete");
3103
3104 let indexes = dataset.load_indices().await.unwrap();
3105 let original_index = indexes[0].clone();
3106
3107 dataset.delete("true").await.unwrap();
3109
3110 let row_count = dataset.count_rows(None).await.unwrap();
3112 assert_eq!(row_count, 0, "Table should be empty after delete all");
3113
3114 let indices_after_delete = dataset.load_indices().await.unwrap();
3116 assert_eq!(
3117 indices_after_delete.len(),
3118 1,
3119 "Index should be retained after deleting all data"
3120 );
3121 assert_eq!(
3122 indices_after_delete[0].name, "index",
3123 "Index name should remain the same after delete"
3124 );
3125
3126 let index_after_delete = &indices_after_delete[0];
3128 let effective_bitmap = index_after_delete
3129 .effective_fragment_bitmap(&dataset.fragment_bitmap)
3130 .unwrap();
3131 assert!(
3132 effective_bitmap.is_empty(),
3133 "Effective bitmap should be empty after deleting all data"
3134 );
3135 assert_eq!(
3136 index_after_delete.fragment_bitmap, original_index.fragment_bitmap,
3137 "Fragment bitmap should remain the same after delete"
3138 );
3139
3140 let stats = dataset.index_statistics("index").await.unwrap();
3142 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3143 assert_eq!(
3144 stats["num_indexed_rows"], 0,
3145 "Index should now report zero indexed rows after delete all"
3146 );
3147 assert_eq!(
3148 stats["num_unindexed_rows"], 0,
3149 "Index should report zero unindexed rows after delete all"
3150 );
3151 assert_eq!(
3152 stats["num_indexed_fragments"], 0,
3153 "Index should report zero indexed fragments after delete all"
3154 );
3155 assert_eq!(
3156 stats["num_unindexed_fragments"], 0,
3157 "Index should report zero unindexed fragments after delete all"
3158 );
3159
3160 if column_name == "text" {
3162 let _plan_after_delete = dataset
3166 .scan()
3167 .project(&[column_name])
3168 .unwrap()
3169 .full_text_search(FullTextSearchQuery::new("test".to_string()))
3170 .unwrap()
3171 .explain_plan(false)
3172 .await
3173 .unwrap();
3174 assert_index_usage(
3175 &_plan_after_delete,
3176 column_name,
3177 true,
3178 "after delete (empty bitmap)",
3179 );
3180 } else {
3181 let _plan_after_delete = dataset
3183 .scan()
3184 .filter(format!("{} = 50", column_name).as_str())
3185 .unwrap()
3186 .explain_plan(false)
3187 .await
3188 .unwrap();
3189 assert_index_usage(
3191 &_plan_after_delete,
3192 column_name,
3193 false,
3194 "after delete (empty bitmap)",
3195 );
3196 }
3197
3198 let append_reader = lance_datagen::gen_batch()
3200 .col("i", array::step::<Int32Type>())
3201 .col("text", array::rand_utf8(ByteCount::from(10), false))
3202 .into_reader_rows(RowCount::from(50), BatchCount::from(1));
3203
3204 dataset.append(append_reader, None).await.unwrap();
3205
3206 let indices_after_append = dataset.load_indices().await.unwrap();
3208 assert_eq!(
3209 indices_after_append.len(),
3210 1,
3211 "Index should still exist after appending to empty table"
3212 );
3213 let stats = dataset.index_statistics("index").await.unwrap();
3214 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3215 assert_eq!(
3216 stats["num_indexed_rows"], 0,
3217 "Index should now report zero indexed rows after data is added"
3218 );
3219
3220 dataset.optimize_indices(&Default::default()).await.unwrap();
3222
3223 let indices_after_optimize = dataset.load_indices().await.unwrap();
3225 assert_eq!(
3226 indices_after_optimize.len(),
3227 1,
3228 "Index should still exist after optimization following delete all"
3229 );
3230
3231 let stats = dataset.index_statistics("index").await.unwrap();
3233 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3234 assert_eq!(
3235 stats["num_indexed_rows"],
3236 dataset.count_rows(None).await.unwrap(),
3237 "Index should now cover all newly added rows after optimization"
3238 );
3239 }
3240
3241 #[rstest]
3249 #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
3250 #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
3251 #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
3252 #[tokio::test]
3253 async fn test_scalar_index_retained_after_update(
3254 #[case] column_name: &str,
3255 #[case] index_type: IndexType,
3256 #[case] params: Box<dyn IndexParams>,
3257 ) {
3258 use crate::dataset::UpdateBuilder;
3259 use lance_datagen::{array, BatchCount, ByteCount, RowCount};
3260
3261 let reader = lance_datagen::gen_batch()
3263 .col("i", array::step::<Int32Type>())
3264 .col("text", array::rand_utf8(ByteCount::from(10), false))
3265 .into_reader_rows(RowCount::from(100), BatchCount::from(1));
3266 let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
3267
3268 dataset
3270 .create_index_builder(&[column_name], index_type, params.as_ref())
3271 .name("index".to_string())
3272 .train(true)
3273 .await
3274 .unwrap();
3275
3276 let stats = dataset.index_statistics("index").await.unwrap();
3278 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3279 assert_eq!(
3280 stats["num_indexed_rows"], 100,
3281 "Index should have indexed all 100 rows"
3282 );
3283
3284 let plan = if column_name == "text" {
3286 dataset
3288 .scan()
3289 .project(&[column_name])
3290 .unwrap()
3291 .full_text_search(FullTextSearchQuery::new("test".to_string()))
3292 .unwrap()
3293 .explain_plan(false)
3294 .await
3295 .unwrap()
3296 } else {
3297 dataset
3299 .scan()
3300 .filter(format!("{} = 50", column_name).as_str())
3301 .unwrap()
3302 .explain_plan(false)
3303 .await
3304 .unwrap()
3305 };
3306 assert_index_usage(&plan, column_name, true, "before update");
3308
3309 let update_result = UpdateBuilder::new(Arc::new(dataset))
3311 .set("i", "i + 1000")
3312 .unwrap()
3313 .set("text", "'updated_' || text")
3314 .unwrap()
3315 .build()
3316 .unwrap()
3317 .execute()
3318 .await
3319 .unwrap();
3320
3321 let mut dataset = update_result.new_dataset.as_ref().clone();
3322
3323 let row_count = dataset.count_rows(None).await.unwrap();
3325 assert_eq!(row_count, 100, "Row count should remain 100 after update");
3326
3327 let indices_after_update = dataset.load_indices().await.unwrap();
3329 assert_eq!(
3330 indices_after_update.len(),
3331 1,
3332 "Index should be retained after updating rows"
3333 );
3334
3335 let indices = dataset.load_indices().await.unwrap();
3337 let index = &indices[0];
3338 let effective_bitmap = index
3339 .effective_fragment_bitmap(&dataset.fragment_bitmap)
3340 .unwrap();
3341 assert!(
3342 effective_bitmap.is_empty(),
3343 "Effective fragment bitmap should be empty after updating all data"
3344 );
3345
3346 let stats_after_update = dataset.index_statistics("index").await.unwrap();
3348 let stats_after_update: serde_json::Value =
3349 serde_json::from_str(&stats_after_update).unwrap();
3350
3351 assert_eq!(
3353 stats_after_update["num_indexed_rows"], 0,
3354 "Index statistics should be zero after update, as it is not re-trained"
3355 );
3356
3357 if column_name == "text" {
3359 let _plan_after_update = dataset
3365 .scan()
3366 .project(&[column_name])
3367 .unwrap()
3368 .full_text_search(FullTextSearchQuery::new("test".to_string()))
3369 .unwrap()
3370 .explain_plan(false)
3371 .await
3372 .unwrap();
3373 assert_index_usage(
3374 &_plan_after_update,
3375 column_name,
3376 true, "after update (empty bitmap)",
3378 );
3379 } else {
3380 let _plan_after_update = dataset
3382 .scan()
3383 .filter(format!("{} = 50", column_name).as_str())
3384 .unwrap()
3385 .explain_plan(false)
3386 .await
3387 .unwrap();
3388 assert_index_usage(
3391 &_plan_after_update,
3392 column_name,
3393 false,
3394 "after update (empty effective bitmap)",
3395 );
3396 }
3397
3398 dataset.optimize_indices(&Default::default()).await.unwrap();
3400
3401 let indices_after_optimize = dataset.load_indices().await.unwrap();
3403 assert_eq!(
3404 indices_after_optimize.len(),
3405 1,
3406 "Index should still exist after optimization following update"
3407 );
3408
3409 let stats_after_optimization = dataset.index_statistics("index").await.unwrap();
3410 let stats_after_optimization: serde_json::Value =
3411 serde_json::from_str(&stats_after_optimization).unwrap();
3412
3413 assert_eq!(
3415 stats_after_optimization["num_unindexed_rows"], 0,
3416 "Index should have zero unindexed rows after optimization"
3417 );
3418 }
3419
3420 async fn validate_indices_after_clone(
3422 dataset: &Dataset,
3423 round: usize,
3424 expected_scalar_rows: usize,
3425 dimensions: u32,
3426 ) {
3427 let indices = dataset.load_indices().await.unwrap();
3429 assert_eq!(
3430 indices.len(),
3431 2,
3432 "Round {}: Cloned dataset should have 2 indices",
3433 round
3434 );
3435 let index_names: HashSet<String> = indices.iter().map(|idx| idx.name.clone()).collect();
3436 assert!(
3437 index_names.contains("vector_idx"),
3438 "Round {}: Should contain vector_idx",
3439 round
3440 );
3441 assert!(
3442 index_names.contains("category_idx"),
3443 "Round {}: Should contain category_idx",
3444 round
3445 );
3446
3447 let expected_total_rows = 300 + (round - 1) * 50;
3452 let total_rows = dataset.count_rows(None).await.unwrap();
3453 assert_eq!(
3454 total_rows, expected_total_rows,
3455 "Round {}: Should have {} rows after clone (chain cloning accumulates data)",
3456 round, expected_total_rows
3457 );
3458
3459 let query_vector = generate_random_array(dimensions as usize);
3461 let search_results = dataset
3462 .scan()
3463 .nearest("vector", &query_vector, 5)
3464 .unwrap()
3465 .limit(Some(5), None)
3466 .unwrap()
3467 .try_into_batch()
3468 .await
3469 .unwrap();
3470 assert!(
3471 search_results.num_rows() > 0,
3472 "Round {}: Vector search should return results immediately after clone",
3473 round
3474 );
3475
3476 let scalar_results = dataset
3478 .scan()
3479 .filter("category = 'category_0'")
3480 .unwrap()
3481 .try_into_batch()
3482 .await
3483 .unwrap();
3484
3485 assert_eq!(
3486 expected_scalar_rows,
3487 scalar_results.num_rows(),
3488 "Round {}: Scalar query should return {} results",
3489 round,
3490 expected_scalar_rows
3491 );
3492 }
3493
3494 #[tokio::test]
3495 async fn test_shallow_clone_with_index() {
3496 let test_dir = TempStrDir::default();
3497 let test_uri = &test_dir;
3498
3499 let dimensions = 16u32;
3501 let data = gen_batch()
3503 .col("id", array::step::<Int32Type>())
3504 .col("category", array::fill_utf8("category_0".to_string()))
3505 .col(
3506 "vector",
3507 array::rand_vec::<Float32Type>(Dimension::from(dimensions)),
3508 )
3509 .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3510
3511 let mut dataset = Dataset::write(data, test_uri, None).await.unwrap();
3513 let vector_params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10);
3515 dataset
3516 .create_index(
3517 &["vector"],
3518 IndexType::Vector,
3519 Some("vector_idx".to_string()),
3520 &vector_params,
3521 true,
3522 )
3523 .await
3524 .unwrap();
3525
3526 dataset
3528 .create_index(
3529 &["category"],
3530 IndexType::BTree,
3531 Some("category_idx".to_string()),
3532 &ScalarIndexParams::default(),
3533 true,
3534 )
3535 .await
3536 .unwrap();
3537
3538 let indices = dataset.load_indices().await.unwrap();
3540 assert_eq!(indices.len(), 2, "Should have 2 indices");
3541 let index_names: HashSet<String> = indices.iter().map(|idx| idx.name.clone()).collect();
3542 assert!(index_names.contains("vector_idx"));
3543 assert!(index_names.contains("category_idx"));
3544
3545 let scalar_results = dataset
3547 .scan()
3548 .filter("category = 'category_0'")
3549 .unwrap()
3550 .try_into_batch()
3551 .await
3552 .unwrap();
3553
3554 let source_scalar_query_rows = scalar_results.num_rows();
3555 assert!(
3556 scalar_results.num_rows() > 0,
3557 "Scalar query should return results"
3558 );
3559
3560 let clone_rounds = 3;
3562 let mut current_dataset = dataset;
3563
3564 for round in 1..=clone_rounds {
3565 let round_clone_dir = format!("{}/clone_round_{}", test_dir, round);
3566 let round_cloned_uri = &round_clone_dir;
3567 let tag_name = format!("shallow_clone_test_{}", round);
3568
3569 let current_version = current_dataset.version().version;
3571 current_dataset
3572 .tags()
3573 .create(&tag_name, current_version)
3574 .await
3575 .unwrap();
3576
3577 let mut round_cloned_dataset = current_dataset
3579 .shallow_clone(round_cloned_uri, tag_name.as_str(), None)
3580 .await
3581 .unwrap();
3582
3583 validate_indices_after_clone(
3585 &round_cloned_dataset,
3586 round,
3587 source_scalar_query_rows,
3588 dimensions,
3589 )
3590 .await;
3591
3592 let new_data = gen_batch()
3595 .col(
3596 "id",
3597 array::step_custom::<Int32Type>(300 + (round * 50) as i32, 1),
3598 )
3599 .col("category", array::fill_utf8(format!("category_{}", round)))
3600 .col(
3601 "vector",
3602 array::rand_vec::<Float32Type>(Dimension::from(dimensions)),
3603 )
3604 .into_reader_rows(RowCount::from(50), BatchCount::from(1));
3605
3606 round_cloned_dataset = Dataset::write(
3607 new_data,
3608 round_cloned_uri,
3609 Some(WriteParams {
3610 mode: WriteMode::Append,
3611 ..Default::default()
3612 }),
3613 )
3614 .await
3615 .unwrap();
3616
3617 let expected_rows = 300 + round * 50;
3619 let total_rows = round_cloned_dataset.count_rows(None).await.unwrap();
3620 assert_eq!(
3621 total_rows, expected_rows,
3622 "Round {}: Should have {} rows after append",
3623 round, expected_rows
3624 );
3625
3626 let indices_before_optimize = round_cloned_dataset.load_indices().await.unwrap();
3627 let vector_idx_before = indices_before_optimize
3628 .iter()
3629 .find(|idx| idx.name == "vector_idx")
3630 .unwrap();
3631 let category_idx_before = indices_before_optimize
3632 .iter()
3633 .find(|idx| idx.name == "category_idx")
3634 .unwrap();
3635
3636 round_cloned_dataset
3638 .optimize_indices(&OptimizeOptions::merge(indices_before_optimize.len()))
3639 .await
3640 .unwrap();
3641
3642 let optimized_indices = round_cloned_dataset.load_indices().await.unwrap();
3644 let new_vector_idx = optimized_indices
3645 .iter()
3646 .find(|idx| idx.name == "vector_idx")
3647 .unwrap();
3648 let new_category_idx = optimized_indices
3649 .iter()
3650 .find(|idx| idx.name == "category_idx")
3651 .unwrap();
3652
3653 assert_ne!(
3654 new_vector_idx.uuid, vector_idx_before.uuid,
3655 "Round {}: Vector index should have a new UUID after optimization",
3656 round
3657 );
3658 assert_ne!(
3659 new_category_idx.uuid, category_idx_before.uuid,
3660 "Round {}: Category index should have a new UUID after optimization",
3661 round
3662 );
3663
3664 use std::path::PathBuf;
3666 let clone_indices_dir = PathBuf::from(round_cloned_uri).join("_indices");
3667 let vector_index_dir = clone_indices_dir.join(new_vector_idx.uuid.to_string());
3668 let category_index_dir = clone_indices_dir.join(new_category_idx.uuid.to_string());
3669
3670 assert!(
3671 vector_index_dir.exists(),
3672 "Round {}: New vector index directory should exist in cloned dataset location: {:?}",
3673 round, vector_index_dir
3674 );
3675 assert!(
3676 category_index_dir.exists(),
3677 "Round {}: New category index directory should exist in cloned dataset location: {:?}",
3678 round, category_index_dir
3679 );
3680
3681 assert!(
3683 new_vector_idx.base_id.is_none(),
3684 "Round {}: New vector index should not have base_id after optimization in cloned dataset",
3685 round
3686 );
3687 assert!(
3688 new_category_idx.base_id.is_none(),
3689 "Round {}: New category index should not have base_id after optimization in cloned dataset",
3690 round
3691 );
3692
3693 let original_indices_dir = PathBuf::from(current_dataset.uri()).join("_indices");
3695 let wrong_vector_dir = original_indices_dir.join(new_vector_idx.uuid.to_string());
3696 let wrong_category_dir = original_indices_dir.join(new_category_idx.uuid.to_string());
3697
3698 assert!(
3699 !wrong_vector_dir.exists(),
3700 "Round {}: New vector index should NOT be in original dataset location: {:?}",
3701 round,
3702 wrong_vector_dir
3703 );
3704 assert!(
3705 !wrong_category_dir.exists(),
3706 "Round {}: New category index should NOT be in original dataset location: {:?}",
3707 round,
3708 wrong_category_dir
3709 );
3710
3711 let old_category_results = round_cloned_dataset
3713 .scan()
3714 .filter("category = 'category_0'")
3715 .unwrap()
3716 .try_into_batch()
3717 .await
3718 .unwrap();
3719
3720 let new_category_results = round_cloned_dataset
3721 .scan()
3722 .filter(&format!("category = 'category_{}'", round))
3723 .unwrap()
3724 .try_into_batch()
3725 .await
3726 .unwrap();
3727
3728 assert_eq!(
3729 source_scalar_query_rows,
3730 old_category_results.num_rows(),
3731 "Round {}: Should find old category data with {} rows",
3732 round,
3733 source_scalar_query_rows
3734 );
3735 assert!(
3736 new_category_results.num_rows() > 0,
3737 "Round {}: Should find new category data",
3738 round
3739 );
3740
3741 let query_vector = generate_random_array(dimensions as usize);
3743 let search_results = round_cloned_dataset
3744 .scan()
3745 .nearest("vector", &query_vector, 10)
3746 .unwrap()
3747 .limit(Some(10), None)
3748 .unwrap()
3749 .try_into_batch()
3750 .await
3751 .unwrap();
3752
3753 assert!(
3754 search_results.num_rows() > 0,
3755 "Round {}: Vector search should return results after optimization",
3756 round
3757 );
3758
3759 let vector_stats: serde_json::Value = serde_json::from_str(
3761 &round_cloned_dataset
3762 .index_statistics("vector_idx")
3763 .await
3764 .unwrap(),
3765 )
3766 .unwrap();
3767 let category_stats: serde_json::Value = serde_json::from_str(
3768 &round_cloned_dataset
3769 .index_statistics("category_idx")
3770 .await
3771 .unwrap(),
3772 )
3773 .unwrap();
3774
3775 assert_eq!(
3776 vector_stats["num_indexed_rows"].as_u64().unwrap(),
3777 expected_rows as u64,
3778 "Round {}: Vector index should have {} indexed rows",
3779 round,
3780 expected_rows
3781 );
3782 assert_eq!(
3783 category_stats["num_indexed_rows"].as_u64().unwrap(),
3784 expected_rows as u64,
3785 "Round {}: Category index should have {} indexed rows",
3786 round,
3787 expected_rows
3788 );
3789
3790 current_dataset = round_cloned_dataset;
3792 }
3793
3794 let final_cloned_dataset = current_dataset;
3796
3797 let cloned_indices = final_cloned_dataset.load_indices().await.unwrap();
3799 assert_eq!(
3800 cloned_indices.len(),
3801 2,
3802 "Final cloned dataset should have 2 indices"
3803 );
3804 let cloned_index_names: HashSet<String> =
3805 cloned_indices.iter().map(|idx| idx.name.clone()).collect();
3806 assert!(cloned_index_names.contains("vector_idx"));
3807 assert!(cloned_index_names.contains("category_idx"));
3808
3809 let query_vector = generate_random_array(dimensions as usize);
3811 let search_results = final_cloned_dataset
3812 .scan()
3813 .nearest("vector", &query_vector, 5)
3814 .unwrap()
3815 .limit(Some(5), None)
3816 .unwrap()
3817 .try_into_batch()
3818 .await
3819 .unwrap();
3820
3821 assert!(
3822 search_results.num_rows() > 0,
3823 "Vector search should return results on final dataset"
3824 );
3825
3826 let scalar_results = final_cloned_dataset
3828 .scan()
3829 .filter("category = 'category_0'")
3830 .unwrap()
3831 .try_into_batch()
3832 .await
3833 .unwrap();
3834
3835 assert_eq!(
3836 source_scalar_query_rows,
3837 scalar_results.num_rows(),
3838 "Scalar query should return results on final dataset"
3839 );
3840 }
3841
3842 #[tokio::test]
3843 async fn test_initialize_indices() {
3844 use crate::dataset::Dataset;
3845 use arrow_array::types::Float32Type;
3846 use lance_core::utils::tempfile::TempStrDir;
3847 use lance_datagen::{array, BatchCount, RowCount};
3848 use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
3849 use lance_linalg::distance::MetricType;
3850 use std::collections::HashSet;
3851
3852 let test_dir = TempStrDir::default();
3854 let source_uri = format!("{}/{}", test_dir, "source");
3855 let target_uri = format!("{}/{}", test_dir, "target");
3856
3857 let source_reader = lance_datagen::gen_batch()
3859 .col("vector", array::rand_vec::<Float32Type>(8.into()))
3860 .col(
3861 "text",
3862 array::cycle_utf8_literals(&["hello world", "foo bar", "test data"]),
3863 )
3864 .col("id", array::step::<Int32Type>())
3865 .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3866
3867 let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
3869 .await
3870 .unwrap();
3871
3872 let vector_params = VectorIndexParams::ivf_pq(4, 8, 2, MetricType::L2, 10);
3875 source_dataset
3876 .create_index(
3877 &["vector"],
3878 IndexType::Vector,
3879 Some("vec_idx".to_string()),
3880 &vector_params,
3881 false,
3882 )
3883 .await
3884 .unwrap();
3885
3886 let fts_params = InvertedIndexParams::default();
3888 source_dataset
3889 .create_index(
3890 &["text"],
3891 IndexType::Inverted,
3892 Some("text_idx".to_string()),
3893 &fts_params,
3894 false,
3895 )
3896 .await
3897 .unwrap();
3898
3899 let scalar_params = ScalarIndexParams::default();
3901 source_dataset
3902 .create_index(
3903 &["id"],
3904 IndexType::BTree,
3905 Some("id_idx".to_string()),
3906 &scalar_params,
3907 false,
3908 )
3909 .await
3910 .unwrap();
3911
3912 let source_dataset = Dataset::open(&source_uri).await.unwrap();
3914
3915 let source_indices = source_dataset.load_indices().await.unwrap();
3917 assert_eq!(
3918 source_indices.len(),
3919 3,
3920 "Source dataset should have 3 indices"
3921 );
3922
3923 let target_reader = lance_datagen::gen_batch()
3925 .col("vector", array::rand_vec::<Float32Type>(8.into()))
3926 .col(
3927 "text",
3928 array::cycle_utf8_literals(&["foo bar", "test data", "hello world"]),
3929 )
3930 .col("id", array::step_custom::<Int32Type>(100, 1))
3931 .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3932 let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
3933 .await
3934 .unwrap();
3935
3936 target_dataset
3938 .initialize_indices(&source_dataset)
3939 .await
3940 .unwrap();
3941
3942 let target_indices = target_dataset.load_indices().await.unwrap();
3944 assert_eq!(
3945 target_indices.len(),
3946 3,
3947 "Target dataset should have 3 indices after initialization"
3948 );
3949
3950 let source_names: HashSet<String> =
3952 source_indices.iter().map(|idx| idx.name.clone()).collect();
3953 let target_names: HashSet<String> =
3954 target_indices.iter().map(|idx| idx.name.clone()).collect();
3955 assert_eq!(
3956 source_names, target_names,
3957 "Index names should match between source and target"
3958 );
3959
3960 let query_vector = generate_random_array(8);
3963 let search_results = target_dataset
3964 .scan()
3965 .nearest("vector", &query_vector, 5)
3966 .unwrap()
3967 .limit(Some(5), None)
3968 .unwrap()
3969 .try_into_batch()
3970 .await
3971 .unwrap();
3972 assert!(
3973 search_results.num_rows() > 0,
3974 "Vector index should be functional"
3975 );
3976
3977 let scalar_results = target_dataset
3979 .scan()
3980 .filter("id = 125")
3981 .unwrap()
3982 .try_into_batch()
3983 .await
3984 .unwrap();
3985 assert_eq!(
3986 scalar_results.num_rows(),
3987 1,
3988 "Scalar index should find exact match"
3989 );
3990 }
3991
3992 #[tokio::test]
3993 async fn test_initialize_indices_with_missing_field() {
3994 use crate::dataset::Dataset;
3995 use arrow_array::types::Int32Type;
3996 use lance_core::utils::tempfile::TempStrDir;
3997 use lance_datagen::{array, BatchCount, RowCount};
3998 use lance_index::scalar::ScalarIndexParams;
3999
4000 let test_dir = TempStrDir::default();
4002 let source_uri = format!("{}/{}", test_dir, "source");
4003 let target_uri = format!("{}/{}", test_dir, "target");
4004
4005 let source_reader = lance_datagen::gen_batch()
4007 .col("id", array::step::<Int32Type>())
4008 .col("extra", array::cycle_utf8_literals(&["test"]))
4009 .into_reader_rows(RowCount::from(10), BatchCount::from(1));
4010 let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
4011 .await
4012 .unwrap();
4013
4014 source_dataset
4016 .create_index(
4017 &["extra"],
4018 IndexType::BTree,
4019 None,
4020 &ScalarIndexParams::default(),
4021 false,
4022 )
4023 .await
4024 .unwrap();
4025
4026 let target_reader = lance_datagen::gen_batch()
4028 .col("id", array::step_custom::<Int32Type>(10, 1))
4029 .into_reader_rows(RowCount::from(10), BatchCount::from(1));
4030 let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
4031 .await
4032 .unwrap();
4033
4034 let result = target_dataset.initialize_indices(&source_dataset).await;
4036
4037 assert!(result.is_err(), "Should error when field is missing");
4039 assert!(result
4040 .unwrap_err()
4041 .to_string()
4042 .contains("not found in target dataset"));
4043 }
4044
4045 #[tokio::test]
4046 async fn test_initialize_single_index() {
4047 use crate::dataset::Dataset;
4048 use crate::index::vector::VectorIndexParams;
4049 use arrow_array::types::{Float32Type, Int32Type};
4050 use lance_core::utils::tempfile::TempStrDir;
4051 use lance_datagen::{array, BatchCount, RowCount};
4052 use lance_index::scalar::ScalarIndexParams;
4053 use lance_linalg::distance::MetricType;
4054
4055 let test_dir = TempStrDir::default();
4056 let source_uri = format!("{}/{}", test_dir, "source");
4057 let target_uri = format!("{}/{}", test_dir, "target");
4058
4059 let source_reader = lance_datagen::gen_batch()
4061 .col("id", array::step::<Int32Type>())
4062 .col("name", array::rand_utf8(4.into(), false))
4063 .col("vector", array::rand_vec::<Float32Type>(8.into()))
4064 .into_reader_rows(RowCount::from(300), BatchCount::from(1));
4065 let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
4066 .await
4067 .unwrap();
4068
4069 let scalar_params = ScalarIndexParams::default();
4071 source_dataset
4072 .create_index(
4073 &["id"],
4074 IndexType::BTree,
4075 Some("id_index".to_string()),
4076 &scalar_params,
4077 false,
4078 )
4079 .await
4080 .unwrap();
4081
4082 let vector_params = VectorIndexParams::ivf_pq(16, 8, 4, MetricType::L2, 50);
4083 source_dataset
4084 .create_index(
4085 &["vector"],
4086 IndexType::Vector,
4087 Some("vector_index".to_string()),
4088 &vector_params,
4089 false,
4090 )
4091 .await
4092 .unwrap();
4093
4094 let source_dataset = Dataset::open(&source_uri).await.unwrap();
4096
4097 let target_reader = lance_datagen::gen_batch()
4099 .col("id", array::step::<Int32Type>())
4100 .col("name", array::rand_utf8(4.into(), false))
4101 .col("vector", array::rand_vec::<Float32Type>(8.into()))
4102 .into_reader_rows(RowCount::from(300), BatchCount::from(1));
4103 let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
4104 .await
4105 .unwrap();
4106
4107 target_dataset
4109 .initialize_index(&source_dataset, "vector_index")
4110 .await
4111 .unwrap();
4112
4113 let target_indices = target_dataset.load_indices().await.unwrap();
4115 assert_eq!(target_indices.len(), 1, "Should have only 1 index");
4116 assert_eq!(
4117 target_indices[0].name, "vector_index",
4118 "Should have the vector index"
4119 );
4120
4121 target_dataset
4123 .initialize_index(&source_dataset, "id_index")
4124 .await
4125 .unwrap();
4126
4127 let target_indices = target_dataset.load_indices().await.unwrap();
4129 assert_eq!(target_indices.len(), 2, "Should have 2 indices");
4130
4131 let index_names: HashSet<String> =
4132 target_indices.iter().map(|idx| idx.name.clone()).collect();
4133 assert!(
4134 index_names.contains("vector_index"),
4135 "Should have vector index"
4136 );
4137 assert!(index_names.contains("id_index"), "Should have id index");
4138
4139 let result = target_dataset
4141 .initialize_index(&source_dataset, "non_existent")
4142 .await;
4143 assert!(result.is_err(), "Should error for non-existent index");
4144 assert!(result
4145 .unwrap_err()
4146 .to_string()
4147 .contains("not found in source dataset"));
4148 }
4149
4150 #[tokio::test]
4151 async fn test_vector_index_on_nested_field_with_dots() {
4152 let dimensions = 16;
4153 let num_rows = 256;
4154
4155 let struct_field = Field::new(
4157 "embedding_data",
4158 DataType::Struct(
4159 vec![
4160 Field::new(
4161 "vector.v1", DataType::FixedSizeList(
4163 Arc::new(Field::new("item", DataType::Float32, true)),
4164 dimensions,
4165 ),
4166 false,
4167 ),
4168 Field::new(
4169 "vector.v2", DataType::FixedSizeList(
4171 Arc::new(Field::new("item", DataType::Float32, true)),
4172 dimensions,
4173 ),
4174 false,
4175 ),
4176 Field::new("metadata", DataType::Utf8, false),
4177 ]
4178 .into(),
4179 ),
4180 false,
4181 );
4182
4183 let schema = Arc::new(Schema::new(vec![
4184 Field::new("id", DataType::Int32, false),
4185 struct_field,
4186 ]));
4187
4188 let float_arr_v1 = generate_random_array(num_rows * dimensions as usize);
4190 let vectors_v1 = FixedSizeListArray::try_new_from_values(float_arr_v1, dimensions).unwrap();
4191
4192 let float_arr_v2 = generate_random_array(num_rows * dimensions as usize);
4193 let vectors_v2 = FixedSizeListArray::try_new_from_values(float_arr_v2, dimensions).unwrap();
4194
4195 let ids = Int32Array::from_iter_values(0..num_rows as i32);
4196 let metadata = StringArray::from_iter_values((0..num_rows).map(|i| format!("meta_{}", i)));
4197
4198 let struct_array = arrow_array::StructArray::from(vec![
4199 (
4200 Arc::new(Field::new(
4201 "vector.v1",
4202 DataType::FixedSizeList(
4203 Arc::new(Field::new("item", DataType::Float32, true)),
4204 dimensions,
4205 ),
4206 false,
4207 )),
4208 Arc::new(vectors_v1) as Arc<dyn arrow_array::Array>,
4209 ),
4210 (
4211 Arc::new(Field::new(
4212 "vector.v2",
4213 DataType::FixedSizeList(
4214 Arc::new(Field::new("item", DataType::Float32, true)),
4215 dimensions,
4216 ),
4217 false,
4218 )),
4219 Arc::new(vectors_v2) as Arc<dyn arrow_array::Array>,
4220 ),
4221 (
4222 Arc::new(Field::new("metadata", DataType::Utf8, false)),
4223 Arc::new(metadata) as Arc<dyn arrow_array::Array>,
4224 ),
4225 ]);
4226
4227 let batch =
4228 RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4229 .unwrap();
4230
4231 let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4232
4233 let test_dir = TempStrDir::default();
4234 let test_uri = &test_dir;
4235 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4236
4237 let nested_column_path_v1 = "embedding_data.`vector.v1`";
4239 let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
4240
4241 dataset
4242 .create_index(
4243 &[nested_column_path_v1],
4244 IndexType::Vector,
4245 Some("vec_v1_idx".to_string()),
4246 ¶ms,
4247 true,
4248 )
4249 .await
4250 .unwrap();
4251
4252 let indices = dataset.load_indices().await.unwrap();
4254 assert_eq!(indices.len(), 1);
4255 assert_eq!(indices[0].name, "vec_v1_idx");
4256
4257 let field_id = indices[0].fields[0];
4259 let field_path = dataset.schema().field_path(field_id).unwrap();
4260 assert_eq!(field_path, "embedding_data.`vector.v1`");
4261
4262 let nested_column_path_v2 = "embedding_data.`vector.v2`";
4264 dataset
4265 .create_index(
4266 &[nested_column_path_v2],
4267 IndexType::Vector,
4268 Some("vec_v2_idx".to_string()),
4269 ¶ms,
4270 true,
4271 )
4272 .await
4273 .unwrap();
4274
4275 let indices = dataset.load_indices().await.unwrap();
4277 assert_eq!(indices.len(), 2);
4278
4279 let query_vector = generate_random_array(dimensions as usize);
4281
4282 let plan_v1 = dataset
4284 .scan()
4285 .nearest(nested_column_path_v1, &query_vector, 5)
4286 .unwrap()
4287 .explain_plan(false)
4288 .await
4289 .unwrap();
4290
4291 assert!(
4293 plan_v1.contains("ANNSubIndex") || plan_v1.contains("ANNIvfPartition"),
4294 "Query plan should use vector index for nested field with dots. Plan: {}",
4295 plan_v1
4296 );
4297
4298 let search_results_v1 = dataset
4299 .scan()
4300 .nearest(nested_column_path_v1, &query_vector, 5)
4301 .unwrap()
4302 .try_into_batch()
4303 .await
4304 .unwrap();
4305
4306 assert_eq!(search_results_v1.num_rows(), 5);
4307
4308 let plan_v2 = dataset
4310 .scan()
4311 .nearest(nested_column_path_v2, &query_vector, 5)
4312 .unwrap()
4313 .explain_plan(false)
4314 .await
4315 .unwrap();
4316
4317 assert!(
4319 plan_v2.contains("ANNSubIndex") || plan_v2.contains("ANNIvfPartition"),
4320 "Query plan should use vector index for second nested field with dots. Plan: {}",
4321 plan_v2
4322 );
4323
4324 let search_results_v2 = dataset
4325 .scan()
4326 .nearest(nested_column_path_v2, &query_vector, 5)
4327 .unwrap()
4328 .try_into_batch()
4329 .await
4330 .unwrap();
4331
4332 assert_eq!(search_results_v2.num_rows(), 5);
4333 }
4334
4335 #[tokio::test]
4336 async fn test_vector_index_on_simple_nested_field() {
4337 let dimensions = 16;
4340 let num_rows = 256;
4341
4342 let struct_field = Field::new(
4344 "data",
4345 DataType::Struct(
4346 vec![
4347 Field::new(
4348 "embedding",
4349 DataType::FixedSizeList(
4350 Arc::new(Field::new("item", DataType::Float32, true)),
4351 dimensions,
4352 ),
4353 false,
4354 ),
4355 Field::new("label", DataType::Utf8, false),
4356 ]
4357 .into(),
4358 ),
4359 false,
4360 );
4361
4362 let schema = Arc::new(Schema::new(vec![
4363 Field::new("id", DataType::Int32, false),
4364 struct_field,
4365 ]));
4366
4367 let float_arr = generate_random_array(num_rows * dimensions as usize);
4369 let vectors = FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
4370
4371 let ids = Int32Array::from_iter_values(0..num_rows as i32);
4372 let labels = StringArray::from_iter_values((0..num_rows).map(|i| format!("label_{}", i)));
4373
4374 let struct_array = arrow_array::StructArray::from(vec![
4375 (
4376 Arc::new(Field::new(
4377 "embedding",
4378 DataType::FixedSizeList(
4379 Arc::new(Field::new("item", DataType::Float32, true)),
4380 dimensions,
4381 ),
4382 false,
4383 )),
4384 Arc::new(vectors) as Arc<dyn arrow_array::Array>,
4385 ),
4386 (
4387 Arc::new(Field::new("label", DataType::Utf8, false)),
4388 Arc::new(labels) as Arc<dyn arrow_array::Array>,
4389 ),
4390 ]);
4391
4392 let batch =
4393 RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4394 .unwrap();
4395
4396 let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4397
4398 let test_dir = TempStrDir::default();
4399 let test_uri = &test_dir;
4400 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4401
4402 let nested_column_path = "data.embedding";
4404 let params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10);
4405
4406 dataset
4407 .create_index(
4408 &[nested_column_path],
4409 IndexType::Vector,
4410 Some("vec_idx".to_string()),
4411 ¶ms,
4412 true,
4413 )
4414 .await
4415 .unwrap();
4416
4417 let indices = dataset.load_indices().await.unwrap();
4419 assert_eq!(indices.len(), 1);
4420 assert_eq!(indices[0].name, "vec_idx");
4421
4422 let field_id = indices[0].fields[0];
4424 let field_path = dataset.schema().field_path(field_id).unwrap();
4425 assert_eq!(field_path, "data.embedding");
4426
4427 let query_vector = generate_random_array(dimensions as usize);
4429
4430 let plan = dataset
4431 .scan()
4432 .nearest(nested_column_path, &query_vector, 5)
4433 .unwrap()
4434 .explain_plan(false)
4435 .await
4436 .unwrap();
4437
4438 assert!(
4440 plan.contains("ANNSubIndex") || plan.contains("ANNIvfPartition"),
4441 "Query plan should use vector index for nested field. Plan: {}",
4442 plan
4443 );
4444
4445 let search_results = dataset
4446 .scan()
4447 .nearest(nested_column_path, &query_vector, 5)
4448 .unwrap()
4449 .try_into_batch()
4450 .await
4451 .unwrap();
4452
4453 assert_eq!(search_results.num_rows(), 5);
4454 }
4455
4456 #[tokio::test]
4457 async fn test_btree_index_on_nested_field_with_dots() {
4458 let test_dir = TempStrDir::default();
4460 let test_uri = &test_dir;
4461
4462 let schema = Arc::new(Schema::new(vec![
4464 Field::new("id", DataType::Int32, false),
4465 Field::new(
4466 "data",
4467 DataType::Struct(
4468 vec![
4469 Field::new("value.v1", DataType::Int32, false),
4470 Field::new("value.v2", DataType::Float32, false),
4471 Field::new("text", DataType::Utf8, false),
4472 ]
4473 .into(),
4474 ),
4475 false,
4476 ),
4477 ]));
4478
4479 let num_rows = 1000;
4481 let ids = Int32Array::from_iter_values(0..num_rows);
4482 let values_v1 = Int32Array::from_iter_values((0..num_rows).map(|i| i % 100));
4483 let values_v2 = Float32Array::from_iter_values((0..num_rows).map(|i| (i as f32) * 0.1));
4484 let texts = StringArray::from_iter_values((0..num_rows).map(|i| format!("text_{}", i)));
4485
4486 let struct_array = arrow_array::StructArray::from(vec![
4487 (
4488 Arc::new(Field::new("value.v1", DataType::Int32, false)),
4489 Arc::new(values_v1) as Arc<dyn arrow_array::Array>,
4490 ),
4491 (
4492 Arc::new(Field::new("value.v2", DataType::Float32, false)),
4493 Arc::new(values_v2) as Arc<dyn arrow_array::Array>,
4494 ),
4495 (
4496 Arc::new(Field::new("text", DataType::Utf8, false)),
4497 Arc::new(texts) as Arc<dyn arrow_array::Array>,
4498 ),
4499 ]);
4500
4501 let batch =
4502 RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4503 .unwrap();
4504
4505 let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4506 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4507
4508 let nested_column_path = "data.`value.v1`";
4510 let params = ScalarIndexParams::default();
4511
4512 dataset
4513 .create_index(
4514 &[nested_column_path],
4515 IndexType::BTree,
4516 Some("btree_v1_idx".to_string()),
4517 ¶ms,
4518 true,
4519 )
4520 .await
4521 .unwrap();
4522
4523 dataset = Dataset::open(test_uri).await.unwrap();
4525
4526 let indices = dataset.load_indices().await.unwrap();
4528 assert_eq!(indices.len(), 1);
4529 assert_eq!(indices[0].name, "btree_v1_idx");
4530
4531 let field_id = indices[0].fields[0];
4533 let field_path = dataset.schema().field_path(field_id).unwrap();
4534 assert_eq!(field_path, "data.`value.v1`");
4535
4536 let plan = dataset
4538 .scan()
4539 .filter("data.`value.v1` = 42")
4540 .unwrap()
4541 .prefilter(true)
4542 .explain_plan(false)
4543 .await
4544 .unwrap();
4545
4546 assert!(
4549 plan.contains("ScalarIndexQuery"),
4550 "Query plan should show optimized read. Plan: {}",
4551 plan
4552 );
4553
4554 let results = dataset
4556 .scan()
4557 .filter("data.`value.v1` = 42")
4558 .unwrap()
4559 .prefilter(true)
4560 .try_into_batch()
4561 .await
4562 .unwrap();
4563
4564 assert!(results.num_rows() > 0);
4565 }
4566
4567 #[tokio::test]
4568 async fn test_bitmap_index_on_nested_field_with_dots() {
4569 let test_dir = TempStrDir::default();
4571 let test_uri = &test_dir;
4572
4573 let schema = Arc::new(Schema::new(vec![
4575 Field::new("id", DataType::Int32, false),
4576 Field::new(
4577 "metadata",
4578 DataType::Struct(
4579 vec![
4580 Field::new("status.code", DataType::Int32, false),
4581 Field::new("category.name", DataType::Utf8, false),
4582 ]
4583 .into(),
4584 ),
4585 false,
4586 ),
4587 ]));
4588
4589 let num_rows = 1000;
4591 let ids = Int32Array::from_iter_values(0..num_rows);
4592 let status_codes = Int32Array::from_iter_values((0..num_rows).map(|i| i % 10));
4594 let categories =
4596 StringArray::from_iter_values((0..num_rows).map(|i| format!("category_{}", i % 5)));
4597
4598 let struct_array = arrow_array::StructArray::from(vec![
4599 (
4600 Arc::new(Field::new("status.code", DataType::Int32, false)),
4601 Arc::new(status_codes) as Arc<dyn arrow_array::Array>,
4602 ),
4603 (
4604 Arc::new(Field::new("category.name", DataType::Utf8, false)),
4605 Arc::new(categories) as Arc<dyn arrow_array::Array>,
4606 ),
4607 ]);
4608
4609 let batch =
4610 RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4611 .unwrap();
4612
4613 let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4614 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4615
4616 let nested_column_path = "metadata.`status.code`";
4618 let params = ScalarIndexParams::default();
4619
4620 dataset
4621 .create_index(
4622 &[nested_column_path],
4623 IndexType::Bitmap,
4624 Some("bitmap_status_idx".to_string()),
4625 ¶ms,
4626 true,
4627 )
4628 .await
4629 .unwrap();
4630
4631 dataset = Dataset::open(test_uri).await.unwrap();
4633
4634 let indices = dataset.load_indices().await.unwrap();
4636 assert_eq!(indices.len(), 1);
4637 assert_eq!(indices[0].name, "bitmap_status_idx");
4638
4639 let field_id = indices[0].fields[0];
4641 let field_path = dataset.schema().field_path(field_id).unwrap();
4642 assert_eq!(field_path, "metadata.`status.code`");
4643
4644 let plan = dataset
4646 .scan()
4647 .filter("metadata.`status.code` = 5")
4648 .unwrap()
4649 .explain_plan(false)
4650 .await
4651 .unwrap();
4652
4653 assert!(
4656 plan.contains("ScalarIndexQuery"),
4657 "Query plan should show optimized read. Plan: {}",
4658 plan
4659 );
4660
4661 let results = dataset
4663 .scan()
4664 .filter("metadata.`status.code` = 5")
4665 .unwrap()
4666 .try_into_batch()
4667 .await
4668 .unwrap();
4669
4670 assert!(results.num_rows() > 0);
4672 assert_eq!(results.num_rows(), 100);
4673 }
4674
4675 #[tokio::test]
4676 async fn test_inverted_index_on_nested_field_with_dots() {
4677 use lance_index::scalar::inverted::tokenizer::InvertedIndexParams;
4678
4679 let test_dir = TempStrDir::default();
4681 let test_uri = &test_dir;
4682
4683 let schema = Arc::new(Schema::new(vec![
4685 Field::new("id", DataType::Int32, false),
4686 Field::new(
4687 "document",
4688 DataType::Struct(
4689 vec![
4690 Field::new("content.text", DataType::Utf8, false),
4691 Field::new("content.summary", DataType::Utf8, false),
4692 ]
4693 .into(),
4694 ),
4695 false,
4696 ),
4697 ]));
4698
4699 let num_rows = 100;
4701 let ids = Int32Array::from_iter_values(0..num_rows as i32);
4702 let content_texts = StringArray::from_iter_values((0..num_rows).map(|i| match i % 3 {
4703 0 => format!("The quick brown fox jumps over the lazy dog {}", i),
4704 1 => format!(
4705 "Machine learning and artificial intelligence document {}",
4706 i
4707 ),
4708 _ => format!("Data science and analytics content piece {}", i),
4709 }));
4710 let summaries = StringArray::from_iter_values(
4711 (0..num_rows).map(|i| format!("Summary of document {}", i)),
4712 );
4713
4714 let struct_array = arrow_array::StructArray::from(vec![
4715 (
4716 Arc::new(Field::new("content.text", DataType::Utf8, false)),
4717 Arc::new(content_texts) as Arc<dyn arrow_array::Array>,
4718 ),
4719 (
4720 Arc::new(Field::new("content.summary", DataType::Utf8, false)),
4721 Arc::new(summaries) as Arc<dyn arrow_array::Array>,
4722 ),
4723 ]);
4724
4725 let batch =
4726 RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4727 .unwrap();
4728
4729 let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4730 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4731
4732 let nested_column_path = "document.`content.text`";
4734 let params = InvertedIndexParams::default();
4735
4736 dataset
4737 .create_index(
4738 &[nested_column_path],
4739 IndexType::Inverted,
4740 Some("inverted_content_idx".to_string()),
4741 ¶ms,
4742 true,
4743 )
4744 .await
4745 .unwrap();
4746
4747 dataset = Dataset::open(test_uri).await.unwrap();
4749
4750 let indices = dataset.load_indices().await.unwrap();
4752 assert_eq!(indices.len(), 1);
4753 assert_eq!(indices[0].name, "inverted_content_idx");
4754
4755 let field_id = indices[0].fields[0];
4757 let field_path = dataset.schema().field_path(field_id).unwrap();
4758 assert_eq!(field_path, "document.`content.text`");
4759
4760 let query = FullTextSearchQuery::new("machine learning".to_string())
4763 .with_column(field_path.clone())
4764 .unwrap();
4765
4766 let plan = dataset
4768 .scan()
4769 .full_text_search(query.clone())
4770 .unwrap()
4771 .explain_plan(false)
4772 .await
4773 .unwrap();
4774
4775 assert!(
4777 plan.contains("MatchQuery") || plan.contains("PhraseQuery"),
4778 "Query plan should use inverted index for nested field with dots. Plan: {}",
4779 plan
4780 );
4781
4782 let results = dataset
4783 .scan()
4784 .full_text_search(query)
4785 .unwrap()
4786 .try_into_stream()
4787 .await
4788 .unwrap()
4789 .try_collect::<Vec<_>>()
4790 .await
4791 .unwrap();
4792
4793 assert!(
4795 !results.is_empty(),
4796 "Full-text search should return results"
4797 );
4798
4799 let mut found_count = 0;
4801 for batch in results {
4802 found_count += batch.num_rows();
4803 }
4804 assert!(
4806 found_count > 0,
4807 "Should find at least some documents with 'machine learning'"
4808 );
4809 assert!(found_count < num_rows, "Should not match all documents");
4810 }
4811}