1use std::collections::{HashMap, HashSet};
8use std::sync::{Arc, OnceLock};
9
10use arrow_schema::{DataType, Schema};
11use async_trait::async_trait;
12use datafusion::execution::SendableRecordBatchStream;
13use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
14use futures::{stream, StreamExt, TryStreamExt};
15use itertools::Itertools;
16use lance_core::cache::{CacheKey, UnsizedCacheKey};
17use lance_core::utils::address::RowAddress;
18use lance_core::utils::parse::str_is_truthy;
19use lance_core::utils::tracing::{
20 IO_TYPE_OPEN_FRAG_REUSE, IO_TYPE_OPEN_MEM_WAL, IO_TYPE_OPEN_SCALAR, IO_TYPE_OPEN_VECTOR,
21 TRACE_IO_EVENTS,
22};
23use lance_file::reader::FileReader;
24use lance_file::v2;
25use lance_file::v2::reader::FileReaderOptions;
26use lance_index::frag_reuse::{FragReuseIndex, FRAG_REUSE_INDEX_NAME};
27use lance_index::mem_wal::{MemWalIndex, MEM_WAL_INDEX_NAME};
28use lance_index::optimize::OptimizeOptions;
29use lance_index::pb::index::Implementation;
30use lance_index::scalar::expression::{
31 IndexInformationProvider, MultiQueryParser, ScalarQueryParser,
32};
33use lance_index::scalar::inverted::InvertedIndexPlugin;
34use lance_index::scalar::lance_format::LanceIndexStore;
35use lance_index::scalar::registry::{TrainingCriteria, TrainingOrdering};
36use lance_index::scalar::{CreatedIndex, ScalarIndex};
37use lance_index::vector::bq::builder::RabitQuantizer;
38use lance_index::vector::flat::index::{FlatBinQuantizer, FlatIndex, FlatQuantizer};
39use lance_index::vector::hnsw::HNSW;
40use lance_index::vector::pq::ProductQuantizer;
41use lance_index::vector::sq::ScalarQuantizer;
42pub use lance_index::IndexParams;
43use lance_index::{
44 is_system_index,
45 metrics::{MetricsCollector, NoOpMetricsCollector},
46 ScalarIndexCriteria,
47};
48use lance_index::{pb, vector::VectorIndex, Index, IndexType, INDEX_FILE_NAME};
49use lance_index::{DatasetIndexExt, INDEX_METADATA_SCHEMA_KEY, VECTOR_INDEX_VERSION};
50use lance_io::scheduler::{ScanScheduler, SchedulerConfig};
51use lance_io::traits::Reader;
52use lance_io::utils::{
53 read_last_block, read_message, read_message_from_buf, read_metadata_offset, read_version,
54 CachedFileSize,
55};
56use lance_table::format::IndexMetadata;
57use lance_table::format::{Fragment, SelfDescribingFileReader};
58use lance_table::io::manifest::read_manifest_indexes;
59use roaring::RoaringBitmap;
60use scalar::index_matches_criteria;
61use serde_json::json;
62use snafu::location;
63use tracing::{info, instrument};
64use uuid::Uuid;
65use vector::ivf::v2::IVFIndex;
66use vector::utils::get_vector_type;
67
68pub(crate) mod append;
69mod create;
70pub mod frag_reuse;
71pub mod mem_wal;
72pub mod prefilter;
73pub mod scalar;
74pub mod vector;
75
76use self::append::merge_indices;
77use self::vector::remap_vector_index;
78use crate::dataset::index::LanceIndexStoreExt;
79use crate::dataset::optimize::remapping::RemapResult;
80use crate::dataset::optimize::RemappedIndex;
81use crate::dataset::transaction::{Operation, Transaction};
82use crate::index::frag_reuse::{load_frag_reuse_index_details, open_frag_reuse_index};
83use crate::index::mem_wal::open_mem_wal_index;
84pub use crate::index::prefilter::{FilterLoader, PreFilter};
85use crate::index::scalar::{fetch_index_details, load_training_data, IndexDetails};
86use crate::session::index_caches::{FragReuseIndexKey, IndexMetadataKey};
87use crate::{dataset::Dataset, Error, Result};
88pub use create::CreateIndexBuilder;
89
90#[derive(Debug, Clone)]
92pub struct ScalarIndexCacheKey<'a> {
93 pub uuid: &'a str,
94 pub fri_uuid: Option<&'a Uuid>,
95}
96
97impl<'a> ScalarIndexCacheKey<'a> {
98 pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
99 Self { uuid, fri_uuid }
100 }
101}
102
103impl UnsizedCacheKey for ScalarIndexCacheKey<'_> {
104 type ValueType = dyn ScalarIndex;
105
106 fn key(&self) -> std::borrow::Cow<'_, str> {
107 if let Some(fri_uuid) = self.fri_uuid {
108 format!("{}-{}", self.uuid, fri_uuid).into()
109 } else {
110 self.uuid.into()
111 }
112 }
113}
114
115#[derive(Debug, Clone)]
116pub struct VectorIndexCacheKey<'a> {
117 pub uuid: &'a str,
118 pub fri_uuid: Option<&'a Uuid>,
119}
120
121impl<'a> VectorIndexCacheKey<'a> {
122 pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
123 Self { uuid, fri_uuid }
124 }
125}
126
127impl UnsizedCacheKey for VectorIndexCacheKey<'_> {
128 type ValueType = dyn VectorIndex;
129
130 fn key(&self) -> std::borrow::Cow<'_, str> {
131 if let Some(fri_uuid) = self.fri_uuid {
132 format!("{}-{}", self.uuid, fri_uuid).into()
133 } else {
134 self.uuid.into()
135 }
136 }
137}
138
139#[derive(Debug, Clone)]
140pub struct FragReuseIndexCacheKey<'a> {
141 pub uuid: &'a str,
142 pub fri_uuid: Option<&'a Uuid>,
143}
144
145impl<'a> FragReuseIndexCacheKey<'a> {
146 pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
147 Self { uuid, fri_uuid }
148 }
149}
150
151impl CacheKey for FragReuseIndexCacheKey<'_> {
152 type ValueType = FragReuseIndex;
153
154 fn key(&self) -> std::borrow::Cow<'_, str> {
155 if let Some(fri_uuid) = self.fri_uuid {
156 format!("{}-{}", self.uuid, fri_uuid).into()
157 } else {
158 self.uuid.into()
159 }
160 }
161}
162
163#[derive(Debug, Clone)]
164pub struct MemWalCacheKey<'a> {
165 pub uuid: &'a Uuid,
166 pub fri_uuid: Option<&'a Uuid>,
167}
168
169impl<'a> MemWalCacheKey<'a> {
170 pub fn new(uuid: &'a Uuid, fri_uuid: Option<&'a Uuid>) -> Self {
171 Self { uuid, fri_uuid }
172 }
173}
174
175impl CacheKey for MemWalCacheKey<'_> {
176 type ValueType = MemWalIndex;
177
178 fn key(&self) -> std::borrow::Cow<'_, str> {
179 if let Some(fri_uuid) = self.fri_uuid {
180 format!("{}-{}", self.uuid, fri_uuid).into()
181 } else {
182 self.uuid.to_string().into()
183 }
184 }
185}
186
187fn auto_migrate_corruption() -> bool {
189 static LANCE_AUTO_MIGRATION: OnceLock<bool> = OnceLock::new();
190 *LANCE_AUTO_MIGRATION.get_or_init(|| {
191 std::env::var("LANCE_AUTO_MIGRATION")
192 .ok()
193 .map(|s| str_is_truthy(&s))
194 .unwrap_or(true)
195 })
196}
197
198#[async_trait]
200pub trait IndexBuilder {
201 fn index_type() -> IndexType;
202
203 async fn build(&self) -> Result<()>;
204}
205
206pub(crate) async fn remap_index(
207 dataset: &Dataset,
208 index_id: &Uuid,
209 row_id_map: &HashMap<u64, Option<u64>>,
210) -> Result<RemapResult> {
211 let indices = dataset.load_indices().await?;
213 let matched = indices
214 .iter()
215 .find(|i| i.uuid == *index_id)
216 .ok_or_else(|| Error::Index {
217 message: format!("Index with id {} does not exist", index_id),
218 location: location!(),
219 })?;
220
221 if matched.fields.len() > 1 {
222 return Err(Error::Index {
223 message: "Remapping indices with multiple fields is not supported".to_string(),
224 location: location!(),
225 });
226 }
227
228 if row_id_map.values().all(|v| v.is_none()) {
229 let deleted_bitmap = RoaringBitmap::from_iter(
230 row_id_map
231 .keys()
232 .map(|row_id| RowAddress::new_from_u64(*row_id))
233 .map(|addr| addr.fragment_id()),
234 );
235 if Some(deleted_bitmap) == matched.fragment_bitmap {
236 return Ok(RemapResult::Keep(*index_id));
241 }
242 }
243
244 let field_id = matched
245 .fields
246 .first()
247 .expect("An index existed with no fields");
248 let field_path = dataset.schema().field_path(*field_id)?;
249
250 let new_id = Uuid::new_v4();
251
252 let generic = dataset
253 .open_generic_index(&field_path, &index_id.to_string(), &NoOpMetricsCollector)
254 .await?;
255
256 let created_index = match generic.index_type() {
257 it if it.is_scalar() => {
258 let new_store = LanceIndexStore::from_dataset_for_new(dataset, &new_id.to_string())?;
259
260 let scalar_index = dataset
261 .open_scalar_index(&field_path, &index_id.to_string(), &NoOpMetricsCollector)
262 .await?;
263 if !scalar_index.can_remap() {
264 return Ok(RemapResult::Drop);
265 }
266
267 match scalar_index.index_type() {
268 IndexType::Inverted => {
269 let inverted_index = scalar_index
270 .as_any()
271 .downcast_ref::<lance_index::scalar::inverted::InvertedIndex>()
272 .ok_or(Error::Index {
273 message: "expected inverted index".to_string(),
274 location: location!(),
275 })?;
276 if inverted_index.is_legacy() {
277 log::warn!("reindex because of legacy format, index_type: {}, index_id: {}, field: {}",
278 scalar_index.index_type(),
279 index_id,
280 field_path
281 );
282 let training_data = load_training_data(
283 dataset,
284 &field_path,
285 &TrainingCriteria::new(TrainingOrdering::None),
286 None,
287 true, None,
289 )
290 .await?;
291 InvertedIndexPlugin::train_inverted_index(
292 training_data,
293 &new_store,
294 inverted_index.params().clone(),
295 None,
296 )
297 .await?
298 } else {
299 scalar_index.remap(row_id_map, &new_store).await?
300 }
301 }
302 _ => scalar_index.remap(row_id_map, &new_store).await?,
303 }
304 }
305 it if it.is_vector() => {
306 remap_vector_index(
307 Arc::new(dataset.clone()),
308 &field_path,
309 index_id,
310 &new_id,
311 matched,
312 row_id_map,
313 )
314 .await?;
315 CreatedIndex {
316 index_details: prost_types::Any::from_msg(
317 &lance_table::format::pb::VectorIndexDetails::default(),
318 )
319 .unwrap(),
320 index_version: VECTOR_INDEX_VERSION,
321 }
322 }
323 _ => {
324 return Err(Error::Index {
325 message: format!("Index type {} is not supported", generic.index_type()),
326 location: location!(),
327 });
328 }
329 };
330
331 Ok(RemapResult::Remapped(RemappedIndex {
332 old_id: *index_id,
333 new_id,
334 index_details: created_index.index_details,
335 index_version: created_index.index_version,
336 }))
337}
338
339#[derive(Debug)]
340pub struct ScalarIndexInfo {
341 indexed_columns: HashMap<String, (DataType, Box<MultiQueryParser>)>,
342}
343
344impl IndexInformationProvider for ScalarIndexInfo {
345 fn get_index(&self, col: &str) -> Option<(&DataType, &dyn ScalarQueryParser)> {
346 self.indexed_columns
347 .get(col)
348 .map(|(ty, parser)| (ty, parser.as_ref() as &dyn ScalarQueryParser))
349 }
350}
351
352async fn open_index_proto(reader: &dyn Reader) -> Result<pb::Index> {
353 let file_size = reader.size().await?;
354 let tail_bytes = read_last_block(reader).await?;
355 let metadata_pos = read_metadata_offset(&tail_bytes)?;
356 let proto: pb::Index = if metadata_pos < file_size - tail_bytes.len() {
357 read_message(reader, metadata_pos).await?
359 } else {
360 let offset = tail_bytes.len() - (file_size - metadata_pos);
361 read_message_from_buf(&tail_bytes.slice(offset..))?
362 };
363 Ok(proto)
364}
365
366fn vector_index_details() -> prost_types::Any {
367 let details = lance_table::format::pb::VectorIndexDetails::default();
368 prost_types::Any::from_msg(&details).unwrap()
369}
370
371#[async_trait]
372impl DatasetIndexExt for Dataset {
373 type IndexBuilder<'a> = CreateIndexBuilder<'a>;
374
375 fn create_index_builder<'a>(
411 &'a mut self,
412 columns: &[&str],
413 index_type: IndexType,
414 params: &'a dyn IndexParams,
415 ) -> CreateIndexBuilder<'a> {
416 CreateIndexBuilder::new(self, columns, index_type, params)
417 }
418
419 #[instrument(skip_all)]
420 async fn create_index(
421 &mut self,
422 columns: &[&str],
423 index_type: IndexType,
424 name: Option<String>,
425 params: &dyn IndexParams,
426 replace: bool,
427 ) -> Result<()> {
428 let mut builder = self.create_index_builder(columns, index_type, params);
430
431 if let Some(name) = name {
432 builder = builder.name(name);
433 }
434
435 builder.replace(replace).await
436 }
437
438 async fn drop_index(&mut self, name: &str) -> Result<()> {
439 let indices = self.load_indices_by_name(name).await?;
440 if indices.is_empty() {
441 return Err(Error::IndexNotFound {
442 identity: format!("name={}", name),
443 location: location!(),
444 });
445 }
446
447 let transaction = Transaction::new(
448 self.manifest.version,
449 Operation::CreateIndex {
450 new_indices: vec![],
451 removed_indices: indices.clone(),
452 },
453 None,
454 None,
455 );
456
457 self.apply_commit(transaction, &Default::default(), &Default::default())
458 .await?;
459
460 Ok(())
461 }
462
463 async fn prewarm_index(&self, name: &str) -> Result<()> {
464 let indices = self.load_indices_by_name(name).await?;
465 if indices.is_empty() {
466 return Err(Error::IndexNotFound {
467 identity: format!("name={}", name),
468 location: location!(),
469 });
470 }
471
472 let index = self
473 .open_generic_index(name, &indices[0].uuid.to_string(), &NoOpMetricsCollector)
474 .await?;
475 index.prewarm().await?;
476
477 Ok(())
478 }
479
480 async fn load_indices(&self) -> Result<Arc<Vec<IndexMetadata>>> {
481 let metadata_key = IndexMetadataKey {
482 version: self.version().version,
483 };
484 let indices = match self.index_cache.get_with_key(&metadata_key).await {
485 Some(indices) => indices,
486 None => {
487 let mut loaded_indices = read_manifest_indexes(
488 &self.object_store,
489 &self.manifest_location,
490 &self.manifest,
491 )
492 .await?;
493 retain_supported_indices(&mut loaded_indices);
494 let loaded_indices = Arc::new(loaded_indices);
495 self.index_cache
496 .insert_with_key(&metadata_key, loaded_indices.clone())
497 .await;
498 loaded_indices
499 }
500 };
501
502 if let Some(frag_reuse_index_meta) =
503 indices.iter().find(|idx| idx.name == FRAG_REUSE_INDEX_NAME)
504 {
505 let uuid = frag_reuse_index_meta.uuid.to_string();
506 let fri_key = FragReuseIndexKey { uuid: &uuid };
507 let frag_reuse_index = self
508 .index_cache
509 .get_or_insert_with_key(fri_key, || async move {
510 let index_details =
511 load_frag_reuse_index_details(self, frag_reuse_index_meta).await?;
512 open_frag_reuse_index(frag_reuse_index_meta.uuid, index_details.as_ref()).await
513 })
514 .await?;
515 let mut indices = indices.as_ref().clone();
516 indices.iter_mut().for_each(|idx| {
517 if let Some(bitmap) = idx.fragment_bitmap.as_mut() {
518 frag_reuse_index.remap_fragment_bitmap(bitmap).unwrap();
519 }
520 });
521 Ok(Arc::new(indices))
522 } else {
523 Ok(indices)
524 }
525 }
526
527 async fn commit_existing_index(
528 &mut self,
529 index_name: &str,
530 column: &str,
531 index_id: Uuid,
532 ) -> Result<()> {
533 let Some(field) = self.schema().field(column) else {
534 return Err(Error::Index {
535 message: format!("CreateIndex: column '{column}' does not exist"),
536 location: location!(),
537 });
538 };
539
540 let new_idx = IndexMetadata {
544 uuid: index_id,
545 name: index_name.to_string(),
546 fields: vec![field.id],
547 dataset_version: self.manifest.version,
548 fragment_bitmap: Some(self.get_fragments().iter().map(|f| f.id() as u32).collect()),
549 index_details: None,
550 index_version: 0,
551 created_at: Some(chrono::Utc::now()),
552 base_id: None, };
554
555 let transaction = Transaction::new(
556 self.manifest.version,
557 Operation::CreateIndex {
558 new_indices: vec![new_idx],
559 removed_indices: vec![],
560 },
561 None,
562 None,
563 );
564
565 self.apply_commit(transaction, &Default::default(), &Default::default())
566 .await?;
567
568 Ok(())
569 }
570
571 async fn load_scalar_index<'a, 'b>(
572 &'a self,
573 criteria: ScalarIndexCriteria<'b>,
574 ) -> Result<Option<IndexMetadata>> {
575 let indices = self.load_indices().await?;
576
577 let mut indices = indices
578 .iter()
579 .filter(|idx| {
580 if idx.fields.is_empty() {
583 if idx.name != FRAG_REUSE_INDEX_NAME {
584 log::error!("Index {} has no fields", idx.name);
585 }
586 false
587 } else {
588 true
589 }
590 })
591 .collect::<Vec<_>>();
592 indices.sort_by_key(|idx| idx.fields[0]);
598 let indice_by_field = indices.into_iter().chunk_by(|idx| idx.fields[0]);
599 for (field_id, indices) in &indice_by_field {
600 let indices = indices.collect::<Vec<_>>();
601 let has_multiple = indices.len() > 1;
602 for idx in indices {
603 let field = self.schema().field_by_id(field_id);
604 if let Some(field) = field {
605 if index_matches_criteria(idx, &criteria, field, has_multiple, self.schema())? {
606 let non_empty = idx.fragment_bitmap.as_ref().is_some_and(|bitmap| {
607 bitmap.intersection_len(self.fragment_bitmap.as_ref()) > 0
608 });
609 let is_fts_index = if let Some(details) = &idx.index_details {
610 IndexDetails(details.clone()).supports_fts()
611 } else {
612 false
613 };
614 if non_empty || is_fts_index {
619 return Ok(Some(idx.clone()));
620 }
621 }
622 }
623 }
624 }
625 return Ok(None);
626 }
627
628 #[instrument(skip_all)]
629 async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()> {
630 let dataset = Arc::new(self.clone());
631 let indices = self.load_indices().await?;
632
633 let indices_to_optimize = options
634 .index_names
635 .as_ref()
636 .map(|names| names.iter().collect::<HashSet<_>>());
637 let name_to_indices = indices
638 .iter()
639 .filter(|idx| {
640 indices_to_optimize
641 .as_ref()
642 .is_none_or(|names| names.contains(&idx.name))
643 && !is_system_index(idx)
644 })
645 .map(|idx| (idx.name.clone(), idx))
646 .into_group_map();
647
648 let mut new_indices = vec![];
649 let mut removed_indices = vec![];
650 for deltas in name_to_indices.values() {
651 let Some(res) = merge_indices(dataset.clone(), deltas.as_slice(), options).await?
652 else {
653 continue;
654 };
655
656 let last_idx = deltas.last().expect("Delta indices should not be empty");
657 let new_idx = IndexMetadata {
658 uuid: res.new_uuid,
659 name: last_idx.name.clone(), fields: last_idx.fields.clone(),
661 dataset_version: self.manifest.version,
662 fragment_bitmap: Some(res.new_fragment_bitmap),
663 index_details: Some(Arc::new(res.new_index_details)),
664 index_version: res.new_index_version,
665 created_at: Some(chrono::Utc::now()),
666 base_id: None, };
668 removed_indices.extend(res.removed_indices.iter().map(|&idx| idx.clone()));
669 if deltas.len() > removed_indices.len() {
670 new_indices.extend(
671 deltas[0..(deltas.len() - res.removed_indices.len())]
672 .iter()
673 .map(|&idx| idx.clone()),
674 );
675 }
676 new_indices.push(new_idx);
677 }
678
679 if new_indices.is_empty() {
680 return Ok(());
681 }
682
683 let transaction = Transaction::new(
684 self.manifest.version,
685 Operation::CreateIndex {
686 new_indices,
687 removed_indices,
688 },
689 None,
690 None,
691 );
692
693 self.apply_commit(transaction, &Default::default(), &Default::default())
694 .await?;
695
696 Ok(())
697 }
698
699 async fn index_statistics(&self, index_name: &str) -> Result<String> {
700 let metadatas = self.load_indices_by_name(index_name).await?;
701 if metadatas.is_empty() {
702 return Err(Error::IndexNotFound {
703 identity: format!("name={}", index_name),
704 location: location!(),
705 });
706 }
707
708 if index_name == FRAG_REUSE_INDEX_NAME {
709 let index = self
710 .open_frag_reuse_index(&NoOpMetricsCollector)
711 .await?
712 .expect("FragmentReuse index does not exist");
713 return serde_json::to_string(&index.statistics()?).map_err(|e| Error::Index {
714 message: format!("Failed to serialize index statistics: {}", e),
715 location: location!(),
716 });
717 }
718
719 if index_name == MEM_WAL_INDEX_NAME {
720 let index = self
721 .open_mem_wal_index(&NoOpMetricsCollector)
722 .await?
723 .expect("MemWal index does not exist");
724 return serde_json::to_string(&index.statistics()?).map_err(|e| Error::Index {
725 message: format!("Failed to serialize index statistics: {}", e),
726 location: location!(),
727 });
728 }
729
730 let field_id = metadatas[0].fields[0];
731 let field_path = self.schema().field_path(field_id)?;
732
733 let indices = stream::iter(metadatas.iter())
735 .then(|m| {
736 let field_path = field_path.clone();
737 async move {
738 self.open_generic_index(&field_path, &m.uuid.to_string(), &NoOpMetricsCollector)
739 .await
740 }
741 })
742 .try_collect::<Vec<_>>()
743 .await?;
744
745 let indices_stats = indices
747 .iter()
748 .map(|idx| idx.statistics())
749 .collect::<Result<Vec<_>>>()?;
750
751 let index_type = indices[0].index_type().to_string();
752
753 let indexed_fragments_per_delta = self.indexed_fragments(index_name).await?;
754
755 let res = indexed_fragments_per_delta
756 .iter()
757 .map(|frags| {
758 let mut sum = 0;
759 for frag in frags.iter() {
760 sum += frag.num_rows().ok_or_else(|| Error::Internal {
761 message: "Fragment should have row counts, please upgrade lance and \
762 trigger a single write to fix this"
763 .to_string(),
764 location: location!(),
765 })?;
766 }
767 Ok(sum)
768 })
769 .collect::<Result<Vec<_>>>();
770
771 async fn migrate_and_recompute(ds: &Dataset, index_name: &str) -> Result<String> {
772 let mut ds = ds.clone();
773 log::warn!(
774 "Detecting out-dated fragment metadata, migrating dataset. \
775 To disable migration, set LANCE_AUTO_MIGRATION=false"
776 );
777 ds.delete("false").await.map_err(|err| {
778 Error::Execution {
779 message: format!("Failed to migrate dataset while calculating index statistics. \
780 To disable migration, set LANCE_AUTO_MIGRATION=false. Original error: {}", err),
781 location: location!(),
782 }
783 })?;
784 ds.index_statistics(index_name).await
785 }
786
787 let num_indexed_rows_per_delta = match res {
788 Ok(rows) => rows,
789 Err(Error::Internal { message, .. })
790 if auto_migrate_corruption() && message.contains("trigger a single write") =>
791 {
792 return migrate_and_recompute(self, index_name).await;
793 }
794 Err(e) => return Err(e),
795 };
796
797 let mut fragment_ids = HashSet::new();
798 for frags in indexed_fragments_per_delta.iter() {
799 for frag in frags.iter() {
800 if !fragment_ids.insert(frag.id) {
801 if auto_migrate_corruption() {
802 return migrate_and_recompute(self, index_name).await;
803 } else {
804 return Err(Error::Internal {
805 message:
806 "Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \
807 and trigger a single write to fix this"
808 .to_string(),
809 location: location!(),
810 });
811 }
812 }
813 }
814 }
815 let num_indexed_fragments = fragment_ids.len();
816
817 let num_unindexed_fragments = self.fragments().len() - num_indexed_fragments;
818 let num_indexed_rows: usize = num_indexed_rows_per_delta.iter().cloned().sum();
819 let num_unindexed_rows = self.count_rows(None).await? - num_indexed_rows;
820
821 let updated_at = metadatas
823 .iter()
824 .filter_map(|m| m.created_at)
825 .max()
826 .map(|dt| dt.timestamp_millis() as u64);
827
828 let stats = json!({
829 "index_type": index_type,
830 "name": index_name,
831 "num_indices": metadatas.len(),
832 "indices": indices_stats,
833 "num_indexed_fragments": num_indexed_fragments,
834 "num_indexed_rows": num_indexed_rows,
835 "num_unindexed_fragments": num_unindexed_fragments,
836 "num_unindexed_rows": num_unindexed_rows,
837 "num_indexed_rows_per_delta": num_indexed_rows_per_delta,
838 "updated_at_timestamp_ms": updated_at,
839 });
840
841 serde_json::to_string(&stats).map_err(|e| Error::Index {
842 message: format!("Failed to serialize index statistics: {}", e),
843 location: location!(),
844 })
845 }
846
847 async fn read_index_partition(
848 &self,
849 index_name: &str,
850 partition_id: usize,
851 with_vector: bool,
852 ) -> Result<SendableRecordBatchStream> {
853 let indices = self.load_indices_by_name(index_name).await?;
854 if indices.is_empty() {
855 return Err(Error::IndexNotFound {
856 identity: format!("name={}", index_name),
857 location: location!(),
858 });
859 }
860 let column = self.schema().field_by_id(indices[0].fields[0]).unwrap();
861
862 let mut schema: Option<Arc<Schema>> = None;
863 let mut partition_streams = Vec::with_capacity(indices.len());
864 for index in indices {
865 let index = self
866 .open_vector_index(&column.name, &index.uuid.to_string(), &NoOpMetricsCollector)
867 .await?;
868
869 let stream = index
870 .partition_reader(partition_id, with_vector, &NoOpMetricsCollector)
871 .await?;
872 if schema.is_none() {
873 schema = Some(stream.schema());
874 }
875 partition_streams.push(stream);
876 }
877
878 match schema {
879 Some(schema) => {
880 let merged = stream::select_all(partition_streams);
881 let stream = RecordBatchStreamAdapter::new(schema, merged);
882 Ok(Box::pin(stream))
883 }
884 None => Ok(Box::pin(RecordBatchStreamAdapter::new(
885 Arc::new(Schema::empty()),
886 stream::empty(),
887 ))),
888 }
889 }
890}
891
892fn retain_supported_indices(indices: &mut Vec<IndexMetadata>) {
893 indices.retain(|idx| {
894 let max_supported_version = idx
895 .index_details
896 .as_ref()
897 .map(|details| {
898 IndexDetails(details.clone())
899 .index_version()
900 .unwrap_or(i32::MAX as u32)
902 })
903 .unwrap_or_default();
904 let is_valid = idx.index_version <= max_supported_version as i32;
905 if !is_valid {
906 log::warn!(
907 "Index {} has version {}, which is not supported (<={}), ignoring it",
908 idx.name,
909 idx.index_version,
910 max_supported_version,
911 );
912 }
913 is_valid
914 })
915}
916
917#[async_trait]
921pub trait DatasetIndexInternalExt: DatasetIndexExt {
922 async fn open_generic_index(
924 &self,
925 column: &str,
926 uuid: &str,
927 metrics: &dyn MetricsCollector,
928 ) -> Result<Arc<dyn Index>>;
929 async fn open_scalar_index(
931 &self,
932 column: &str,
933 uuid: &str,
934 metrics: &dyn MetricsCollector,
935 ) -> Result<Arc<dyn ScalarIndex>>;
936 async fn open_vector_index(
938 &self,
939 column: &str,
940 uuid: &str,
941 metrics: &dyn MetricsCollector,
942 ) -> Result<Arc<dyn VectorIndex>>;
943
944 async fn open_frag_reuse_index(
946 &self,
947 metrics: &dyn MetricsCollector,
948 ) -> Result<Option<Arc<FragReuseIndex>>>;
949
950 async fn open_mem_wal_index(
952 &self,
953 metrics: &dyn MetricsCollector,
954 ) -> Result<Option<Arc<MemWalIndex>>>;
955
956 async fn frag_reuse_index_uuid(&self) -> Option<Uuid>;
958
959 async fn scalar_index_info(&self) -> Result<ScalarIndexInfo>;
961
962 async fn unindexed_fragments(&self, idx_name: &str) -> Result<Vec<Fragment>>;
964
965 async fn indexed_fragments(&self, idx_name: &str) -> Result<Vec<Vec<Fragment>>>;
967
968 async fn initialize_index(&mut self, source_dataset: &Dataset, index_name: &str) -> Result<()>;
970
971 async fn initialize_indices(&mut self, source_dataset: &Dataset) -> Result<()>;
974}
975
976#[async_trait]
977impl DatasetIndexInternalExt for Dataset {
978 async fn open_generic_index(
979 &self,
980 column: &str,
981 uuid: &str,
982 metrics: &dyn MetricsCollector,
983 ) -> Result<Arc<dyn Index>> {
984 let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
986 let cache_key = ScalarIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
987 if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
988 return Ok(index.as_index());
989 }
990
991 let vector_cache_key = VectorIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
992 if let Some(index) = self
993 .index_cache
994 .get_unsized_with_key(&vector_cache_key)
995 .await
996 {
997 return Ok(index.as_index());
998 }
999
1000 let frag_reuse_cache_key = FragReuseIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1001 if let Some(index) = self.index_cache.get_with_key(&frag_reuse_cache_key).await {
1002 return Ok(index.as_index());
1003 }
1004
1005 let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1014 message: format!("Index with id {} does not exist", uuid),
1015 location: location!(),
1016 })?;
1017 let index_dir = self.indice_files_dir(&index_meta)?;
1018 let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME);
1019 if self.object_store.exists(&index_file).await? {
1020 let index = self.open_vector_index(column, uuid, metrics).await?;
1021 Ok(index.as_index())
1022 } else {
1023 let index = self.open_scalar_index(column, uuid, metrics).await?;
1024 Ok(index.as_index())
1025 }
1026 }
1027
1028 async fn open_scalar_index(
1029 &self,
1030 column: &str,
1031 uuid: &str,
1032 metrics: &dyn MetricsCollector,
1033 ) -> Result<Arc<dyn ScalarIndex>> {
1034 let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1035 let cache_key = ScalarIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1036 if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
1037 return Ok(index);
1038 }
1039
1040 let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1041 message: format!("Index with id {} does not exist", uuid),
1042 location: location!(),
1043 })?;
1044
1045 let index = scalar::open_scalar_index(self, column, &index_meta, metrics).await?;
1046
1047 info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_SCALAR, index_type=index.index_type().to_string());
1048 metrics.record_index_load();
1049
1050 self.index_cache
1051 .insert_unsized_with_key(&cache_key, index.clone())
1052 .await;
1053 Ok(index)
1054 }
1055
1056 async fn open_vector_index(
1057 &self,
1058 column: &str,
1059 uuid: &str,
1060 metrics: &dyn MetricsCollector,
1061 ) -> Result<Arc<dyn VectorIndex>> {
1062 let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1063 let cache_key = VectorIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1064
1065 if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
1066 log::debug!("Found vector index in cache uuid: {}", uuid);
1067 return Ok(index);
1068 }
1069
1070 let frag_reuse_index = self.open_frag_reuse_index(metrics).await?;
1071 let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1072 message: format!("Index with id {} does not exist", uuid),
1073 location: location!(),
1074 })?;
1075 let index_dir = self.indice_files_dir(&index_meta)?;
1076 let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME);
1077 let reader: Arc<dyn Reader> = self.object_store.open(&index_file).await?.into();
1078
1079 let tailing_bytes = read_last_block(reader.as_ref()).await?;
1080 let (major_version, minor_version) = read_version(&tailing_bytes)?;
1081
1082 let index_cache = self.index_cache.with_key_prefix(&cache_key.key());
1084
1085 let index = match (major_version, minor_version) {
1088 (0, 1) | (0, 0) => {
1089 info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.1", index_type="IVF_PQ");
1090 let proto = open_index_proto(reader.as_ref()).await?;
1091 match &proto.implementation {
1092 Some(Implementation::VectorIndex(vector_index)) => {
1093 let dataset = Arc::new(self.clone());
1094 vector::open_vector_index(
1095 dataset,
1096 uuid,
1097 vector_index,
1098 reader,
1099 frag_reuse_index,
1100 )
1101 .await
1102 }
1103 None => Err(Error::Internal {
1104 message: "Index proto was missing implementation field".into(),
1105 location: location!(),
1106 }),
1107 }
1108 }
1109
1110 (0, 2) => {
1111 info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.2", index_type="IVF_PQ");
1112 let reader = FileReader::try_new_self_described_from_reader(
1113 reader.clone(),
1114 Some(&self.metadata_cache.file_metadata_cache(&index_file)),
1115 )
1116 .await?;
1117 vector::open_vector_index_v2(
1118 Arc::new(self.clone()),
1119 column,
1120 uuid,
1121 reader,
1122 frag_reuse_index,
1123 )
1124 .await
1125 }
1126
1127 (0, 3) | (2, _) => {
1128 let scheduler = ScanScheduler::new(
1129 self.object_store.clone(),
1130 SchedulerConfig::max_bandwidth(&self.object_store),
1131 );
1132 let file = scheduler
1133 .open_file(&index_file, &CachedFileSize::unknown())
1134 .await?;
1135 let reader = v2::reader::FileReader::try_open(
1136 file,
1137 None,
1138 Default::default(),
1139 &self.metadata_cache.file_metadata_cache(&index_file),
1140 FileReaderOptions::default(),
1141 )
1142 .await?;
1143 let index_metadata = reader
1144 .schema()
1145 .metadata
1146 .get(INDEX_METADATA_SCHEMA_KEY)
1147 .ok_or(Error::Index {
1148 message: "Index Metadata not found".to_owned(),
1149 location: location!(),
1150 })?;
1151 let index_metadata: lance_index::IndexMetadata =
1152 serde_json::from_str(index_metadata)?;
1153 let field = self.schema().field(column).ok_or_else(|| Error::Index {
1154 message: format!("Column {} does not exist in the schema", column),
1155 location: location!(),
1156 })?;
1157
1158 let (_, element_type) = get_vector_type(self.schema(), column)?;
1159
1160 info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.3", index_type=index_metadata.index_type);
1161
1162 match index_metadata.index_type.as_str() {
1163 "IVF_FLAT" => match element_type {
1164 DataType::Float16 | DataType::Float32 | DataType::Float64 => {
1165 let ivf = IVFIndex::<FlatIndex, FlatQuantizer>::try_new(
1166 self.object_store.clone(),
1167 index_dir,
1168 uuid.to_owned(),
1169 frag_reuse_index,
1170 self.metadata_cache.as_ref(),
1171 index_cache,
1172 )
1173 .await?;
1174 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1175 }
1176 DataType::UInt8 => {
1177 let ivf = IVFIndex::<FlatIndex, FlatBinQuantizer>::try_new(
1178 self.object_store.clone(),
1179 index_dir,
1180 uuid.to_owned(),
1181 frag_reuse_index,
1182 self.metadata_cache.as_ref(),
1183 index_cache,
1184 )
1185 .await?;
1186 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1187 }
1188 _ => Err(Error::Index {
1189 message: format!(
1190 "the field type {} is not supported for FLAT index",
1191 field.data_type()
1192 ),
1193 location: location!(),
1194 }),
1195 },
1196
1197 "IVF_PQ" => {
1198 let ivf = IVFIndex::<FlatIndex, ProductQuantizer>::try_new(
1199 self.object_store.clone(),
1200 index_dir,
1201 uuid.to_owned(),
1202 frag_reuse_index,
1203 self.metadata_cache.as_ref(),
1204 index_cache,
1205 )
1206 .await?;
1207 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1208 }
1209
1210 "IVF_SQ" => {
1211 let ivf = IVFIndex::<FlatIndex, ScalarQuantizer>::try_new(
1212 self.object_store.clone(),
1213 index_dir,
1214 uuid.to_owned(),
1215 frag_reuse_index,
1216 self.metadata_cache.as_ref(),
1217 index_cache,
1218 )
1219 .await?;
1220 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1221 }
1222
1223 "IVF_RQ" => {
1224 let ivf = IVFIndex::<FlatIndex, RabitQuantizer>::try_new(
1225 self.object_store.clone(),
1226 self.indices_dir(),
1227 uuid.to_owned(),
1228 frag_reuse_index,
1229 self.metadata_cache.as_ref(),
1230 index_cache,
1231 )
1232 .await?;
1233 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1234 }
1235
1236 "IVF_HNSW_FLAT" => {
1237 let uri = index_dir.child(uuid).child("index.pb");
1238 let file_metadata_cache =
1239 self.session.metadata_cache.file_metadata_cache(&uri);
1240 let ivf = IVFIndex::<HNSW, FlatQuantizer>::try_new(
1241 self.object_store.clone(),
1242 index_dir,
1243 uuid.to_owned(),
1244 frag_reuse_index,
1245 &file_metadata_cache,
1246 index_cache,
1247 )
1248 .await?;
1249 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1250 }
1251
1252 "IVF_HNSW_SQ" => {
1253 let ivf = IVFIndex::<HNSW, ScalarQuantizer>::try_new(
1254 self.object_store.clone(),
1255 index_dir,
1256 uuid.to_owned(),
1257 frag_reuse_index,
1258 self.metadata_cache.as_ref(),
1259 index_cache,
1260 )
1261 .await?;
1262 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1263 }
1264
1265 "IVF_HNSW_PQ" => {
1266 let ivf = IVFIndex::<HNSW, ProductQuantizer>::try_new(
1267 self.object_store.clone(),
1268 index_dir,
1269 uuid.to_owned(),
1270 frag_reuse_index,
1271 self.metadata_cache.as_ref(),
1272 index_cache,
1273 )
1274 .await?;
1275 Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1276 }
1277
1278 _ => Err(Error::Index {
1279 message: format!("Unsupported index type: {}", index_metadata.index_type),
1280 location: location!(),
1281 }),
1282 }
1283 }
1284
1285 _ => Err(Error::Index {
1286 message: "unsupported index version (maybe need to upgrade your lance version)"
1287 .to_owned(),
1288 location: location!(),
1289 }),
1290 };
1291 let index = index?;
1292 metrics.record_index_load();
1293 self.index_cache
1294 .insert_unsized_with_key(&cache_key, index.clone())
1295 .await;
1296 Ok(index)
1297 }
1298
1299 async fn open_frag_reuse_index(
1300 &self,
1301 metrics: &dyn MetricsCollector,
1302 ) -> Result<Option<Arc<FragReuseIndex>>> {
1303 if let Some(frag_reuse_index_meta) = self.load_index_by_name(FRAG_REUSE_INDEX_NAME).await? {
1304 let uuid = frag_reuse_index_meta.uuid.to_string();
1305 let frag_reuse_key = FragReuseIndexKey { uuid: &uuid };
1306 let uuid_clone = uuid.clone();
1307
1308 let index = self
1309 .index_cache
1310 .get_or_insert_with_key(frag_reuse_key, || async move {
1311 let index_meta = self.load_index(&uuid_clone).await?.ok_or_else(|| Error::Index {
1312 message: format!("Index with id {} does not exist", uuid_clone),
1313 location: location!(),
1314 })?;
1315 let index_details = load_frag_reuse_index_details(self, &index_meta).await?;
1316 let index =
1317 open_frag_reuse_index(frag_reuse_index_meta.uuid, index_details.as_ref()).await?;
1318
1319 info!(target: TRACE_IO_EVENTS, index_uuid=uuid_clone, r#type=IO_TYPE_OPEN_FRAG_REUSE);
1320 metrics.record_index_load();
1321
1322 Ok(index)
1323 })
1324 .await?;
1325
1326 Ok(Some(index))
1327 } else {
1328 Ok(None)
1329 }
1330 }
1331
1332 async fn open_mem_wal_index(
1333 &self,
1334 metrics: &dyn MetricsCollector,
1335 ) -> Result<Option<Arc<MemWalIndex>>> {
1336 let Some(mem_wal_meta) = self.load_index_by_name(MEM_WAL_INDEX_NAME).await? else {
1337 return Ok(None);
1338 };
1339
1340 let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1341 let cache_key = MemWalCacheKey::new(&mem_wal_meta.uuid, frag_reuse_uuid.as_ref());
1342 if let Some(index) = self.index_cache.get_with_key(&cache_key).await {
1343 log::debug!("Found MemWAL index in cache uuid: {}", mem_wal_meta.uuid);
1344 return Ok(Some(index));
1345 }
1346
1347 let uuid = mem_wal_meta.uuid.to_string();
1348
1349 let index_meta = self.load_index(&uuid).await?.ok_or_else(|| Error::Index {
1350 message: format!("Index with id {} does not exist", uuid),
1351 location: location!(),
1352 })?;
1353 let index = open_mem_wal_index(index_meta)?;
1354
1355 info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_MEM_WAL);
1356 metrics.record_index_load();
1357
1358 self.index_cache
1359 .insert_with_key(&cache_key, index.clone())
1360 .await;
1361 Ok(Some(index))
1362 }
1363
1364 async fn frag_reuse_index_uuid(&self) -> Option<Uuid> {
1365 if let Ok(indices) = self.load_indices().await {
1366 indices
1367 .iter()
1368 .find(|idx| idx.name == FRAG_REUSE_INDEX_NAME)
1369 .map(|idx| idx.uuid)
1370 } else {
1371 None
1372 }
1373 }
1374
1375 #[instrument(level = "trace", skip_all)]
1376 async fn scalar_index_info(&self) -> Result<ScalarIndexInfo> {
1377 let indices = self.load_indices().await?;
1378 let schema = self.schema();
1379 let mut indexed_fields = Vec::new();
1380 for index in indices.iter().filter(|idx| {
1381 let idx_schema = schema.project_by_ids(idx.fields.as_slice(), true);
1382 let is_vector_index = idx_schema
1383 .fields
1384 .iter()
1385 .any(|f| is_vector_field(f.data_type()));
1386
1387 let is_fts_index = if let Some(details) = &idx.index_details {
1389 IndexDetails(details.clone()).supports_fts()
1390 } else {
1391 false
1392 };
1393
1394 let has_non_empty_bitmap = idx.fragment_bitmap.as_ref().is_some_and(|bitmap| {
1397 !bitmap.is_empty() && !(bitmap & self.fragment_bitmap.as_ref()).is_empty()
1398 });
1399
1400 idx.fields.len() == 1 && !is_vector_index && (has_non_empty_bitmap || is_fts_index)
1401 }) {
1402 let field = index.fields[0];
1403 let field = schema.field_by_id(field).ok_or_else(|| Error::Internal {
1404 message: format!(
1405 "Index referenced a field with id {field} which did not exist in the schema"
1406 ),
1407 location: location!(),
1408 })?;
1409
1410 let field_path = if let Some(ancestors) = schema.field_ancestry_by_id(field.id) {
1412 let field_refs: Vec<&str> = ancestors.iter().map(|f| f.name.as_str()).collect();
1413 lance_core::datatypes::format_field_path(&field_refs)
1414 } else {
1415 field.name.clone()
1416 };
1417
1418 let index_details = IndexDetails(fetch_index_details(self, &field.name, index).await?);
1419 let plugin = index_details.get_plugin()?;
1420 let query_parser = plugin.new_query_parser(index.name.clone(), &index_details.0);
1421
1422 if let Some(query_parser) = query_parser {
1423 indexed_fields.push((field_path, (field.data_type(), query_parser)));
1424 }
1425 }
1426 let mut index_info_map = HashMap::with_capacity(indexed_fields.len());
1427 for indexed_field in indexed_fields {
1428 let mut parser = Some(indexed_field.1 .1);
1431 let parser = &mut parser;
1432 index_info_map
1433 .entry(indexed_field.0)
1434 .and_modify(|existing: &mut (DataType, Box<MultiQueryParser>)| {
1435 debug_assert_eq!(existing.0, indexed_field.1 .0);
1437
1438 existing.1.add(parser.take().unwrap());
1439 })
1440 .or_insert_with(|| {
1441 (
1442 indexed_field.1 .0,
1443 Box::new(MultiQueryParser::single(parser.take().unwrap())),
1444 )
1445 });
1446 }
1447 Ok(ScalarIndexInfo {
1448 indexed_columns: index_info_map,
1449 })
1450 }
1451
1452 async fn unindexed_fragments(&self, name: &str) -> Result<Vec<Fragment>> {
1453 let indices = self.load_indices_by_name(name).await?;
1454 let mut total_fragment_bitmap = RoaringBitmap::new();
1455 for idx in indices.iter() {
1456 total_fragment_bitmap |= idx.fragment_bitmap.as_ref().ok_or(Error::Index {
1457 message: "Please upgrade lance to 0.8+ to use this function".to_string(),
1458 location: location!(),
1459 })?;
1460 }
1461 Ok(self
1462 .fragments()
1463 .iter()
1464 .filter(|f| !total_fragment_bitmap.contains(f.id as u32))
1465 .cloned()
1466 .collect())
1467 }
1468
1469 async fn indexed_fragments(&self, name: &str) -> Result<Vec<Vec<Fragment>>> {
1470 let indices = self.load_indices_by_name(name).await?;
1471 indices
1472 .iter()
1473 .map(|index| {
1474 let fragment_bitmap = index.fragment_bitmap.as_ref().ok_or(Error::Index {
1475 message: "Please upgrade lance to 0.8+ to use this function".to_string(),
1476 location: location!(),
1477 })?;
1478 let mut indexed_frags = Vec::with_capacity(fragment_bitmap.len() as usize);
1479 for frag in self.fragments().iter() {
1480 if fragment_bitmap.contains(frag.id as u32) {
1481 indexed_frags.push(frag.clone());
1482 }
1483 }
1484 Ok(indexed_frags)
1485 })
1486 .collect()
1487 }
1488
1489 async fn initialize_index(&mut self, source_dataset: &Dataset, index_name: &str) -> Result<()> {
1490 let source_indices = source_dataset.load_indices_by_name(index_name).await?;
1491
1492 if source_indices.is_empty() {
1493 return Err(Error::Index {
1494 message: format!("Index '{}' not found in source dataset", index_name),
1495 location: location!(),
1496 });
1497 }
1498
1499 let source_index = source_indices
1500 .iter()
1501 .min_by_key(|idx| idx.created_at)
1502 .ok_or_else(|| Error::Index {
1503 message: format!("Could not determine oldest index for '{}'", index_name),
1504 location: location!(),
1505 })?;
1506
1507 let mut field_names = Vec::new();
1508 for field_id in source_index.fields.iter() {
1509 let source_field = source_dataset
1510 .schema()
1511 .field_by_id(*field_id)
1512 .ok_or_else(|| Error::Index {
1513 message: format!("Field with id {} not found in source dataset", field_id),
1514 location: location!(),
1515 })?;
1516
1517 let target_field =
1518 self.schema()
1519 .field(&source_field.name)
1520 .ok_or_else(|| Error::Index {
1521 message: format!(
1522 "Field '{}' required by index '{}' not found in target dataset",
1523 source_field.name, index_name
1524 ),
1525 location: location!(),
1526 })?;
1527
1528 if source_field.data_type() != target_field.data_type() {
1529 return Err(Error::Index {
1530 message: format!(
1531 "Field '{}' has different types in source ({:?}) and target ({:?}) datasets",
1532 source_field.name,
1533 source_field.data_type(),
1534 target_field.data_type()
1535 ),
1536 location: location!(),
1537 });
1538 }
1539
1540 field_names.push(source_field.name.as_str());
1541 }
1542
1543 if field_names.is_empty() {
1544 return Err(Error::Index {
1545 message: format!("Index '{}' has no fields", index_name),
1546 location: location!(),
1547 });
1548 }
1549
1550 if let Some(index_details) = &source_index.index_details {
1551 let index_details_wrapper = IndexDetails(index_details.clone());
1552
1553 if index_details_wrapper.is_vector() {
1554 vector::initialize_vector_index(self, source_dataset, source_index, &field_names)
1555 .await?;
1556 } else {
1557 scalar::initialize_scalar_index(self, source_dataset, source_index, &field_names)
1558 .await?;
1559 }
1560 } else {
1561 log::warn!(
1562 "Index '{}' has no index_details, skipping",
1563 source_index.name
1564 );
1565 }
1566
1567 Ok(())
1568 }
1569
1570 async fn initialize_indices(&mut self, source_dataset: &Dataset) -> Result<()> {
1571 let source_indices = source_dataset.load_indices().await?;
1572 let non_system_indices: Vec<_> = source_indices
1573 .iter()
1574 .filter(|idx| !lance_index::is_system_index(idx))
1575 .collect();
1576
1577 if non_system_indices.is_empty() {
1578 return Ok(());
1579 }
1580
1581 let mut unique_index_names = HashSet::new();
1582 for index in non_system_indices.iter() {
1583 unique_index_names.insert(index.name.clone());
1584 }
1585
1586 for index_name in unique_index_names {
1587 self.initialize_index(source_dataset, &index_name).await?;
1588 }
1589
1590 Ok(())
1591 }
1592}
1593
1594fn is_vector_field(data_type: DataType) -> bool {
1595 match data_type {
1596 DataType::FixedSizeList(_, _) => true,
1597 DataType::List(inner) => {
1598 matches!(inner.data_type(), DataType::FixedSizeList(_, _))
1600 }
1601 _ => false,
1602 }
1603}
1604
1605#[cfg(test)]
1606mod tests {
1607 use crate::dataset::builder::DatasetBuilder;
1608 use crate::dataset::optimize::{compact_files, CompactionOptions};
1609 use crate::dataset::{ReadParams, WriteMode, WriteParams};
1610 use crate::index::vector::VectorIndexParams;
1611 use crate::session::Session;
1612 use crate::utils::test::{
1613 copy_test_data_to_tmp, DatagenExt, FragmentCount, FragmentRowCount, StatsHolder,
1614 };
1615 use arrow_array::Int32Array;
1616
1617 use super::*;
1618
1619 use arrow::array::AsArray;
1620 use arrow::datatypes::{Float32Type, Int32Type};
1621 use arrow_array::{
1622 FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator, StringArray,
1623 };
1624 use arrow_schema::{Field, Schema};
1625 use lance_arrow::*;
1626 use lance_core::utils::tempfile::TempStrDir;
1627 use lance_datagen::gen_batch;
1628 use lance_datagen::{array, BatchCount, Dimension, RowCount};
1629 use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams};
1630 use lance_index::vector::{
1631 hnsw::builder::HnswBuildParams, ivf::IvfBuildParams, sq::builder::SQBuildParams,
1632 };
1633 use lance_io::object_store::ObjectStoreParams;
1634 use lance_linalg::distance::{DistanceType, MetricType};
1635 use lance_testing::datagen::generate_random_array;
1636 use rstest::rstest;
1637 use std::collections::HashSet;
1638
1639 #[tokio::test]
1640 async fn test_recreate_index() {
1641 const DIM: i32 = 8;
1642 let schema = Arc::new(Schema::new(vec![
1643 Field::new(
1644 "v",
1645 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), DIM),
1646 true,
1647 ),
1648 Field::new(
1649 "o",
1650 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), DIM),
1651 true,
1652 ),
1653 ]));
1654 let data = generate_random_array(2048 * DIM as usize);
1655 let batches: Vec<RecordBatch> = vec![RecordBatch::try_new(
1656 schema.clone(),
1657 vec![
1658 Arc::new(FixedSizeListArray::try_new_from_values(data.clone(), DIM).unwrap()),
1659 Arc::new(FixedSizeListArray::try_new_from_values(data, DIM).unwrap()),
1660 ],
1661 )
1662 .unwrap()];
1663
1664 let test_dir = TempStrDir::default();
1665 let test_uri = &test_dir;
1666 let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
1667 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
1668
1669 let params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 2);
1670 dataset
1671 .create_index(&["v"], IndexType::Vector, None, ¶ms, true)
1672 .await
1673 .unwrap();
1674 dataset
1675 .create_index(&["o"], IndexType::Vector, None, ¶ms, true)
1676 .await
1677 .unwrap();
1678
1679 dataset
1681 .create_index(&["v"], IndexType::Vector, None, ¶ms, true)
1682 .await
1683 .unwrap();
1684
1685 assert!(dataset
1687 .create_index(
1688 &["v"],
1689 IndexType::Vector,
1690 Some("o_idx".to_string()),
1691 ¶ms,
1692 true,
1693 )
1694 .await
1695 .is_err());
1696 }
1697
1698 fn sample_vector_field() -> Field {
1699 let dimensions = 16;
1700 let column_name = "vec";
1701 Field::new(
1702 column_name,
1703 DataType::FixedSizeList(
1704 Arc::new(Field::new("item", DataType::Float32, true)),
1705 dimensions,
1706 ),
1707 false,
1708 )
1709 }
1710
1711 #[tokio::test]
1712 async fn test_drop_index() {
1713 let test_dir = TempStrDir::default();
1714 let schema = Schema::new(vec![
1715 sample_vector_field(),
1716 Field::new("ints", DataType::Int32, false),
1717 ]);
1718 let mut dataset = lance_datagen::rand(&schema)
1719 .into_dataset(
1720 &test_dir,
1721 FragmentCount::from(1),
1722 FragmentRowCount::from(256),
1723 )
1724 .await
1725 .unwrap();
1726
1727 let idx_name = "name".to_string();
1728 dataset
1729 .create_index(
1730 &["vec"],
1731 IndexType::Vector,
1732 Some(idx_name.clone()),
1733 &VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10),
1734 true,
1735 )
1736 .await
1737 .unwrap();
1738 dataset
1739 .create_index(
1740 &["ints"],
1741 IndexType::BTree,
1742 None,
1743 &ScalarIndexParams::default(),
1744 true,
1745 )
1746 .await
1747 .unwrap();
1748
1749 assert_eq!(dataset.load_indices().await.unwrap().len(), 2);
1750
1751 dataset.drop_index(&idx_name).await.unwrap();
1752
1753 assert_eq!(dataset.load_indices().await.unwrap().len(), 1);
1754
1755 let scalar_idx_name = &dataset.load_indices().await.unwrap()[0].name;
1757 dataset.drop_index(scalar_idx_name).await.unwrap();
1758
1759 assert_eq!(dataset.load_indices().await.unwrap().len(), 0);
1760
1761 assert!(dataset.drop_index(scalar_idx_name).await.is_err());
1763 }
1764
1765 #[tokio::test]
1766 async fn test_count_index_rows() {
1767 let test_dir = TempStrDir::default();
1768 let dimensions = 16;
1769 let column_name = "vec";
1770 let field = sample_vector_field();
1771 let schema = Arc::new(Schema::new(vec![field]));
1772
1773 let float_arr = generate_random_array(512 * dimensions as usize);
1774
1775 let vectors =
1776 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
1777
1778 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
1779
1780 let reader =
1781 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
1782 let test_uri = &test_dir;
1783 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
1784 dataset.validate().await.unwrap();
1785
1786 assert!(dataset.index_statistics("bad_id").await.is_err());
1788 let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
1790 dataset
1791 .create_index(
1792 &[column_name],
1793 IndexType::Vector,
1794 Some("vec_idx".into()),
1795 ¶ms,
1796 true,
1797 )
1798 .await
1799 .unwrap();
1800
1801 let stats: serde_json::Value =
1802 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
1803 assert_eq!(stats["num_unindexed_rows"], 0);
1804 assert_eq!(stats["num_indexed_rows"], 512);
1805
1806 let float_arr = generate_random_array(512 * dimensions as usize);
1809 let vectors =
1810 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
1811
1812 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
1813
1814 let reader = RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema);
1815 dataset.append(reader, None).await.unwrap();
1816
1817 let stats: serde_json::Value =
1818 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
1819 assert_eq!(stats["num_unindexed_rows"], 512);
1820 assert_eq!(stats["num_indexed_rows"], 512);
1821 }
1822
1823 #[tokio::test]
1824 async fn test_optimize_delta_indices() {
1825 let dimensions = 16;
1826 let column_name = "vec";
1827 let vec_field = Field::new(
1828 column_name,
1829 DataType::FixedSizeList(
1830 Arc::new(Field::new("item", DataType::Float32, true)),
1831 dimensions,
1832 ),
1833 false,
1834 );
1835 let other_column_name = "other_vec";
1836 let other_vec_field = Field::new(
1837 other_column_name,
1838 DataType::FixedSizeList(
1839 Arc::new(Field::new("item", DataType::Float32, true)),
1840 dimensions,
1841 ),
1842 false,
1843 );
1844 let schema = Arc::new(Schema::new(vec![vec_field, other_vec_field]));
1845
1846 let float_arr = generate_random_array(512 * dimensions as usize);
1847
1848 let vectors = Arc::new(
1849 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(),
1850 );
1851
1852 let record_batch =
1853 RecordBatch::try_new(schema.clone(), vec![vectors.clone(), vectors.clone()]).unwrap();
1854
1855 let reader = RecordBatchIterator::new(
1856 vec![record_batch.clone()].into_iter().map(Ok),
1857 schema.clone(),
1858 );
1859
1860 let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
1861 let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
1862 dataset
1863 .create_index(
1864 &[column_name],
1865 IndexType::Vector,
1866 Some("vec_idx".into()),
1867 ¶ms,
1868 true,
1869 )
1870 .await
1871 .unwrap();
1872 dataset
1873 .create_index(
1874 &[other_column_name],
1875 IndexType::Vector,
1876 Some("other_vec_idx".into()),
1877 ¶ms,
1878 true,
1879 )
1880 .await
1881 .unwrap();
1882
1883 async fn get_stats(dataset: &Dataset, name: &str) -> serde_json::Value {
1884 serde_json::from_str(&dataset.index_statistics(name).await.unwrap()).unwrap()
1885 }
1886 async fn get_meta(dataset: &Dataset, name: &str) -> Vec<IndexMetadata> {
1887 dataset
1888 .load_indices()
1889 .await
1890 .unwrap()
1891 .iter()
1892 .filter(|m| m.name == name)
1893 .cloned()
1894 .collect()
1895 }
1896 fn get_bitmap(meta: &IndexMetadata) -> Vec<u32> {
1897 meta.fragment_bitmap.as_ref().unwrap().iter().collect()
1898 }
1899
1900 let stats = get_stats(&dataset, "vec_idx").await;
1901 assert_eq!(stats["num_unindexed_rows"], 0);
1902 assert_eq!(stats["num_indexed_rows"], 512);
1903 assert_eq!(stats["num_indexed_fragments"], 1);
1904 assert_eq!(stats["num_indices"], 1);
1905 let meta = get_meta(&dataset, "vec_idx").await;
1906 assert_eq!(meta.len(), 1);
1907 assert_eq!(get_bitmap(&meta[0]), vec![0]);
1908
1909 let reader =
1910 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
1911 dataset.append(reader, None).await.unwrap();
1912 let stats = get_stats(&dataset, "vec_idx").await;
1913 assert_eq!(stats["num_unindexed_rows"], 512);
1914 assert_eq!(stats["num_indexed_rows"], 512);
1915 assert_eq!(stats["num_indexed_fragments"], 1);
1916 assert_eq!(stats["num_unindexed_fragments"], 1);
1917 assert_eq!(stats["num_indices"], 1);
1918 let meta = get_meta(&dataset, "vec_idx").await;
1919 assert_eq!(meta.len(), 1);
1920 assert_eq!(get_bitmap(&meta[0]), vec![0]);
1921
1922 dataset
1923 .optimize_indices(&OptimizeOptions::append().index_names(vec![])) .await
1925 .unwrap();
1926 let stats = get_stats(&dataset, "vec_idx").await;
1927 assert_eq!(stats["num_unindexed_rows"], 512);
1928 assert_eq!(stats["num_indexed_rows"], 512);
1929 assert_eq!(stats["num_indexed_fragments"], 1);
1930 assert_eq!(stats["num_unindexed_fragments"], 1);
1931 assert_eq!(stats["num_indices"], 1);
1932 let meta = get_meta(&dataset, "vec_idx").await;
1933 assert_eq!(meta.len(), 1);
1934 assert_eq!(get_bitmap(&meta[0]), vec![0]);
1935
1936 dataset
1938 .optimize_indices(
1939 &OptimizeOptions::append().index_names(vec!["other_vec_idx".to_owned()]),
1940 )
1941 .await
1942 .unwrap();
1943 let stats = get_stats(&dataset, "vec_idx").await;
1944 assert_eq!(stats["num_unindexed_rows"], 512);
1945 assert_eq!(stats["num_indexed_rows"], 512);
1946 assert_eq!(stats["num_indexed_fragments"], 1);
1947 assert_eq!(stats["num_unindexed_fragments"], 1);
1948 assert_eq!(stats["num_indices"], 1);
1949 let meta = get_meta(&dataset, "vec_idx").await;
1950 assert_eq!(meta.len(), 1);
1951 assert_eq!(get_bitmap(&meta[0]), vec![0]);
1952
1953 let stats = get_stats(&dataset, "other_vec_idx").await;
1954 assert_eq!(stats["num_unindexed_rows"], 0);
1955 assert_eq!(stats["num_indexed_rows"], 1024);
1956 assert_eq!(stats["num_indexed_fragments"], 2);
1957 assert_eq!(stats["num_unindexed_fragments"], 0);
1958 assert_eq!(stats["num_indices"], 2);
1959 let meta = get_meta(&dataset, "other_vec_idx").await;
1960 assert_eq!(meta.len(), 2);
1961 assert_eq!(get_bitmap(&meta[0]), vec![0]);
1962 assert_eq!(get_bitmap(&meta[1]), vec![1]);
1963
1964 dataset
1965 .optimize_indices(&OptimizeOptions {
1966 num_indices_to_merge: 1, ..Default::default()
1968 })
1969 .await
1970 .unwrap();
1971
1972 let stats = get_stats(&dataset, "vec_idx").await;
1973 assert_eq!(stats["num_unindexed_rows"], 0);
1974 assert_eq!(stats["num_indexed_rows"], 1024);
1975 assert_eq!(stats["num_indexed_fragments"], 2);
1976 assert_eq!(stats["num_unindexed_fragments"], 0);
1977 assert_eq!(stats["num_indices"], 1);
1978 let meta = get_meta(&dataset, "vec_idx").await;
1979 assert_eq!(meta.len(), 1);
1980 assert_eq!(get_bitmap(&meta[0]), vec![0, 1]);
1981
1982 dataset
1983 .optimize_indices(&OptimizeOptions {
1984 num_indices_to_merge: 2,
1985 ..Default::default()
1986 })
1987 .await
1988 .unwrap();
1989 let stats = get_stats(&dataset, "other_vec_idx").await;
1990 assert_eq!(stats["num_unindexed_rows"], 0);
1991 assert_eq!(stats["num_indexed_rows"], 1024);
1992 assert_eq!(stats["num_indexed_fragments"], 2);
1993 assert_eq!(stats["num_unindexed_fragments"], 0);
1994 assert_eq!(stats["num_indices"], 1);
1995 let meta = get_meta(&dataset, "other_vec_idx").await;
1996 assert_eq!(meta.len(), 1);
1997 assert_eq!(get_bitmap(&meta[0]), vec![0, 1]);
1998 }
1999
2000 #[tokio::test]
2001 async fn test_optimize_ivf_hnsw_sq_delta_indices() {
2002 let test_dir = TempStrDir::default();
2003 let dimensions = 16;
2004 let column_name = "vec";
2005 let field = Field::new(
2006 column_name,
2007 DataType::FixedSizeList(
2008 Arc::new(Field::new("item", DataType::Float32, true)),
2009 dimensions,
2010 ),
2011 false,
2012 );
2013 let schema = Arc::new(Schema::new(vec![field]));
2014
2015 let float_arr = generate_random_array(512 * dimensions as usize);
2016
2017 let vectors =
2018 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2019
2020 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
2021
2022 let reader = RecordBatchIterator::new(
2023 vec![record_batch.clone()].into_iter().map(Ok),
2024 schema.clone(),
2025 );
2026
2027 let test_uri = &test_dir;
2028 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2029
2030 let ivf_params = IvfBuildParams::default();
2031 let hnsw_params = HnswBuildParams::default();
2032 let sq_params = SQBuildParams::default();
2033 let params = VectorIndexParams::with_ivf_hnsw_sq_params(
2034 MetricType::L2,
2035 ivf_params,
2036 hnsw_params,
2037 sq_params,
2038 );
2039 dataset
2040 .create_index(
2041 &[column_name],
2042 IndexType::Vector,
2043 Some("vec_idx".into()),
2044 ¶ms,
2045 true,
2046 )
2047 .await
2048 .unwrap();
2049
2050 let stats: serde_json::Value =
2051 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2052 assert_eq!(stats["num_unindexed_rows"], 0);
2053 assert_eq!(stats["num_indexed_rows"], 512);
2054 assert_eq!(stats["num_indexed_fragments"], 1);
2055 assert_eq!(stats["num_indices"], 1);
2056
2057 let reader =
2058 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2059 dataset.append(reader, None).await.unwrap();
2060 let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap();
2061 let stats: serde_json::Value =
2062 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2063 assert_eq!(stats["num_unindexed_rows"], 512);
2064 assert_eq!(stats["num_indexed_rows"], 512);
2065 assert_eq!(stats["num_indexed_fragments"], 1);
2066 assert_eq!(stats["num_unindexed_fragments"], 1);
2067 assert_eq!(stats["num_indices"], 1);
2068
2069 dataset
2070 .optimize_indices(&OptimizeOptions {
2071 num_indices_to_merge: 0, ..Default::default()
2073 })
2074 .await
2075 .unwrap();
2076
2077 let stats: serde_json::Value =
2078 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2079 assert_eq!(stats["num_unindexed_rows"], 0);
2080 assert_eq!(stats["num_indexed_rows"], 1024);
2081 assert_eq!(stats["num_indexed_fragments"], 2);
2082 assert_eq!(stats["num_unindexed_fragments"], 0);
2083 assert_eq!(stats["num_indices"], 2);
2084
2085 dataset
2086 .optimize_indices(&OptimizeOptions {
2087 num_indices_to_merge: 2,
2088 ..Default::default()
2089 })
2090 .await
2091 .unwrap();
2092 let stats: serde_json::Value =
2093 serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2094 assert_eq!(stats["num_unindexed_rows"], 0);
2095 assert_eq!(stats["num_indexed_rows"], 1024);
2096 assert_eq!(stats["num_indexed_fragments"], 2);
2097 assert_eq!(stats["num_unindexed_fragments"], 0);
2098 assert_eq!(stats["num_indices"], 1);
2099 }
2100
2101 #[rstest]
2102 #[tokio::test]
2103 async fn test_optimize_fts(#[values(false, true)] with_position: bool) {
2104 let words = ["apple", "banana", "cherry", "date"];
2105
2106 let dir = TempStrDir::default();
2107 let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
2108 let data = StringArray::from_iter_values(words.iter().map(|s| s.to_string()));
2109 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)]).unwrap();
2110 let batch_iterator = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2111
2112 let mut dataset = Dataset::write(batch_iterator, &dir, None).await.unwrap();
2113
2114 let params = InvertedIndexParams::default()
2115 .lower_case(false)
2116 .with_position(with_position);
2117 dataset
2118 .create_index(&["text"], IndexType::Inverted, None, ¶ms, true)
2119 .await
2120 .unwrap();
2121
2122 async fn assert_indexed_rows(dataset: &Dataset, expected_indexed_rows: usize) {
2123 let stats = dataset.index_statistics("text_idx").await.unwrap();
2124 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2125 let indexed_rows = stats["num_indexed_rows"].as_u64().unwrap() as usize;
2126 let unindexed_rows = stats["num_unindexed_rows"].as_u64().unwrap() as usize;
2127 let num_rows = dataset.count_all_rows().await.unwrap();
2128 assert_eq!(indexed_rows, expected_indexed_rows);
2129 assert_eq!(unindexed_rows, num_rows - expected_indexed_rows);
2130 }
2131
2132 let num_rows = dataset.count_all_rows().await.unwrap();
2133 assert_indexed_rows(&dataset, num_rows).await;
2134
2135 let new_words = ["elephant", "fig", "grape", "honeydew"];
2136 let new_data = StringArray::from_iter_values(new_words.iter().map(|s| s.to_string()));
2137 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(new_data)]).unwrap();
2138 let batch_iter = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2139 dataset.append(batch_iter, None).await.unwrap();
2140 assert_indexed_rows(&dataset, num_rows).await;
2141
2142 dataset
2143 .optimize_indices(&OptimizeOptions::append())
2144 .await
2145 .unwrap();
2146 let num_rows = dataset.count_all_rows().await.unwrap();
2147 assert_indexed_rows(&dataset, num_rows).await;
2148
2149 for &word in words.iter().chain(new_words.iter()) {
2150 let query_result = dataset
2151 .scan()
2152 .project(&["text"])
2153 .unwrap()
2154 .full_text_search(FullTextSearchQuery::new(word.to_string()))
2155 .unwrap()
2156 .limit(Some(10), None)
2157 .unwrap()
2158 .try_into_batch()
2159 .await
2160 .unwrap();
2161
2162 let texts = query_result["text"]
2163 .as_string::<i32>()
2164 .iter()
2165 .map(|v| match v {
2166 None => "".to_string(),
2167 Some(v) => v.to_string(),
2168 })
2169 .collect::<Vec<String>>();
2170
2171 assert_eq!(texts.len(), 1);
2172 assert_eq!(texts[0], word);
2173 }
2174
2175 let uppercase_words = ["Apple", "Banana", "Cherry", "Date"];
2176 for &word in uppercase_words.iter() {
2177 let query_result = dataset
2178 .scan()
2179 .project(&["text"])
2180 .unwrap()
2181 .full_text_search(FullTextSearchQuery::new(word.to_string()))
2182 .unwrap()
2183 .limit(Some(10), None)
2184 .unwrap()
2185 .try_into_batch()
2186 .await
2187 .unwrap();
2188
2189 let texts = query_result["text"]
2190 .as_string::<i32>()
2191 .iter()
2192 .map(|v| match v {
2193 None => "".to_string(),
2194 Some(v) => v.to_string(),
2195 })
2196 .collect::<Vec<String>>();
2197
2198 assert_eq!(texts.len(), 0);
2199 }
2200 let new_data = StringArray::from_iter_values(uppercase_words.iter().map(|s| s.to_string()));
2201 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(new_data)]).unwrap();
2202 let batch_iter = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2203 dataset.append(batch_iter, None).await.unwrap();
2204 assert_indexed_rows(&dataset, num_rows).await;
2205
2206 for &word in uppercase_words.iter() {
2208 let query_result = dataset
2209 .scan()
2210 .project(&["text"])
2211 .unwrap()
2212 .full_text_search(FullTextSearchQuery::new(word.to_string()))
2213 .unwrap()
2214 .limit(Some(10), None)
2215 .unwrap()
2216 .try_into_batch()
2217 .await
2218 .unwrap();
2219
2220 let texts = query_result["text"]
2221 .as_string::<i32>()
2222 .iter()
2223 .map(|v| match v {
2224 None => "".to_string(),
2225 Some(v) => v.to_string(),
2226 })
2227 .collect::<Vec<String>>();
2228
2229 assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2230 assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2231 }
2232
2233 dataset
2234 .optimize_indices(&OptimizeOptions::append())
2235 .await
2236 .unwrap();
2237 let num_rows = dataset.count_all_rows().await.unwrap();
2238 assert_indexed_rows(&dataset, num_rows).await;
2239
2240 for &word in uppercase_words.iter() {
2242 let query_result = dataset
2243 .scan()
2244 .project(&["text"])
2245 .unwrap()
2246 .full_text_search(FullTextSearchQuery::new(word.to_string()))
2247 .unwrap()
2248 .limit(Some(10), None)
2249 .unwrap()
2250 .try_into_batch()
2251 .await
2252 .unwrap();
2253
2254 let texts = query_result["text"]
2255 .as_string::<i32>()
2256 .iter()
2257 .map(|v| match v {
2258 None => "".to_string(),
2259 Some(v) => v.to_string(),
2260 })
2261 .collect::<Vec<String>>();
2262
2263 assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2264 assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2265
2266 compact_files(&mut dataset, CompactionOptions::default(), None)
2268 .await
2269 .unwrap();
2270 for &word in uppercase_words.iter() {
2271 let query_result = dataset
2272 .scan()
2273 .project(&["text"])
2274 .unwrap()
2275 .full_text_search(FullTextSearchQuery::new(word.to_string()))
2276 .unwrap()
2277 .try_into_batch()
2278 .await
2279 .unwrap();
2280 let texts = query_result["text"]
2281 .as_string::<i32>()
2282 .iter()
2283 .map(|v| match v {
2284 None => "".to_string(),
2285 Some(v) => v.to_string(),
2286 })
2287 .collect::<Vec<String>>();
2288 assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2289 assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2290 }
2291 assert_indexed_rows(&dataset, num_rows).await;
2292 }
2293 }
2294
2295 #[tokio::test]
2296 async fn test_create_index_too_small_for_pq() {
2297 let test_dir = TempStrDir::default();
2298 let dimensions = 1536;
2299
2300 let field = Field::new(
2301 "vector",
2302 DataType::FixedSizeList(
2303 Arc::new(Field::new("item", DataType::Float32, true)),
2304 dimensions,
2305 ),
2306 false,
2307 );
2308
2309 let schema = Arc::new(Schema::new(vec![field]));
2310 let float_arr = generate_random_array(100 * dimensions as usize);
2311
2312 let vectors =
2313 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2314 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
2315 let reader = RecordBatchIterator::new(
2316 vec![record_batch.clone()].into_iter().map(Ok),
2317 schema.clone(),
2318 );
2319
2320 let test_uri = &test_dir;
2321 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2322
2323 let params = VectorIndexParams::ivf_pq(1, 8, 96, DistanceType::L2, 1);
2324 let result = dataset
2325 .create_index(&["vector"], IndexType::Vector, None, ¶ms, false)
2326 .await;
2327
2328 assert!(matches!(result, Err(Error::Index { .. })));
2329 if let Error::Index { message, .. } = result.unwrap_err() {
2330 assert_eq!(
2331 message,
2332 "Not enough rows to train PQ. Requires 256 rows but only 100 available",
2333 )
2334 }
2335 }
2336
2337 #[tokio::test]
2338 async fn test_create_bitmap_index() {
2339 let test_dir = TempStrDir::default();
2340 let field = Field::new("tag", DataType::Utf8, false);
2341 let schema = Arc::new(Schema::new(vec![field]));
2342 let array = StringArray::from_iter_values((0..128).map(|i| ["a", "b", "c"][i % 3]));
2343 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2344 let reader = RecordBatchIterator::new(
2345 vec![record_batch.clone()].into_iter().map(Ok),
2346 schema.clone(),
2347 );
2348
2349 let test_uri = &test_dir;
2350 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2351 dataset
2352 .create_index(
2353 &["tag"],
2354 IndexType::Bitmap,
2355 None,
2356 &ScalarIndexParams::default(),
2357 false,
2358 )
2359 .await
2360 .unwrap();
2361 let indices = dataset.load_indices().await.unwrap();
2362 let index = dataset
2363 .open_generic_index("tag", &indices[0].uuid.to_string(), &NoOpMetricsCollector)
2364 .await
2365 .unwrap();
2366 assert_eq!(index.index_type(), IndexType::Bitmap);
2367 }
2368
2369 #[lance_test_macros::test(tokio::test)]
2371 async fn test_load_indices() {
2372 let session = Arc::new(Session::default());
2373 let io_stats = Arc::new(StatsHolder::default());
2374 let write_params = WriteParams {
2375 store_params: Some(ObjectStoreParams {
2376 object_store_wrapper: Some(io_stats.clone()),
2377 ..Default::default()
2378 }),
2379 session: Some(session.clone()),
2380 ..Default::default()
2381 };
2382
2383 let test_dir = TempStrDir::default();
2384 let field = Field::new("tag", DataType::Utf8, false);
2385 let schema = Arc::new(Schema::new(vec![field]));
2386 let array = StringArray::from_iter_values((0..128).map(|i| ["a", "b", "c"][i % 3]));
2387 let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2388 let reader = RecordBatchIterator::new(
2389 vec![record_batch.clone()].into_iter().map(Ok),
2390 schema.clone(),
2391 );
2392
2393 let test_uri = &test_dir;
2394 let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
2395 .await
2396 .unwrap();
2397 dataset
2398 .create_index(
2399 &["tag"],
2400 IndexType::Bitmap,
2401 None,
2402 &ScalarIndexParams::default(),
2403 false,
2404 )
2405 .await
2406 .unwrap();
2407 io_stats.incremental_stats(); let indices = dataset.load_indices().await.unwrap();
2410 let stats = io_stats.incremental_stats();
2411 assert_eq!(
2413 stats.read_iops, 0,
2414 "Read IOPS should be 0. Saw requests: {:?}",
2415 stats.requests
2416 );
2417 assert_eq!(stats.read_bytes, 0);
2418 assert_eq!(indices.len(), 1);
2419
2420 session.index_cache.clear().await; let dataset2 = DatasetBuilder::from_uri(test_uri)
2423 .with_session(session.clone())
2424 .with_read_params(ReadParams {
2425 store_options: Some(ObjectStoreParams {
2426 object_store_wrapper: Some(io_stats.clone()),
2427 ..Default::default()
2428 }),
2429 session: Some(session.clone()),
2430 ..Default::default()
2431 })
2432 .load()
2433 .await
2434 .unwrap();
2435 let stats = io_stats.incremental_stats(); assert!(stats.read_bytes < 64 * 1024);
2437
2438 let indices2 = dataset2.load_indices().await.unwrap();
2441 let stats = io_stats.incremental_stats();
2442 assert_eq!(stats.read_iops, 0);
2443 assert_eq!(stats.read_bytes, 0);
2444 assert_eq!(indices2.len(), 1);
2445 }
2446
2447 #[tokio::test]
2448 async fn test_remap_empty() {
2449 let data = gen_batch()
2450 .col("int", array::step::<Int32Type>())
2451 .col(
2452 "vector",
2453 array::rand_vec::<Float32Type>(Dimension::from(16)),
2454 )
2455 .into_reader_rows(RowCount::from(256), BatchCount::from(1));
2456 let mut dataset = Dataset::write(data, "memory://", None).await.unwrap();
2457
2458 let params = VectorIndexParams::ivf_pq(1, 8, 1, DistanceType::L2, 1);
2459 dataset
2460 .create_index(&["vector"], IndexType::Vector, None, ¶ms, false)
2461 .await
2462 .unwrap();
2463
2464 let index_uuid = dataset.load_indices().await.unwrap()[0].uuid;
2465 let remap_to_empty = (0..dataset.count_all_rows().await.unwrap())
2466 .map(|i| (i as u64, None))
2467 .collect::<HashMap<_, _>>();
2468 let new_uuid = remap_index(&dataset, &index_uuid, &remap_to_empty)
2469 .await
2470 .unwrap();
2471 assert_eq!(new_uuid, RemapResult::Keep(index_uuid));
2472 }
2473
2474 #[tokio::test]
2475 async fn test_optimize_ivf_pq_up_to_date() {
2476 let nrows = 256;
2478 let dimensions = 16;
2479 let column_name = "vector";
2480 let schema = Arc::new(Schema::new(vec![
2481 Field::new("id", DataType::Int32, false),
2482 Field::new(
2483 column_name,
2484 DataType::FixedSizeList(
2485 Arc::new(Field::new("item", DataType::Float32, true)),
2486 dimensions,
2487 ),
2488 false,
2489 ),
2490 ]));
2491
2492 let float_arr = generate_random_array(nrows * dimensions as usize);
2493 let vectors =
2494 arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2495 let record_batch = RecordBatch::try_new(
2496 schema.clone(),
2497 vec![
2498 Arc::new(arrow_array::Int32Array::from_iter_values(0..nrows as i32)),
2499 Arc::new(vectors),
2500 ],
2501 )
2502 .unwrap();
2503
2504 let reader = RecordBatchIterator::new(
2505 vec![record_batch.clone()].into_iter().map(Ok),
2506 schema.clone(),
2507 );
2508 let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2509
2510 let params = VectorIndexParams::ivf_pq(1, 8, 2, MetricType::L2, 2);
2511 dataset
2512 .create_index(&[column_name], IndexType::Vector, None, ¶ms, true)
2513 .await
2514 .unwrap();
2515
2516 let query_vector = generate_random_array(dimensions as usize);
2517
2518 let nearest = dataset
2519 .scan()
2520 .nearest(column_name, &query_vector, 5)
2521 .unwrap()
2522 .try_into_batch()
2523 .await
2524 .unwrap();
2525
2526 let ids = nearest["id"].as_primitive::<Int32Type>();
2527 let mut seen = HashSet::new();
2528 for id in ids.values() {
2529 assert!(seen.insert(*id), "Duplicate id found: {}", id);
2530 }
2531
2532 dataset
2533 .optimize_indices(&OptimizeOptions::default())
2534 .await
2535 .unwrap();
2536
2537 dataset.validate().await.unwrap();
2538
2539 let nearest_after = dataset
2540 .scan()
2541 .nearest(column_name, &query_vector, 5)
2542 .unwrap()
2543 .try_into_batch()
2544 .await
2545 .unwrap();
2546
2547 let ids = nearest_after["id"].as_primitive::<Int32Type>();
2548 let mut seen = HashSet::new();
2549 for id in ids.values() {
2550 assert!(seen.insert(*id), "Duplicate id found: {}", id);
2551 }
2552 }
2553
2554 #[tokio::test]
2555 async fn test_index_created_at_timestamp() {
2556 let schema = Arc::new(Schema::new(vec![
2558 Field::new("id", DataType::Int32, false),
2559 Field::new("values", DataType::Utf8, false),
2560 ]));
2561
2562 let values = StringArray::from_iter_values(["hello", "world", "foo", "bar"]);
2563 let record_batch = RecordBatch::try_new(
2564 schema.clone(),
2565 vec![
2566 Arc::new(Int32Array::from_iter_values(0..4)),
2567 Arc::new(values),
2568 ],
2569 )
2570 .unwrap();
2571
2572 let reader =
2573 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2574
2575 let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2576
2577 let before_index = chrono::Utc::now();
2579
2580 dataset
2582 .create_index(
2583 &["values"],
2584 IndexType::Scalar,
2585 Some("test_idx".to_string()),
2586 &ScalarIndexParams::default(),
2587 false,
2588 )
2589 .await
2590 .unwrap();
2591
2592 let after_index = chrono::Utc::now();
2594
2595 let indices = dataset.load_indices().await.unwrap();
2597 let test_index = indices.iter().find(|idx| idx.name == "test_idx").unwrap();
2598
2599 assert!(test_index.created_at.is_some());
2601 let created_at = test_index.created_at.unwrap();
2602 assert!(created_at >= before_index);
2603 assert!(created_at <= after_index);
2604 }
2605
2606 #[tokio::test]
2607 async fn test_index_statistics_updated_at() {
2608 let schema = Arc::new(Schema::new(vec![
2610 Field::new("id", DataType::Int32, false),
2611 Field::new("values", DataType::Utf8, false),
2612 ]));
2613
2614 let values = StringArray::from_iter_values(["hello", "world", "foo", "bar"]);
2615 let record_batch = RecordBatch::try_new(
2616 schema.clone(),
2617 vec![
2618 Arc::new(Int32Array::from_iter_values(0..4)),
2619 Arc::new(values),
2620 ],
2621 )
2622 .unwrap();
2623
2624 let reader =
2625 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2626
2627 let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2628
2629 dataset
2631 .create_index(
2632 &["values"],
2633 IndexType::Scalar,
2634 Some("test_idx".to_string()),
2635 &ScalarIndexParams::default(),
2636 false,
2637 )
2638 .await
2639 .unwrap();
2640
2641 let stats_str = dataset.index_statistics("test_idx").await.unwrap();
2643 let stats: serde_json::Value = serde_json::from_str(&stats_str).unwrap();
2644
2645 assert!(stats["updated_at_timestamp_ms"].is_number());
2647 let updated_at = stats["updated_at_timestamp_ms"].as_u64().unwrap();
2648
2649 let indices = dataset.load_indices().await.unwrap();
2651 let test_index = indices.iter().find(|idx| idx.name == "test_idx").unwrap();
2652 let created_at = test_index.created_at.unwrap().timestamp_millis() as u64;
2653
2654 assert_eq!(updated_at, created_at);
2656 }
2657
2658 #[tokio::test]
2659 async fn test_index_statistics_updated_at_multiple_deltas() {
2660 let schema = Arc::new(Schema::new(vec![
2662 Field::new("id", DataType::Int32, false),
2663 Field::new(
2664 "vector",
2665 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4),
2666 false,
2667 ),
2668 ]));
2669
2670 let num_rows = 300;
2672 let float_arr = generate_random_array(4 * num_rows);
2673 let vectors = FixedSizeListArray::try_new_from_values(float_arr, 4).unwrap();
2674 let record_batch = RecordBatch::try_new(
2675 schema.clone(),
2676 vec![
2677 Arc::new(Int32Array::from_iter_values(0..num_rows as i32)),
2678 Arc::new(vectors),
2679 ],
2680 )
2681 .unwrap();
2682
2683 let reader =
2684 RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2685
2686 let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2687
2688 let params = VectorIndexParams::ivf_pq(1, 8, 2, MetricType::L2, 2);
2690 dataset
2691 .create_index(
2692 &["vector"],
2693 IndexType::Vector,
2694 Some("test_vec_idx".to_string()),
2695 ¶ms,
2696 false,
2697 )
2698 .await
2699 .unwrap();
2700
2701 let stats_str_1 = dataset.index_statistics("test_vec_idx").await.unwrap();
2703 let stats_1: serde_json::Value = serde_json::from_str(&stats_str_1).unwrap();
2704 let initial_updated_at = stats_1["updated_at_timestamp_ms"].as_u64().unwrap();
2705
2706 std::thread::sleep(std::time::Duration::from_millis(10)); let num_rows_2 = 50;
2710 let float_arr_2 = generate_random_array(4 * num_rows_2);
2711 let vectors_2 = FixedSizeListArray::try_new_from_values(float_arr_2, 4).unwrap();
2712 let record_batch_2 = RecordBatch::try_new(
2713 schema.clone(),
2714 vec![
2715 Arc::new(Int32Array::from_iter_values(
2716 num_rows as i32..(num_rows + num_rows_2) as i32,
2717 )),
2718 Arc::new(vectors_2),
2719 ],
2720 )
2721 .unwrap();
2722
2723 let reader_2 =
2724 RecordBatchIterator::new(vec![record_batch_2].into_iter().map(Ok), schema.clone());
2725
2726 dataset.append(reader_2, None).await.unwrap();
2727
2728 dataset
2730 .create_index(
2731 &["vector"],
2732 IndexType::Vector,
2733 Some("test_vec_idx".to_string()),
2734 ¶ms,
2735 true,
2736 )
2737 .await
2738 .unwrap();
2739
2740 let stats_str_2 = dataset.index_statistics("test_vec_idx").await.unwrap();
2742 let stats_2: serde_json::Value = serde_json::from_str(&stats_str_2).unwrap();
2743 let final_updated_at = stats_2["updated_at_timestamp_ms"].as_u64().unwrap();
2744
2745 assert!(final_updated_at >= initial_updated_at);
2747 }
2748
2749 #[tokio::test]
2750 async fn test_index_statistics_updated_at_none_when_no_created_at() {
2751 let test_dir =
2756 copy_test_data_to_tmp("v0.30.0_pre_created_at/index_without_created_at").unwrap();
2757 let test_uri = test_dir.path_str();
2758 let test_uri = &test_uri;
2759
2760 let dataset = Dataset::open(test_uri).await.unwrap();
2761
2762 let indices = dataset.load_indices().await.unwrap();
2764 assert!(!indices.is_empty(), "Test dataset should have indices");
2765
2766 for index in indices.iter() {
2768 assert!(
2769 index.created_at.is_none(),
2770 "Index from old version should have created_at = None"
2771 );
2772 }
2773
2774 let index_name = &indices[0].name;
2776 let stats_str = dataset.index_statistics(index_name).await.unwrap();
2777 let stats: serde_json::Value = serde_json::from_str(&stats_str).unwrap();
2778
2779 assert!(
2781 stats["updated_at_timestamp_ms"].is_null(),
2782 "updated_at_timestamp_ms should be null when no indices have created_at timestamps"
2783 );
2784 }
2785 #[rstest]
2786 #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
2787 #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
2788 #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
2789 #[tokio::test]
2790 async fn test_create_empty_scalar_index(
2791 #[case] column_name: &str,
2792 #[case] index_type: IndexType,
2793 #[case] params: Box<dyn IndexParams>,
2794 ) {
2795 use lance_datagen::{array, BatchCount, ByteCount, RowCount};
2796
2797 let reader = lance_datagen::gen_batch()
2799 .col("i", array::step::<Int32Type>())
2800 .col("text", array::rand_utf8(ByteCount::from(10), false))
2801 .into_reader_rows(RowCount::from(100), BatchCount::from(1));
2802 let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
2803
2804 dataset
2807 .create_index_builder(&[column_name], index_type, params.as_ref())
2808 .name("index".to_string())
2809 .train(false)
2810 .await
2811 .unwrap();
2812
2813 let stats = dataset.index_statistics("index").await.unwrap();
2815 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2816 assert_eq!(
2817 stats["num_indexed_rows"], 0,
2818 "Empty index should have zero indexed rows"
2819 );
2820
2821 let append_reader = lance_datagen::gen_batch()
2823 .col("i", array::step::<Int32Type>())
2824 .col("text", array::rand_utf8(ByteCount::from(10), false))
2825 .into_reader_rows(RowCount::from(50), BatchCount::from(1));
2826
2827 dataset.append(append_reader, None).await.unwrap();
2828
2829 let indices_after_append = dataset.load_indices().await.unwrap();
2831 assert_eq!(
2832 indices_after_append.len(),
2833 1,
2834 "Index should be retained after append for index type {:?}",
2835 index_type
2836 );
2837
2838 let stats = dataset.index_statistics("index").await.unwrap();
2839 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2840 assert_eq!(
2841 stats["num_indexed_rows"], 0,
2842 "Empty index should still have zero indexed rows after append"
2843 );
2844
2845 dataset.optimize_indices(&Default::default()).await.unwrap();
2847
2848 let indices_after_optimize = dataset.load_indices().await.unwrap();
2850 assert_eq!(
2851 indices_after_optimize.len(),
2852 1,
2853 "Index should still exist after optimization"
2854 );
2855
2856 let stats = dataset.index_statistics("index").await.unwrap();
2858 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2859 assert_eq!(
2860 stats["num_unindexed_rows"], 0,
2861 "Empty index should indexed all rows"
2862 );
2863 }
2864
2865 fn assert_index_usage(plan: &str, column_name: &str, should_use_index: bool, context: &str) {
2867 let index_used = if column_name == "text" {
2868 plan.contains("MatchQuery")
2870 } else {
2871 plan.contains("ScalarIndexQuery")
2873 };
2874
2875 if should_use_index {
2876 assert!(
2877 index_used,
2878 "Query plan should use index {}: {}",
2879 context, plan
2880 );
2881 } else {
2882 assert!(
2883 !index_used,
2884 "Query plan should NOT use index {}: {}",
2885 context, plan
2886 );
2887 }
2888 }
2889
2890 #[rstest]
2898 #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
2899 #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
2900 #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
2901 #[tokio::test]
2902 async fn test_scalar_index_retained_after_delete_all(
2903 #[case] column_name: &str,
2904 #[case] index_type: IndexType,
2905 #[case] params: Box<dyn IndexParams>,
2906 ) {
2907 use lance_datagen::{array, BatchCount, ByteCount, RowCount};
2908
2909 let reader = lance_datagen::gen_batch()
2911 .col("i", array::step::<Int32Type>())
2912 .col("text", array::rand_utf8(ByteCount::from(10), false))
2913 .into_reader_rows(RowCount::from(100), BatchCount::from(1));
2914 let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
2915
2916 dataset
2918 .create_index_builder(&[column_name], index_type, params.as_ref())
2919 .name("index".to_string())
2920 .train(true)
2921 .await
2922 .unwrap();
2923
2924 let stats = dataset.index_statistics("index").await.unwrap();
2926 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2927 assert_eq!(
2928 stats["num_indexed_rows"], 100,
2929 "Index should have indexed all 100 rows"
2930 );
2931
2932 let plan = if column_name == "text" {
2934 dataset
2936 .scan()
2937 .full_text_search(FullTextSearchQuery::new("test".to_string()))
2938 .unwrap()
2939 .explain_plan(false)
2940 .await
2941 .unwrap()
2942 } else {
2943 dataset
2945 .scan()
2946 .filter(format!("{} = 50", column_name).as_str())
2947 .unwrap()
2948 .explain_plan(false)
2949 .await
2950 .unwrap()
2951 };
2952 assert_index_usage(&plan, column_name, true, "before delete");
2954
2955 let indexes = dataset.load_indices().await.unwrap();
2956 let original_index = indexes[0].clone();
2957
2958 dataset.delete("true").await.unwrap();
2960
2961 let row_count = dataset.count_rows(None).await.unwrap();
2963 assert_eq!(row_count, 0, "Table should be empty after delete all");
2964
2965 let indices_after_delete = dataset.load_indices().await.unwrap();
2967 assert_eq!(
2968 indices_after_delete.len(),
2969 1,
2970 "Index should be retained after deleting all data"
2971 );
2972 assert_eq!(
2973 indices_after_delete[0].name, "index",
2974 "Index name should remain the same after delete"
2975 );
2976
2977 let index_after_delete = &indices_after_delete[0];
2979 let effective_bitmap = index_after_delete
2980 .effective_fragment_bitmap(&dataset.fragment_bitmap)
2981 .unwrap();
2982 assert!(
2983 effective_bitmap.is_empty(),
2984 "Effective bitmap should be empty after deleting all data"
2985 );
2986 assert_eq!(
2987 index_after_delete.fragment_bitmap, original_index.fragment_bitmap,
2988 "Fragment bitmap should remain the same after delete"
2989 );
2990
2991 let stats = dataset.index_statistics("index").await.unwrap();
2993 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2994 assert_eq!(
2995 stats["num_indexed_rows"], 0,
2996 "Index should now report zero indexed rows after delete all"
2997 );
2998 assert_eq!(
2999 stats["num_unindexed_rows"], 0,
3000 "Index should report zero unindexed rows after delete all"
3001 );
3002 assert_eq!(
3003 stats["num_indexed_fragments"], 0,
3004 "Index should report zero indexed fragments after delete all"
3005 );
3006 assert_eq!(
3007 stats["num_unindexed_fragments"], 0,
3008 "Index should report zero unindexed fragments after delete all"
3009 );
3010
3011 if column_name == "text" {
3013 let _plan_after_delete = dataset
3017 .scan()
3018 .project(&[column_name])
3019 .unwrap()
3020 .full_text_search(FullTextSearchQuery::new("test".to_string()))
3021 .unwrap()
3022 .explain_plan(false)
3023 .await
3024 .unwrap();
3025 assert_index_usage(
3026 &_plan_after_delete,
3027 column_name,
3028 true,
3029 "after delete (empty bitmap)",
3030 );
3031 } else {
3032 let _plan_after_delete = dataset
3034 .scan()
3035 .filter(format!("{} = 50", column_name).as_str())
3036 .unwrap()
3037 .explain_plan(false)
3038 .await
3039 .unwrap();
3040 assert_index_usage(
3042 &_plan_after_delete,
3043 column_name,
3044 false,
3045 "after delete (empty bitmap)",
3046 );
3047 }
3048
3049 let append_reader = lance_datagen::gen_batch()
3051 .col("i", array::step::<Int32Type>())
3052 .col("text", array::rand_utf8(ByteCount::from(10), false))
3053 .into_reader_rows(RowCount::from(50), BatchCount::from(1));
3054
3055 dataset.append(append_reader, None).await.unwrap();
3056
3057 let indices_after_append = dataset.load_indices().await.unwrap();
3059 assert_eq!(
3060 indices_after_append.len(),
3061 1,
3062 "Index should still exist after appending to empty table"
3063 );
3064 let stats = dataset.index_statistics("index").await.unwrap();
3065 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3066 assert_eq!(
3067 stats["num_indexed_rows"], 0,
3068 "Index should now report zero indexed rows after data is added"
3069 );
3070
3071 dataset.optimize_indices(&Default::default()).await.unwrap();
3073
3074 let indices_after_optimize = dataset.load_indices().await.unwrap();
3076 assert_eq!(
3077 indices_after_optimize.len(),
3078 1,
3079 "Index should still exist after optimization following delete all"
3080 );
3081
3082 let stats = dataset.index_statistics("index").await.unwrap();
3084 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3085 assert_eq!(
3086 stats["num_indexed_rows"],
3087 dataset.count_rows(None).await.unwrap(),
3088 "Index should now cover all newly added rows after optimization"
3089 );
3090 }
3091
3092 #[rstest]
3100 #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
3101 #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
3102 #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
3103 #[tokio::test]
3104 async fn test_scalar_index_retained_after_update(
3105 #[case] column_name: &str,
3106 #[case] index_type: IndexType,
3107 #[case] params: Box<dyn IndexParams>,
3108 ) {
3109 use crate::dataset::UpdateBuilder;
3110 use lance_datagen::{array, BatchCount, ByteCount, RowCount};
3111
3112 let reader = lance_datagen::gen_batch()
3114 .col("i", array::step::<Int32Type>())
3115 .col("text", array::rand_utf8(ByteCount::from(10), false))
3116 .into_reader_rows(RowCount::from(100), BatchCount::from(1));
3117 let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
3118
3119 dataset
3121 .create_index_builder(&[column_name], index_type, params.as_ref())
3122 .name("index".to_string())
3123 .train(true)
3124 .await
3125 .unwrap();
3126
3127 let stats = dataset.index_statistics("index").await.unwrap();
3129 let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3130 assert_eq!(
3131 stats["num_indexed_rows"], 100,
3132 "Index should have indexed all 100 rows"
3133 );
3134
3135 let plan = if column_name == "text" {
3137 dataset
3139 .scan()
3140 .project(&[column_name])
3141 .unwrap()
3142 .full_text_search(FullTextSearchQuery::new("test".to_string()))
3143 .unwrap()
3144 .explain_plan(false)
3145 .await
3146 .unwrap()
3147 } else {
3148 dataset
3150 .scan()
3151 .filter(format!("{} = 50", column_name).as_str())
3152 .unwrap()
3153 .explain_plan(false)
3154 .await
3155 .unwrap()
3156 };
3157 assert_index_usage(&plan, column_name, true, "before update");
3159
3160 let update_result = UpdateBuilder::new(Arc::new(dataset))
3162 .set("i", "i + 1000")
3163 .unwrap()
3164 .set("text", "'updated_' || text")
3165 .unwrap()
3166 .build()
3167 .unwrap()
3168 .execute()
3169 .await
3170 .unwrap();
3171
3172 let mut dataset = update_result.new_dataset.as_ref().clone();
3173
3174 let row_count = dataset.count_rows(None).await.unwrap();
3176 assert_eq!(row_count, 100, "Row count should remain 100 after update");
3177
3178 let indices_after_update = dataset.load_indices().await.unwrap();
3180 assert_eq!(
3181 indices_after_update.len(),
3182 1,
3183 "Index should be retained after updating rows"
3184 );
3185
3186 let indices = dataset.load_indices().await.unwrap();
3188 let index = &indices[0];
3189 let effective_bitmap = index
3190 .effective_fragment_bitmap(&dataset.fragment_bitmap)
3191 .unwrap();
3192 assert!(
3193 effective_bitmap.is_empty(),
3194 "Effective fragment bitmap should be empty after updating all data"
3195 );
3196
3197 let stats_after_update = dataset.index_statistics("index").await.unwrap();
3199 let stats_after_update: serde_json::Value =
3200 serde_json::from_str(&stats_after_update).unwrap();
3201
3202 assert_eq!(
3204 stats_after_update["num_indexed_rows"], 0,
3205 "Index statistics should be zero after update, as it is not re-trained"
3206 );
3207
3208 if column_name == "text" {
3210 let _plan_after_update = dataset
3216 .scan()
3217 .project(&[column_name])
3218 .unwrap()
3219 .full_text_search(FullTextSearchQuery::new("test".to_string()))
3220 .unwrap()
3221 .explain_plan(false)
3222 .await
3223 .unwrap();
3224 assert_index_usage(
3225 &_plan_after_update,
3226 column_name,
3227 true, "after update (empty bitmap)",
3229 );
3230 } else {
3231 let _plan_after_update = dataset
3233 .scan()
3234 .filter(format!("{} = 50", column_name).as_str())
3235 .unwrap()
3236 .explain_plan(false)
3237 .await
3238 .unwrap();
3239 assert_index_usage(
3242 &_plan_after_update,
3243 column_name,
3244 false,
3245 "after update (empty effective bitmap)",
3246 );
3247 }
3248
3249 dataset.optimize_indices(&Default::default()).await.unwrap();
3251
3252 let indices_after_optimize = dataset.load_indices().await.unwrap();
3254 assert_eq!(
3255 indices_after_optimize.len(),
3256 1,
3257 "Index should still exist after optimization following update"
3258 );
3259
3260 let stats_after_optimization = dataset.index_statistics("index").await.unwrap();
3261 let stats_after_optimization: serde_json::Value =
3262 serde_json::from_str(&stats_after_optimization).unwrap();
3263
3264 assert_eq!(
3266 stats_after_optimization["num_unindexed_rows"], 0,
3267 "Index should have zero unindexed rows after optimization"
3268 );
3269 }
3270
3271 async fn validate_indices_after_clone(
3273 dataset: &Dataset,
3274 round: usize,
3275 expected_scalar_rows: usize,
3276 dimensions: u32,
3277 ) {
3278 let indices = dataset.load_indices().await.unwrap();
3280 assert_eq!(
3281 indices.len(),
3282 2,
3283 "Round {}: Cloned dataset should have 2 indices",
3284 round
3285 );
3286 let index_names: HashSet<String> = indices.iter().map(|idx| idx.name.clone()).collect();
3287 assert!(
3288 index_names.contains("vector_idx"),
3289 "Round {}: Should contain vector_idx",
3290 round
3291 );
3292 assert!(
3293 index_names.contains("category_idx"),
3294 "Round {}: Should contain category_idx",
3295 round
3296 );
3297
3298 let expected_total_rows = 300 + (round - 1) * 50;
3303 let total_rows = dataset.count_rows(None).await.unwrap();
3304 assert_eq!(
3305 total_rows, expected_total_rows,
3306 "Round {}: Should have {} rows after clone (chain cloning accumulates data)",
3307 round, expected_total_rows
3308 );
3309
3310 let query_vector = generate_random_array(dimensions as usize);
3312 let search_results = dataset
3313 .scan()
3314 .nearest("vector", &query_vector, 5)
3315 .unwrap()
3316 .limit(Some(5), None)
3317 .unwrap()
3318 .try_into_batch()
3319 .await
3320 .unwrap();
3321 assert!(
3322 search_results.num_rows() > 0,
3323 "Round {}: Vector search should return results immediately after clone",
3324 round
3325 );
3326
3327 let scalar_results = dataset
3329 .scan()
3330 .filter("category = 'category_0'")
3331 .unwrap()
3332 .try_into_batch()
3333 .await
3334 .unwrap();
3335
3336 assert_eq!(
3337 expected_scalar_rows,
3338 scalar_results.num_rows(),
3339 "Round {}: Scalar query should return {} results",
3340 round,
3341 expected_scalar_rows
3342 );
3343 }
3344
3345 #[tokio::test]
3346 async fn test_shallow_clone_with_index() {
3347 let test_dir = TempStrDir::default();
3348 let test_uri = &test_dir;
3349
3350 let dimensions = 16u32;
3352 let data = gen_batch()
3354 .col("id", array::step::<Int32Type>())
3355 .col("category", array::fill_utf8("category_0".to_string()))
3356 .col(
3357 "vector",
3358 array::rand_vec::<Float32Type>(Dimension::from(dimensions)),
3359 )
3360 .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3361
3362 let mut dataset = Dataset::write(data, test_uri, None).await.unwrap();
3364 let vector_params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10);
3366 dataset
3367 .create_index(
3368 &["vector"],
3369 IndexType::Vector,
3370 Some("vector_idx".to_string()),
3371 &vector_params,
3372 true,
3373 )
3374 .await
3375 .unwrap();
3376
3377 dataset
3379 .create_index(
3380 &["category"],
3381 IndexType::BTree,
3382 Some("category_idx".to_string()),
3383 &ScalarIndexParams::default(),
3384 true,
3385 )
3386 .await
3387 .unwrap();
3388
3389 let indices = dataset.load_indices().await.unwrap();
3391 assert_eq!(indices.len(), 2, "Should have 2 indices");
3392 let index_names: HashSet<String> = indices.iter().map(|idx| idx.name.clone()).collect();
3393 assert!(index_names.contains("vector_idx"));
3394 assert!(index_names.contains("category_idx"));
3395
3396 let scalar_results = dataset
3398 .scan()
3399 .filter("category = 'category_0'")
3400 .unwrap()
3401 .try_into_batch()
3402 .await
3403 .unwrap();
3404
3405 let source_scalar_query_rows = scalar_results.num_rows();
3406 assert!(
3407 scalar_results.num_rows() > 0,
3408 "Scalar query should return results"
3409 );
3410
3411 let clone_rounds = 3;
3413 let mut current_dataset = dataset;
3414
3415 for round in 1..=clone_rounds {
3416 let round_clone_dir = format!("{}/clone_round_{}", test_dir, round);
3417 let round_cloned_uri = &round_clone_dir;
3418 let tag_name = format!("shallow_clone_test_{}", round);
3419
3420 let current_version = current_dataset.version().version;
3422 current_dataset
3423 .tags()
3424 .create(&tag_name, current_version)
3425 .await
3426 .unwrap();
3427
3428 let mut round_cloned_dataset = current_dataset
3430 .shallow_clone(round_cloned_uri, tag_name.as_str(), None)
3431 .await
3432 .unwrap();
3433
3434 validate_indices_after_clone(
3436 &round_cloned_dataset,
3437 round,
3438 source_scalar_query_rows,
3439 dimensions,
3440 )
3441 .await;
3442
3443 let new_data = gen_batch()
3446 .col(
3447 "id",
3448 array::step_custom::<Int32Type>(300 + (round * 50) as i32, 1),
3449 )
3450 .col("category", array::fill_utf8(format!("category_{}", round)))
3451 .col(
3452 "vector",
3453 array::rand_vec::<Float32Type>(Dimension::from(dimensions)),
3454 )
3455 .into_reader_rows(RowCount::from(50), BatchCount::from(1));
3456
3457 round_cloned_dataset = Dataset::write(
3458 new_data,
3459 round_cloned_uri,
3460 Some(WriteParams {
3461 mode: WriteMode::Append,
3462 ..Default::default()
3463 }),
3464 )
3465 .await
3466 .unwrap();
3467
3468 let expected_rows = 300 + round * 50;
3470 let total_rows = round_cloned_dataset.count_rows(None).await.unwrap();
3471 assert_eq!(
3472 total_rows, expected_rows,
3473 "Round {}: Should have {} rows after append",
3474 round, expected_rows
3475 );
3476
3477 let indices_before_optimize = round_cloned_dataset.load_indices().await.unwrap();
3478 let vector_idx_before = indices_before_optimize
3479 .iter()
3480 .find(|idx| idx.name == "vector_idx")
3481 .unwrap();
3482 let category_idx_before = indices_before_optimize
3483 .iter()
3484 .find(|idx| idx.name == "category_idx")
3485 .unwrap();
3486
3487 round_cloned_dataset
3489 .optimize_indices(&OptimizeOptions::default())
3490 .await
3491 .unwrap();
3492
3493 let optimized_indices = round_cloned_dataset.load_indices().await.unwrap();
3495 let new_vector_idx = optimized_indices
3496 .iter()
3497 .find(|idx| idx.name == "vector_idx")
3498 .unwrap();
3499 let new_category_idx = optimized_indices
3500 .iter()
3501 .find(|idx| idx.name == "category_idx")
3502 .unwrap();
3503
3504 assert_ne!(
3505 new_vector_idx.uuid, vector_idx_before.uuid,
3506 "Round {}: Vector index should have a new UUID after optimization",
3507 round
3508 );
3509 assert_ne!(
3510 new_category_idx.uuid, category_idx_before.uuid,
3511 "Round {}: Category index should have a new UUID after optimization",
3512 round
3513 );
3514
3515 use std::path::PathBuf;
3517 let clone_indices_dir = PathBuf::from(round_cloned_uri).join("_indices");
3518 let vector_index_dir = clone_indices_dir.join(new_vector_idx.uuid.to_string());
3519 let category_index_dir = clone_indices_dir.join(new_category_idx.uuid.to_string());
3520
3521 assert!(
3522 vector_index_dir.exists(),
3523 "Round {}: New vector index directory should exist in cloned dataset location: {:?}",
3524 round, vector_index_dir
3525 );
3526 assert!(
3527 category_index_dir.exists(),
3528 "Round {}: New category index directory should exist in cloned dataset location: {:?}",
3529 round, category_index_dir
3530 );
3531
3532 assert!(
3534 new_vector_idx.base_id.is_none(),
3535 "Round {}: New vector index should not have base_id after optimization in cloned dataset",
3536 round
3537 );
3538 assert!(
3539 new_category_idx.base_id.is_none(),
3540 "Round {}: New category index should not have base_id after optimization in cloned dataset",
3541 round
3542 );
3543
3544 let original_indices_dir = PathBuf::from(current_dataset.uri()).join("_indices");
3546 let wrong_vector_dir = original_indices_dir.join(new_vector_idx.uuid.to_string());
3547 let wrong_category_dir = original_indices_dir.join(new_category_idx.uuid.to_string());
3548
3549 assert!(
3550 !wrong_vector_dir.exists(),
3551 "Round {}: New vector index should NOT be in original dataset location: {:?}",
3552 round,
3553 wrong_vector_dir
3554 );
3555 assert!(
3556 !wrong_category_dir.exists(),
3557 "Round {}: New category index should NOT be in original dataset location: {:?}",
3558 round,
3559 wrong_category_dir
3560 );
3561
3562 let old_category_results = round_cloned_dataset
3564 .scan()
3565 .filter("category = 'category_0'")
3566 .unwrap()
3567 .try_into_batch()
3568 .await
3569 .unwrap();
3570
3571 let new_category_results = round_cloned_dataset
3572 .scan()
3573 .filter(&format!("category = 'category_{}'", round))
3574 .unwrap()
3575 .try_into_batch()
3576 .await
3577 .unwrap();
3578
3579 assert_eq!(
3580 source_scalar_query_rows,
3581 old_category_results.num_rows(),
3582 "Round {}: Should find old category data with {} rows",
3583 round,
3584 source_scalar_query_rows
3585 );
3586 assert!(
3587 new_category_results.num_rows() > 0,
3588 "Round {}: Should find new category data",
3589 round
3590 );
3591
3592 let query_vector = generate_random_array(dimensions as usize);
3594 let search_results = round_cloned_dataset
3595 .scan()
3596 .nearest("vector", &query_vector, 10)
3597 .unwrap()
3598 .limit(Some(10), None)
3599 .unwrap()
3600 .try_into_batch()
3601 .await
3602 .unwrap();
3603
3604 assert!(
3605 search_results.num_rows() > 0,
3606 "Round {}: Vector search should return results after optimization",
3607 round
3608 );
3609
3610 let vector_stats: serde_json::Value = serde_json::from_str(
3612 &round_cloned_dataset
3613 .index_statistics("vector_idx")
3614 .await
3615 .unwrap(),
3616 )
3617 .unwrap();
3618 let category_stats: serde_json::Value = serde_json::from_str(
3619 &round_cloned_dataset
3620 .index_statistics("category_idx")
3621 .await
3622 .unwrap(),
3623 )
3624 .unwrap();
3625
3626 assert_eq!(
3627 vector_stats["num_indexed_rows"].as_u64().unwrap(),
3628 expected_rows as u64,
3629 "Round {}: Vector index should have {} indexed rows",
3630 round,
3631 expected_rows
3632 );
3633 assert_eq!(
3634 category_stats["num_indexed_rows"].as_u64().unwrap(),
3635 expected_rows as u64,
3636 "Round {}: Category index should have {} indexed rows",
3637 round,
3638 expected_rows
3639 );
3640
3641 current_dataset = round_cloned_dataset;
3643 }
3644
3645 let final_cloned_dataset = current_dataset;
3647
3648 let cloned_indices = final_cloned_dataset.load_indices().await.unwrap();
3650 assert_eq!(
3651 cloned_indices.len(),
3652 2,
3653 "Final cloned dataset should have 2 indices"
3654 );
3655 let cloned_index_names: HashSet<String> =
3656 cloned_indices.iter().map(|idx| idx.name.clone()).collect();
3657 assert!(cloned_index_names.contains("vector_idx"));
3658 assert!(cloned_index_names.contains("category_idx"));
3659
3660 let query_vector = generate_random_array(dimensions as usize);
3662 let search_results = final_cloned_dataset
3663 .scan()
3664 .nearest("vector", &query_vector, 5)
3665 .unwrap()
3666 .limit(Some(5), None)
3667 .unwrap()
3668 .try_into_batch()
3669 .await
3670 .unwrap();
3671
3672 assert!(
3673 search_results.num_rows() > 0,
3674 "Vector search should return results on final dataset"
3675 );
3676
3677 let scalar_results = final_cloned_dataset
3679 .scan()
3680 .filter("category = 'category_0'")
3681 .unwrap()
3682 .try_into_batch()
3683 .await
3684 .unwrap();
3685
3686 assert_eq!(
3687 source_scalar_query_rows,
3688 scalar_results.num_rows(),
3689 "Scalar query should return results on final dataset"
3690 );
3691 }
3692
3693 #[tokio::test]
3694 async fn test_initialize_indices() {
3695 use crate::dataset::Dataset;
3696 use arrow_array::types::Float32Type;
3697 use lance_core::utils::tempfile::TempStrDir;
3698 use lance_datagen::{array, BatchCount, RowCount};
3699 use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
3700 use lance_linalg::distance::MetricType;
3701 use std::collections::HashSet;
3702
3703 let test_dir = TempStrDir::default();
3705 let source_uri = format!("{}/{}", test_dir, "source");
3706 let target_uri = format!("{}/{}", test_dir, "target");
3707
3708 let source_reader = lance_datagen::gen_batch()
3710 .col("vector", array::rand_vec::<Float32Type>(8.into()))
3711 .col(
3712 "text",
3713 array::cycle_utf8_literals(&["hello world", "foo bar", "test data"]),
3714 )
3715 .col("id", array::step::<Int32Type>())
3716 .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3717
3718 let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
3720 .await
3721 .unwrap();
3722
3723 let vector_params = VectorIndexParams::ivf_pq(4, 8, 2, MetricType::L2, 10);
3726 source_dataset
3727 .create_index(
3728 &["vector"],
3729 IndexType::Vector,
3730 Some("vec_idx".to_string()),
3731 &vector_params,
3732 false,
3733 )
3734 .await
3735 .unwrap();
3736
3737 let fts_params = InvertedIndexParams::default();
3739 source_dataset
3740 .create_index(
3741 &["text"],
3742 IndexType::Inverted,
3743 Some("text_idx".to_string()),
3744 &fts_params,
3745 false,
3746 )
3747 .await
3748 .unwrap();
3749
3750 let scalar_params = ScalarIndexParams::default();
3752 source_dataset
3753 .create_index(
3754 &["id"],
3755 IndexType::BTree,
3756 Some("id_idx".to_string()),
3757 &scalar_params,
3758 false,
3759 )
3760 .await
3761 .unwrap();
3762
3763 let source_dataset = Dataset::open(&source_uri).await.unwrap();
3765
3766 let source_indices = source_dataset.load_indices().await.unwrap();
3768 assert_eq!(
3769 source_indices.len(),
3770 3,
3771 "Source dataset should have 3 indices"
3772 );
3773
3774 let target_reader = lance_datagen::gen_batch()
3776 .col("vector", array::rand_vec::<Float32Type>(8.into()))
3777 .col(
3778 "text",
3779 array::cycle_utf8_literals(&["foo bar", "test data", "hello world"]),
3780 )
3781 .col("id", array::step_custom::<Int32Type>(100, 1))
3782 .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3783 let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
3784 .await
3785 .unwrap();
3786
3787 target_dataset
3789 .initialize_indices(&source_dataset)
3790 .await
3791 .unwrap();
3792
3793 let target_indices = target_dataset.load_indices().await.unwrap();
3795 assert_eq!(
3796 target_indices.len(),
3797 3,
3798 "Target dataset should have 3 indices after initialization"
3799 );
3800
3801 let source_names: HashSet<String> =
3803 source_indices.iter().map(|idx| idx.name.clone()).collect();
3804 let target_names: HashSet<String> =
3805 target_indices.iter().map(|idx| idx.name.clone()).collect();
3806 assert_eq!(
3807 source_names, target_names,
3808 "Index names should match between source and target"
3809 );
3810
3811 let query_vector = generate_random_array(8);
3814 let search_results = target_dataset
3815 .scan()
3816 .nearest("vector", &query_vector, 5)
3817 .unwrap()
3818 .limit(Some(5), None)
3819 .unwrap()
3820 .try_into_batch()
3821 .await
3822 .unwrap();
3823 assert!(
3824 search_results.num_rows() > 0,
3825 "Vector index should be functional"
3826 );
3827
3828 let scalar_results = target_dataset
3830 .scan()
3831 .filter("id = 125")
3832 .unwrap()
3833 .try_into_batch()
3834 .await
3835 .unwrap();
3836 assert_eq!(
3837 scalar_results.num_rows(),
3838 1,
3839 "Scalar index should find exact match"
3840 );
3841 }
3842
3843 #[tokio::test]
3844 async fn test_initialize_indices_with_missing_field() {
3845 use crate::dataset::Dataset;
3846 use arrow_array::types::Int32Type;
3847 use lance_core::utils::tempfile::TempStrDir;
3848 use lance_datagen::{array, BatchCount, RowCount};
3849 use lance_index::scalar::ScalarIndexParams;
3850
3851 let test_dir = TempStrDir::default();
3853 let source_uri = format!("{}/{}", test_dir, "source");
3854 let target_uri = format!("{}/{}", test_dir, "target");
3855
3856 let source_reader = lance_datagen::gen_batch()
3858 .col("id", array::step::<Int32Type>())
3859 .col("extra", array::cycle_utf8_literals(&["test"]))
3860 .into_reader_rows(RowCount::from(10), BatchCount::from(1));
3861 let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
3862 .await
3863 .unwrap();
3864
3865 source_dataset
3867 .create_index(
3868 &["extra"],
3869 IndexType::BTree,
3870 None,
3871 &ScalarIndexParams::default(),
3872 false,
3873 )
3874 .await
3875 .unwrap();
3876
3877 let target_reader = lance_datagen::gen_batch()
3879 .col("id", array::step_custom::<Int32Type>(10, 1))
3880 .into_reader_rows(RowCount::from(10), BatchCount::from(1));
3881 let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
3882 .await
3883 .unwrap();
3884
3885 let result = target_dataset.initialize_indices(&source_dataset).await;
3887
3888 assert!(result.is_err(), "Should error when field is missing");
3890 assert!(result
3891 .unwrap_err()
3892 .to_string()
3893 .contains("not found in target dataset"));
3894 }
3895
3896 #[tokio::test]
3897 async fn test_initialize_single_index() {
3898 use crate::dataset::Dataset;
3899 use crate::index::vector::VectorIndexParams;
3900 use arrow_array::types::{Float32Type, Int32Type};
3901 use lance_core::utils::tempfile::TempStrDir;
3902 use lance_datagen::{array, BatchCount, RowCount};
3903 use lance_index::scalar::ScalarIndexParams;
3904 use lance_linalg::distance::MetricType;
3905
3906 let test_dir = TempStrDir::default();
3907 let source_uri = format!("{}/{}", test_dir, "source");
3908 let target_uri = format!("{}/{}", test_dir, "target");
3909
3910 let source_reader = lance_datagen::gen_batch()
3912 .col("id", array::step::<Int32Type>())
3913 .col("name", array::rand_utf8(4.into(), false))
3914 .col("vector", array::rand_vec::<Float32Type>(8.into()))
3915 .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3916 let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
3917 .await
3918 .unwrap();
3919
3920 let scalar_params = ScalarIndexParams::default();
3922 source_dataset
3923 .create_index(
3924 &["id"],
3925 IndexType::BTree,
3926 Some("id_index".to_string()),
3927 &scalar_params,
3928 false,
3929 )
3930 .await
3931 .unwrap();
3932
3933 let vector_params = VectorIndexParams::ivf_pq(16, 8, 4, MetricType::L2, 50);
3934 source_dataset
3935 .create_index(
3936 &["vector"],
3937 IndexType::Vector,
3938 Some("vector_index".to_string()),
3939 &vector_params,
3940 false,
3941 )
3942 .await
3943 .unwrap();
3944
3945 let source_dataset = Dataset::open(&source_uri).await.unwrap();
3947
3948 let target_reader = lance_datagen::gen_batch()
3950 .col("id", array::step::<Int32Type>())
3951 .col("name", array::rand_utf8(4.into(), false))
3952 .col("vector", array::rand_vec::<Float32Type>(8.into()))
3953 .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3954 let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
3955 .await
3956 .unwrap();
3957
3958 target_dataset
3960 .initialize_index(&source_dataset, "vector_index")
3961 .await
3962 .unwrap();
3963
3964 let target_indices = target_dataset.load_indices().await.unwrap();
3966 assert_eq!(target_indices.len(), 1, "Should have only 1 index");
3967 assert_eq!(
3968 target_indices[0].name, "vector_index",
3969 "Should have the vector index"
3970 );
3971
3972 target_dataset
3974 .initialize_index(&source_dataset, "id_index")
3975 .await
3976 .unwrap();
3977
3978 let target_indices = target_dataset.load_indices().await.unwrap();
3980 assert_eq!(target_indices.len(), 2, "Should have 2 indices");
3981
3982 let index_names: HashSet<String> =
3983 target_indices.iter().map(|idx| idx.name.clone()).collect();
3984 assert!(
3985 index_names.contains("vector_index"),
3986 "Should have vector index"
3987 );
3988 assert!(index_names.contains("id_index"), "Should have id index");
3989
3990 let result = target_dataset
3992 .initialize_index(&source_dataset, "non_existent")
3993 .await;
3994 assert!(result.is_err(), "Should error for non-existent index");
3995 assert!(result
3996 .unwrap_err()
3997 .to_string()
3998 .contains("not found in source dataset"));
3999 }
4000
4001 #[tokio::test]
4002 async fn test_vector_index_on_nested_field_with_dots() {
4003 let dimensions = 16;
4004 let num_rows = 256;
4005
4006 let struct_field = Field::new(
4008 "embedding_data",
4009 DataType::Struct(
4010 vec![
4011 Field::new(
4012 "vector.v1", DataType::FixedSizeList(
4014 Arc::new(Field::new("item", DataType::Float32, true)),
4015 dimensions,
4016 ),
4017 false,
4018 ),
4019 Field::new(
4020 "vector.v2", DataType::FixedSizeList(
4022 Arc::new(Field::new("item", DataType::Float32, true)),
4023 dimensions,
4024 ),
4025 false,
4026 ),
4027 Field::new("metadata", DataType::Utf8, false),
4028 ]
4029 .into(),
4030 ),
4031 false,
4032 );
4033
4034 let schema = Arc::new(Schema::new(vec![
4035 Field::new("id", DataType::Int32, false),
4036 struct_field,
4037 ]));
4038
4039 let float_arr_v1 = generate_random_array(num_rows * dimensions as usize);
4041 let vectors_v1 = FixedSizeListArray::try_new_from_values(float_arr_v1, dimensions).unwrap();
4042
4043 let float_arr_v2 = generate_random_array(num_rows * dimensions as usize);
4044 let vectors_v2 = FixedSizeListArray::try_new_from_values(float_arr_v2, dimensions).unwrap();
4045
4046 let ids = Int32Array::from_iter_values(0..num_rows as i32);
4047 let metadata = StringArray::from_iter_values((0..num_rows).map(|i| format!("meta_{}", i)));
4048
4049 let struct_array = arrow_array::StructArray::from(vec![
4050 (
4051 Arc::new(Field::new(
4052 "vector.v1",
4053 DataType::FixedSizeList(
4054 Arc::new(Field::new("item", DataType::Float32, true)),
4055 dimensions,
4056 ),
4057 false,
4058 )),
4059 Arc::new(vectors_v1) as Arc<dyn arrow_array::Array>,
4060 ),
4061 (
4062 Arc::new(Field::new(
4063 "vector.v2",
4064 DataType::FixedSizeList(
4065 Arc::new(Field::new("item", DataType::Float32, true)),
4066 dimensions,
4067 ),
4068 false,
4069 )),
4070 Arc::new(vectors_v2) as Arc<dyn arrow_array::Array>,
4071 ),
4072 (
4073 Arc::new(Field::new("metadata", DataType::Utf8, false)),
4074 Arc::new(metadata) as Arc<dyn arrow_array::Array>,
4075 ),
4076 ]);
4077
4078 let batch =
4079 RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4080 .unwrap();
4081
4082 let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4083
4084 let test_dir = TempStrDir::default();
4085 let test_uri = &test_dir;
4086 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4087
4088 let nested_column_path_v1 = "embedding_data.`vector.v1`";
4090 let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
4091
4092 dataset
4093 .create_index(
4094 &[nested_column_path_v1],
4095 IndexType::Vector,
4096 Some("vec_v1_idx".to_string()),
4097 ¶ms,
4098 true,
4099 )
4100 .await
4101 .unwrap();
4102
4103 let indices = dataset.load_indices().await.unwrap();
4105 assert_eq!(indices.len(), 1);
4106 assert_eq!(indices[0].name, "vec_v1_idx");
4107
4108 let field_id = indices[0].fields[0];
4110 let field_path = dataset.schema().field_path(field_id).unwrap();
4111 assert_eq!(field_path, "embedding_data.`vector.v1`");
4112
4113 let nested_column_path_v2 = "embedding_data.`vector.v2`";
4115 dataset
4116 .create_index(
4117 &[nested_column_path_v2],
4118 IndexType::Vector,
4119 Some("vec_v2_idx".to_string()),
4120 ¶ms,
4121 true,
4122 )
4123 .await
4124 .unwrap();
4125
4126 let indices = dataset.load_indices().await.unwrap();
4128 assert_eq!(indices.len(), 2);
4129
4130 let query_vector = generate_random_array(dimensions as usize);
4132
4133 let plan_v1 = dataset
4135 .scan()
4136 .nearest(nested_column_path_v1, &query_vector, 5)
4137 .unwrap()
4138 .explain_plan(false)
4139 .await
4140 .unwrap();
4141
4142 assert!(
4144 plan_v1.contains("ANNSubIndex") || plan_v1.contains("ANNIvfPartition"),
4145 "Query plan should use vector index for nested field with dots. Plan: {}",
4146 plan_v1
4147 );
4148
4149 let search_results_v1 = dataset
4150 .scan()
4151 .nearest(nested_column_path_v1, &query_vector, 5)
4152 .unwrap()
4153 .try_into_batch()
4154 .await
4155 .unwrap();
4156
4157 assert_eq!(search_results_v1.num_rows(), 5);
4158
4159 let plan_v2 = dataset
4161 .scan()
4162 .nearest(nested_column_path_v2, &query_vector, 5)
4163 .unwrap()
4164 .explain_plan(false)
4165 .await
4166 .unwrap();
4167
4168 assert!(
4170 plan_v2.contains("ANNSubIndex") || plan_v2.contains("ANNIvfPartition"),
4171 "Query plan should use vector index for second nested field with dots. Plan: {}",
4172 plan_v2
4173 );
4174
4175 let search_results_v2 = dataset
4176 .scan()
4177 .nearest(nested_column_path_v2, &query_vector, 5)
4178 .unwrap()
4179 .try_into_batch()
4180 .await
4181 .unwrap();
4182
4183 assert_eq!(search_results_v2.num_rows(), 5);
4184 }
4185
4186 #[tokio::test]
4187 async fn test_btree_index_on_nested_field_with_dots() {
4188 let test_dir = TempStrDir::default();
4190 let test_uri = &test_dir;
4191
4192 let schema = Arc::new(Schema::new(vec![
4194 Field::new("id", DataType::Int32, false),
4195 Field::new(
4196 "data",
4197 DataType::Struct(
4198 vec![
4199 Field::new("value.v1", DataType::Int32, false),
4200 Field::new("value.v2", DataType::Float32, false),
4201 Field::new("text", DataType::Utf8, false),
4202 ]
4203 .into(),
4204 ),
4205 false,
4206 ),
4207 ]));
4208
4209 let num_rows = 1000;
4211 let ids = Int32Array::from_iter_values(0..num_rows);
4212 let values_v1 = Int32Array::from_iter_values((0..num_rows).map(|i| i % 100));
4213 let values_v2 = Float32Array::from_iter_values((0..num_rows).map(|i| (i as f32) * 0.1));
4214 let texts = StringArray::from_iter_values((0..num_rows).map(|i| format!("text_{}", i)));
4215
4216 let struct_array = arrow_array::StructArray::from(vec![
4217 (
4218 Arc::new(Field::new("value.v1", DataType::Int32, false)),
4219 Arc::new(values_v1) as Arc<dyn arrow_array::Array>,
4220 ),
4221 (
4222 Arc::new(Field::new("value.v2", DataType::Float32, false)),
4223 Arc::new(values_v2) as Arc<dyn arrow_array::Array>,
4224 ),
4225 (
4226 Arc::new(Field::new("text", DataType::Utf8, false)),
4227 Arc::new(texts) as Arc<dyn arrow_array::Array>,
4228 ),
4229 ]);
4230
4231 let batch =
4232 RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4233 .unwrap();
4234
4235 let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4236 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4237
4238 let nested_column_path = "data.`value.v1`";
4240 let params = ScalarIndexParams::default();
4241
4242 dataset
4243 .create_index(
4244 &[nested_column_path],
4245 IndexType::BTree,
4246 Some("btree_v1_idx".to_string()),
4247 ¶ms,
4248 true,
4249 )
4250 .await
4251 .unwrap();
4252
4253 dataset = Dataset::open(test_uri).await.unwrap();
4255
4256 let indices = dataset.load_indices().await.unwrap();
4258 assert_eq!(indices.len(), 1);
4259 assert_eq!(indices[0].name, "btree_v1_idx");
4260
4261 let field_id = indices[0].fields[0];
4263 let field_path = dataset.schema().field_path(field_id).unwrap();
4264 assert_eq!(field_path, "data.`value.v1`");
4265
4266 let plan = dataset
4268 .scan()
4269 .filter("data.`value.v1` = 42")
4270 .unwrap()
4271 .prefilter(true)
4272 .explain_plan(false)
4273 .await
4274 .unwrap();
4275
4276 assert!(
4279 plan.contains("ScalarIndexQuery"),
4280 "Query plan should show optimized read. Plan: {}",
4281 plan
4282 );
4283
4284 let results = dataset
4286 .scan()
4287 .filter("data.`value.v1` = 42")
4288 .unwrap()
4289 .prefilter(true)
4290 .try_into_batch()
4291 .await
4292 .unwrap();
4293
4294 assert!(results.num_rows() > 0);
4295 }
4296
4297 #[tokio::test]
4298 async fn test_bitmap_index_on_nested_field_with_dots() {
4299 let test_dir = TempStrDir::default();
4301 let test_uri = &test_dir;
4302
4303 let schema = Arc::new(Schema::new(vec![
4305 Field::new("id", DataType::Int32, false),
4306 Field::new(
4307 "metadata",
4308 DataType::Struct(
4309 vec![
4310 Field::new("status.code", DataType::Int32, false),
4311 Field::new("category.name", DataType::Utf8, false),
4312 ]
4313 .into(),
4314 ),
4315 false,
4316 ),
4317 ]));
4318
4319 let num_rows = 1000;
4321 let ids = Int32Array::from_iter_values(0..num_rows);
4322 let status_codes = Int32Array::from_iter_values((0..num_rows).map(|i| i % 10));
4324 let categories =
4326 StringArray::from_iter_values((0..num_rows).map(|i| format!("category_{}", i % 5)));
4327
4328 let struct_array = arrow_array::StructArray::from(vec![
4329 (
4330 Arc::new(Field::new("status.code", DataType::Int32, false)),
4331 Arc::new(status_codes) as Arc<dyn arrow_array::Array>,
4332 ),
4333 (
4334 Arc::new(Field::new("category.name", DataType::Utf8, false)),
4335 Arc::new(categories) as Arc<dyn arrow_array::Array>,
4336 ),
4337 ]);
4338
4339 let batch =
4340 RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4341 .unwrap();
4342
4343 let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4344 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4345
4346 let nested_column_path = "metadata.`status.code`";
4348 let params = ScalarIndexParams::default();
4349
4350 dataset
4351 .create_index(
4352 &[nested_column_path],
4353 IndexType::Bitmap,
4354 Some("bitmap_status_idx".to_string()),
4355 ¶ms,
4356 true,
4357 )
4358 .await
4359 .unwrap();
4360
4361 dataset = Dataset::open(test_uri).await.unwrap();
4363
4364 let indices = dataset.load_indices().await.unwrap();
4366 assert_eq!(indices.len(), 1);
4367 assert_eq!(indices[0].name, "bitmap_status_idx");
4368
4369 let field_id = indices[0].fields[0];
4371 let field_path = dataset.schema().field_path(field_id).unwrap();
4372 assert_eq!(field_path, "metadata.`status.code`");
4373
4374 let plan = dataset
4376 .scan()
4377 .filter("metadata.`status.code` = 5")
4378 .unwrap()
4379 .explain_plan(false)
4380 .await
4381 .unwrap();
4382
4383 assert!(
4386 plan.contains("ScalarIndexQuery"),
4387 "Query plan should show optimized read. Plan: {}",
4388 plan
4389 );
4390
4391 let results = dataset
4393 .scan()
4394 .filter("metadata.`status.code` = 5")
4395 .unwrap()
4396 .try_into_batch()
4397 .await
4398 .unwrap();
4399
4400 assert!(results.num_rows() > 0);
4402 assert_eq!(results.num_rows(), 100);
4403 }
4404
4405 #[tokio::test]
4406 async fn test_inverted_index_on_nested_field_with_dots() {
4407 use lance_index::scalar::inverted::tokenizer::InvertedIndexParams;
4408
4409 let test_dir = TempStrDir::default();
4411 let test_uri = &test_dir;
4412
4413 let schema = Arc::new(Schema::new(vec![
4415 Field::new("id", DataType::Int32, false),
4416 Field::new(
4417 "document",
4418 DataType::Struct(
4419 vec![
4420 Field::new("content.text", DataType::Utf8, false),
4421 Field::new("content.summary", DataType::Utf8, false),
4422 ]
4423 .into(),
4424 ),
4425 false,
4426 ),
4427 ]));
4428
4429 let num_rows = 100;
4431 let ids = Int32Array::from_iter_values(0..num_rows as i32);
4432 let content_texts = StringArray::from_iter_values((0..num_rows).map(|i| match i % 3 {
4433 0 => format!("The quick brown fox jumps over the lazy dog {}", i),
4434 1 => format!(
4435 "Machine learning and artificial intelligence document {}",
4436 i
4437 ),
4438 _ => format!("Data science and analytics content piece {}", i),
4439 }));
4440 let summaries = StringArray::from_iter_values(
4441 (0..num_rows).map(|i| format!("Summary of document {}", i)),
4442 );
4443
4444 let struct_array = arrow_array::StructArray::from(vec![
4445 (
4446 Arc::new(Field::new("content.text", DataType::Utf8, false)),
4447 Arc::new(content_texts) as Arc<dyn arrow_array::Array>,
4448 ),
4449 (
4450 Arc::new(Field::new("content.summary", DataType::Utf8, false)),
4451 Arc::new(summaries) as Arc<dyn arrow_array::Array>,
4452 ),
4453 ]);
4454
4455 let batch =
4456 RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4457 .unwrap();
4458
4459 let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4460 let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4461
4462 let nested_column_path = "document.`content.text`";
4464 let params = InvertedIndexParams::default();
4465
4466 dataset
4467 .create_index(
4468 &[nested_column_path],
4469 IndexType::Inverted,
4470 Some("inverted_content_idx".to_string()),
4471 ¶ms,
4472 true,
4473 )
4474 .await
4475 .unwrap();
4476
4477 dataset = Dataset::open(test_uri).await.unwrap();
4479
4480 let indices = dataset.load_indices().await.unwrap();
4482 assert_eq!(indices.len(), 1);
4483 assert_eq!(indices[0].name, "inverted_content_idx");
4484
4485 let field_id = indices[0].fields[0];
4487 let field_path = dataset.schema().field_path(field_id).unwrap();
4488 assert_eq!(field_path, "document.`content.text`");
4489
4490 let query = FullTextSearchQuery::new("machine learning".to_string())
4493 .with_column(field_path.clone())
4494 .unwrap();
4495
4496 let plan = dataset
4498 .scan()
4499 .full_text_search(query.clone())
4500 .unwrap()
4501 .explain_plan(false)
4502 .await
4503 .unwrap();
4504
4505 assert!(
4507 plan.contains("MatchQuery") || plan.contains("PhraseQuery"),
4508 "Query plan should use inverted index for nested field with dots. Plan: {}",
4509 plan
4510 );
4511
4512 let results = dataset
4513 .scan()
4514 .full_text_search(query)
4515 .unwrap()
4516 .try_into_stream()
4517 .await
4518 .unwrap()
4519 .try_collect::<Vec<_>>()
4520 .await
4521 .unwrap();
4522
4523 assert!(
4525 !results.is_empty(),
4526 "Full-text search should return results"
4527 );
4528
4529 let mut found_count = 0;
4531 for batch in results {
4532 found_count += batch.num_rows();
4533 }
4534 assert!(
4536 found_count > 0,
4537 "Should find at least some documents with 'machine learning'"
4538 );
4539 assert!(found_count < num_rows, "Should not match all documents");
4540 }
4541}