1use std::collections::{HashMap, HashSet};
8use std::sync::{Arc, OnceLock};
9
10use arrow_schema::{DataType, Schema};
11use async_trait::async_trait;
12use datafusion::execution::SendableRecordBatchStream;
13use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
14use futures::{stream, FutureExt};
15use itertools::Itertools;
16use lance_core::cache::{CacheKey, UnsizedCacheKey};
17use lance_core::datatypes::Field;
18use lance_core::datatypes::Schema as LanceSchema;
19use lance_core::utils::address::RowAddress;
20use lance_core::utils::parse::str_is_truthy;
21use lance_core::utils::tracing::{
22 IO_TYPE_OPEN_FRAG_REUSE, IO_TYPE_OPEN_MEM_WAL, IO_TYPE_OPEN_SCALAR, IO_TYPE_OPEN_VECTOR,
23 TRACE_IO_EVENTS,
24};
25use lance_file::previous::reader::FileReader as PreviousFileReader;
26use lance_file::reader::FileReaderOptions;
27use lance_index::frag_reuse::{FragReuseIndex, FRAG_REUSE_INDEX_NAME};
28use lance_index::mem_wal::{MemWalIndex, MEM_WAL_INDEX_NAME};
29use lance_index::optimize::OptimizeOptions;
30use lance_index::pb::index::Implementation;
31use lance_index::scalar::expression::{
32 IndexInformationProvider, MultiQueryParser, ScalarQueryParser,
33};
34use lance_index::scalar::inverted::InvertedIndexPlugin;
35use lance_index::scalar::lance_format::LanceIndexStore;
36use lance_index::scalar::registry::{TrainingCriteria, TrainingOrdering};
37use lance_index::scalar::{CreatedIndex, ScalarIndex};
38use lance_index::vector::bq::builder::RabitQuantizer;
39use lance_index::vector::flat::index::{FlatBinQuantizer, FlatIndex, FlatQuantizer};
40use lance_index::vector::hnsw::HNSW;
41use lance_index::vector::pq::ProductQuantizer;
42use lance_index::vector::sq::ScalarQuantizer;
43pub use lance_index::IndexParams;
44use lance_index::{
45 is_system_index,
46 metrics::{MetricsCollector, NoOpMetricsCollector},
47 IndexCriteria,
48};
49use lance_index::{pb, vector::VectorIndex, Index, IndexType, INDEX_FILE_NAME};
50use lance_index::{
51 DatasetIndexExt, IndexDescription, INDEX_METADATA_SCHEMA_KEY, VECTOR_INDEX_VERSION,
52};
53use lance_io::scheduler::{ScanScheduler, SchedulerConfig};
54use lance_io::traits::Reader;
55use lance_io::utils::{
56 read_last_block, read_message, read_message_from_buf, read_metadata_offset, read_version,
57 CachedFileSize,
58};
59use lance_table::format::IndexMetadata;
60use lance_table::format::{Fragment, SelfDescribingFileReader};
61use lance_table::io::manifest::read_manifest_indexes;
62use roaring::RoaringBitmap;
63use scalar::index_matches_criteria;
64use serde_json::json;
65use snafu::location;
66use tracing::{info, instrument};
67use uuid::Uuid;
68use vector::ivf::v2::IVFIndex;
69use vector::utils::get_vector_type;
70
71pub(crate) mod append;
72mod create;
73pub mod frag_reuse;
74pub mod mem_wal;
75pub mod prefilter;
76pub mod scalar;
77pub mod vector;
78
79use self::append::merge_indices;
80use self::vector::remap_vector_index;
81use crate::dataset::index::LanceIndexStoreExt;
82use crate::dataset::optimize::remapping::RemapResult;
83use crate::dataset::optimize::RemappedIndex;
84use crate::dataset::transaction::{Operation, Transaction};
85use crate::index::frag_reuse::{load_frag_reuse_index_details, open_frag_reuse_index};
86use crate::index::mem_wal::open_mem_wal_index;
87pub use crate::index::prefilter::{FilterLoader, PreFilter};
88use crate::index::scalar::{fetch_index_details, load_training_data, IndexDetails};
89use crate::session::index_caches::{FragReuseIndexKey, IndexMetadataKey};
90use crate::{dataset::Dataset, Error, Result};
91pub use create::CreateIndexBuilder;
92
93#[derive(Debug, Clone)]
95pub struct ScalarIndexCacheKey<'a> {
96 pub uuid: &'a str,
97 pub fri_uuid: Option<&'a Uuid>,
98}
99
100impl<'a> ScalarIndexCacheKey<'a> {
101 pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
102 Self { uuid, fri_uuid }
103 }
104}
105
106impl UnsizedCacheKey for ScalarIndexCacheKey<'_> {
107 type ValueType = dyn ScalarIndex;
108
109 fn key(&self) -> std::borrow::Cow<'_, str> {
110 if let Some(fri_uuid) = self.fri_uuid {
111 format!("{}-{}", self.uuid, fri_uuid).into()
112 } else {
113 self.uuid.into()
114 }
115 }
116}
117
118#[derive(Debug, Clone)]
119pub struct VectorIndexCacheKey<'a> {
120 pub uuid: &'a str,
121 pub fri_uuid: Option<&'a Uuid>,
122}
123
124impl<'a> VectorIndexCacheKey<'a> {
125 pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
126 Self { uuid, fri_uuid }
127 }
128}
129
130impl UnsizedCacheKey for VectorIndexCacheKey<'_> {
131 type ValueType = dyn VectorIndex;
132
133 fn key(&self) -> std::borrow::Cow<'_, str> {
134 if let Some(fri_uuid) = self.fri_uuid {
135 format!("{}-{}", self.uuid, fri_uuid).into()
136 } else {
137 self.uuid.into()
138 }
139 }
140}
141
142#[derive(Debug, Clone)]
143pub struct FragReuseIndexCacheKey<'a> {
144 pub uuid: &'a str,
145 pub fri_uuid: Option<&'a Uuid>,
146}
147
148impl<'a> FragReuseIndexCacheKey<'a> {
149 pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
150 Self { uuid, fri_uuid }
151 }
152}
153
154impl CacheKey for FragReuseIndexCacheKey<'_> {
155 type ValueType = FragReuseIndex;
156
157 fn key(&self) -> std::borrow::Cow<'_, str> {
158 if let Some(fri_uuid) = self.fri_uuid {
159 format!("{}-{}", self.uuid, fri_uuid).into()
160 } else {
161 self.uuid.into()
162 }
163 }
164}
165
166#[derive(Debug, Clone)]
167pub struct MemWalCacheKey<'a> {
168 pub uuid: &'a Uuid,
169 pub fri_uuid: Option<&'a Uuid>,
170}
171
172impl<'a> MemWalCacheKey<'a> {
173 pub fn new(uuid: &'a Uuid, fri_uuid: Option<&'a Uuid>) -> Self {
174 Self { uuid, fri_uuid }
175 }
176}
177
178impl CacheKey for MemWalCacheKey<'_> {
179 type ValueType = MemWalIndex;
180
181 fn key(&self) -> std::borrow::Cow<'_, str> {
182 if let Some(fri_uuid) = self.fri_uuid {
183 format!("{}-{}", self.uuid, fri_uuid).into()
184 } else {
185 self.uuid.to_string().into()
186 }
187 }
188}
189
190fn auto_migrate_corruption() -> bool {
192 static LANCE_AUTO_MIGRATION: OnceLock<bool> = OnceLock::new();
193 *LANCE_AUTO_MIGRATION.get_or_init(|| {
194 std::env::var("LANCE_AUTO_MIGRATION")
195 .ok()
196 .map(|s| str_is_truthy(&s))
197 .unwrap_or(true)
198 })
199}
200
201fn type_name_from_uri(index_uri: &str) -> String {
207 let type_name = index_uri.rsplit('/').next().unwrap_or(index_uri);
208 let type_name = type_name.rsplit('.').next().unwrap_or(type_name);
209 type_name.trim_end_matches("IndexDetails").to_string()
210}
211
212fn legacy_type_name(index_uri: &str, index_type_hint: Option<&str>) -> String {
219 if let Some(hint) = index_type_hint {
220 return hint.to_string();
221 }
222
223 let base = type_name_from_uri(index_uri);
224
225 match base.as_str() {
226 "BTree" => IndexType::BTree.to_string(),
227 "Bitmap" => IndexType::Bitmap.to_string(),
228 "LabelList" => IndexType::LabelList.to_string(),
229 "NGram" => IndexType::NGram.to_string(),
230 "ZoneMap" => IndexType::ZoneMap.to_string(),
231 "BloomFilter" => IndexType::BloomFilter.to_string(),
232 "Inverted" => IndexType::Inverted.to_string(),
233 "Json" => IndexType::Scalar.to_string(),
234 "Flat" | "Vector" => IndexType::Vector.to_string(),
235 other if other.contains("Vector") => IndexType::Vector.to_string(),
236 _ => "N/A".to_string(),
237 }
238}
239
240#[async_trait]
242pub trait IndexBuilder {
243 fn index_type() -> IndexType;
244
245 async fn build(&self) -> Result<()>;
246}
247
248pub(crate) async fn remap_index(
249 dataset: &Dataset,
250 index_id: &Uuid,
251 row_id_map: &HashMap<u64, Option<u64>>,
252) -> Result<RemapResult> {
253 let indices = dataset.load_indices().await?;
255 let matched = indices
256 .iter()
257 .find(|i| i.uuid == *index_id)
258 .ok_or_else(|| Error::Index {
259 message: format!("Index with id {} does not exist", index_id),
260 location: location!(),
261 })?;
262
263 if matched.fields.len() > 1 {
264 return Err(Error::Index {
265 message: "Remapping indices with multiple fields is not supported".to_string(),
266 location: location!(),
267 });
268 }
269
270 if row_id_map.values().all(|v| v.is_none()) {
271 let deleted_bitmap = RoaringBitmap::from_iter(
272 row_id_map
273 .keys()
274 .map(|row_id| RowAddress::new_from_u64(*row_id))
275 .map(|addr| addr.fragment_id()),
276 );
277 if Some(deleted_bitmap) == matched.fragment_bitmap {
278 return Ok(RemapResult::Keep(*index_id));
283 }
284 }
285
286 let field_id = matched
287 .fields
288 .first()
289 .expect("An index existed with no fields");
290 let field_path = dataset.schema().field_path(*field_id)?;
291
292 let new_id = Uuid::new_v4();
293
294 let generic = dataset
295 .open_generic_index(&field_path, &index_id.to_string(), &NoOpMetricsCollector)
296 .await?;
297
298 let created_index = match generic.index_type() {
299 it if it.is_scalar() => {
300 let new_store = LanceIndexStore::from_dataset_for_new(dataset, &new_id.to_string())?;
301
302 let scalar_index = dataset
303 .open_scalar_index(&field_path, &index_id.to_string(), &NoOpMetricsCollector)
304 .await?;
305 if !scalar_index.can_remap() {
306 return Ok(RemapResult::Drop);
307 }
308
309 match scalar_index.index_type() {
310 IndexType::Inverted => {
311 let inverted_index = scalar_index
312 .as_any()
313 .downcast_ref::<lance_index::scalar::inverted::InvertedIndex>()
314 .ok_or(Error::Index {
315 message: "expected inverted index".to_string(),
316 location: location!(),
317 })?;
318 if inverted_index.is_legacy() {
319 log::warn!("reindex because of legacy format, index_type: {}, index_id: {}, field: {}",
320 scalar_index.index_type(),
321 index_id,
322 field_path
323 );
324 let training_data = load_training_data(
325 dataset,
326 &field_path,
327 &TrainingCriteria::new(TrainingOrdering::None),
328 None,
329 true, None,
331 )
332 .await?;
333 InvertedIndexPlugin::train_inverted_index(
334 training_data,
335 &new_store,
336 inverted_index.params().clone(),
337 None,
338 )
339 .await?
340 } else {
341 scalar_index.remap(row_id_map, &new_store).await?
342 }
343 }
344 _ => scalar_index.remap(row_id_map, &new_store).await?,
345 }
346 }
347 it if it.is_vector() => {
348 remap_vector_index(
349 Arc::new(dataset.clone()),
350 &field_path,
351 index_id,
352 &new_id,
353 matched,
354 row_id_map,
355 )
356 .await?;
357 CreatedIndex {
358 index_details: prost_types::Any::from_msg(
359 &lance_table::format::pb::VectorIndexDetails::default(),
360 )
361 .unwrap(),
362 index_version: VECTOR_INDEX_VERSION,
363 }
364 }
365 _ => {
366 return Err(Error::Index {
367 message: format!("Index type {} is not supported", generic.index_type()),
368 location: location!(),
369 });
370 }
371 };
372
373 Ok(RemapResult::Remapped(RemappedIndex {
374 old_id: *index_id,
375 new_id,
376 index_details: created_index.index_details,
377 index_version: created_index.index_version,
378 }))
379}
380
381#[derive(Debug)]
382pub struct ScalarIndexInfo {
383 indexed_columns: HashMap<String, (DataType, Box<MultiQueryParser>)>,
384}
385
386impl IndexInformationProvider for ScalarIndexInfo {
387 fn get_index(&self, col: &str) -> Option<(&DataType, &dyn ScalarQueryParser)> {
388 self.indexed_columns
389 .get(col)
390 .map(|(ty, parser)| (ty, parser.as_ref() as &dyn ScalarQueryParser))
391 }
392}
393
394async fn open_index_proto(reader: &dyn Reader) -> Result<pb::Index> {
395 let file_size = reader.size().await?;
396 let tail_bytes = read_last_block(reader).await?;
397 let metadata_pos = read_metadata_offset(&tail_bytes)?;
398 let proto: pb::Index = if metadata_pos < file_size - tail_bytes.len() {
399 read_message(reader, metadata_pos).await?
401 } else {
402 let offset = tail_bytes.len() - (file_size - metadata_pos);
403 read_message_from_buf(&tail_bytes.slice(offset..))?
404 };
405 Ok(proto)
406}
407
408fn vector_index_details() -> prost_types::Any {
409 let details = lance_table::format::pb::VectorIndexDetails::default();
410 prost_types::Any::from_msg(&details).unwrap()
411}
412
413struct IndexDescriptionImpl {
414 name: String,
415 field_ids: Vec<u32>,
416 segments: Vec<IndexMetadata>,
417 index_type: String,
418 details: IndexDetails,
419 rows_indexed: u64,
420}
421
422impl IndexDescriptionImpl {
423 fn try_new(segments: Vec<IndexMetadata>, dataset: &Dataset) -> Result<Self> {
424 if segments.is_empty() {
425 return Err(Error::Index {
426 message: "Index metadata is empty".to_string(),
427 location: location!(),
428 });
429 }
430
431 let example_metadata = &segments[0];
433
434 let name = example_metadata.name.clone();
435 if !segments.iter().all(|shard| shard.name == name) {
436 return Err(Error::Index {
437 message: "Index name should be identical across all segments".to_string(),
438 location: location!(),
439 });
440 }
441
442 let field_ids = &example_metadata.fields;
443 if !segments.iter().all(|shard| shard.fields == *field_ids) {
444 return Err(Error::Index {
445 message: "Index fields should be identical across all segments".to_string(),
446 location: location!(),
447 });
448 }
449 let field_ids = field_ids.iter().map(|id| *id as u32).collect();
450
451 let index_details = example_metadata.index_details.as_ref().ok_or(Error::Index {
453 message:
454 "Index details are required for index description. This index must be retrained to support this method."
455 .to_string(),
456 location: location!(),
457 })?;
458 let type_url = &index_details.type_url;
459 if !segments.iter().all(|shard| {
460 shard
461 .index_details
462 .as_ref()
463 .map(|d| d.type_url == *type_url)
464 .unwrap_or(false)
465 }) {
466 return Err(Error::Index {
467 message: "Index type URL should be present and identical across all segments"
468 .to_string(),
469 location: location!(),
470 });
471 }
472
473 let details = IndexDetails(index_details.clone());
474 let mut rows_indexed = 0;
475
476 let index_type = details
477 .get_plugin()
478 .map(|p| p.name().to_string())
479 .unwrap_or_else(|_| "Unknown".to_string());
480
481 for shard in &segments {
482 let fragment_bitmap = shard
483 .fragment_bitmap
484 .as_ref()
485 .ok_or_else(|| Error::Index {
486 message: "Fragment bitmap is required for index description. This index must be retrained to support this method.".to_string(),
487 location: location!(),
488 })?;
489
490 for fragment in dataset.get_fragments() {
491 if fragment_bitmap.contains(fragment.id() as u32) {
492 rows_indexed += fragment.fast_logical_rows()? as u64;
493 }
494 }
495 }
496
497 Ok(Self {
498 name,
499 field_ids,
500 index_type,
501 segments,
502 details,
503 rows_indexed,
504 })
505 }
506}
507
508impl IndexDescription for IndexDescriptionImpl {
509 fn name(&self) -> &str {
510 &self.name
511 }
512
513 fn field_ids(&self) -> &[u32] {
514 &self.field_ids
515 }
516
517 fn index_type(&self) -> &str {
518 &self.index_type
519 }
520
521 fn metadata(&self) -> &[IndexMetadata] {
522 &self.segments
523 }
524
525 fn type_url(&self) -> &str {
526 self.details.0.type_url.as_str()
527 }
528
529 fn rows_indexed(&self) -> u64 {
530 self.rows_indexed
531 }
532
533 fn details(&self) -> Result<String> {
534 let plugin = self.details.get_plugin()?;
535 plugin
536 .details_as_json(&self.details.0)
537 .map(|v| v.to_string())
538 }
539}
540
541#[async_trait]
542impl DatasetIndexExt for Dataset {
543 type IndexBuilder<'a> = CreateIndexBuilder<'a>;
544
545 fn create_index_builder<'a>(
581 &'a mut self,
582 columns: &[&str],
583 index_type: IndexType,
584 params: &'a dyn IndexParams,
585 ) -> CreateIndexBuilder<'a> {
586 CreateIndexBuilder::new(self, columns, index_type, params)
587 }
588
589 #[instrument(skip_all)]
590 async fn create_index(
591 &mut self,
592 columns: &[&str],
593 index_type: IndexType,
594 name: Option<String>,
595 params: &dyn IndexParams,
596 replace: bool,
597 ) -> Result<IndexMetadata> {
598 let mut builder = self.create_index_builder(columns, index_type, params);
600
601 if let Some(name) = name {
602 builder = builder.name(name);
603 }
604
605 builder.replace(replace).await
606 }
607
608 async fn drop_index(&mut self, name: &str) -> Result<()> {
609 let indices = self.load_indices_by_name(name).await?;
610 if indices.is_empty() {
611 return Err(Error::IndexNotFound {
612 identity: format!("name={}", name),
613 location: location!(),
614 });
615 }
616
617 let transaction = Transaction::new(
618 self.manifest.version,
619 Operation::CreateIndex {
620 new_indices: vec![],
621 removed_indices: indices.clone(),
622 },
623 None,
624 );
625
626 self.apply_commit(transaction, &Default::default(), &Default::default())
627 .await?;
628
629 Ok(())
630 }
631
632 async fn prewarm_index(&self, name: &str) -> Result<()> {
633 let indices = self.load_indices_by_name(name).await?;
634 if indices.is_empty() {
635 return Err(Error::IndexNotFound {
636 identity: format!("name={}", name),
637 location: location!(),
638 });
639 }
640
641 let index = self
642 .open_generic_index(name, &indices[0].uuid.to_string(), &NoOpMetricsCollector)
643 .await?;
644 index.prewarm().await?;
645
646 Ok(())
647 }
648
649 async fn describe_indices<'a, 'b>(
650 &'a self,
651 criteria: Option<IndexCriteria<'b>>,
652 ) -> Result<Vec<Arc<dyn IndexDescription>>> {
653 let indices = self.load_indices().await?;
654 let mut indices = if let Some(criteria) = criteria {
655 indices.iter().filter(|idx| {
656 if idx.index_details.is_none() {
657 log::warn!("The method describe_indices does not support indexes without index details. Please retrain the index {}", idx.name);
658 return false;
659 }
660 let fields = idx
661 .fields
662 .iter()
663 .filter_map(|id| self.schema().field_by_id(*id))
664 .collect::<Vec<_>>();
665 match index_matches_criteria(idx, &criteria, &fields, false, self.schema()) {
666 Ok(matched) => matched,
667 Err(err) => {
668 log::warn!("Could not describe index {}: {}", idx.name, err);
669 false
670 }
671 }
672 }).collect::<Vec<_>>()
673 } else {
674 indices.iter().collect::<Vec<_>>()
675 };
676 indices.sort_by_key(|idx| &idx.name);
677
678 indices
679 .into_iter()
680 .chunk_by(|idx| &idx.name)
681 .into_iter()
682 .map(|(_, segments)| {
683 let segments = segments.cloned().collect::<Vec<_>>();
684 let desc = IndexDescriptionImpl::try_new(segments, self)?;
685 Ok(Arc::new(desc) as Arc<dyn IndexDescription>)
686 })
687 .collect::<Result<Vec<_>>>()
688 }
689
690 async fn load_indices(&self) -> Result<Arc<Vec<IndexMetadata>>> {
691 let metadata_key = IndexMetadataKey {
692 version: self.version().version,
693 };
694 let indices = match self.index_cache.get_with_key(&metadata_key).await {
695 Some(indices) => indices,
696 None => {
697 let mut loaded_indices = read_manifest_indexes(
698 &self.object_store,
699 &self.manifest_location,
700 &self.manifest,
701 )
702 .await?;
703 retain_supported_indices(&mut loaded_indices);
704 let loaded_indices = Arc::new(loaded_indices);
705 self.index_cache
706 .insert_with_key(&metadata_key, loaded_indices.clone())
707 .await;
708 loaded_indices
709 }
710 };
711
712 if let Some(frag_reuse_index_meta) =
713 indices.iter().find(|idx| idx.name == FRAG_REUSE_INDEX_NAME)
714 {
715 let uuid = frag_reuse_index_meta.uuid.to_string();
716 let fri_key = FragReuseIndexKey { uuid: &uuid };
717 let frag_reuse_index = self
718 .index_cache
719 .get_or_insert_with_key(fri_key, || async move {
720 let index_details =
721 load_frag_reuse_index_details(self, frag_reuse_index_meta).await?;
722 open_frag_reuse_index(frag_reuse_index_meta.uuid, index_details.as_ref()).await
723 })
724 .await?;
725 let mut indices = indices.as_ref().clone();
726 indices.iter_mut().for_each(|idx| {
727 if let Some(bitmap) = idx.fragment_bitmap.as_mut() {
728 frag_reuse_index.remap_fragment_bitmap(bitmap).unwrap();
729 }
730 });
731 Ok(Arc::new(indices))
732 } else {
733 Ok(indices)
734 }
735 }
736
737 async fn commit_existing_index(
738 &mut self,
739 index_name: &str,
740 column: &str,
741 index_id: Uuid,
742 ) -> Result<()> {
743 let Some(field) = self.schema().field(column) else {
744 return Err(Error::Index {
745 message: format!("CreateIndex: column '{column}' does not exist"),
746 location: location!(),
747 });
748 };
749
750 let new_idx = IndexMetadata {
754 uuid: index_id,
755 name: index_name.to_string(),
756 fields: vec![field.id],
757 dataset_version: self.manifest.version,
758 fragment_bitmap: Some(self.get_fragments().iter().map(|f| f.id() as u32).collect()),
759 index_details: None,
760 index_version: 0,
761 created_at: Some(chrono::Utc::now()),
762 base_id: None, };
764
765 let transaction = Transaction::new(
766 self.manifest.version,
767 Operation::CreateIndex {
768 new_indices: vec![new_idx],
769 removed_indices: vec![],
770 },
771 None,
772 );
773
774 self.apply_commit(transaction, &Default::default(), &Default::default())
775 .await?;
776
777 Ok(())
778 }
779
780 async fn load_scalar_index<'a, 'b>(
781 &'a self,
782 criteria: IndexCriteria<'b>,
783 ) -> Result<Option<IndexMetadata>> {
784 let indices = self.load_indices().await?;
785
786 let mut indices = indices
787 .iter()
788 .filter(|idx| {
789 if idx.fields.is_empty() {
792 if idx.name != FRAG_REUSE_INDEX_NAME {
793 log::error!("Index {} has no fields", idx.name);
794 }
795 false
796 } else {
797 true
798 }
799 })
800 .collect::<Vec<_>>();
801 indices.sort_by_key(|idx| idx.fields[0]);
808 let indice_by_field = indices.into_iter().chunk_by(|idx| idx.fields[0]);
809 for (field_id, indices) in &indice_by_field {
810 let indices = indices.collect::<Vec<_>>();
811 let has_multiple = indices.len() > 1;
812 for idx in indices {
813 let field = self.schema().field_by_id(field_id);
814 if let Some(field) = field {
815 if index_matches_criteria(
816 idx,
817 &criteria,
818 &[field],
819 has_multiple,
820 self.schema(),
821 )? {
822 let non_empty = idx.fragment_bitmap.as_ref().is_some_and(|bitmap| {
823 bitmap.intersection_len(self.fragment_bitmap.as_ref()) > 0
824 });
825 let is_fts_index = if let Some(details) = &idx.index_details {
826 IndexDetails(details.clone()).supports_fts()
827 } else {
828 false
829 };
830 if non_empty || is_fts_index {
835 return Ok(Some(idx.clone()));
836 }
837 }
838 }
839 }
840 }
841 return Ok(None);
842 }
843
844 #[instrument(skip_all)]
845 async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()> {
846 let dataset = Arc::new(self.clone());
847 let indices = self.load_indices().await?;
848
849 let indices_to_optimize = options
850 .index_names
851 .as_ref()
852 .map(|names| names.iter().collect::<HashSet<_>>());
853 let name_to_indices = indices
854 .iter()
855 .filter(|idx| {
856 indices_to_optimize
857 .as_ref()
858 .is_none_or(|names| names.contains(&idx.name))
859 && !is_system_index(idx)
860 })
861 .map(|idx| (idx.name.clone(), idx))
862 .into_group_map();
863
864 let mut new_indices = vec![];
865 let mut removed_indices = vec![];
866 for deltas in name_to_indices.values() {
867 let Some(res) = merge_indices(dataset.clone(), deltas.as_slice(), options).await?
868 else {
869 continue;
870 };
871
872 let last_idx = deltas.last().expect("Delta indices should not be empty");
873 let new_idx = IndexMetadata {
874 uuid: res.new_uuid,
875 name: last_idx.name.clone(), fields: last_idx.fields.clone(),
877 dataset_version: self.manifest.version,
878 fragment_bitmap: Some(res.new_fragment_bitmap),
879 index_details: Some(Arc::new(res.new_index_details)),
880 index_version: res.new_index_version,
881 created_at: Some(chrono::Utc::now()),
882 base_id: None, };
884 removed_indices.extend(res.removed_indices.iter().map(|&idx| idx.clone()));
885 if deltas.len() > res.removed_indices.len() {
886 new_indices.extend(
887 deltas[0..(deltas.len() - res.removed_indices.len())]
888 .iter()
889 .map(|&idx| idx.clone()),
890 );
891 }
892 new_indices.push(new_idx);
893 }
894
895 if new_indices.is_empty() {
896 return Ok(());
897 }
898
899 let transaction = Transaction::new(
900 self.manifest.version,
901 Operation::CreateIndex {
902 new_indices,
903 removed_indices,
904 },
905 None,
906 );
907
908 self.apply_commit(transaction, &Default::default(), &Default::default())
909 .await?;
910
911 Ok(())
912 }
913
914 async fn index_statistics(&self, index_name: &str) -> Result<String> {
915 let metadatas = self.load_indices_by_name(index_name).await?;
916 if metadatas.is_empty() {
917 return Err(Error::IndexNotFound {
918 identity: format!("name={}", index_name),
919 location: location!(),
920 });
921 }
922
923 if index_name == FRAG_REUSE_INDEX_NAME {
924 return index_statistics_frag_reuse(self).boxed().await;
925 }
926
927 if index_name == MEM_WAL_INDEX_NAME {
928 return index_statistics_mem_wal(self).boxed().await;
929 }
930
931 index_statistics_scalar(self, index_name, metadatas)
932 .boxed()
933 .await
934 }
935
936 async fn read_index_partition(
937 &self,
938 index_name: &str,
939 partition_id: usize,
940 with_vector: bool,
941 ) -> Result<SendableRecordBatchStream> {
942 let indices = self.load_indices_by_name(index_name).await?;
943 if indices.is_empty() {
944 return Err(Error::IndexNotFound {
945 identity: format!("name={}", index_name),
946 location: location!(),
947 });
948 }
949 let column = self.schema().field_by_id(indices[0].fields[0]).unwrap();
950
951 let mut schema: Option<Arc<Schema>> = None;
952 let mut partition_streams = Vec::with_capacity(indices.len());
953 for index in indices {
954 let index = self
955 .open_vector_index(&column.name, &index.uuid.to_string(), &NoOpMetricsCollector)
956 .await?;
957
958 let stream = index
959 .partition_reader(partition_id, with_vector, &NoOpMetricsCollector)
960 .await?;
961 if schema.is_none() {
962 schema = Some(stream.schema());
963 }
964 partition_streams.push(stream);
965 }
966
967 match schema {
968 Some(schema) => {
969 let merged = stream::select_all(partition_streams);
970 let stream = RecordBatchStreamAdapter::new(schema, merged);
971 Ok(Box::pin(stream))
972 }
973 None => Ok(Box::pin(RecordBatchStreamAdapter::new(
974 Arc::new(Schema::empty()),
975 stream::empty(),
976 ))),
977 }
978 }
979}
980
981fn sum_indexed_rows_per_delta(indexed_fragments_per_delta: &[Vec<Fragment>]) -> Result<Vec<usize>> {
982 let mut rows_per_delta = Vec::with_capacity(indexed_fragments_per_delta.len());
983 for frags in indexed_fragments_per_delta {
984 let mut sum = 0usize;
985 for frag in frags {
986 sum += frag.num_rows().ok_or_else(|| Error::Internal {
987 message: "Fragment should have row counts, please upgrade lance and \
988 trigger a single write to fix this"
989 .to_string(),
990 location: location!(),
991 })?;
992 }
993 rows_per_delta.push(sum);
994 }
995 Ok(rows_per_delta)
996}
997
998fn unique_indexed_fragment_count(indexed_fragments_per_delta: &[Vec<Fragment>]) -> Option<usize> {
999 let mut fragment_ids = HashSet::new();
1000 for frags in indexed_fragments_per_delta {
1001 for frag in frags {
1002 if !fragment_ids.insert(frag.id) {
1003 return None;
1004 }
1005 }
1006 }
1007 Some(fragment_ids.len())
1008}
1009
1010fn serialize_index_statistics(stats: &serde_json::Value) -> Result<String> {
1011 serde_json::to_string(stats).map_err(|e| Error::Index {
1012 message: format!("Failed to serialize index statistics: {}", e),
1013 location: location!(),
1014 })
1015}
1016
1017async fn migrate_and_recompute_index_statistics(ds: &Dataset, index_name: &str) -> Result<String> {
1018 let mut ds = ds.clone();
1019 log::warn!(
1020 "Detecting out-dated fragment metadata, migrating dataset. \
1021 To disable migration, set LANCE_AUTO_MIGRATION=false"
1022 );
1023 ds.delete("false").await.map_err(|err| Error::Execution {
1024 message: format!(
1025 "Failed to migrate dataset while calculating index statistics. \
1026 To disable migration, set LANCE_AUTO_MIGRATION=false. Original error: {}",
1027 err
1028 ),
1029 location: location!(),
1030 })?;
1031 ds.index_statistics(index_name).await
1032}
1033
1034async fn index_statistics_frag_reuse(ds: &Dataset) -> Result<String> {
1035 let index = ds
1036 .open_frag_reuse_index(&NoOpMetricsCollector)
1037 .await?
1038 .expect("FragmentReuse index does not exist");
1039 serialize_index_statistics(&index.statistics()?)
1040}
1041
1042async fn index_statistics_mem_wal(ds: &Dataset) -> Result<String> {
1043 let index = ds
1044 .open_mem_wal_index(&NoOpMetricsCollector)
1045 .await?
1046 .expect("MemWal index does not exist");
1047 serialize_index_statistics(&index.statistics()?)
1048}
1049
1050async fn index_statistics_scalar(
1051 ds: &Dataset,
1052 index_name: &str,
1053 metadatas: Vec<IndexMetadata>,
1054) -> Result<String> {
1055 let field_id = metadatas[0].fields[0];
1056 let field_path = ds.schema().field_path(field_id)?;
1057
1058 let (indices_stats, index_uri, num_indices, updated_at) =
1059 collect_regular_indices_statistics(ds, metadatas, &field_path).await?;
1060
1061 let index_type_hint = indices_stats
1062 .first()
1063 .and_then(|stats| stats.get("index_type"))
1064 .and_then(|v| v.as_str());
1065 let index_type = legacy_type_name(&index_uri, index_type_hint);
1066
1067 let Some((
1068 num_indexed_rows_per_delta,
1069 num_indexed_fragments,
1070 num_unindexed_fragments,
1071 num_indexed_rows,
1072 num_unindexed_rows,
1073 )) = gather_fragment_statistics(ds, index_name).await?
1074 else {
1075 return migrate_and_recompute_index_statistics(ds, index_name).await;
1076 };
1077
1078 let stats = json!({
1079 "index_type": index_type,
1080 "name": index_name,
1081 "num_indices": num_indices,
1082 "indices": indices_stats,
1083 "num_indexed_fragments": num_indexed_fragments,
1084 "num_indexed_rows": num_indexed_rows,
1085 "num_unindexed_fragments": num_unindexed_fragments,
1086 "num_unindexed_rows": num_unindexed_rows,
1087 "num_indexed_rows_per_delta": num_indexed_rows_per_delta,
1088 "updated_at_timestamp_ms": updated_at,
1089 });
1090
1091 serialize_index_statistics(&stats)
1092}
1093
1094async fn collect_regular_indices_statistics(
1095 ds: &Dataset,
1096 metadatas: Vec<IndexMetadata>,
1097 field_path: &str,
1098) -> Result<(Vec<serde_json::Value>, String, usize, Option<u64>)> {
1099 let num_indices = metadatas.len();
1100 let updated_at = metadatas
1101 .iter()
1102 .filter_map(|m| m.created_at)
1103 .max()
1104 .map(|dt| dt.timestamp_millis() as u64);
1105
1106 let mut indices_stats = Vec::with_capacity(num_indices);
1107 let mut index_uri: Option<String> = None;
1108
1109 for meta in metadatas.iter() {
1110 let index_store = Arc::new(LanceIndexStore::from_dataset_for_existing(ds, meta)?);
1111 let index_details = scalar::fetch_index_details(ds, field_path, meta).await?;
1112 if index_uri.is_none() {
1113 index_uri = Some(index_details.type_url.clone());
1114 }
1115
1116 let index_details_wrapper = scalar::IndexDetails(index_details.clone());
1117 if let Ok(plugin) = index_details_wrapper.get_plugin() {
1118 if let Some(stats) = plugin
1119 .load_statistics(index_store.clone(), index_details.as_ref())
1120 .await?
1121 {
1122 indices_stats.push(stats);
1123 continue;
1124 }
1125 }
1126
1127 let index = ds
1128 .open_generic_index(field_path, &meta.uuid.to_string(), &NoOpMetricsCollector)
1129 .await?;
1130
1131 indices_stats.push(index.statistics()?);
1132 }
1133
1134 Ok((
1135 indices_stats,
1136 index_uri.unwrap_or_else(|| "unknown".to_string()),
1137 num_indices,
1138 updated_at,
1139 ))
1140}
1141
1142async fn gather_fragment_statistics(
1143 ds: &Dataset,
1144 index_name: &str,
1145) -> Result<Option<(Vec<usize>, usize, usize, usize, usize)>> {
1146 let indexed_fragments_per_delta = ds.indexed_fragments(index_name).await?;
1147
1148 let num_indexed_rows_per_delta = match sum_indexed_rows_per_delta(&indexed_fragments_per_delta)
1149 {
1150 Ok(rows) => rows,
1151 Err(Error::Internal { message, .. })
1152 if auto_migrate_corruption() && message.contains("trigger a single write") =>
1153 {
1154 return Ok(None);
1155 }
1156 Err(e) => return Err(e),
1157 };
1158
1159 let Some(num_indexed_fragments) = unique_indexed_fragment_count(&indexed_fragments_per_delta)
1160 else {
1161 if auto_migrate_corruption() {
1162 return Ok(None);
1163 }
1164 return Err(Error::Internal {
1165 message: "Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \
1166 and trigger a single write to fix this"
1167 .to_string(),
1168 location: location!(),
1169 });
1170 };
1171
1172 let num_unindexed_fragments = ds.fragments().len() - num_indexed_fragments;
1173 let num_indexed_rows: usize = num_indexed_rows_per_delta.iter().sum();
1174
1175 drop(indexed_fragments_per_delta);
1176 let total_rows = ds.count_rows(None).await?;
1177 let num_unindexed_rows = total_rows - num_indexed_rows;
1178
1179 Ok(Some((
1180 num_indexed_rows_per_delta,
1181 num_indexed_fragments,
1182 num_unindexed_fragments,
1183 num_indexed_rows,
1184 num_unindexed_rows,
1185 )))
1186}
1187
1188pub(crate) fn retain_supported_indices(indices: &mut Vec<IndexMetadata>) {
1189 indices.retain(|idx| {
1190 let max_supported_version = idx
1191 .index_details
1192 .as_ref()
1193 .map(|details| {
1194 IndexDetails(details.clone())
1195 .index_version()
1196 .unwrap_or(i32::MAX as u32)
1198 })
1199 .unwrap_or_default();
1200 let is_valid = idx.index_version <= max_supported_version as i32;
1201 if !is_valid {
1202 log::warn!(
1203 "Index {} has version {}, which is not supported (<={}), ignoring it",
1204 idx.name,
1205 idx.index_version,
1206 max_supported_version,
1207 );
1208 }
1209 is_valid
1210 })
1211}
1212
1213#[async_trait]
1217pub trait DatasetIndexInternalExt: DatasetIndexExt {
1218 async fn open_generic_index(
1220 &self,
1221 column: &str,
1222 uuid: &str,
1223 metrics: &dyn MetricsCollector,
1224 ) -> Result<Arc<dyn Index>>;
1225 async fn open_scalar_index(
1227 &self,
1228 column: &str,
1229 uuid: &str,
1230 metrics: &dyn MetricsCollector,
1231 ) -> Result<Arc<dyn ScalarIndex>>;
1232 async fn open_vector_index(
1234 &self,
1235 column: &str,
1236 uuid: &str,
1237 metrics: &dyn MetricsCollector,
1238 ) -> Result<Arc<dyn VectorIndex>>;
1239
1240 async fn open_frag_reuse_index(
1242 &self,
1243 metrics: &dyn MetricsCollector,
1244 ) -> Result<Option<Arc<FragReuseIndex>>>;
1245
1246 async fn open_mem_wal_index(
1248 &self,
1249 metrics: &dyn MetricsCollector,
1250 ) -> Result<Option<Arc<MemWalIndex>>>;
1251
1252 async fn frag_reuse_index_uuid(&self) -> Option<Uuid>;
1254
1255 async fn scalar_index_info(&self) -> Result<ScalarIndexInfo>;
1257
1258 async fn unindexed_fragments(&self, idx_name: &str) -> Result<Vec<Fragment>>;
1260
1261 async fn indexed_fragments(&self, idx_name: &str) -> Result<Vec<Vec<Fragment>>>;
1263
1264 async fn initialize_index(&mut self, source_dataset: &Dataset, index_name: &str) -> Result<()>;
1266
1267 async fn initialize_indices(&mut self, source_dataset: &Dataset) -> Result<()>;
1270}
1271
1272#[async_trait]
1273impl DatasetIndexInternalExt for Dataset {
1274 async fn open_generic_index(
1275 &self,
1276 column: &str,
1277 uuid: &str,
1278 metrics: &dyn MetricsCollector,
1279 ) -> Result<Arc<dyn Index>> {
1280 let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1282 let cache_key = ScalarIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1283 if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
1284 return Ok(index.as_index());
1285 }
1286
1287 let vector_cache_key = VectorIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1288 if let Some(index) = self
1289 .index_cache
1290 .get_unsized_with_key(&vector_cache_key)
1291 .await
1292 {
1293 return Ok(index.as_index());
1294 }
1295
1296 let frag_reuse_cache_key = FragReuseIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1297 if let Some(index) = self.index_cache.get_with_key(&frag_reuse_cache_key).await {
1298 return Ok(index.as_index());
1299 }
1300
1301 let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1310 message: format!("Index with id {} does not exist", uuid),
1311 location: location!(),
1312 })?;
1313 let index_dir = self.indice_files_dir(&index_meta)?;
1314 let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME);
1315 if self.object_store.exists(&index_file).await? {
1316 let index = self.open_vector_index(column, uuid, metrics).await?;
1317 Ok(index.as_index())
1318 } else {
1319 let index = self.open_scalar_index(column, uuid, metrics).await?;
1320 Ok(index.as_index())
1321 }
1322 }
1323
1324 #[instrument(level = "debug", skip_all)]
1325 async fn open_scalar_index(
1326 &self,
1327 column: &str,
1328 uuid: &str,
1329 metrics: &dyn MetricsCollector,
1330 ) -> Result<Arc<dyn ScalarIndex>> {
1331 let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1332 let cache_key = ScalarIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1333 if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
1334 return Ok(index);
1335 }
1336
1337 let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1338 message: format!("Index with id {} does not exist", uuid),
1339 location: location!(),
1340 })?;
1341
1342 let index = scalar::open_scalar_index(self, column, &index_meta, metrics).await?;
1343
1344 info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_SCALAR, index_type=index.index_type().to_string());
1345 metrics.record_index_load();
1346
1347 self.index_cache
1348 .insert_unsized_with_key(&cache_key, index.clone())
1349 .await;
1350 Ok(index)
1351 }
1352
1353 async fn open_vector_index(
1354 &self,
1355 column: &str,
1356 uuid: &str,
1357 metrics: &dyn MetricsCollector,
1358 ) -> Result<Arc<dyn VectorIndex>> {
1359 let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1360 let cache_key = VectorIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1361
1362 if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
1363 log::debug!("Found vector index in cache uuid: {}", uuid);
1364 return Ok(index);
1365 }
1366
1367 let frag_reuse_index = self.open_frag_reuse_index(metrics).await?;
1368 let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1369 message: format!("Index with id {} does not exist", uuid),
1370 location: location!(),
1371 })?;
1372 let index_dir = self.indice_files_dir(&index_meta)?;
1373 let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME);
1374 let reader: Arc<dyn Reader> = self.object_store.open(&index_file).await?.into();
1375
1376 let tailing_bytes = read_last_block(reader.as_ref()).await?;
1377 let (major_version, minor_version) = read_version(&tailing_bytes)?;
1378
1379 let index_cache = self.index_cache.with_key_prefix(&cache_key.key());
1381
1382 let index = match (major_version, minor_version) {
1385 (0, 1) | (0, 0) => {
1386 info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.1", index_type="IVF_PQ");
1387 let proto = open_index_proto(reader.as_ref()).await?;
1388 match &proto.implementation {
1389 Some(Implementation::VectorIndex(vector_index)) => {
1390 let dataset = Arc::new(self.clone());
1391 vector::open_vector_index(
1392 dataset,
1393 uuid,
1394 vector_index,
1395 reader,
1396 frag_reuse_index,
1397 )
1398 .await
1399 }
1400 None => Err(Error::Internal {
1401 message: "Index proto was missing implementation field".into(),
1402 location: location!(),
1403 }),
1404 }
1405 }
1406
1407 (0, 2) => {
1408 info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.2", index_type="IVF_PQ");
1409 let reader = PreviousFileReader::try_new_self_described_from_reader(
1410 reader.clone(),
1411 Some(&self.metadata_cache.file_metadata_cache(&index_file)),
1412 )
1413 .await?;
1414 vector::open_vector_index_v2(
1415 Arc::new(self.clone()),
1416 column,
1417 uuid,
1418 reader,
1419 frag_reuse_index,
1420 )
1421 .await
1422 }
1423
1424 (0, 3) | (2, _) => {
1425 let scheduler = ScanScheduler::new(
1426 self.object_store.clone(),
1427 SchedulerConfig::max_bandwidth(&self.object_store),
1428 );
1429 let file = scheduler
1430 .open_file(&index_file, &CachedFileSize::unknown())
1431 .await?;
1432 let reader = lance_file::reader::FileReader::try_open(
1433 file,
1434 None,
1435 Default::default(),
1436 &self.metadata_cache.file_metadata_cache(&index_file),
1437 FileReaderOptions::default(),
1438 )
1439 .await?;
1440 let index_metadata = reader
1441 .schema()
1442 .metadata
1443 .get(INDEX_METADATA_SCHEMA_KEY)
1444 .ok_or(Error::Index {
1445 message: "Index Metadata not found".to_owned(),
1446 location: location!(),
1447 })?;
1448 let index_metadata: lance_index::IndexMetadata =
1449 serde_json::from_str(index_metadata)?;
1450
1451 let (field_path, field) = resolve_index_column(self.schema(), &index_meta, column)?;
1453
1454 let (_, element_type) = get_vector_type(self.schema(), &field_path)?;
1455
1456 info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.3", index_type=index_metadata.index_type);
1457
1458 match index_metadata.index_type.as_str() {
1459 "IVF_FLAT" => match element_type {
1460 DataType::Float16 | DataType::Float32 | DataType::Float64 => {
1461 let ivf = IVFIndex::<FlatIndex, FlatQuantizer>::try_new(
1462 self.object_store.clone(),
1463 index_dir,
1464 uuid.to_owned(),
1465 frag_reuse_index,
1466 self.metadata_cache.as_ref(),
1467 index_cache,
1468 )
1469 .await?;
1470 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1471 }
1472 DataType::UInt8 => {
1473 let ivf = IVFIndex::<FlatIndex, FlatBinQuantizer>::try_new(
1474 self.object_store.clone(),
1475 index_dir,
1476 uuid.to_owned(),
1477 frag_reuse_index,
1478 self.metadata_cache.as_ref(),
1479 index_cache,
1480 )
1481 .await?;
1482 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1483 }
1484 _ => Err(Error::Index {
1485 message: format!(
1486 "the field type {} is not supported for FLAT index",
1487 field.data_type()
1488 ),
1489 location: location!(),
1490 }),
1491 },
1492
1493 "IVF_PQ" => {
1494 let ivf = IVFIndex::<FlatIndex, ProductQuantizer>::try_new(
1495 self.object_store.clone(),
1496 index_dir,
1497 uuid.to_owned(),
1498 frag_reuse_index,
1499 self.metadata_cache.as_ref(),
1500 index_cache,
1501 )
1502 .await?;
1503 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1504 }
1505
1506 "IVF_SQ" => {
1507 let ivf = IVFIndex::<FlatIndex, ScalarQuantizer>::try_new(
1508 self.object_store.clone(),
1509 index_dir,
1510 uuid.to_owned(),
1511 frag_reuse_index,
1512 self.metadata_cache.as_ref(),
1513 index_cache,
1514 )
1515 .await?;
1516 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1517 }
1518
1519 "IVF_RQ" => {
1520 let ivf = IVFIndex::<FlatIndex, RabitQuantizer>::try_new(
1521 self.object_store.clone(),
1522 self.indices_dir(),
1523 uuid.to_owned(),
1524 frag_reuse_index,
1525 self.metadata_cache.as_ref(),
1526 index_cache,
1527 )
1528 .await?;
1529 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1530 }
1531
1532 "IVF_HNSW_FLAT" => {
1533 let uri = index_dir.child(uuid).child("index.pb");
1534 let file_metadata_cache =
1535 self.session.metadata_cache.file_metadata_cache(&uri);
1536 let ivf = IVFIndex::<HNSW, FlatQuantizer>::try_new(
1537 self.object_store.clone(),
1538 index_dir,
1539 uuid.to_owned(),
1540 frag_reuse_index,
1541 &file_metadata_cache,
1542 index_cache,
1543 )
1544 .await?;
1545 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1546 }
1547
1548 "IVF_HNSW_SQ" => {
1549 let ivf = IVFIndex::<HNSW, ScalarQuantizer>::try_new(
1550 self.object_store.clone(),
1551 index_dir,
1552 uuid.to_owned(),
1553 frag_reuse_index,
1554 self.metadata_cache.as_ref(),
1555 index_cache,
1556 )
1557 .await?;
1558 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1559 }
1560
1561 "IVF_HNSW_PQ" => {
1562 let ivf = IVFIndex::<HNSW, ProductQuantizer>::try_new(
1563 self.object_store.clone(),
1564 index_dir,
1565 uuid.to_owned(),
1566 frag_reuse_index,
1567 self.metadata_cache.as_ref(),
1568 index_cache,
1569 )
1570 .await?;
1571 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1572 }
1573
1574 _ => Err(Error::Index {
1575 message: format!("Unsupported index type: {}", index_metadata.index_type),
1576 location: location!(),
1577 }),
1578 }
1579 }
1580
1581 _ => Err(Error::Index {
1582 message: "unsupported index version (maybe need to upgrade your lance version)"
1583 .to_owned(),
1584 location: location!(),
1585 }),
1586 };
1587 let index = index?;
1588 metrics.record_index_load();
1589 self.index_cache
1590 .insert_unsized_with_key(&cache_key, index.clone())
1591 .await;
1592 Ok(index)
1593 }
1594
1595 async fn open_frag_reuse_index(
1596 &self,
1597 metrics: &dyn MetricsCollector,
1598 ) -> Result<Option<Arc<FragReuseIndex>>> {
1599 if let Some(frag_reuse_index_meta) = self.load_index_by_name(FRAG_REUSE_INDEX_NAME).await? {
1600 let uuid = frag_reuse_index_meta.uuid.to_string();
1601 let frag_reuse_key = FragReuseIndexKey { uuid: &uuid };
1602 let uuid_clone = uuid.clone();
1603
1604 let index = self
1605 .index_cache
1606 .get_or_insert_with_key(frag_reuse_key, || async move {
1607 let index_meta = self.load_index(&uuid_clone).await?.ok_or_else(|| Error::Index {
1608 message: format!("Index with id {} does not exist", uuid_clone),
1609 location: location!(),
1610 })?;
1611 let index_details = load_frag_reuse_index_details(self, &index_meta).await?;
1612 let index =
1613 open_frag_reuse_index(frag_reuse_index_meta.uuid, index_details.as_ref()).await?;
1614
1615 info!(target: TRACE_IO_EVENTS, index_uuid=uuid_clone, r#type=IO_TYPE_OPEN_FRAG_REUSE);
1616 metrics.record_index_load();
1617
1618 Ok(index)
1619 })
1620 .await?;
1621
1622 Ok(Some(index))
1623 } else {
1624 Ok(None)
1625 }
1626 }
1627
1628 async fn open_mem_wal_index(
1629 &self,
1630 metrics: &dyn MetricsCollector,
1631 ) -> Result<Option<Arc<MemWalIndex>>> {
1632 let Some(mem_wal_meta) = self.load_index_by_name(MEM_WAL_INDEX_NAME).await? else {
1633 return Ok(None);
1634 };
1635
1636 let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1637 let cache_key = MemWalCacheKey::new(&mem_wal_meta.uuid, frag_reuse_uuid.as_ref());
1638 if let Some(index) = self.index_cache.get_with_key(&cache_key).await {
1639 log::debug!("Found MemWAL index in cache uuid: {}", mem_wal_meta.uuid);
1640 return Ok(Some(index));
1641 }
1642
1643 let uuid = mem_wal_meta.uuid.to_string();
1644
1645 let index_meta = self.load_index(&uuid).await?.ok_or_else(|| Error::Index {
1646 message: format!("Index with id {} does not exist", uuid),
1647 location: location!(),
1648 })?;
1649 let index = open_mem_wal_index(index_meta)?;
1650
1651 info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_MEM_WAL);
1652 metrics.record_index_load();
1653
1654 self.index_cache
1655 .insert_with_key(&cache_key, index.clone())
1656 .await;
1657 Ok(Some(index))
1658 }
1659
1660 async fn frag_reuse_index_uuid(&self) -> Option<Uuid> {
1661 if let Ok(indices) = self.load_indices().await {
1662 indices
1663 .iter()
1664 .find(|idx| idx.name == FRAG_REUSE_INDEX_NAME)
1665 .map(|idx| idx.uuid)
1666 } else {
1667 None
1668 }
1669 }
1670
1671 #[instrument(level = "trace", skip_all)]
1672 async fn scalar_index_info(&self) -> Result<ScalarIndexInfo> {
1673 let indices = self.load_indices().await?;
1674 let schema = self.schema();
1675 let mut indexed_fields = Vec::new();
1676 for index in indices.iter().filter(|idx| {
1677 let idx_schema = schema.project_by_ids(idx.fields.as_slice(), true);
1678 let is_vector_index = idx_schema
1679 .fields
1680 .iter()
1681 .any(|f| is_vector_field(f.data_type()));
1682
1683 let is_fts_index = if let Some(details) = &idx.index_details {
1685 IndexDetails(details.clone()).supports_fts()
1686 } else {
1687 false
1688 };
1689
1690 let has_non_empty_bitmap = idx.fragment_bitmap.as_ref().is_some_and(|bitmap| {
1693 !bitmap.is_empty() && !(bitmap & self.fragment_bitmap.as_ref()).is_empty()
1694 });
1695
1696 idx.fields.len() == 1 && !is_vector_index && (has_non_empty_bitmap || is_fts_index)
1697 }) {
1698 let field = index.fields[0];
1699 let field = schema.field_by_id(field).ok_or_else(|| Error::Internal {
1700 message: format!(
1701 "Index referenced a field with id {field} which did not exist in the schema"
1702 ),
1703 location: location!(),
1704 })?;
1705
1706 let field_path = if let Some(ancestors) = schema.field_ancestry_by_id(field.id) {
1708 let field_refs: Vec<&str> = ancestors.iter().map(|f| f.name.as_str()).collect();
1709 lance_core::datatypes::format_field_path(&field_refs)
1710 } else {
1711 field.name.clone()
1712 };
1713
1714 let index_details = IndexDetails(fetch_index_details(self, &field_path, index).await?);
1715 if index_details.is_vector() {
1716 continue;
1717 }
1718
1719 let plugin = index_details.get_plugin()?;
1720 let query_parser = plugin.new_query_parser(index.name.clone(), &index_details.0);
1721
1722 if let Some(query_parser) = query_parser {
1723 indexed_fields.push((field_path, (field.data_type(), query_parser)));
1724 }
1725 }
1726 let mut index_info_map = HashMap::with_capacity(indexed_fields.len());
1727 for indexed_field in indexed_fields {
1728 let mut parser = Some(indexed_field.1 .1);
1731 let parser = &mut parser;
1732 index_info_map
1733 .entry(indexed_field.0)
1734 .and_modify(|existing: &mut (DataType, Box<MultiQueryParser>)| {
1735 debug_assert_eq!(existing.0, indexed_field.1 .0);
1737
1738 existing.1.add(parser.take().unwrap());
1739 })
1740 .or_insert_with(|| {
1741 (
1742 indexed_field.1 .0,
1743 Box::new(MultiQueryParser::single(parser.take().unwrap())),
1744 )
1745 });
1746 }
1747 Ok(ScalarIndexInfo {
1748 indexed_columns: index_info_map,
1749 })
1750 }
1751
1752 async fn unindexed_fragments(&self, name: &str) -> Result<Vec<Fragment>> {
1753 let indices = self.load_indices_by_name(name).await?;
1754 let mut total_fragment_bitmap = RoaringBitmap::new();
1755 for idx in indices.iter() {
1756 total_fragment_bitmap |= idx.fragment_bitmap.as_ref().ok_or(Error::Index {
1757 message: "Please upgrade lance to 0.8+ to use this function".to_string(),
1758 location: location!(),
1759 })?;
1760 }
1761 Ok(self
1762 .fragments()
1763 .iter()
1764 .filter(|f| !total_fragment_bitmap.contains(f.id as u32))
1765 .cloned()
1766 .collect())
1767 }
1768
1769 async fn indexed_fragments(&self, name: &str) -> Result<Vec<Vec<Fragment>>> {
1770 let indices = self.load_indices_by_name(name).await?;
1771 indices
1772 .iter()
1773 .map(|index| {
1774 let fragment_bitmap = index.fragment_bitmap.as_ref().ok_or(Error::Index {
1775 message: "Please upgrade lance to 0.8+ to use this function".to_string(),
1776 location: location!(),
1777 })?;
1778 let mut indexed_frags = Vec::with_capacity(fragment_bitmap.len() as usize);
1779 for frag in self.fragments().iter() {
1780 if fragment_bitmap.contains(frag.id as u32) {
1781 indexed_frags.push(frag.clone());
1782 }
1783 }
1784 Ok(indexed_frags)
1785 })
1786 .collect()
1787 }
1788
1789 async fn initialize_index(&mut self, source_dataset: &Dataset, index_name: &str) -> Result<()> {
1790 let source_indices = source_dataset.load_indices_by_name(index_name).await?;
1791
1792 if source_indices.is_empty() {
1793 return Err(Error::Index {
1794 message: format!("Index '{}' not found in source dataset", index_name),
1795 location: location!(),
1796 });
1797 }
1798
1799 let source_index = source_indices
1800 .iter()
1801 .min_by_key(|idx| idx.created_at)
1802 .ok_or_else(|| Error::Index {
1803 message: format!("Could not determine oldest index for '{}'", index_name),
1804 location: location!(),
1805 })?;
1806
1807 let mut field_names = Vec::new();
1808 for field_id in source_index.fields.iter() {
1809 let source_field = source_dataset
1810 .schema()
1811 .field_by_id(*field_id)
1812 .ok_or_else(|| Error::Index {
1813 message: format!("Field with id {} not found in source dataset", field_id),
1814 location: location!(),
1815 })?;
1816
1817 let target_field =
1818 self.schema()
1819 .field(&source_field.name)
1820 .ok_or_else(|| Error::Index {
1821 message: format!(
1822 "Field '{}' required by index '{}' not found in target dataset",
1823 source_field.name, index_name
1824 ),
1825 location: location!(),
1826 })?;
1827
1828 if source_field.data_type() != target_field.data_type() {
1829 return Err(Error::Index {
1830 message: format!(
1831 "Field '{}' has different types in source ({:?}) and target ({:?}) datasets",
1832 source_field.name,
1833 source_field.data_type(),
1834 target_field.data_type()
1835 ),
1836 location: location!(),
1837 });
1838 }
1839
1840 field_names.push(source_field.name.as_str());
1841 }
1842
1843 if field_names.is_empty() {
1844 return Err(Error::Index {
1845 message: format!("Index '{}' has no fields", index_name),
1846 location: location!(),
1847 });
1848 }
1849
1850 if let Some(index_details) = &source_index.index_details {
1851 let index_details_wrapper = IndexDetails(index_details.clone());
1852
1853 if index_details_wrapper.is_vector() {
1854 vector::initialize_vector_index(self, source_dataset, source_index, &field_names)
1855 .await?;
1856 } else {
1857 scalar::initialize_scalar_index(self, source_dataset, source_index, &field_names)
1858 .await?;
1859 }
1860 } else {
1861 log::warn!(
1862 "Index '{}' has no index_details, skipping",
1863 source_index.name
1864 );
1865 }
1866
1867 Ok(())
1868 }
1869
1870 async fn initialize_indices(&mut self, source_dataset: &Dataset) -> Result<()> {
1871 let source_indices = source_dataset.load_indices().await?;
1872 let non_system_indices: Vec<_> = source_indices
1873 .iter()
1874 .filter(|idx| !lance_index::is_system_index(idx))
1875 .collect();
1876
1877 if non_system_indices.is_empty() {
1878 return Ok(());
1879 }
1880
1881 let mut unique_index_names = HashSet::new();
1882 for index in non_system_indices.iter() {
1883 unique_index_names.insert(index.name.clone());
1884 }
1885
1886 for index_name in unique_index_names {
1887 self.initialize_index(source_dataset, &index_name).await?;
1888 }
1889
1890 Ok(())
1891 }
1892}
1893
1894fn resolve_index_column(
1899 schema: &LanceSchema,
1900 index_meta: &IndexMetadata,
1901 column_arg: &str,
1902) -> Result<(String, Arc<Field>)> {
1903 if let Some(field) = schema.field(column_arg) {
1905 return Ok((column_arg.to_string(), Arc::new(field.clone())));
1907 }
1908
1909 if column_arg == index_meta.name {
1911 if let Some(field_id) = index_meta.fields.first() {
1913 let field = schema.field_by_id(*field_id).ok_or_else(|| Error::Index {
1914 message: format!(
1915 "Index '{}' references field with id {} which does not exist in schema",
1916 index_meta.name, field_id
1917 ),
1918 location: location!(),
1919 })?;
1920 let field_path = schema.field_path(*field_id)?;
1921 return Ok((field_path, Arc::new(field.clone())));
1922 } else {
1923 return Err(Error::Index {
1924 message: format!("Index '{}' has no fields", index_meta.name),
1925 location: location!(),
1926 });
1927 }
1928 }
1929
1930 Err(Error::Index {
1932 message: format!("Column '{}' does not exist in the schema", column_arg),
1933 location: location!(),
1934 })
1935}
1936
1937fn is_vector_field(data_type: DataType) -> bool {
1938 match data_type {
1939 DataType::FixedSizeList(_, _) => true,
1940 DataType::List(inner) => {
1941 matches!(inner.data_type(), DataType::FixedSizeList(_, _))
1943 }
1944 _ => false,
1945 }
1946}
1947
1948#[cfg(test)]
1949mod tests {
1950 use super::*;
1951 use crate::dataset::builder::DatasetBuilder;
1952 use crate::dataset::optimize::{compact_files, CompactionOptions};
1953 use crate::dataset::{WriteMode, WriteParams};
1954 use crate::index::vector::VectorIndexParams;
1955 use crate::session::Session;
1956 use crate::utils::test::{copy_test_data_to_tmp, DatagenExt, FragmentCount, FragmentRowCount};
1957 use arrow::array::AsArray;
1958 use arrow::datatypes::{Float32Type, Int32Type};
1959 use arrow_array::Int32Array;
1960 use arrow_array::{
1961 FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator, StringArray,
1962 };
1963 use arrow_schema::{DataType, Field, Schema};
1964 use futures::stream::TryStreamExt;
1965 use lance_arrow::*;
1966 use lance_core::utils::tempfile::TempStrDir;
1967 use lance_datagen::gen_batch;
1968 use lance_datagen::{array, BatchCount, Dimension, RowCount};
1969 use lance_index::scalar::bitmap::BITMAP_LOOKUP_NAME;
1970 use lance_index::scalar::{
1971 BuiltinIndexType, FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams,
1972 };
1973 use lance_index::vector::{
1974 hnsw::builder::HnswBuildParams, ivf::IvfBuildParams, sq::builder::SQBuildParams,
1975 };
1976 use lance_io::{assert_io_eq, assert_io_lt};
1977 use lance_linalg::distance::{DistanceType, MetricType};
1978 use lance_testing::datagen::generate_random_array;
1979 use rstest::rstest;
1980 use std::collections::HashSet;
1981
1982 #[tokio::test]
1983 async fn test_recreate_index() {
1984 const DIM: i32 = 8;
1985 let schema = Arc::new(Schema::new(vec![
1986 Field::new(
1987 "v",
1988 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), DIM),
1989 true,
1990 ),
1991 Field::new(
1992 "o",
1993 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), DIM),
1994 true,
1995 ),
1996 ]));
1997 let data = generate_random_array(2048 * DIM as usize);
1998 let batches: Vec<RecordBatch> = vec![RecordBatch::try_new(
1999 schema.clone(),
2000 vec![
2001 Arc::new(FixedSizeListArray::try_new_from_values(data.clone(), DIM).unwrap()),
2002 Arc::new(FixedSizeListArray::try_new_from_values(data, DIM).unwrap()),
2003 ],
2004 )
2005 .unwrap()];
2006
2007 let test_dir = TempStrDir::default();
2008 let test_uri = &test_dir;
2009 let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
2010 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2011
2012 let params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 2);
2013 dataset
2014 .create_index(&["v"], IndexType::Vector, None, ¶ms, true)
2015 .await
2016 .unwrap();
2017 dataset
2018 .create_index(&["o"], IndexType::Vector, None, ¶ms, true)
2019 .await
2020 .unwrap();
2021
2022 dataset
2024 .create_index(&["v"], IndexType::Vector, None, ¶ms, true)
2025 .await
2026 .unwrap();
2027
2028 assert!(dataset
2030 .create_index(
2031 &["v"],
2032 IndexType::Vector,
2033 Some("o_idx".to_string()),
2034 ¶ms,
2035 true,
2036 )
2037 .await
2038 .is_err());
2039 }
2040
2041 #[tokio::test]
2042 async fn test_bitmap_index_statistics_minimal_io_via_dataset() {
2043 const NUM_ROWS: usize = 500_000;
2044 let test_dir = TempStrDir::default();
2045 let schema = Arc::new(Schema::new(vec![Field::new(
2046 "status",
2047 DataType::Int32,
2048 false,
2049 )]));
2050 let values: Vec<i32> = (0..NUM_ROWS as i32).collect();
2051 let batch =
2052 RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(values))]).unwrap();
2053 let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2054
2055 let mut dataset = Dataset::write(reader, &test_dir, None).await.unwrap();
2056 let io_tracker = dataset.object_store().io_tracker().clone();
2057
2058 let params = ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap);
2059 dataset
2060 .create_index(
2061 &["status"],
2062 IndexType::Bitmap,
2063 Some("status_idx".to_string()),
2064 ¶ms,
2065 true,
2066 )
2067 .await
2068 .unwrap();
2069
2070 let indices = dataset.load_indices().await.unwrap();
2071 let index_meta = indices
2072 .iter()
2073 .find(|idx| idx.name == "status_idx")
2074 .expect("status_idx should exist");
2075 let lookup_path = dataset
2076 .indice_files_dir(index_meta)
2077 .unwrap()
2078 .child(index_meta.uuid.to_string())
2079 .child(BITMAP_LOOKUP_NAME);
2080 let meta = dataset.object_store.inner.head(&lookup_path).await.unwrap();
2081 assert!(
2082 meta.size >= 1_000_000,
2083 "bitmap index should be large enough to fail without metadata path, size={} bytes",
2084 meta.size
2085 );
2086
2087 io_tracker.incremental_stats();
2089
2090 dataset.index_statistics("status_idx").await.unwrap();
2091
2092 let stats = io_tracker.incremental_stats();
2093 assert_io_eq!(
2094 stats,
2095 read_bytes,
2096 4096,
2097 "index_statistics should only read the index footer; got {} bytes",
2098 stats.read_bytes
2099 );
2100 assert_io_lt!(
2101 stats,
2102 read_iops,
2103 3,
2104 "index_statistics should only require a head plus one range read; got {} ops",
2105 stats.read_iops
2106 );
2107 assert_io_eq!(
2108 stats,
2109 written_bytes,
2110 0,
2111 "index_statistics should not perform writes"
2112 );
2113 }
2114
2115 fn sample_vector_field() -> Field {
2116 let dimensions = 16;
2117 let column_name = "vec";
2118 Field::new(
2119 column_name,
2120 DataType::FixedSizeList(
2121 Arc::new(Field::new("item", DataType::Float32, true)),
2122 dimensions,
2123 ),
2124 false,
2125 )
2126 }
2127
2128 #[tokio::test]
2129 async fn test_drop_index() {
2130 let test_dir = TempStrDir::default();
2131 let schema = Schema::new(vec![
2132 sample_vector_field(),
2133 Field::new("ints", DataType::Int32, false),
2134 ]);
2135 let mut dataset = lance_datagen::rand(&schema)
2136 .into_dataset(
2137 &test_dir,
2138 FragmentCount::from(1),
2139 FragmentRowCount::from(256),
2140 )
2141 .await
2142 .unwrap();
2143
2144 let idx_name = "name".to_string();
2145 dataset
2146 .create_index(
2147 &["vec"],
2148 IndexType::Vector,
2149 Some(idx_name.clone()),
2150 &VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10),
2151 true,
2152 )
2153 .await
2154 .unwrap();
2155 dataset
2156 .create_index(
2157 &["ints"],
2158 IndexType::BTree,
2159 None,
2160 &ScalarIndexParams::default(),
2161 true,
2162 )
2163 .await
2164 .unwrap();
2165
2166 assert_eq!(dataset.load_indices().await.unwrap().len(), 2);
2167
2168 dataset.drop_index(&idx_name).await.unwrap();
2169
2170 assert_eq!(dataset.load_indices().await.unwrap().len(), 1);
2171
2172 let scalar_idx_name = &dataset.load_indices().await.unwrap()[0].name;
2174 dataset.drop_index(scalar_idx_name).await.unwrap();
2175
2176 assert_eq!(dataset.load_indices().await.unwrap().len(), 0);
2177
2178 assert!(dataset.drop_index(scalar_idx_name).await.is_err());
2180 }
2181
2182 #[tokio::test]
2183 async fn test_count_index_rows() {
2184 let test_dir = TempStrDir::default();
2185 let dimensions = 16;
2186 let column_name = "vec";
2187 let field = sample_vector_field();
2188 let schema = Arc::new(Schema::new(vec![field]));
2189
2190 let float_arr = generate_random_array(512 * dimensions as usize);
2191
2192 let vectors =
2193 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2194
2195 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
2196
2197 let reader =
2198 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2199 let test_uri = &test_dir;
2200 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2201 dataset.validate().await.unwrap();
2202
2203 assert!(dataset.index_statistics("bad_id").await.is_err());
2205 let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
2207 dataset
2208 .create_index(
2209 &[column_name],
2210 IndexType::Vector,
2211 Some("vec_idx".into()),
2212 ¶ms,
2213 true,
2214 )
2215 .await
2216 .unwrap();
2217
2218 let stats: serde_json::Value =
2219 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2220 assert_eq!(stats["num_unindexed_rows"], 0);
2221 assert_eq!(stats["num_indexed_rows"], 512);
2222
2223 let float_arr = generate_random_array(512 * dimensions as usize);
2226 let vectors =
2227 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2228
2229 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
2230
2231 let reader = RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema);
2232 dataset.append(reader, None).await.unwrap();
2233
2234 let stats: serde_json::Value =
2235 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2236 assert_eq!(stats["num_unindexed_rows"], 512);
2237 assert_eq!(stats["num_indexed_rows"], 512);
2238 }
2239
2240 #[tokio::test]
2241 async fn test_optimize_delta_indices() {
2242 let dimensions = 16;
2243 let column_name = "vec";
2244 let vec_field = Field::new(
2245 column_name,
2246 DataType::FixedSizeList(
2247 Arc::new(Field::new("item", DataType::Float32, true)),
2248 dimensions,
2249 ),
2250 false,
2251 );
2252 let other_column_name = "other_vec";
2253 let other_vec_field = Field::new(
2254 other_column_name,
2255 DataType::FixedSizeList(
2256 Arc::new(Field::new("item", DataType::Float32, true)),
2257 dimensions,
2258 ),
2259 false,
2260 );
2261 let schema = Arc::new(Schema::new(vec![vec_field, other_vec_field]));
2262
2263 let float_arr = generate_random_array(512 * dimensions as usize);
2264
2265 let vectors = Arc::new(
2266 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(),
2267 );
2268
2269 let record_batch =
2270 RecordBatch::try_new(schema.clone(), vec![vectors.clone(), vectors.clone()]).unwrap();
2271
2272 let reader = RecordBatchIterator::new(
2273 vec![record_batch.clone()].into_iter().map(Ok),
2274 schema.clone(),
2275 );
2276
2277 let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2278 let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
2279 dataset
2280 .create_index(
2281 &[column_name],
2282 IndexType::Vector,
2283 Some("vec_idx".into()),
2284 ¶ms,
2285 true,
2286 )
2287 .await
2288 .unwrap();
2289 dataset
2290 .create_index(
2291 &[other_column_name],
2292 IndexType::Vector,
2293 Some("other_vec_idx".into()),
2294 ¶ms,
2295 true,
2296 )
2297 .await
2298 .unwrap();
2299
2300 async fn get_stats(dataset: &Dataset, name: &str) -> serde_json::Value {
2301 serde_json::from_str(&dataset.index_statistics(name).await.unwrap()).unwrap()
2302 }
2303 async fn get_meta(dataset: &Dataset, name: &str) -> Vec<IndexMetadata> {
2304 dataset
2305 .load_indices()
2306 .await
2307 .unwrap()
2308 .iter()
2309 .filter(|m| m.name == name)
2310 .cloned()
2311 .collect()
2312 }
2313 fn get_bitmap(meta: &IndexMetadata) -> Vec<u32> {
2314 meta.fragment_bitmap.as_ref().unwrap().iter().collect()
2315 }
2316
2317 let stats = get_stats(&dataset, "vec_idx").await;
2318 assert_eq!(stats["num_unindexed_rows"], 0);
2319 assert_eq!(stats["num_indexed_rows"], 512);
2320 assert_eq!(stats["num_indexed_fragments"], 1);
2321 assert_eq!(stats["num_indices"], 1);
2322 let meta = get_meta(&dataset, "vec_idx").await;
2323 assert_eq!(meta.len(), 1);
2324 assert_eq!(get_bitmap(&meta[0]), vec![0]);
2325
2326 let reader =
2327 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2328 dataset.append(reader, None).await.unwrap();
2329 let stats = get_stats(&dataset, "vec_idx").await;
2330 assert_eq!(stats["num_unindexed_rows"], 512);
2331 assert_eq!(stats["num_indexed_rows"], 512);
2332 assert_eq!(stats["num_indexed_fragments"], 1);
2333 assert_eq!(stats["num_unindexed_fragments"], 1);
2334 assert_eq!(stats["num_indices"], 1);
2335 let meta = get_meta(&dataset, "vec_idx").await;
2336 assert_eq!(meta.len(), 1);
2337 assert_eq!(get_bitmap(&meta[0]), vec![0]);
2338
2339 dataset
2340 .optimize_indices(&OptimizeOptions::append().index_names(vec![])) .await
2342 .unwrap();
2343 let stats = get_stats(&dataset, "vec_idx").await;
2344 assert_eq!(stats["num_unindexed_rows"], 512);
2345 assert_eq!(stats["num_indexed_rows"], 512);
2346 assert_eq!(stats["num_indexed_fragments"], 1);
2347 assert_eq!(stats["num_unindexed_fragments"], 1);
2348 assert_eq!(stats["num_indices"], 1);
2349 let meta = get_meta(&dataset, "vec_idx").await;
2350 assert_eq!(meta.len(), 1);
2351 assert_eq!(get_bitmap(&meta[0]), vec![0]);
2352
2353 dataset
2355 .optimize_indices(
2356 &OptimizeOptions::append().index_names(vec!["other_vec_idx".to_owned()]),
2357 )
2358 .await
2359 .unwrap();
2360 let stats = get_stats(&dataset, "vec_idx").await;
2361 assert_eq!(stats["num_unindexed_rows"], 512);
2362 assert_eq!(stats["num_indexed_rows"], 512);
2363 assert_eq!(stats["num_indexed_fragments"], 1);
2364 assert_eq!(stats["num_unindexed_fragments"], 1);
2365 assert_eq!(stats["num_indices"], 1);
2366 let meta = get_meta(&dataset, "vec_idx").await;
2367 assert_eq!(meta.len(), 1);
2368 assert_eq!(get_bitmap(&meta[0]), vec![0]);
2369
2370 let stats = get_stats(&dataset, "other_vec_idx").await;
2371 assert_eq!(stats["num_unindexed_rows"], 0);
2372 assert_eq!(stats["num_indexed_rows"], 1024);
2373 assert_eq!(stats["num_indexed_fragments"], 2);
2374 assert_eq!(stats["num_unindexed_fragments"], 0);
2375 assert_eq!(stats["num_indices"], 2);
2376 let meta = get_meta(&dataset, "other_vec_idx").await;
2377 assert_eq!(meta.len(), 2);
2378 assert_eq!(get_bitmap(&meta[0]), vec![0]);
2379 assert_eq!(get_bitmap(&meta[1]), vec![1]);
2380
2381 dataset
2382 .optimize_indices(&OptimizeOptions::retrain())
2383 .await
2384 .unwrap();
2385
2386 let stats = get_stats(&dataset, "vec_idx").await;
2387 assert_eq!(stats["num_unindexed_rows"], 0);
2388 assert_eq!(stats["num_indexed_rows"], 1024);
2389 assert_eq!(stats["num_indexed_fragments"], 2);
2390 assert_eq!(stats["num_unindexed_fragments"], 0);
2391 assert_eq!(stats["num_indices"], 1);
2392 let meta = get_meta(&dataset, "vec_idx").await;
2393 assert_eq!(meta.len(), 1);
2394 assert_eq!(get_bitmap(&meta[0]), vec![0, 1]);
2395
2396 dataset
2397 .optimize_indices(&OptimizeOptions::retrain())
2398 .await
2399 .unwrap();
2400 let stats = get_stats(&dataset, "other_vec_idx").await;
2401 assert_eq!(stats["num_unindexed_rows"], 0);
2402 assert_eq!(stats["num_indexed_rows"], 1024);
2403 assert_eq!(stats["num_indexed_fragments"], 2);
2404 assert_eq!(stats["num_unindexed_fragments"], 0);
2405 assert_eq!(stats["num_indices"], 1);
2406 let meta = get_meta(&dataset, "other_vec_idx").await;
2407 assert_eq!(meta.len(), 1);
2408 assert_eq!(get_bitmap(&meta[0]), vec![0, 1]);
2409 }
2410
2411 #[tokio::test]
2412 async fn test_optimize_ivf_hnsw_sq_delta_indices() {
2413 let test_dir = TempStrDir::default();
2414 let dimensions = 16;
2415 let column_name = "vec";
2416 let field = Field::new(
2417 column_name,
2418 DataType::FixedSizeList(
2419 Arc::new(Field::new("item", DataType::Float32, true)),
2420 dimensions,
2421 ),
2422 false,
2423 );
2424 let schema = Arc::new(Schema::new(vec![field]));
2425
2426 let float_arr = generate_random_array(512 * dimensions as usize);
2427
2428 let vectors =
2429 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2430
2431 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
2432
2433 let reader = RecordBatchIterator::new(
2434 vec![record_batch.clone()].into_iter().map(Ok),
2435 schema.clone(),
2436 );
2437
2438 let test_uri = &test_dir;
2439 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2440
2441 let ivf_params = IvfBuildParams::default();
2442 let hnsw_params = HnswBuildParams::default();
2443 let sq_params = SQBuildParams::default();
2444 let params = VectorIndexParams::with_ivf_hnsw_sq_params(
2445 MetricType::L2,
2446 ivf_params,
2447 hnsw_params,
2448 sq_params,
2449 );
2450 dataset
2451 .create_index(
2452 &[column_name],
2453 IndexType::Vector,
2454 Some("vec_idx".into()),
2455 ¶ms,
2456 true,
2457 )
2458 .await
2459 .unwrap();
2460
2461 let stats: serde_json::Value =
2462 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2463 assert_eq!(stats["num_unindexed_rows"], 0);
2464 assert_eq!(stats["num_indexed_rows"], 512);
2465 assert_eq!(stats["num_indexed_fragments"], 1);
2466 assert_eq!(stats["num_indices"], 1);
2467
2468 let reader =
2469 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2470 dataset.append(reader, None).await.unwrap();
2471 let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap();
2472 let stats: serde_json::Value =
2473 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2474 assert_eq!(stats["num_unindexed_rows"], 512);
2475 assert_eq!(stats["num_indexed_rows"], 512);
2476 assert_eq!(stats["num_indexed_fragments"], 1);
2477 assert_eq!(stats["num_unindexed_fragments"], 1);
2478 assert_eq!(stats["num_indices"], 1);
2479
2480 dataset
2481 .optimize_indices(&OptimizeOptions::append())
2482 .await
2483 .unwrap();
2484
2485 let stats: serde_json::Value =
2486 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2487 assert_eq!(stats["num_unindexed_rows"], 0);
2488 assert_eq!(stats["num_indexed_rows"], 1024);
2489 assert_eq!(stats["num_indexed_fragments"], 2);
2490 assert_eq!(stats["num_unindexed_fragments"], 0);
2491 assert_eq!(stats["num_indices"], 2);
2492
2493 dataset
2494 .optimize_indices(&OptimizeOptions::retrain())
2495 .await
2496 .unwrap();
2497 let stats: serde_json::Value =
2498 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2499 assert_eq!(stats["num_unindexed_rows"], 0);
2500 assert_eq!(stats["num_indexed_rows"], 1024);
2501 assert_eq!(stats["num_indexed_fragments"], 2);
2502 assert_eq!(stats["num_unindexed_fragments"], 0);
2503 assert_eq!(stats["num_indices"], 1);
2504 }
2505
2506 #[rstest]
2507 #[tokio::test]
2508 async fn test_optimize_fts(#[values(false, true)] with_position: bool) {
2509 let words = ["apple", "banana", "cherry", "date"];
2510
2511 let dir = TempStrDir::default();
2512 let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
2513 let data = StringArray::from_iter_values(words.iter().map(|s| s.to_string()));
2514 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)]).unwrap();
2515 let batch_iterator = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2516
2517 let mut dataset = Dataset::write(batch_iterator, &dir, None).await.unwrap();
2518
2519 let params = InvertedIndexParams::default()
2520 .lower_case(false)
2521 .with_position(with_position);
2522 dataset
2523 .create_index(&["text"], IndexType::Inverted, None, ¶ms, true)
2524 .await
2525 .unwrap();
2526
2527 async fn assert_indexed_rows(dataset: &Dataset, expected_indexed_rows: usize) {
2528 let stats = dataset.index_statistics("text_idx").await.unwrap();
2529 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2530 let indexed_rows = stats["num_indexed_rows"].as_u64().unwrap() as usize;
2531 let unindexed_rows = stats["num_unindexed_rows"].as_u64().unwrap() as usize;
2532 let num_rows = dataset.count_all_rows().await.unwrap();
2533 assert_eq!(indexed_rows, expected_indexed_rows);
2534 assert_eq!(unindexed_rows, num_rows - expected_indexed_rows);
2535 }
2536
2537 let num_rows = dataset.count_all_rows().await.unwrap();
2538 assert_indexed_rows(&dataset, num_rows).await;
2539
2540 let new_words = ["elephant", "fig", "grape", "honeydew"];
2541 let new_data = StringArray::from_iter_values(new_words.iter().map(|s| s.to_string()));
2542 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(new_data)]).unwrap();
2543 let batch_iter = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2544 dataset.append(batch_iter, None).await.unwrap();
2545 assert_indexed_rows(&dataset, num_rows).await;
2546
2547 dataset
2548 .optimize_indices(&OptimizeOptions::append())
2549 .await
2550 .unwrap();
2551 let num_rows = dataset.count_all_rows().await.unwrap();
2552 assert_indexed_rows(&dataset, num_rows).await;
2553
2554 for &word in words.iter().chain(new_words.iter()) {
2555 let query_result = dataset
2556 .scan()
2557 .project(&["text"])
2558 .unwrap()
2559 .full_text_search(FullTextSearchQuery::new(word.to_string()))
2560 .unwrap()
2561 .limit(Some(10), None)
2562 .unwrap()
2563 .try_into_batch()
2564 .await
2565 .unwrap();
2566
2567 let texts = query_result["text"]
2568 .as_string::<i32>()
2569 .iter()
2570 .map(|v| match v {
2571 None => "".to_string(),
2572 Some(v) => v.to_string(),
2573 })
2574 .collect::<Vec<String>>();
2575
2576 assert_eq!(texts.len(), 1);
2577 assert_eq!(texts[0], word);
2578 }
2579
2580 let uppercase_words = ["Apple", "Banana", "Cherry", "Date"];
2581 for &word in uppercase_words.iter() {
2582 let query_result = dataset
2583 .scan()
2584 .project(&["text"])
2585 .unwrap()
2586 .full_text_search(FullTextSearchQuery::new(word.to_string()))
2587 .unwrap()
2588 .limit(Some(10), None)
2589 .unwrap()
2590 .try_into_batch()
2591 .await
2592 .unwrap();
2593
2594 let texts = query_result["text"]
2595 .as_string::<i32>()
2596 .iter()
2597 .map(|v| match v {
2598 None => "".to_string(),
2599 Some(v) => v.to_string(),
2600 })
2601 .collect::<Vec<String>>();
2602
2603 assert_eq!(texts.len(), 0);
2604 }
2605 let new_data = StringArray::from_iter_values(uppercase_words.iter().map(|s| s.to_string()));
2606 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(new_data)]).unwrap();
2607 let batch_iter = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2608 dataset.append(batch_iter, None).await.unwrap();
2609 assert_indexed_rows(&dataset, num_rows).await;
2610
2611 for &word in uppercase_words.iter() {
2613 let query_result = dataset
2614 .scan()
2615 .project(&["text"])
2616 .unwrap()
2617 .full_text_search(FullTextSearchQuery::new(word.to_string()))
2618 .unwrap()
2619 .limit(Some(10), None)
2620 .unwrap()
2621 .try_into_batch()
2622 .await
2623 .unwrap();
2624
2625 let texts = query_result["text"]
2626 .as_string::<i32>()
2627 .iter()
2628 .map(|v| match v {
2629 None => "".to_string(),
2630 Some(v) => v.to_string(),
2631 })
2632 .collect::<Vec<String>>();
2633
2634 assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2635 assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2636 }
2637
2638 dataset
2639 .optimize_indices(&OptimizeOptions::append())
2640 .await
2641 .unwrap();
2642 let num_rows = dataset.count_all_rows().await.unwrap();
2643 assert_indexed_rows(&dataset, num_rows).await;
2644
2645 for &word in uppercase_words.iter() {
2647 let query_result = dataset
2648 .scan()
2649 .project(&["text"])
2650 .unwrap()
2651 .full_text_search(FullTextSearchQuery::new(word.to_string()))
2652 .unwrap()
2653 .limit(Some(10), None)
2654 .unwrap()
2655 .try_into_batch()
2656 .await
2657 .unwrap();
2658
2659 let texts = query_result["text"]
2660 .as_string::<i32>()
2661 .iter()
2662 .map(|v| match v {
2663 None => "".to_string(),
2664 Some(v) => v.to_string(),
2665 })
2666 .collect::<Vec<String>>();
2667
2668 assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2669 assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2670
2671 compact_files(&mut dataset, CompactionOptions::default(), None)
2673 .await
2674 .unwrap();
2675 for &word in uppercase_words.iter() {
2676 let query_result = dataset
2677 .scan()
2678 .project(&["text"])
2679 .unwrap()
2680 .full_text_search(FullTextSearchQuery::new(word.to_string()))
2681 .unwrap()
2682 .try_into_batch()
2683 .await
2684 .unwrap();
2685 let texts = query_result["text"]
2686 .as_string::<i32>()
2687 .iter()
2688 .map(|v| match v {
2689 None => "".to_string(),
2690 Some(v) => v.to_string(),
2691 })
2692 .collect::<Vec<String>>();
2693 assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2694 assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2695 }
2696 assert_indexed_rows(&dataset, num_rows).await;
2697 }
2698 }
2699
2700 #[tokio::test]
2701 async fn test_create_index_too_small_for_pq() {
2702 let test_dir = TempStrDir::default();
2703 let dimensions = 1536;
2704
2705 let field = Field::new(
2706 "vector",
2707 DataType::FixedSizeList(
2708 Arc::new(Field::new("item", DataType::Float32, true)),
2709 dimensions,
2710 ),
2711 false,
2712 );
2713
2714 let schema = Arc::new(Schema::new(vec![field]));
2715 let float_arr = generate_random_array(100 * dimensions as usize);
2716
2717 let vectors =
2718 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2719 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
2720 let reader = RecordBatchIterator::new(
2721 vec![record_batch.clone()].into_iter().map(Ok),
2722 schema.clone(),
2723 );
2724
2725 let test_uri = &test_dir;
2726 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2727
2728 let params = VectorIndexParams::ivf_pq(1, 8, 96, DistanceType::L2, 1);
2729 let result = dataset
2730 .create_index(&["vector"], IndexType::Vector, None, ¶ms, false)
2731 .await;
2732
2733 assert!(matches!(result, Err(Error::Unprocessable { .. })));
2734 if let Error::Unprocessable { message, .. } = result.unwrap_err() {
2735 assert_eq!(
2736 message,
2737 "Not enough rows to train PQ. Requires 256 rows but only 100 available",
2738 )
2739 }
2740 }
2741
2742 #[tokio::test]
2743 async fn test_create_bitmap_index() {
2744 let test_dir = TempStrDir::default();
2745 let field = Field::new("tag", DataType::Utf8, false);
2746 let schema = Arc::new(Schema::new(vec![field]));
2747 let array = StringArray::from_iter_values((0..128).map(|i| ["a", "b", "c"][i % 3]));
2748 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2749 let reader = RecordBatchIterator::new(
2750 vec![record_batch.clone()].into_iter().map(Ok),
2751 schema.clone(),
2752 );
2753
2754 let test_uri = &test_dir;
2755 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2756 dataset
2757 .create_index(
2758 &["tag"],
2759 IndexType::Bitmap,
2760 None,
2761 &ScalarIndexParams::default(),
2762 false,
2763 )
2764 .await
2765 .unwrap();
2766 let indices = dataset.load_indices().await.unwrap();
2767 let index = dataset
2768 .open_generic_index("tag", &indices[0].uuid.to_string(), &NoOpMetricsCollector)
2769 .await
2770 .unwrap();
2771 assert_eq!(index.index_type(), IndexType::Bitmap);
2772 }
2773
2774 #[lance_test_macros::test(tokio::test)]
2776 async fn test_load_indices() {
2777 let session = Arc::new(Session::default());
2778 let write_params = WriteParams {
2779 session: Some(session.clone()),
2780 ..Default::default()
2781 };
2782
2783 let test_dir = TempStrDir::default();
2784 let field = Field::new("tag", DataType::Utf8, false);
2785 let schema = Arc::new(Schema::new(vec![field]));
2786 let array = StringArray::from_iter_values((0..128).map(|i| ["a", "b", "c"][i % 3]));
2787 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2788 let reader = RecordBatchIterator::new(
2789 vec![record_batch.clone()].into_iter().map(Ok),
2790 schema.clone(),
2791 );
2792
2793 let test_uri = &test_dir;
2794 let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
2795 .await
2796 .unwrap();
2797 dataset
2798 .create_index(
2799 &["tag"],
2800 IndexType::Bitmap,
2801 None,
2802 &ScalarIndexParams::default(),
2803 false,
2804 )
2805 .await
2806 .unwrap();
2807 dataset.object_store().io_stats_incremental(); let indices = dataset.load_indices().await.unwrap();
2810 let stats = dataset.object_store().io_stats_incremental();
2811 assert_io_eq!(stats, read_iops, 0);
2813 assert_io_eq!(stats, read_bytes, 0);
2814 assert_eq!(indices.len(), 1);
2815
2816 session.index_cache.clear().await; let dataset2 = DatasetBuilder::from_uri(test_uri)
2819 .with_session(session.clone())
2820 .load()
2821 .await
2822 .unwrap();
2823 let stats = dataset2.object_store().io_stats_incremental(); assert_io_lt!(stats, read_bytes, 64 * 1024);
2825
2826 let indices2 = dataset2.load_indices().await.unwrap();
2829 let stats = dataset2.object_store().io_stats_incremental();
2830 assert_io_eq!(stats, read_iops, 0);
2831 assert_io_eq!(stats, read_bytes, 0);
2832 assert_eq!(indices2.len(), 1);
2833 }
2834
2835 #[tokio::test]
2836 async fn test_remap_empty() {
2837 let data = gen_batch()
2838 .col("int", array::step::<Int32Type>())
2839 .col(
2840 "vector",
2841 array::rand_vec::<Float32Type>(Dimension::from(16)),
2842 )
2843 .into_reader_rows(RowCount::from(256), BatchCount::from(1));
2844 let mut dataset = Dataset::write(data, "memory://", None).await.unwrap();
2845
2846 let params = VectorIndexParams::ivf_pq(1, 8, 1, DistanceType::L2, 1);
2847 dataset
2848 .create_index(&["vector"], IndexType::Vector, None, ¶ms, false)
2849 .await
2850 .unwrap();
2851
2852 let index_uuid = dataset.load_indices().await.unwrap()[0].uuid;
2853 let remap_to_empty = (0..dataset.count_all_rows().await.unwrap())
2854 .map(|i| (i as u64, None))
2855 .collect::<HashMap<_, _>>();
2856 let new_uuid = remap_index(&dataset, &index_uuid, &remap_to_empty)
2857 .await
2858 .unwrap();
2859 assert_eq!(new_uuid, RemapResult::Keep(index_uuid));
2860 }
2861
2862 #[tokio::test]
2863 async fn test_optimize_ivf_pq_up_to_date() {
2864 let nrows = 256;
2866 let dimensions = 16;
2867 let column_name = "vector";
2868 let schema = Arc::new(Schema::new(vec![
2869 Field::new("id", DataType::Int32, false),
2870 Field::new(
2871 column_name,
2872 DataType::FixedSizeList(
2873 Arc::new(Field::new("item", DataType::Float32, true)),
2874 dimensions,
2875 ),
2876 false,
2877 ),
2878 ]));
2879
2880 let float_arr = generate_random_array(nrows * dimensions as usize);
2881 let vectors =
2882 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2883 let record_batch = RecordBatch::try_new(
2884 schema.clone(),
2885 vec![
2886 Arc::new(arrow_array::Int32Array::from_iter_values(0..nrows as i32)),
2887 Arc::new(vectors),
2888 ],
2889 )
2890 .unwrap();
2891
2892 let reader = RecordBatchIterator::new(
2893 vec![record_batch.clone()].into_iter().map(Ok),
2894 schema.clone(),
2895 );
2896 let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2897
2898 let params = VectorIndexParams::ivf_pq(1, 8, 2, MetricType::L2, 2);
2899 dataset
2900 .create_index(&[column_name], IndexType::Vector, None, ¶ms, true)
2901 .await
2902 .unwrap();
2903
2904 let query_vector = generate_random_array(dimensions as usize);
2905
2906 let nearest = dataset
2907 .scan()
2908 .nearest(column_name, &query_vector, 5)
2909 .unwrap()
2910 .try_into_batch()
2911 .await
2912 .unwrap();
2913
2914 let ids = nearest["id"].as_primitive::<Int32Type>();
2915 let mut seen = HashSet::new();
2916 for id in ids.values() {
2917 assert!(seen.insert(*id), "Duplicate id found: {}", id);
2918 }
2919
2920 dataset
2921 .optimize_indices(&OptimizeOptions::default())
2922 .await
2923 .unwrap();
2924
2925 dataset.validate().await.unwrap();
2926
2927 let nearest_after = dataset
2928 .scan()
2929 .nearest(column_name, &query_vector, 5)
2930 .unwrap()
2931 .try_into_batch()
2932 .await
2933 .unwrap();
2934
2935 let ids = nearest_after["id"].as_primitive::<Int32Type>();
2936 let mut seen = HashSet::new();
2937 for id in ids.values() {
2938 assert!(seen.insert(*id), "Duplicate id found: {}", id);
2939 }
2940 }
2941
2942 #[tokio::test]
2943 async fn test_index_created_at_timestamp() {
2944 let schema = Arc::new(Schema::new(vec![
2946 Field::new("id", DataType::Int32, false),
2947 Field::new("values", DataType::Utf8, false),
2948 ]));
2949
2950 let values = StringArray::from_iter_values(["hello", "world", "foo", "bar"]);
2951 let record_batch = RecordBatch::try_new(
2952 schema.clone(),
2953 vec![
2954 Arc::new(Int32Array::from_iter_values(0..4)),
2955 Arc::new(values),
2956 ],
2957 )
2958 .unwrap();
2959
2960 let reader =
2961 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2962
2963 let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2964
2965 let before_index = chrono::Utc::now();
2967
2968 dataset
2970 .create_index(
2971 &["values"],
2972 IndexType::Scalar,
2973 Some("test_idx".to_string()),
2974 &ScalarIndexParams::default(),
2975 false,
2976 )
2977 .await
2978 .unwrap();
2979
2980 let after_index = chrono::Utc::now();
2982
2983 let indices = dataset.load_indices().await.unwrap();
2985 let test_index = indices.iter().find(|idx| idx.name == "test_idx").unwrap();
2986
2987 assert!(test_index.created_at.is_some());
2989 let created_at = test_index.created_at.unwrap();
2990 assert!(created_at >= before_index);
2991 assert!(created_at <= after_index);
2992 }
2993
2994 #[tokio::test]
2995 async fn test_index_statistics_updated_at() {
2996 let schema = Arc::new(Schema::new(vec![
2998 Field::new("id", DataType::Int32, false),
2999 Field::new("values", DataType::Utf8, false),
3000 ]));
3001
3002 let values = StringArray::from_iter_values(["hello", "world", "foo", "bar"]);
3003 let record_batch = RecordBatch::try_new(
3004 schema.clone(),
3005 vec![
3006 Arc::new(Int32Array::from_iter_values(0..4)),
3007 Arc::new(values),
3008 ],
3009 )
3010 .unwrap();
3011
3012 let reader =
3013 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
3014
3015 let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
3016
3017 dataset
3019 .create_index(
3020 &["values"],
3021 IndexType::Scalar,
3022 Some("test_idx".to_string()),
3023 &ScalarIndexParams::default(),
3024 false,
3025 )
3026 .await
3027 .unwrap();
3028
3029 let stats_str = dataset.index_statistics("test_idx").await.unwrap();
3031 let stats: serde_json::Value = serde_json::from_str(&stats_str).unwrap();
3032
3033 assert!(stats["updated_at_timestamp_ms"].is_number());
3035 let updated_at = stats["updated_at_timestamp_ms"].as_u64().unwrap();
3036
3037 let indices = dataset.load_indices().await.unwrap();
3039 let test_index = indices.iter().find(|idx| idx.name == "test_idx").unwrap();
3040 let created_at = test_index.created_at.unwrap().timestamp_millis() as u64;
3041
3042 assert_eq!(updated_at, created_at);
3044 }
3045
3046 #[tokio::test]
3047 async fn test_index_statistics_updated_at_multiple_deltas() {
3048 let schema = Arc::new(Schema::new(vec![
3050 Field::new("id", DataType::Int32, false),
3051 Field::new(
3052 "vector",
3053 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4),
3054 false,
3055 ),
3056 ]));
3057
3058 let num_rows = 300;
3060 let float_arr = generate_random_array(4 * num_rows);
3061 let vectors = FixedSizeListArray::try_new_from_values(float_arr, 4).unwrap();
3062 let record_batch = RecordBatch::try_new(
3063 schema.clone(),
3064 vec![
3065 Arc::new(Int32Array::from_iter_values(0..num_rows as i32)),
3066 Arc::new(vectors),
3067 ],
3068 )
3069 .unwrap();
3070
3071 let reader =
3072 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
3073
3074 let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
3075
3076 let params = VectorIndexParams::ivf_pq(1, 8, 2, MetricType::L2, 2);
3078 dataset
3079 .create_index(
3080 &["vector"],
3081 IndexType::Vector,
3082 Some("test_vec_idx".to_string()),
3083 ¶ms,
3084 false,
3085 )
3086 .await
3087 .unwrap();
3088
3089 let stats_str_1 = dataset.index_statistics("test_vec_idx").await.unwrap();
3091 let stats_1: serde_json::Value = serde_json::from_str(&stats_str_1).unwrap();
3092 let initial_updated_at = stats_1["updated_at_timestamp_ms"].as_u64().unwrap();
3093
3094 std::thread::sleep(std::time::Duration::from_millis(10)); let num_rows_2 = 50;
3098 let float_arr_2 = generate_random_array(4 * num_rows_2);
3099 let vectors_2 = FixedSizeListArray::try_new_from_values(float_arr_2, 4).unwrap();
3100 let record_batch_2 = RecordBatch::try_new(
3101 schema.clone(),
3102 vec![
3103 Arc::new(Int32Array::from_iter_values(
3104 num_rows as i32..(num_rows + num_rows_2) as i32,
3105 )),
3106 Arc::new(vectors_2),
3107 ],
3108 )
3109 .unwrap();
3110
3111 let reader_2 =
3112 RecordBatchIterator::new(vec![record_batch_2].into_iter().map(Ok), schema.clone());
3113
3114 dataset.append(reader_2, None).await.unwrap();
3115
3116 dataset
3118 .create_index(
3119 &["vector"],
3120 IndexType::Vector,
3121 Some("test_vec_idx".to_string()),
3122 ¶ms,
3123 true,
3124 )
3125 .await
3126 .unwrap();
3127
3128 let stats_str_2 = dataset.index_statistics("test_vec_idx").await.unwrap();
3130 let stats_2: serde_json::Value = serde_json::from_str(&stats_str_2).unwrap();
3131 let final_updated_at = stats_2["updated_at_timestamp_ms"].as_u64().unwrap();
3132
3133 assert!(final_updated_at >= initial_updated_at);
3135 }
3136
3137 #[tokio::test]
3138 async fn test_index_statistics_updated_at_none_when_no_created_at() {
3139 let test_dir =
3144 copy_test_data_to_tmp("v0.30.0_pre_created_at/index_without_created_at").unwrap();
3145 let test_uri = test_dir.path_str();
3146 let test_uri = &test_uri;
3147
3148 let dataset = Dataset::open(test_uri).await.unwrap();
3149
3150 let indices = dataset.load_indices().await.unwrap();
3152 assert!(!indices.is_empty(), "Test dataset should have indices");
3153
3154 for index in indices.iter() {
3156 assert!(
3157 index.created_at.is_none(),
3158 "Index from old version should have created_at = None"
3159 );
3160 }
3161
3162 let index_name = &indices[0].name;
3164 let stats_str = dataset.index_statistics(index_name).await.unwrap();
3165 let stats: serde_json::Value = serde_json::from_str(&stats_str).unwrap();
3166
3167 assert!(
3169 stats["updated_at_timestamp_ms"].is_null(),
3170 "updated_at_timestamp_ms should be null when no indices have created_at timestamps"
3171 );
3172 }
3173 #[rstest]
3174 #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
3175 #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
3176 #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
3177 #[tokio::test]
3178 async fn test_create_empty_scalar_index(
3179 #[case] column_name: &str,
3180 #[case] index_type: IndexType,
3181 #[case] params: Box<dyn IndexParams>,
3182 ) {
3183 use lance_datagen::{array, BatchCount, ByteCount, RowCount};
3184
3185 let reader = lance_datagen::gen_batch()
3187 .col("i", array::step::<Int32Type>())
3188 .col("text", array::rand_utf8(ByteCount::from(10), false))
3189 .into_reader_rows(RowCount::from(100), BatchCount::from(1));
3190 let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
3191
3192 dataset
3195 .create_index_builder(&[column_name], index_type, params.as_ref())
3196 .name("index".to_string())
3197 .train(false)
3198 .await
3199 .unwrap();
3200
3201 let stats = dataset.index_statistics("index").await.unwrap();
3203 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3204 assert_eq!(
3205 stats["num_indexed_rows"], 0,
3206 "Empty index should have zero indexed rows"
3207 );
3208
3209 let append_reader = lance_datagen::gen_batch()
3211 .col("i", array::step::<Int32Type>())
3212 .col("text", array::rand_utf8(ByteCount::from(10), false))
3213 .into_reader_rows(RowCount::from(50), BatchCount::from(1));
3214
3215 dataset.append(append_reader, None).await.unwrap();
3216
3217 let indices_after_append = dataset.load_indices().await.unwrap();
3219 assert_eq!(
3220 indices_after_append.len(),
3221 1,
3222 "Index should be retained after append for index type {:?}",
3223 index_type
3224 );
3225
3226 let stats = dataset.index_statistics("index").await.unwrap();
3227 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3228 assert_eq!(
3229 stats["num_indexed_rows"], 0,
3230 "Empty index should still have zero indexed rows after append"
3231 );
3232
3233 dataset.optimize_indices(&Default::default()).await.unwrap();
3235
3236 let indices_after_optimize = dataset.load_indices().await.unwrap();
3238 assert_eq!(
3239 indices_after_optimize.len(),
3240 1,
3241 "Index should still exist after optimization"
3242 );
3243
3244 let stats = dataset.index_statistics("index").await.unwrap();
3246 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3247 assert_eq!(
3248 stats["num_unindexed_rows"], 0,
3249 "Empty index should indexed all rows"
3250 );
3251 }
3252
3253 fn assert_index_usage(plan: &str, column_name: &str, should_use_index: bool, context: &str) {
3255 let index_used = if column_name == "text" {
3256 plan.contains("MatchQuery")
3258 } else {
3259 plan.contains("ScalarIndexQuery")
3261 };
3262
3263 if should_use_index {
3264 assert!(
3265 index_used,
3266 "Query plan should use index {}: {}",
3267 context, plan
3268 );
3269 } else {
3270 assert!(
3271 !index_used,
3272 "Query plan should NOT use index {}: {}",
3273 context, plan
3274 );
3275 }
3276 }
3277
3278 #[rstest]
3286 #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
3287 #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
3288 #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
3289 #[tokio::test]
3290 async fn test_scalar_index_retained_after_delete_all(
3291 #[case] column_name: &str,
3292 #[case] index_type: IndexType,
3293 #[case] params: Box<dyn IndexParams>,
3294 ) {
3295 use lance_datagen::{array, BatchCount, ByteCount, RowCount};
3296
3297 let reader = lance_datagen::gen_batch()
3299 .col("i", array::step::<Int32Type>())
3300 .col("text", array::rand_utf8(ByteCount::from(10), false))
3301 .into_reader_rows(RowCount::from(100), BatchCount::from(1));
3302 let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
3303
3304 dataset
3306 .create_index_builder(&[column_name], index_type, params.as_ref())
3307 .name("index".to_string())
3308 .train(true)
3309 .await
3310 .unwrap();
3311
3312 let stats = dataset.index_statistics("index").await.unwrap();
3314 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3315 assert_eq!(
3316 stats["num_indexed_rows"], 100,
3317 "Index should have indexed all 100 rows"
3318 );
3319
3320 let plan = if column_name == "text" {
3322 dataset
3324 .scan()
3325 .full_text_search(FullTextSearchQuery::new("test".to_string()))
3326 .unwrap()
3327 .explain_plan(false)
3328 .await
3329 .unwrap()
3330 } else {
3331 dataset
3333 .scan()
3334 .filter(format!("{} = 50", column_name).as_str())
3335 .unwrap()
3336 .explain_plan(false)
3337 .await
3338 .unwrap()
3339 };
3340 assert_index_usage(&plan, column_name, true, "before delete");
3342
3343 let indexes = dataset.load_indices().await.unwrap();
3344 let original_index = indexes[0].clone();
3345
3346 dataset.delete("true").await.unwrap();
3348
3349 let row_count = dataset.count_rows(None).await.unwrap();
3351 assert_eq!(row_count, 0, "Table should be empty after delete all");
3352
3353 let indices_after_delete = dataset.load_indices().await.unwrap();
3355 assert_eq!(
3356 indices_after_delete.len(),
3357 1,
3358 "Index should be retained after deleting all data"
3359 );
3360 assert_eq!(
3361 indices_after_delete[0].name, "index",
3362 "Index name should remain the same after delete"
3363 );
3364
3365 let index_after_delete = &indices_after_delete[0];
3367 let effective_bitmap = index_after_delete
3368 .effective_fragment_bitmap(&dataset.fragment_bitmap)
3369 .unwrap();
3370 assert!(
3371 effective_bitmap.is_empty(),
3372 "Effective bitmap should be empty after deleting all data"
3373 );
3374 assert_eq!(
3375 index_after_delete.fragment_bitmap, original_index.fragment_bitmap,
3376 "Fragment bitmap should remain the same after delete"
3377 );
3378
3379 let stats = dataset.index_statistics("index").await.unwrap();
3381 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3382 assert_eq!(
3383 stats["num_indexed_rows"], 0,
3384 "Index should now report zero indexed rows after delete all"
3385 );
3386 assert_eq!(
3387 stats["num_unindexed_rows"], 0,
3388 "Index should report zero unindexed rows after delete all"
3389 );
3390 assert_eq!(
3391 stats["num_indexed_fragments"], 0,
3392 "Index should report zero indexed fragments after delete all"
3393 );
3394 assert_eq!(
3395 stats["num_unindexed_fragments"], 0,
3396 "Index should report zero unindexed fragments after delete all"
3397 );
3398
3399 if column_name == "text" {
3401 let _plan_after_delete = dataset
3405 .scan()
3406 .project(&[column_name])
3407 .unwrap()
3408 .full_text_search(FullTextSearchQuery::new("test".to_string()))
3409 .unwrap()
3410 .explain_plan(false)
3411 .await
3412 .unwrap();
3413 assert_index_usage(
3414 &_plan_after_delete,
3415 column_name,
3416 true,
3417 "after delete (empty bitmap)",
3418 );
3419 } else {
3420 let _plan_after_delete = dataset
3422 .scan()
3423 .filter(format!("{} = 50", column_name).as_str())
3424 .unwrap()
3425 .explain_plan(false)
3426 .await
3427 .unwrap();
3428 assert_index_usage(
3430 &_plan_after_delete,
3431 column_name,
3432 false,
3433 "after delete (empty bitmap)",
3434 );
3435 }
3436
3437 let append_reader = lance_datagen::gen_batch()
3439 .col("i", array::step::<Int32Type>())
3440 .col("text", array::rand_utf8(ByteCount::from(10), false))
3441 .into_reader_rows(RowCount::from(50), BatchCount::from(1));
3442
3443 dataset.append(append_reader, None).await.unwrap();
3444
3445 let indices_after_append = dataset.load_indices().await.unwrap();
3447 assert_eq!(
3448 indices_after_append.len(),
3449 1,
3450 "Index should still exist after appending to empty table"
3451 );
3452 let stats = dataset.index_statistics("index").await.unwrap();
3453 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3454 assert_eq!(
3455 stats["num_indexed_rows"], 0,
3456 "Index should now report zero indexed rows after data is added"
3457 );
3458
3459 dataset.optimize_indices(&Default::default()).await.unwrap();
3461
3462 let indices_after_optimize = dataset.load_indices().await.unwrap();
3464 assert_eq!(
3465 indices_after_optimize.len(),
3466 1,
3467 "Index should still exist after optimization following delete all"
3468 );
3469
3470 let stats = dataset.index_statistics("index").await.unwrap();
3472 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3473 assert_eq!(
3474 stats["num_indexed_rows"],
3475 dataset.count_rows(None).await.unwrap(),
3476 "Index should now cover all newly added rows after optimization"
3477 );
3478 }
3479
3480 #[rstest]
3488 #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
3489 #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
3490 #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
3491 #[tokio::test]
3492 async fn test_scalar_index_retained_after_update(
3493 #[case] column_name: &str,
3494 #[case] index_type: IndexType,
3495 #[case] params: Box<dyn IndexParams>,
3496 ) {
3497 use crate::dataset::UpdateBuilder;
3498 use lance_datagen::{array, BatchCount, ByteCount, RowCount};
3499
3500 let reader = lance_datagen::gen_batch()
3502 .col("i", array::step::<Int32Type>())
3503 .col("text", array::rand_utf8(ByteCount::from(10), false))
3504 .into_reader_rows(RowCount::from(100), BatchCount::from(1));
3505 let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
3506
3507 dataset
3509 .create_index_builder(&[column_name], index_type, params.as_ref())
3510 .name("index".to_string())
3511 .train(true)
3512 .await
3513 .unwrap();
3514
3515 let stats = dataset.index_statistics("index").await.unwrap();
3517 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3518 assert_eq!(
3519 stats["num_indexed_rows"], 100,
3520 "Index should have indexed all 100 rows"
3521 );
3522
3523 let plan = if column_name == "text" {
3525 dataset
3527 .scan()
3528 .project(&[column_name])
3529 .unwrap()
3530 .full_text_search(FullTextSearchQuery::new("test".to_string()))
3531 .unwrap()
3532 .explain_plan(false)
3533 .await
3534 .unwrap()
3535 } else {
3536 dataset
3538 .scan()
3539 .filter(format!("{} = 50", column_name).as_str())
3540 .unwrap()
3541 .explain_plan(false)
3542 .await
3543 .unwrap()
3544 };
3545 assert_index_usage(&plan, column_name, true, "before update");
3547
3548 let update_result = UpdateBuilder::new(Arc::new(dataset))
3550 .set("i", "i + 1000")
3551 .unwrap()
3552 .set("text", "'updated_' || text")
3553 .unwrap()
3554 .build()
3555 .unwrap()
3556 .execute()
3557 .await
3558 .unwrap();
3559
3560 let mut dataset = update_result.new_dataset.as_ref().clone();
3561
3562 let row_count = dataset.count_rows(None).await.unwrap();
3564 assert_eq!(row_count, 100, "Row count should remain 100 after update");
3565
3566 let indices_after_update = dataset.load_indices().await.unwrap();
3568 assert_eq!(
3569 indices_after_update.len(),
3570 1,
3571 "Index should be retained after updating rows"
3572 );
3573
3574 let indices = dataset.load_indices().await.unwrap();
3576 let index = &indices[0];
3577 let effective_bitmap = index
3578 .effective_fragment_bitmap(&dataset.fragment_bitmap)
3579 .unwrap();
3580 assert!(
3581 effective_bitmap.is_empty(),
3582 "Effective fragment bitmap should be empty after updating all data"
3583 );
3584
3585 let stats_after_update = dataset.index_statistics("index").await.unwrap();
3587 let stats_after_update: serde_json::Value =
3588 serde_json::from_str(&stats_after_update).unwrap();
3589
3590 assert_eq!(
3592 stats_after_update["num_indexed_rows"], 0,
3593 "Index statistics should be zero after update, as it is not re-trained"
3594 );
3595
3596 if column_name == "text" {
3598 let _plan_after_update = dataset
3604 .scan()
3605 .project(&[column_name])
3606 .unwrap()
3607 .full_text_search(FullTextSearchQuery::new("test".to_string()))
3608 .unwrap()
3609 .explain_plan(false)
3610 .await
3611 .unwrap();
3612 assert_index_usage(
3613 &_plan_after_update,
3614 column_name,
3615 true, "after update (empty bitmap)",
3617 );
3618 } else {
3619 let _plan_after_update = dataset
3621 .scan()
3622 .filter(format!("{} = 50", column_name).as_str())
3623 .unwrap()
3624 .explain_plan(false)
3625 .await
3626 .unwrap();
3627 assert_index_usage(
3630 &_plan_after_update,
3631 column_name,
3632 false,
3633 "after update (empty effective bitmap)",
3634 );
3635 }
3636
3637 dataset.optimize_indices(&Default::default()).await.unwrap();
3639
3640 let indices_after_optimize = dataset.load_indices().await.unwrap();
3642 assert_eq!(
3643 indices_after_optimize.len(),
3644 1,
3645 "Index should still exist after optimization following update"
3646 );
3647
3648 let stats_after_optimization = dataset.index_statistics("index").await.unwrap();
3649 let stats_after_optimization: serde_json::Value =
3650 serde_json::from_str(&stats_after_optimization).unwrap();
3651
3652 assert_eq!(
3654 stats_after_optimization["num_unindexed_rows"], 0,
3655 "Index should have zero unindexed rows after optimization"
3656 );
3657 }
3658
3659 async fn validate_indices_after_clone(
3661 dataset: &Dataset,
3662 round: usize,
3663 expected_scalar_rows: usize,
3664 dimensions: u32,
3665 ) {
3666 let indices = dataset.load_indices().await.unwrap();
3668 assert_eq!(
3669 indices.len(),
3670 2,
3671 "Round {}: Cloned dataset should have 2 indices",
3672 round
3673 );
3674 let index_names: HashSet<String> = indices.iter().map(|idx| idx.name.clone()).collect();
3675 assert!(
3676 index_names.contains("vector_idx"),
3677 "Round {}: Should contain vector_idx",
3678 round
3679 );
3680 assert!(
3681 index_names.contains("category_idx"),
3682 "Round {}: Should contain category_idx",
3683 round
3684 );
3685
3686 let expected_total_rows = 300 + (round - 1) * 50;
3691 let total_rows = dataset.count_rows(None).await.unwrap();
3692 assert_eq!(
3693 total_rows, expected_total_rows,
3694 "Round {}: Should have {} rows after clone (chain cloning accumulates data)",
3695 round, expected_total_rows
3696 );
3697
3698 let query_vector = generate_random_array(dimensions as usize);
3700 let search_results = dataset
3701 .scan()
3702 .nearest("vector", &query_vector, 5)
3703 .unwrap()
3704 .limit(Some(5), None)
3705 .unwrap()
3706 .try_into_batch()
3707 .await
3708 .unwrap();
3709 assert!(
3710 search_results.num_rows() > 0,
3711 "Round {}: Vector search should return results immediately after clone",
3712 round
3713 );
3714
3715 let scalar_results = dataset
3717 .scan()
3718 .filter("category = 'category_0'")
3719 .unwrap()
3720 .try_into_batch()
3721 .await
3722 .unwrap();
3723
3724 assert_eq!(
3725 expected_scalar_rows,
3726 scalar_results.num_rows(),
3727 "Round {}: Scalar query should return {} results",
3728 round,
3729 expected_scalar_rows
3730 );
3731 }
3732
3733 #[tokio::test]
3734 async fn test_shallow_clone_with_index() {
3735 let test_dir = TempStrDir::default();
3736 let test_uri = &test_dir;
3737
3738 let dimensions = 16u32;
3740 let data = gen_batch()
3742 .col("id", array::step::<Int32Type>())
3743 .col("category", array::fill_utf8("category_0".to_string()))
3744 .col(
3745 "vector",
3746 array::rand_vec::<Float32Type>(Dimension::from(dimensions)),
3747 )
3748 .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3749
3750 let mut dataset = Dataset::write(data, test_uri, None).await.unwrap();
3752 let vector_params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10);
3754 dataset
3755 .create_index(
3756 &["vector"],
3757 IndexType::Vector,
3758 Some("vector_idx".to_string()),
3759 &vector_params,
3760 true,
3761 )
3762 .await
3763 .unwrap();
3764
3765 dataset
3767 .create_index(
3768 &["category"],
3769 IndexType::BTree,
3770 Some("category_idx".to_string()),
3771 &ScalarIndexParams::default(),
3772 true,
3773 )
3774 .await
3775 .unwrap();
3776
3777 let indices = dataset.load_indices().await.unwrap();
3779 assert_eq!(indices.len(), 2, "Should have 2 indices");
3780 let index_names: HashSet<String> = indices.iter().map(|idx| idx.name.clone()).collect();
3781 assert!(index_names.contains("vector_idx"));
3782 assert!(index_names.contains("category_idx"));
3783
3784 let scalar_results = dataset
3786 .scan()
3787 .filter("category = 'category_0'")
3788 .unwrap()
3789 .try_into_batch()
3790 .await
3791 .unwrap();
3792
3793 let source_scalar_query_rows = scalar_results.num_rows();
3794 assert!(
3795 scalar_results.num_rows() > 0,
3796 "Scalar query should return results"
3797 );
3798
3799 let clone_rounds = 3;
3801 let mut current_dataset = dataset;
3802
3803 for round in 1..=clone_rounds {
3804 let round_clone_dir = format!("{}/clone_round_{}", test_dir, round);
3805 let round_cloned_uri = &round_clone_dir;
3806 let tag_name = format!("shallow_clone_test_{}", round);
3807
3808 let current_version = current_dataset.version().version;
3810 current_dataset
3811 .tags()
3812 .create(&tag_name, current_version)
3813 .await
3814 .unwrap();
3815
3816 let mut round_cloned_dataset = current_dataset
3818 .shallow_clone(round_cloned_uri, tag_name.as_str(), None)
3819 .await
3820 .unwrap();
3821
3822 validate_indices_after_clone(
3824 &round_cloned_dataset,
3825 round,
3826 source_scalar_query_rows,
3827 dimensions,
3828 )
3829 .await;
3830
3831 let new_data = gen_batch()
3834 .col(
3835 "id",
3836 array::step_custom::<Int32Type>(300 + (round * 50) as i32, 1),
3837 )
3838 .col("category", array::fill_utf8(format!("category_{}", round)))
3839 .col(
3840 "vector",
3841 array::rand_vec::<Float32Type>(Dimension::from(dimensions)),
3842 )
3843 .into_reader_rows(RowCount::from(50), BatchCount::from(1));
3844
3845 round_cloned_dataset = Dataset::write(
3846 new_data,
3847 round_cloned_uri,
3848 Some(WriteParams {
3849 mode: WriteMode::Append,
3850 ..Default::default()
3851 }),
3852 )
3853 .await
3854 .unwrap();
3855
3856 let expected_rows = 300 + round * 50;
3858 let total_rows = round_cloned_dataset.count_rows(None).await.unwrap();
3859 assert_eq!(
3860 total_rows, expected_rows,
3861 "Round {}: Should have {} rows after append",
3862 round, expected_rows
3863 );
3864
3865 let indices_before_optimize = round_cloned_dataset.load_indices().await.unwrap();
3866 let vector_idx_before = indices_before_optimize
3867 .iter()
3868 .find(|idx| idx.name == "vector_idx")
3869 .unwrap();
3870 let category_idx_before = indices_before_optimize
3871 .iter()
3872 .find(|idx| idx.name == "category_idx")
3873 .unwrap();
3874
3875 round_cloned_dataset
3877 .optimize_indices(&OptimizeOptions::merge(indices_before_optimize.len()))
3878 .await
3879 .unwrap();
3880
3881 let optimized_indices = round_cloned_dataset.load_indices().await.unwrap();
3883 let new_vector_idx = optimized_indices
3884 .iter()
3885 .find(|idx| idx.name == "vector_idx")
3886 .unwrap();
3887 let new_category_idx = optimized_indices
3888 .iter()
3889 .find(|idx| idx.name == "category_idx")
3890 .unwrap();
3891
3892 assert_ne!(
3893 new_vector_idx.uuid, vector_idx_before.uuid,
3894 "Round {}: Vector index should have a new UUID after optimization",
3895 round
3896 );
3897 assert_ne!(
3898 new_category_idx.uuid, category_idx_before.uuid,
3899 "Round {}: Category index should have a new UUID after optimization",
3900 round
3901 );
3902
3903 use std::path::PathBuf;
3905 let clone_indices_dir = PathBuf::from(round_cloned_uri).join("_indices");
3906 let vector_index_dir = clone_indices_dir.join(new_vector_idx.uuid.to_string());
3907 let category_index_dir = clone_indices_dir.join(new_category_idx.uuid.to_string());
3908
3909 assert!(
3910 vector_index_dir.exists(),
3911 "Round {}: New vector index directory should exist in cloned dataset location: {:?}",
3912 round, vector_index_dir
3913 );
3914 assert!(
3915 category_index_dir.exists(),
3916 "Round {}: New category index directory should exist in cloned dataset location: {:?}",
3917 round, category_index_dir
3918 );
3919
3920 assert!(
3922 new_vector_idx.base_id.is_none(),
3923 "Round {}: New vector index should not have base_id after optimization in cloned dataset",
3924 round
3925 );
3926 assert!(
3927 new_category_idx.base_id.is_none(),
3928 "Round {}: New category index should not have base_id after optimization in cloned dataset",
3929 round
3930 );
3931
3932 let original_indices_dir = PathBuf::from(current_dataset.uri()).join("_indices");
3934 let wrong_vector_dir = original_indices_dir.join(new_vector_idx.uuid.to_string());
3935 let wrong_category_dir = original_indices_dir.join(new_category_idx.uuid.to_string());
3936
3937 assert!(
3938 !wrong_vector_dir.exists(),
3939 "Round {}: New vector index should NOT be in original dataset location: {:?}",
3940 round,
3941 wrong_vector_dir
3942 );
3943 assert!(
3944 !wrong_category_dir.exists(),
3945 "Round {}: New category index should NOT be in original dataset location: {:?}",
3946 round,
3947 wrong_category_dir
3948 );
3949
3950 let old_category_results = round_cloned_dataset
3952 .scan()
3953 .filter("category = 'category_0'")
3954 .unwrap()
3955 .try_into_batch()
3956 .await
3957 .unwrap();
3958
3959 let new_category_results = round_cloned_dataset
3960 .scan()
3961 .filter(&format!("category = 'category_{}'", round))
3962 .unwrap()
3963 .try_into_batch()
3964 .await
3965 .unwrap();
3966
3967 assert_eq!(
3968 source_scalar_query_rows,
3969 old_category_results.num_rows(),
3970 "Round {}: Should find old category data with {} rows",
3971 round,
3972 source_scalar_query_rows
3973 );
3974 assert!(
3975 new_category_results.num_rows() > 0,
3976 "Round {}: Should find new category data",
3977 round
3978 );
3979
3980 let query_vector = generate_random_array(dimensions as usize);
3982 let search_results = round_cloned_dataset
3983 .scan()
3984 .nearest("vector", &query_vector, 10)
3985 .unwrap()
3986 .limit(Some(10), None)
3987 .unwrap()
3988 .try_into_batch()
3989 .await
3990 .unwrap();
3991
3992 assert!(
3993 search_results.num_rows() > 0,
3994 "Round {}: Vector search should return results after optimization",
3995 round
3996 );
3997
3998 let vector_stats: serde_json::Value = serde_json::from_str(
4000 &round_cloned_dataset
4001 .index_statistics("vector_idx")
4002 .await
4003 .unwrap(),
4004 )
4005 .unwrap();
4006 let category_stats: serde_json::Value = serde_json::from_str(
4007 &round_cloned_dataset
4008 .index_statistics("category_idx")
4009 .await
4010 .unwrap(),
4011 )
4012 .unwrap();
4013
4014 assert_eq!(
4015 vector_stats["num_indexed_rows"].as_u64().unwrap(),
4016 expected_rows as u64,
4017 "Round {}: Vector index should have {} indexed rows",
4018 round,
4019 expected_rows
4020 );
4021 assert_eq!(
4022 category_stats["num_indexed_rows"].as_u64().unwrap(),
4023 expected_rows as u64,
4024 "Round {}: Category index should have {} indexed rows",
4025 round,
4026 expected_rows
4027 );
4028
4029 current_dataset = round_cloned_dataset;
4031 }
4032
4033 let final_cloned_dataset = current_dataset;
4035
4036 let cloned_indices = final_cloned_dataset.load_indices().await.unwrap();
4038 assert_eq!(
4039 cloned_indices.len(),
4040 2,
4041 "Final cloned dataset should have 2 indices"
4042 );
4043 let cloned_index_names: HashSet<String> =
4044 cloned_indices.iter().map(|idx| idx.name.clone()).collect();
4045 assert!(cloned_index_names.contains("vector_idx"));
4046 assert!(cloned_index_names.contains("category_idx"));
4047
4048 let query_vector = generate_random_array(dimensions as usize);
4050 let search_results = final_cloned_dataset
4051 .scan()
4052 .nearest("vector", &query_vector, 5)
4053 .unwrap()
4054 .limit(Some(5), None)
4055 .unwrap()
4056 .try_into_batch()
4057 .await
4058 .unwrap();
4059
4060 assert!(
4061 search_results.num_rows() > 0,
4062 "Vector search should return results on final dataset"
4063 );
4064
4065 let scalar_results = final_cloned_dataset
4067 .scan()
4068 .filter("category = 'category_0'")
4069 .unwrap()
4070 .try_into_batch()
4071 .await
4072 .unwrap();
4073
4074 assert_eq!(
4075 source_scalar_query_rows,
4076 scalar_results.num_rows(),
4077 "Scalar query should return results on final dataset"
4078 );
4079 }
4080
4081 #[tokio::test]
4082 async fn test_initialize_indices() {
4083 use crate::dataset::Dataset;
4084 use arrow_array::types::Float32Type;
4085 use lance_core::utils::tempfile::TempStrDir;
4086 use lance_datagen::{array, BatchCount, RowCount};
4087 use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
4088 use lance_linalg::distance::MetricType;
4089 use std::collections::HashSet;
4090
4091 let test_dir = TempStrDir::default();
4093 let source_uri = format!("{}/{}", test_dir, "source");
4094 let target_uri = format!("{}/{}", test_dir, "target");
4095
4096 let source_reader = lance_datagen::gen_batch()
4098 .col("vector", array::rand_vec::<Float32Type>(8.into()))
4099 .col(
4100 "text",
4101 array::cycle_utf8_literals(&["hello world", "foo bar", "test data"]),
4102 )
4103 .col("id", array::step::<Int32Type>())
4104 .into_reader_rows(RowCount::from(300), BatchCount::from(1));
4105
4106 let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
4108 .await
4109 .unwrap();
4110
4111 let vector_params = VectorIndexParams::ivf_pq(4, 8, 2, MetricType::L2, 10);
4114 source_dataset
4115 .create_index(
4116 &["vector"],
4117 IndexType::Vector,
4118 Some("vec_idx".to_string()),
4119 &vector_params,
4120 false,
4121 )
4122 .await
4123 .unwrap();
4124
4125 let fts_params = InvertedIndexParams::default();
4127 source_dataset
4128 .create_index(
4129 &["text"],
4130 IndexType::Inverted,
4131 Some("text_idx".to_string()),
4132 &fts_params,
4133 false,
4134 )
4135 .await
4136 .unwrap();
4137
4138 let scalar_params = ScalarIndexParams::default();
4140 source_dataset
4141 .create_index(
4142 &["id"],
4143 IndexType::BTree,
4144 Some("id_idx".to_string()),
4145 &scalar_params,
4146 false,
4147 )
4148 .await
4149 .unwrap();
4150
4151 let source_dataset = Dataset::open(&source_uri).await.unwrap();
4153
4154 let source_indices = source_dataset.load_indices().await.unwrap();
4156 assert_eq!(
4157 source_indices.len(),
4158 3,
4159 "Source dataset should have 3 indices"
4160 );
4161
4162 let target_reader = lance_datagen::gen_batch()
4164 .col("vector", array::rand_vec::<Float32Type>(8.into()))
4165 .col(
4166 "text",
4167 array::cycle_utf8_literals(&["foo bar", "test data", "hello world"]),
4168 )
4169 .col("id", array::step_custom::<Int32Type>(100, 1))
4170 .into_reader_rows(RowCount::from(300), BatchCount::from(1));
4171 let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
4172 .await
4173 .unwrap();
4174
4175 target_dataset
4177 .initialize_indices(&source_dataset)
4178 .await
4179 .unwrap();
4180
4181 let target_indices = target_dataset.load_indices().await.unwrap();
4183 assert_eq!(
4184 target_indices.len(),
4185 3,
4186 "Target dataset should have 3 indices after initialization"
4187 );
4188
4189 let source_names: HashSet<String> =
4191 source_indices.iter().map(|idx| idx.name.clone()).collect();
4192 let target_names: HashSet<String> =
4193 target_indices.iter().map(|idx| idx.name.clone()).collect();
4194 assert_eq!(
4195 source_names, target_names,
4196 "Index names should match between source and target"
4197 );
4198
4199 let query_vector = generate_random_array(8);
4202 let search_results = target_dataset
4203 .scan()
4204 .nearest("vector", &query_vector, 5)
4205 .unwrap()
4206 .limit(Some(5), None)
4207 .unwrap()
4208 .try_into_batch()
4209 .await
4210 .unwrap();
4211 assert!(
4212 search_results.num_rows() > 0,
4213 "Vector index should be functional"
4214 );
4215
4216 let scalar_results = target_dataset
4218 .scan()
4219 .filter("id = 125")
4220 .unwrap()
4221 .try_into_batch()
4222 .await
4223 .unwrap();
4224 assert_eq!(
4225 scalar_results.num_rows(),
4226 1,
4227 "Scalar index should find exact match"
4228 );
4229 }
4230
4231 #[tokio::test]
4232 async fn test_initialize_indices_with_missing_field() {
4233 use crate::dataset::Dataset;
4234 use arrow_array::types::Int32Type;
4235 use lance_core::utils::tempfile::TempStrDir;
4236 use lance_datagen::{array, BatchCount, RowCount};
4237 use lance_index::scalar::ScalarIndexParams;
4238
4239 let test_dir = TempStrDir::default();
4241 let source_uri = format!("{}/{}", test_dir, "source");
4242 let target_uri = format!("{}/{}", test_dir, "target");
4243
4244 let source_reader = lance_datagen::gen_batch()
4246 .col("id", array::step::<Int32Type>())
4247 .col("extra", array::cycle_utf8_literals(&["test"]))
4248 .into_reader_rows(RowCount::from(10), BatchCount::from(1));
4249 let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
4250 .await
4251 .unwrap();
4252
4253 source_dataset
4255 .create_index(
4256 &["extra"],
4257 IndexType::BTree,
4258 None,
4259 &ScalarIndexParams::default(),
4260 false,
4261 )
4262 .await
4263 .unwrap();
4264
4265 let target_reader = lance_datagen::gen_batch()
4267 .col("id", array::step_custom::<Int32Type>(10, 1))
4268 .into_reader_rows(RowCount::from(10), BatchCount::from(1));
4269 let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
4270 .await
4271 .unwrap();
4272
4273 let result = target_dataset.initialize_indices(&source_dataset).await;
4275
4276 assert!(result.is_err(), "Should error when field is missing");
4278 assert!(result
4279 .unwrap_err()
4280 .to_string()
4281 .contains("not found in target dataset"));
4282 }
4283
4284 #[tokio::test]
4285 async fn test_initialize_single_index() {
4286 use crate::dataset::Dataset;
4287 use crate::index::vector::VectorIndexParams;
4288 use arrow_array::types::{Float32Type, Int32Type};
4289 use lance_core::utils::tempfile::TempStrDir;
4290 use lance_datagen::{array, BatchCount, RowCount};
4291 use lance_index::scalar::ScalarIndexParams;
4292 use lance_linalg::distance::MetricType;
4293
4294 let test_dir = TempStrDir::default();
4295 let source_uri = format!("{}/{}", test_dir, "source");
4296 let target_uri = format!("{}/{}", test_dir, "target");
4297
4298 let source_reader = lance_datagen::gen_batch()
4300 .col("id", array::step::<Int32Type>())
4301 .col("name", array::rand_utf8(4.into(), false))
4302 .col("vector", array::rand_vec::<Float32Type>(8.into()))
4303 .into_reader_rows(RowCount::from(300), BatchCount::from(1));
4304 let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
4305 .await
4306 .unwrap();
4307
4308 let scalar_params = ScalarIndexParams::default();
4310 source_dataset
4311 .create_index(
4312 &["id"],
4313 IndexType::BTree,
4314 Some("id_index".to_string()),
4315 &scalar_params,
4316 false,
4317 )
4318 .await
4319 .unwrap();
4320
4321 let vector_params = VectorIndexParams::ivf_pq(16, 8, 4, MetricType::L2, 50);
4322 source_dataset
4323 .create_index(
4324 &["vector"],
4325 IndexType::Vector,
4326 Some("vector_index".to_string()),
4327 &vector_params,
4328 false,
4329 )
4330 .await
4331 .unwrap();
4332
4333 let source_dataset = Dataset::open(&source_uri).await.unwrap();
4335
4336 let target_reader = lance_datagen::gen_batch()
4338 .col("id", array::step::<Int32Type>())
4339 .col("name", array::rand_utf8(4.into(), false))
4340 .col("vector", array::rand_vec::<Float32Type>(8.into()))
4341 .into_reader_rows(RowCount::from(300), BatchCount::from(1));
4342 let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
4343 .await
4344 .unwrap();
4345
4346 target_dataset
4348 .initialize_index(&source_dataset, "vector_index")
4349 .await
4350 .unwrap();
4351
4352 let target_indices = target_dataset.load_indices().await.unwrap();
4354 assert_eq!(target_indices.len(), 1, "Should have only 1 index");
4355 assert_eq!(
4356 target_indices[0].name, "vector_index",
4357 "Should have the vector index"
4358 );
4359
4360 target_dataset
4362 .initialize_index(&source_dataset, "id_index")
4363 .await
4364 .unwrap();
4365
4366 let target_indices = target_dataset.load_indices().await.unwrap();
4368 assert_eq!(target_indices.len(), 2, "Should have 2 indices");
4369
4370 let index_names: HashSet<String> =
4371 target_indices.iter().map(|idx| idx.name.clone()).collect();
4372 assert!(
4373 index_names.contains("vector_index"),
4374 "Should have vector index"
4375 );
4376 assert!(index_names.contains("id_index"), "Should have id index");
4377
4378 let result = target_dataset
4380 .initialize_index(&source_dataset, "non_existent")
4381 .await;
4382 assert!(result.is_err(), "Should error for non-existent index");
4383 assert!(result
4384 .unwrap_err()
4385 .to_string()
4386 .contains("not found in source dataset"));
4387 }
4388
4389 #[tokio::test]
4390 async fn test_vector_index_on_nested_field_with_dots() {
4391 let dimensions = 16;
4392 let num_rows = 256;
4393
4394 let struct_field = Field::new(
4396 "embedding_data",
4397 DataType::Struct(
4398 vec![
4399 Field::new(
4400 "vector.v1", DataType::FixedSizeList(
4402 Arc::new(Field::new("item", DataType::Float32, true)),
4403 dimensions,
4404 ),
4405 false,
4406 ),
4407 Field::new(
4408 "vector.v2", DataType::FixedSizeList(
4410 Arc::new(Field::new("item", DataType::Float32, true)),
4411 dimensions,
4412 ),
4413 false,
4414 ),
4415 Field::new("metadata", DataType::Utf8, false),
4416 ]
4417 .into(),
4418 ),
4419 false,
4420 );
4421
4422 let schema = Arc::new(Schema::new(vec![
4423 Field::new("id", DataType::Int32, false),
4424 struct_field,
4425 ]));
4426
4427 let float_arr_v1 = generate_random_array(num_rows * dimensions as usize);
4429 let vectors_v1 = FixedSizeListArray::try_new_from_values(float_arr_v1, dimensions).unwrap();
4430
4431 let float_arr_v2 = generate_random_array(num_rows * dimensions as usize);
4432 let vectors_v2 = FixedSizeListArray::try_new_from_values(float_arr_v2, dimensions).unwrap();
4433
4434 let ids = Int32Array::from_iter_values(0..num_rows as i32);
4435 let metadata = StringArray::from_iter_values((0..num_rows).map(|i| format!("meta_{}", i)));
4436
4437 let struct_array = arrow_array::StructArray::from(vec![
4438 (
4439 Arc::new(Field::new(
4440 "vector.v1",
4441 DataType::FixedSizeList(
4442 Arc::new(Field::new("item", DataType::Float32, true)),
4443 dimensions,
4444 ),
4445 false,
4446 )),
4447 Arc::new(vectors_v1) as Arc<dyn arrow_array::Array>,
4448 ),
4449 (
4450 Arc::new(Field::new(
4451 "vector.v2",
4452 DataType::FixedSizeList(
4453 Arc::new(Field::new("item", DataType::Float32, true)),
4454 dimensions,
4455 ),
4456 false,
4457 )),
4458 Arc::new(vectors_v2) as Arc<dyn arrow_array::Array>,
4459 ),
4460 (
4461 Arc::new(Field::new("metadata", DataType::Utf8, false)),
4462 Arc::new(metadata) as Arc<dyn arrow_array::Array>,
4463 ),
4464 ]);
4465
4466 let batch =
4467 RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4468 .unwrap();
4469
4470 let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4471
4472 let test_dir = TempStrDir::default();
4473 let test_uri = &test_dir;
4474 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4475
4476 let nested_column_path_v1 = "embedding_data.`vector.v1`";
4478 let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
4479
4480 dataset
4481 .create_index(
4482 &[nested_column_path_v1],
4483 IndexType::Vector,
4484 Some("vec_v1_idx".to_string()),
4485 ¶ms,
4486 true,
4487 )
4488 .await
4489 .unwrap();
4490
4491 let indices = dataset.load_indices().await.unwrap();
4493 assert_eq!(indices.len(), 1);
4494 assert_eq!(indices[0].name, "vec_v1_idx");
4495
4496 let field_id = indices[0].fields[0];
4498 let field_path = dataset.schema().field_path(field_id).unwrap();
4499 assert_eq!(field_path, "embedding_data.`vector.v1`");
4500
4501 let nested_column_path_v2 = "embedding_data.`vector.v2`";
4503 dataset
4504 .create_index(
4505 &[nested_column_path_v2],
4506 IndexType::Vector,
4507 Some("vec_v2_idx".to_string()),
4508 ¶ms,
4509 true,
4510 )
4511 .await
4512 .unwrap();
4513
4514 let indices = dataset.load_indices().await.unwrap();
4516 assert_eq!(indices.len(), 2);
4517
4518 let query_vector = generate_random_array(dimensions as usize);
4520
4521 let plan_v1 = dataset
4523 .scan()
4524 .nearest(nested_column_path_v1, &query_vector, 5)
4525 .unwrap()
4526 .explain_plan(false)
4527 .await
4528 .unwrap();
4529
4530 assert!(
4532 plan_v1.contains("ANNSubIndex") || plan_v1.contains("ANNIvfPartition"),
4533 "Query plan should use vector index for nested field with dots. Plan: {}",
4534 plan_v1
4535 );
4536
4537 let search_results_v1 = dataset
4538 .scan()
4539 .nearest(nested_column_path_v1, &query_vector, 5)
4540 .unwrap()
4541 .try_into_batch()
4542 .await
4543 .unwrap();
4544
4545 assert_eq!(search_results_v1.num_rows(), 5);
4546
4547 let plan_v2 = dataset
4549 .scan()
4550 .nearest(nested_column_path_v2, &query_vector, 5)
4551 .unwrap()
4552 .explain_plan(false)
4553 .await
4554 .unwrap();
4555
4556 assert!(
4558 plan_v2.contains("ANNSubIndex") || plan_v2.contains("ANNIvfPartition"),
4559 "Query plan should use vector index for second nested field with dots. Plan: {}",
4560 plan_v2
4561 );
4562
4563 let search_results_v2 = dataset
4564 .scan()
4565 .nearest(nested_column_path_v2, &query_vector, 5)
4566 .unwrap()
4567 .try_into_batch()
4568 .await
4569 .unwrap();
4570
4571 assert_eq!(search_results_v2.num_rows(), 5);
4572 }
4573
4574 #[tokio::test]
4575 async fn test_vector_index_on_simple_nested_field() {
4576 let dimensions = 16;
4579 let num_rows = 256;
4580
4581 let struct_field = Field::new(
4583 "data",
4584 DataType::Struct(
4585 vec![
4586 Field::new(
4587 "embedding",
4588 DataType::FixedSizeList(
4589 Arc::new(Field::new("item", DataType::Float32, true)),
4590 dimensions,
4591 ),
4592 false,
4593 ),
4594 Field::new("label", DataType::Utf8, false),
4595 ]
4596 .into(),
4597 ),
4598 false,
4599 );
4600
4601 let schema = Arc::new(Schema::new(vec![
4602 Field::new("id", DataType::Int32, false),
4603 struct_field,
4604 ]));
4605
4606 let float_arr = generate_random_array(num_rows * dimensions as usize);
4608 let vectors = FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
4609
4610 let ids = Int32Array::from_iter_values(0..num_rows as i32);
4611 let labels = StringArray::from_iter_values((0..num_rows).map(|i| format!("label_{}", i)));
4612
4613 let struct_array = arrow_array::StructArray::from(vec![
4614 (
4615 Arc::new(Field::new(
4616 "embedding",
4617 DataType::FixedSizeList(
4618 Arc::new(Field::new("item", DataType::Float32, true)),
4619 dimensions,
4620 ),
4621 false,
4622 )),
4623 Arc::new(vectors) as Arc<dyn arrow_array::Array>,
4624 ),
4625 (
4626 Arc::new(Field::new("label", DataType::Utf8, false)),
4627 Arc::new(labels) as Arc<dyn arrow_array::Array>,
4628 ),
4629 ]);
4630
4631 let batch =
4632 RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4633 .unwrap();
4634
4635 let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4636
4637 let test_dir = TempStrDir::default();
4638 let test_uri = &test_dir;
4639 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4640
4641 let nested_column_path = "data.embedding";
4643 let params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10);
4644
4645 dataset
4646 .create_index(
4647 &[nested_column_path],
4648 IndexType::Vector,
4649 Some("vec_idx".to_string()),
4650 ¶ms,
4651 true,
4652 )
4653 .await
4654 .unwrap();
4655
4656 let indices = dataset.load_indices().await.unwrap();
4658 assert_eq!(indices.len(), 1);
4659 assert_eq!(indices[0].name, "vec_idx");
4660
4661 let field_id = indices[0].fields[0];
4663 let field_path = dataset.schema().field_path(field_id).unwrap();
4664 assert_eq!(field_path, "data.embedding");
4665
4666 let query_vector = generate_random_array(dimensions as usize);
4668
4669 let plan = dataset
4670 .scan()
4671 .nearest(nested_column_path, &query_vector, 5)
4672 .unwrap()
4673 .explain_plan(false)
4674 .await
4675 .unwrap();
4676
4677 assert!(
4679 plan.contains("ANNSubIndex") || plan.contains("ANNIvfPartition"),
4680 "Query plan should use vector index for nested field. Plan: {}",
4681 plan
4682 );
4683
4684 let search_results = dataset
4685 .scan()
4686 .nearest(nested_column_path, &query_vector, 5)
4687 .unwrap()
4688 .try_into_batch()
4689 .await
4690 .unwrap();
4691
4692 assert_eq!(search_results.num_rows(), 5);
4693 }
4694
4695 #[tokio::test]
4696 async fn test_btree_index_on_nested_field_with_dots() {
4697 let test_dir = TempStrDir::default();
4699 let test_uri = &test_dir;
4700
4701 let schema = Arc::new(Schema::new(vec![
4703 Field::new("id", DataType::Int32, false),
4704 Field::new(
4705 "data",
4706 DataType::Struct(
4707 vec![
4708 Field::new("value.v1", DataType::Int32, false),
4709 Field::new("value.v2", DataType::Float32, false),
4710 Field::new("text", DataType::Utf8, false),
4711 ]
4712 .into(),
4713 ),
4714 false,
4715 ),
4716 ]));
4717
4718 let num_rows = 1000;
4720 let ids = Int32Array::from_iter_values(0..num_rows);
4721 let values_v1 = Int32Array::from_iter_values((0..num_rows).map(|i| i % 100));
4722 let values_v2 = Float32Array::from_iter_values((0..num_rows).map(|i| (i as f32) * 0.1));
4723 let texts = StringArray::from_iter_values((0..num_rows).map(|i| format!("text_{}", i)));
4724
4725 let struct_array = arrow_array::StructArray::from(vec![
4726 (
4727 Arc::new(Field::new("value.v1", DataType::Int32, false)),
4728 Arc::new(values_v1) as Arc<dyn arrow_array::Array>,
4729 ),
4730 (
4731 Arc::new(Field::new("value.v2", DataType::Float32, false)),
4732 Arc::new(values_v2) as Arc<dyn arrow_array::Array>,
4733 ),
4734 (
4735 Arc::new(Field::new("text", DataType::Utf8, false)),
4736 Arc::new(texts) as Arc<dyn arrow_array::Array>,
4737 ),
4738 ]);
4739
4740 let batch =
4741 RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4742 .unwrap();
4743
4744 let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4745 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4746
4747 let nested_column_path = "data.`value.v1`";
4749 let params = ScalarIndexParams::default();
4750
4751 dataset
4752 .create_index(
4753 &[nested_column_path],
4754 IndexType::BTree,
4755 Some("btree_v1_idx".to_string()),
4756 ¶ms,
4757 true,
4758 )
4759 .await
4760 .unwrap();
4761
4762 dataset = Dataset::open(test_uri).await.unwrap();
4764
4765 let indices = dataset.load_indices().await.unwrap();
4767 assert_eq!(indices.len(), 1);
4768 assert_eq!(indices[0].name, "btree_v1_idx");
4769
4770 let field_id = indices[0].fields[0];
4772 let field_path = dataset.schema().field_path(field_id).unwrap();
4773 assert_eq!(field_path, "data.`value.v1`");
4774
4775 let plan = dataset
4777 .scan()
4778 .filter("data.`value.v1` = 42")
4779 .unwrap()
4780 .prefilter(true)
4781 .explain_plan(false)
4782 .await
4783 .unwrap();
4784
4785 assert!(
4788 plan.contains("ScalarIndexQuery"),
4789 "Query plan should show optimized read. Plan: {}",
4790 plan
4791 );
4792
4793 let results = dataset
4795 .scan()
4796 .filter("data.`value.v1` = 42")
4797 .unwrap()
4798 .prefilter(true)
4799 .try_into_batch()
4800 .await
4801 .unwrap();
4802
4803 assert!(results.num_rows() > 0);
4804 }
4805
4806 #[tokio::test]
4807 async fn test_bitmap_index_on_nested_field_with_dots() {
4808 let test_dir = TempStrDir::default();
4810 let test_uri = &test_dir;
4811
4812 let schema = Arc::new(Schema::new(vec![
4814 Field::new("id", DataType::Int32, false),
4815 Field::new(
4816 "metadata",
4817 DataType::Struct(
4818 vec![
4819 Field::new("status.code", DataType::Int32, false),
4820 Field::new("category.name", DataType::Utf8, false),
4821 ]
4822 .into(),
4823 ),
4824 false,
4825 ),
4826 ]));
4827
4828 let num_rows = 1000;
4830 let ids = Int32Array::from_iter_values(0..num_rows);
4831 let status_codes = Int32Array::from_iter_values((0..num_rows).map(|i| i % 10));
4833 let categories =
4835 StringArray::from_iter_values((0..num_rows).map(|i| format!("category_{}", i % 5)));
4836
4837 let struct_array = arrow_array::StructArray::from(vec![
4838 (
4839 Arc::new(Field::new("status.code", DataType::Int32, false)),
4840 Arc::new(status_codes) as Arc<dyn arrow_array::Array>,
4841 ),
4842 (
4843 Arc::new(Field::new("category.name", DataType::Utf8, false)),
4844 Arc::new(categories) as Arc<dyn arrow_array::Array>,
4845 ),
4846 ]);
4847
4848 let batch =
4849 RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4850 .unwrap();
4851
4852 let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4853 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4854
4855 let nested_column_path = "metadata.`status.code`";
4857 let params = ScalarIndexParams::default();
4858
4859 dataset
4860 .create_index(
4861 &[nested_column_path],
4862 IndexType::Bitmap,
4863 Some("bitmap_status_idx".to_string()),
4864 ¶ms,
4865 true,
4866 )
4867 .await
4868 .unwrap();
4869
4870 dataset = Dataset::open(test_uri).await.unwrap();
4872
4873 let indices = dataset.load_indices().await.unwrap();
4875 assert_eq!(indices.len(), 1);
4876 assert_eq!(indices[0].name, "bitmap_status_idx");
4877
4878 let field_id = indices[0].fields[0];
4880 let field_path = dataset.schema().field_path(field_id).unwrap();
4881 assert_eq!(field_path, "metadata.`status.code`");
4882
4883 let plan = dataset
4885 .scan()
4886 .filter("metadata.`status.code` = 5")
4887 .unwrap()
4888 .explain_plan(false)
4889 .await
4890 .unwrap();
4891
4892 assert!(
4895 plan.contains("ScalarIndexQuery"),
4896 "Query plan should show optimized read. Plan: {}",
4897 plan
4898 );
4899
4900 let results = dataset
4902 .scan()
4903 .filter("metadata.`status.code` = 5")
4904 .unwrap()
4905 .try_into_batch()
4906 .await
4907 .unwrap();
4908
4909 assert!(results.num_rows() > 0);
4911 assert_eq!(results.num_rows(), 100);
4912 }
4913
4914 #[tokio::test]
4915 async fn test_inverted_index_on_nested_field_with_dots() {
4916 use lance_index::scalar::inverted::tokenizer::InvertedIndexParams;
4917
4918 let test_dir = TempStrDir::default();
4920 let test_uri = &test_dir;
4921
4922 let schema = Arc::new(Schema::new(vec![
4924 Field::new("id", DataType::Int32, false),
4925 Field::new(
4926 "document",
4927 DataType::Struct(
4928 vec![
4929 Field::new("content.text", DataType::Utf8, false),
4930 Field::new("content.summary", DataType::Utf8, false),
4931 ]
4932 .into(),
4933 ),
4934 false,
4935 ),
4936 ]));
4937
4938 let num_rows = 100;
4940 let ids = Int32Array::from_iter_values(0..num_rows as i32);
4941 let content_texts = StringArray::from_iter_values((0..num_rows).map(|i| match i % 3 {
4942 0 => format!("The quick brown fox jumps over the lazy dog {}", i),
4943 1 => format!(
4944 "Machine learning and artificial intelligence document {}",
4945 i
4946 ),
4947 _ => format!("Data science and analytics content piece {}", i),
4948 }));
4949 let summaries = StringArray::from_iter_values(
4950 (0..num_rows).map(|i| format!("Summary of document {}", i)),
4951 );
4952
4953 let struct_array = arrow_array::StructArray::from(vec![
4954 (
4955 Arc::new(Field::new("content.text", DataType::Utf8, false)),
4956 Arc::new(content_texts) as Arc<dyn arrow_array::Array>,
4957 ),
4958 (
4959 Arc::new(Field::new("content.summary", DataType::Utf8, false)),
4960 Arc::new(summaries) as Arc<dyn arrow_array::Array>,
4961 ),
4962 ]);
4963
4964 let batch =
4965 RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4966 .unwrap();
4967
4968 let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4969 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4970
4971 let nested_column_path = "document.`content.text`";
4973 let params = InvertedIndexParams::default();
4974
4975 dataset
4976 .create_index(
4977 &[nested_column_path],
4978 IndexType::Inverted,
4979 Some("inverted_content_idx".to_string()),
4980 ¶ms,
4981 true,
4982 )
4983 .await
4984 .unwrap();
4985
4986 dataset = Dataset::open(test_uri).await.unwrap();
4988
4989 let indices = dataset.load_indices().await.unwrap();
4991 assert_eq!(indices.len(), 1);
4992 assert_eq!(indices[0].name, "inverted_content_idx");
4993
4994 let field_id = indices[0].fields[0];
4996 let field_path = dataset.schema().field_path(field_id).unwrap();
4997 assert_eq!(field_path, "document.`content.text`");
4998
4999 let query = FullTextSearchQuery::new("machine learning".to_string())
5002 .with_column(field_path.clone())
5003 .unwrap();
5004
5005 let plan = dataset
5007 .scan()
5008 .full_text_search(query.clone())
5009 .unwrap()
5010 .explain_plan(false)
5011 .await
5012 .unwrap();
5013
5014 assert!(
5016 plan.contains("MatchQuery") || plan.contains("PhraseQuery"),
5017 "Query plan should use inverted index for nested field with dots. Plan: {}",
5018 plan
5019 );
5020
5021 let results = dataset
5022 .scan()
5023 .full_text_search(query)
5024 .unwrap()
5025 .try_into_stream()
5026 .await
5027 .unwrap()
5028 .try_collect::<Vec<_>>()
5029 .await
5030 .unwrap();
5031
5032 assert!(
5034 !results.is_empty(),
5035 "Full-text search should return results"
5036 );
5037
5038 let mut found_count = 0;
5040 for batch in results {
5041 found_count += batch.num_rows();
5042 }
5043 assert!(
5045 found_count > 0,
5046 "Should find at least some documents with 'machine learning'"
5047 );
5048 assert!(found_count < num_rows, "Should not match all documents");
5049 }
5050
5051 #[tokio::test]
5052 async fn test_resolve_index_column() {
5053 use lance_datagen::{array, BatchCount, RowCount};
5054
5055 let test_dir = tempfile::tempdir().unwrap();
5057 let test_uri = test_dir.path().to_str().unwrap();
5058
5059 let reader = lance_datagen::gen_batch()
5060 .col("id", array::step::<arrow_array::types::Int32Type>())
5061 .col(
5062 "vector",
5063 array::rand_vec::<arrow_array::types::Float32Type>(32.into()),
5064 )
5065 .into_reader_rows(RowCount::from(100), BatchCount::from(1));
5066
5067 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
5068
5069 let params = crate::index::vector::VectorIndexParams::ivf_flat(
5071 4,
5072 lance_linalg::distance::MetricType::L2,
5073 );
5074 dataset
5075 .create_index(
5076 &["vector"],
5077 IndexType::Vector,
5078 Some("my_vector_index".to_string()),
5079 ¶ms,
5080 false,
5081 )
5082 .await
5083 .unwrap();
5084
5085 let dataset = Dataset::open(test_uri).await.unwrap();
5087 let indices = dataset.load_indices().await.unwrap();
5088 assert_eq!(indices.len(), 1);
5089 let index_meta = &indices[0];
5090
5091 let (field_path, field) =
5093 resolve_index_column(dataset.schema(), index_meta, "vector").unwrap();
5094 assert_eq!(field_path, "vector");
5095 assert_eq!(field.name, "vector");
5096
5097 let (field_path2, field2) =
5099 resolve_index_column(dataset.schema(), index_meta, "my_vector_index").unwrap();
5100 assert_eq!(field_path2, "vector");
5101 assert_eq!(field2.name, "vector");
5102
5103 let result = resolve_index_column(dataset.schema(), index_meta, "nonexistent");
5105 assert!(result.is_err());
5106 assert!(result
5107 .unwrap_err()
5108 .to_string()
5109 .contains("does not exist in the schema"));
5110 }
5111
5112 #[tokio::test]
5113 async fn test_resolve_index_column_error_cases() {
5114 use lance_datagen::{array, BatchCount, RowCount};
5115
5116 let test_dir = tempfile::tempdir().unwrap();
5118 let test_uri = test_dir.path().to_str().unwrap();
5119
5120 let reader = lance_datagen::gen_batch()
5121 .col("id", array::step::<arrow_array::types::Int32Type>())
5122 .col(
5123 "vector",
5124 array::rand_vec::<arrow_array::types::Float32Type>(32.into()),
5125 )
5126 .into_reader_rows(RowCount::from(100), BatchCount::from(1));
5127
5128 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
5129
5130 let params = crate::index::vector::VectorIndexParams::ivf_flat(
5132 4,
5133 lance_linalg::distance::MetricType::L2,
5134 );
5135 dataset
5136 .create_index(
5137 &["vector"],
5138 IndexType::Vector,
5139 Some("my_index".to_string()),
5140 ¶ms,
5141 false,
5142 )
5143 .await
5144 .unwrap();
5145
5146 let dataset = Dataset::open(test_uri).await.unwrap();
5148 let indices = dataset.load_indices().await.unwrap();
5149 let index_meta = &indices[0];
5150
5151 let result = resolve_index_column(dataset.schema(), index_meta, "nonexistent_column");
5153 assert!(result.is_err());
5154 let err_msg = result.unwrap_err().to_string();
5155 assert!(
5156 err_msg.contains("does not exist in the schema"),
5157 "Error message should mention column doesn't exist, got: {}",
5158 err_msg
5159 );
5160 }
5161
5162 #[tokio::test]
5163 async fn test_resolve_index_column_nested_field() {
5164 use arrow_array::{RecordBatch, StructArray};
5165 use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
5166
5167 let test_dir = tempfile::tempdir().unwrap();
5169 let test_uri = test_dir.path().to_str().unwrap();
5170
5171 let vector_field = ArrowField::new(
5173 "vector",
5174 DataType::FixedSizeList(
5175 Arc::new(ArrowField::new("item", DataType::Float32, true)),
5176 8,
5177 ),
5178 false,
5179 );
5180 let struct_field = ArrowField::new(
5181 "data",
5182 DataType::Struct(vec![vector_field.clone()].into()),
5183 false,
5184 );
5185 let schema = Arc::new(ArrowSchema::new(vec![
5186 ArrowField::new("id", DataType::Int32, false),
5187 struct_field,
5188 ]));
5189
5190 let id_array = arrow_array::Int32Array::from(vec![1, 2, 3, 4, 5]);
5192
5193 let mut vector_values = Vec::new();
5195 for _ in 0..5 {
5196 for _ in 0..8 {
5197 vector_values.push(rand::random::<f32>());
5198 }
5199 }
5200 let vector_array = arrow_array::FixedSizeListArray::try_new_from_values(
5201 arrow_array::Float32Array::from(vector_values),
5202 8,
5203 )
5204 .unwrap();
5205
5206 let struct_array = StructArray::from(vec![(
5207 Arc::new(vector_field),
5208 Arc::new(vector_array) as arrow_array::ArrayRef,
5209 )]);
5210
5211 let batch = RecordBatch::try_new(
5212 schema.clone(),
5213 vec![Arc::new(id_array), Arc::new(struct_array)],
5214 )
5215 .unwrap();
5216
5217 let reader = Box::new(arrow_array::RecordBatchIterator::new(
5218 vec![Ok(batch)],
5219 schema,
5220 ));
5221
5222 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
5223
5224 let params = crate::index::vector::VectorIndexParams::ivf_flat(
5226 2,
5227 lance_linalg::distance::MetricType::L2,
5228 );
5229 dataset
5230 .create_index(
5231 &["data.vector"],
5232 IndexType::Vector,
5233 Some("nested_vector_index".to_string()),
5234 ¶ms,
5235 false,
5236 )
5237 .await
5238 .unwrap();
5239
5240 let dataset = Dataset::open(test_uri).await.unwrap();
5242 let indices = dataset.load_indices().await.unwrap();
5243 assert_eq!(indices.len(), 1);
5244 let index_meta = &indices[0];
5245
5246 let (field_path, field) =
5248 resolve_index_column(dataset.schema(), index_meta, "data.vector").unwrap();
5249 assert_eq!(field_path, "data.vector");
5250 assert_eq!(field.name, "vector");
5251
5252 let (field_path2, field2) =
5254 resolve_index_column(dataset.schema(), index_meta, "nested_vector_index").unwrap();
5255 assert_eq!(field_path2, "data.vector");
5256 assert_eq!(field2.name, "vector");
5257
5258 assert!(
5260 field_path2.contains('.'),
5261 "Field path should contain '.' for nested field"
5262 );
5263 }
5264}