lance/
index.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Secondary Index
5//!
6
7use std::collections::{HashMap, HashSet};
8use std::sync::{Arc, OnceLock};
9
10use arrow_schema::{DataType, Schema};
11use async_trait::async_trait;
12use datafusion::execution::SendableRecordBatchStream;
13use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
14use futures::{stream, StreamExt, TryStreamExt};
15use itertools::Itertools;
16use lance_core::cache::{CacheKey, UnsizedCacheKey};
17use lance_core::utils::address::RowAddress;
18use lance_core::utils::parse::str_is_truthy;
19use lance_core::utils::tracing::{
20    IO_TYPE_OPEN_FRAG_REUSE, IO_TYPE_OPEN_MEM_WAL, IO_TYPE_OPEN_SCALAR, IO_TYPE_OPEN_VECTOR,
21    TRACE_IO_EVENTS,
22};
23use lance_file::reader::FileReader;
24use lance_file::v2;
25use lance_file::v2::reader::FileReaderOptions;
26use lance_index::frag_reuse::{FragReuseIndex, FRAG_REUSE_INDEX_NAME};
27use lance_index::mem_wal::{MemWalIndex, MEM_WAL_INDEX_NAME};
28use lance_index::optimize::OptimizeOptions;
29use lance_index::pb::index::Implementation;
30use lance_index::scalar::expression::{
31    IndexInformationProvider, MultiQueryParser, ScalarQueryParser,
32};
33use lance_index::scalar::inverted::InvertedIndexPlugin;
34use lance_index::scalar::lance_format::LanceIndexStore;
35use lance_index::scalar::registry::{TrainingCriteria, TrainingOrdering};
36use lance_index::scalar::{CreatedIndex, ScalarIndex};
37use lance_index::vector::bq::builder::RabitQuantizer;
38use lance_index::vector::flat::index::{FlatBinQuantizer, FlatIndex, FlatQuantizer};
39use lance_index::vector::hnsw::HNSW;
40use lance_index::vector::pq::ProductQuantizer;
41use lance_index::vector::sq::ScalarQuantizer;
42pub use lance_index::IndexParams;
43use lance_index::{
44    is_system_index,
45    metrics::{MetricsCollector, NoOpMetricsCollector},
46    ScalarIndexCriteria,
47};
48use lance_index::{pb, vector::VectorIndex, Index, IndexType, INDEX_FILE_NAME};
49use lance_index::{DatasetIndexExt, INDEX_METADATA_SCHEMA_KEY, VECTOR_INDEX_VERSION};
50use lance_io::scheduler::{ScanScheduler, SchedulerConfig};
51use lance_io::traits::Reader;
52use lance_io::utils::{
53    read_last_block, read_message, read_message_from_buf, read_metadata_offset, read_version,
54    CachedFileSize,
55};
56use lance_table::format::IndexMetadata;
57use lance_table::format::{Fragment, SelfDescribingFileReader};
58use lance_table::io::manifest::read_manifest_indexes;
59use roaring::RoaringBitmap;
60use scalar::index_matches_criteria;
61use serde_json::json;
62use snafu::location;
63use tracing::{info, instrument};
64use uuid::Uuid;
65use vector::ivf::v2::IVFIndex;
66use vector::utils::get_vector_type;
67
68pub(crate) mod append;
69mod create;
70pub mod frag_reuse;
71pub mod mem_wal;
72pub mod prefilter;
73pub mod scalar;
74pub mod vector;
75
76use self::append::merge_indices;
77use self::vector::remap_vector_index;
78use crate::dataset::index::LanceIndexStoreExt;
79use crate::dataset::optimize::remapping::RemapResult;
80use crate::dataset::optimize::RemappedIndex;
81use crate::dataset::transaction::{Operation, Transaction};
82use crate::index::frag_reuse::{load_frag_reuse_index_details, open_frag_reuse_index};
83use crate::index::mem_wal::open_mem_wal_index;
84pub use crate::index::prefilter::{FilterLoader, PreFilter};
85use crate::index::scalar::{fetch_index_details, load_training_data, IndexDetails};
86use crate::session::index_caches::{FragReuseIndexKey, IndexMetadataKey};
87use crate::{dataset::Dataset, Error, Result};
88pub use create::CreateIndexBuilder;
89
90// Cache keys for different index types
91#[derive(Debug, Clone)]
92pub struct ScalarIndexCacheKey<'a> {
93    pub uuid: &'a str,
94    pub fri_uuid: Option<&'a Uuid>,
95}
96
97impl<'a> ScalarIndexCacheKey<'a> {
98    pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
99        Self { uuid, fri_uuid }
100    }
101}
102
103impl UnsizedCacheKey for ScalarIndexCacheKey<'_> {
104    type ValueType = dyn ScalarIndex;
105
106    fn key(&self) -> std::borrow::Cow<'_, str> {
107        if let Some(fri_uuid) = self.fri_uuid {
108            format!("{}-{}", self.uuid, fri_uuid).into()
109        } else {
110            self.uuid.into()
111        }
112    }
113}
114
115#[derive(Debug, Clone)]
116pub struct VectorIndexCacheKey<'a> {
117    pub uuid: &'a str,
118    pub fri_uuid: Option<&'a Uuid>,
119}
120
121impl<'a> VectorIndexCacheKey<'a> {
122    pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
123        Self { uuid, fri_uuid }
124    }
125}
126
127impl UnsizedCacheKey for VectorIndexCacheKey<'_> {
128    type ValueType = dyn VectorIndex;
129
130    fn key(&self) -> std::borrow::Cow<'_, str> {
131        if let Some(fri_uuid) = self.fri_uuid {
132            format!("{}-{}", self.uuid, fri_uuid).into()
133        } else {
134            self.uuid.into()
135        }
136    }
137}
138
139#[derive(Debug, Clone)]
140pub struct FragReuseIndexCacheKey<'a> {
141    pub uuid: &'a str,
142    pub fri_uuid: Option<&'a Uuid>,
143}
144
145impl<'a> FragReuseIndexCacheKey<'a> {
146    pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
147        Self { uuid, fri_uuid }
148    }
149}
150
151impl CacheKey for FragReuseIndexCacheKey<'_> {
152    type ValueType = FragReuseIndex;
153
154    fn key(&self) -> std::borrow::Cow<'_, str> {
155        if let Some(fri_uuid) = self.fri_uuid {
156            format!("{}-{}", self.uuid, fri_uuid).into()
157        } else {
158            self.uuid.into()
159        }
160    }
161}
162
163#[derive(Debug, Clone)]
164pub struct MemWalCacheKey<'a> {
165    pub uuid: &'a Uuid,
166    pub fri_uuid: Option<&'a Uuid>,
167}
168
169impl<'a> MemWalCacheKey<'a> {
170    pub fn new(uuid: &'a Uuid, fri_uuid: Option<&'a Uuid>) -> Self {
171        Self { uuid, fri_uuid }
172    }
173}
174
175impl CacheKey for MemWalCacheKey<'_> {
176    type ValueType = MemWalIndex;
177
178    fn key(&self) -> std::borrow::Cow<'_, str> {
179        if let Some(fri_uuid) = self.fri_uuid {
180            format!("{}-{}", self.uuid, fri_uuid).into()
181        } else {
182            self.uuid.to_string().into()
183        }
184    }
185}
186
187// Whether to auto-migrate a dataset when we encounter corruption.
188fn auto_migrate_corruption() -> bool {
189    static LANCE_AUTO_MIGRATION: OnceLock<bool> = OnceLock::new();
190    *LANCE_AUTO_MIGRATION.get_or_init(|| {
191        std::env::var("LANCE_AUTO_MIGRATION")
192            .ok()
193            .map(|s| str_is_truthy(&s))
194            .unwrap_or(true)
195    })
196}
197
198/// Builds index.
199#[async_trait]
200pub trait IndexBuilder {
201    fn index_type() -> IndexType;
202
203    async fn build(&self) -> Result<()>;
204}
205
206pub(crate) async fn remap_index(
207    dataset: &Dataset,
208    index_id: &Uuid,
209    row_id_map: &HashMap<u64, Option<u64>>,
210) -> Result<RemapResult> {
211    // Load indices from the disk.
212    let indices = dataset.load_indices().await?;
213    let matched = indices
214        .iter()
215        .find(|i| i.uuid == *index_id)
216        .ok_or_else(|| Error::Index {
217            message: format!("Index with id {} does not exist", index_id),
218            location: location!(),
219        })?;
220
221    if matched.fields.len() > 1 {
222        return Err(Error::Index {
223            message: "Remapping indices with multiple fields is not supported".to_string(),
224            location: location!(),
225        });
226    }
227
228    if row_id_map.values().all(|v| v.is_none()) {
229        let deleted_bitmap = RoaringBitmap::from_iter(
230            row_id_map
231                .keys()
232                .map(|row_id| RowAddress::new_from_u64(*row_id))
233                .map(|addr| addr.fragment_id()),
234        );
235        if Some(deleted_bitmap) == matched.fragment_bitmap {
236            // If remap deleted all rows, we can just return the same index ID.
237            // This can happen if there is a bug where the index is covering empty
238            // fragment that haven't been cleaned up. They should be cleaned up
239            // outside of this function.
240            return Ok(RemapResult::Keep(*index_id));
241        }
242    }
243
244    let field_id = matched
245        .fields
246        .first()
247        .expect("An index existed with no fields");
248    let field_path = dataset.schema().field_path(*field_id)?;
249
250    let new_id = Uuid::new_v4();
251
252    let generic = dataset
253        .open_generic_index(&field_path, &index_id.to_string(), &NoOpMetricsCollector)
254        .await?;
255
256    let created_index = match generic.index_type() {
257        it if it.is_scalar() => {
258            let new_store = LanceIndexStore::from_dataset_for_new(dataset, &new_id.to_string())?;
259
260            let scalar_index = dataset
261                .open_scalar_index(&field_path, &index_id.to_string(), &NoOpMetricsCollector)
262                .await?;
263            if !scalar_index.can_remap() {
264                return Ok(RemapResult::Drop);
265            }
266
267            match scalar_index.index_type() {
268                IndexType::Inverted => {
269                    let inverted_index = scalar_index
270                        .as_any()
271                        .downcast_ref::<lance_index::scalar::inverted::InvertedIndex>()
272                        .ok_or(Error::Index {
273                            message: "expected inverted index".to_string(),
274                            location: location!(),
275                        })?;
276                    if inverted_index.is_legacy() {
277                        log::warn!("reindex because of legacy format, index_type: {}, index_id: {}, field: {}",
278                            scalar_index.index_type(),
279                            index_id,
280                            field_path
281                        );
282                        let training_data = load_training_data(
283                            dataset,
284                            &field_path,
285                            &TrainingCriteria::new(TrainingOrdering::None),
286                            None,
287                            true, // Legacy reindexing should always train
288                            None,
289                        )
290                        .await?;
291                        InvertedIndexPlugin::train_inverted_index(
292                            training_data,
293                            &new_store,
294                            inverted_index.params().clone(),
295                            None,
296                        )
297                        .await?
298                    } else {
299                        scalar_index.remap(row_id_map, &new_store).await?
300                    }
301                }
302                _ => scalar_index.remap(row_id_map, &new_store).await?,
303            }
304        }
305        it if it.is_vector() => {
306            remap_vector_index(
307                Arc::new(dataset.clone()),
308                &field_path,
309                index_id,
310                &new_id,
311                matched,
312                row_id_map,
313            )
314            .await?;
315            CreatedIndex {
316                index_details: prost_types::Any::from_msg(
317                    &lance_table::format::pb::VectorIndexDetails::default(),
318                )
319                .unwrap(),
320                index_version: VECTOR_INDEX_VERSION,
321            }
322        }
323        _ => {
324            return Err(Error::Index {
325                message: format!("Index type {} is not supported", generic.index_type()),
326                location: location!(),
327            });
328        }
329    };
330
331    Ok(RemapResult::Remapped(RemappedIndex {
332        old_id: *index_id,
333        new_id,
334        index_details: created_index.index_details,
335        index_version: created_index.index_version,
336    }))
337}
338
339#[derive(Debug)]
340pub struct ScalarIndexInfo {
341    indexed_columns: HashMap<String, (DataType, Box<MultiQueryParser>)>,
342}
343
344impl IndexInformationProvider for ScalarIndexInfo {
345    fn get_index(&self, col: &str) -> Option<(&DataType, &dyn ScalarQueryParser)> {
346        self.indexed_columns
347            .get(col)
348            .map(|(ty, parser)| (ty, parser.as_ref() as &dyn ScalarQueryParser))
349    }
350}
351
352async fn open_index_proto(reader: &dyn Reader) -> Result<pb::Index> {
353    let file_size = reader.size().await?;
354    let tail_bytes = read_last_block(reader).await?;
355    let metadata_pos = read_metadata_offset(&tail_bytes)?;
356    let proto: pb::Index = if metadata_pos < file_size - tail_bytes.len() {
357        // We have not read the metadata bytes yet.
358        read_message(reader, metadata_pos).await?
359    } else {
360        let offset = tail_bytes.len() - (file_size - metadata_pos);
361        read_message_from_buf(&tail_bytes.slice(offset..))?
362    };
363    Ok(proto)
364}
365
366fn vector_index_details() -> prost_types::Any {
367    let details = lance_table::format::pb::VectorIndexDetails::default();
368    prost_types::Any::from_msg(&details).unwrap()
369}
370
371#[async_trait]
372impl DatasetIndexExt for Dataset {
373    type IndexBuilder<'a> = CreateIndexBuilder<'a>;
374
375    /// Create a builder for creating an index on columns.
376    ///
377    /// This returns a builder that can be configured with additional options
378    /// before awaiting to execute.
379    ///
380    /// # Examples
381    ///
382    /// Create a scalar BTREE index:
383    /// ```
384    /// # use lance::{Dataset, Result};
385    /// # use lance_index::{DatasetIndexExt, IndexType, scalar::ScalarIndexParams};
386    /// # async fn example(dataset: &mut Dataset) -> Result<()> {
387    /// let params = ScalarIndexParams::default();
388    /// dataset
389    ///     .create_index_builder(&["id"], IndexType::BTree, &params)
390    ///     .name("id_index".to_string())
391    ///     .await?;
392    /// # Ok(())
393    /// # }
394    /// ```
395    ///
396    /// Create an empty index that will be populated later:
397    /// ```
398    /// # use lance::{Dataset, Result};
399    /// # use lance_index::{DatasetIndexExt, IndexType, scalar::ScalarIndexParams};
400    /// # async fn example(dataset: &mut Dataset) -> Result<()> {
401    /// let params = ScalarIndexParams::default();
402    /// dataset
403    ///     .create_index_builder(&["category"], IndexType::Bitmap, &params)
404    ///     .train(false)  // Create empty index
405    ///     .replace(true)  // Replace if exists
406    ///     .await?;
407    /// # Ok(())
408    /// # }
409    /// ```
410    fn create_index_builder<'a>(
411        &'a mut self,
412        columns: &[&str],
413        index_type: IndexType,
414        params: &'a dyn IndexParams,
415    ) -> CreateIndexBuilder<'a> {
416        CreateIndexBuilder::new(self, columns, index_type, params)
417    }
418
419    #[instrument(skip_all)]
420    async fn create_index(
421        &mut self,
422        columns: &[&str],
423        index_type: IndexType,
424        name: Option<String>,
425        params: &dyn IndexParams,
426        replace: bool,
427    ) -> Result<()> {
428        // Use the builder pattern with default train=true for backward compatibility
429        let mut builder = self.create_index_builder(columns, index_type, params);
430
431        if let Some(name) = name {
432            builder = builder.name(name);
433        }
434
435        builder.replace(replace).await
436    }
437
438    async fn drop_index(&mut self, name: &str) -> Result<()> {
439        let indices = self.load_indices_by_name(name).await?;
440        if indices.is_empty() {
441            return Err(Error::IndexNotFound {
442                identity: format!("name={}", name),
443                location: location!(),
444            });
445        }
446
447        let transaction = Transaction::new(
448            self.manifest.version,
449            Operation::CreateIndex {
450                new_indices: vec![],
451                removed_indices: indices.clone(),
452            },
453            /*blobs_op= */ None,
454            None,
455        );
456
457        self.apply_commit(transaction, &Default::default(), &Default::default())
458            .await?;
459
460        Ok(())
461    }
462
463    async fn prewarm_index(&self, name: &str) -> Result<()> {
464        let indices = self.load_indices_by_name(name).await?;
465        if indices.is_empty() {
466            return Err(Error::IndexNotFound {
467                identity: format!("name={}", name),
468                location: location!(),
469            });
470        }
471
472        let index = self
473            .open_generic_index(name, &indices[0].uuid.to_string(), &NoOpMetricsCollector)
474            .await?;
475        index.prewarm().await?;
476
477        Ok(())
478    }
479
480    async fn load_indices(&self) -> Result<Arc<Vec<IndexMetadata>>> {
481        let metadata_key = IndexMetadataKey {
482            version: self.version().version,
483        };
484        let indices = match self.index_cache.get_with_key(&metadata_key).await {
485            Some(indices) => indices,
486            None => {
487                let mut loaded_indices = read_manifest_indexes(
488                    &self.object_store,
489                    &self.manifest_location,
490                    &self.manifest,
491                )
492                .await?;
493                retain_supported_indices(&mut loaded_indices);
494                let loaded_indices = Arc::new(loaded_indices);
495                self.index_cache
496                    .insert_with_key(&metadata_key, loaded_indices.clone())
497                    .await;
498                loaded_indices
499            }
500        };
501
502        if let Some(frag_reuse_index_meta) =
503            indices.iter().find(|idx| idx.name == FRAG_REUSE_INDEX_NAME)
504        {
505            let uuid = frag_reuse_index_meta.uuid.to_string();
506            let fri_key = FragReuseIndexKey { uuid: &uuid };
507            let frag_reuse_index = self
508                .index_cache
509                .get_or_insert_with_key(fri_key, || async move {
510                    let index_details =
511                        load_frag_reuse_index_details(self, frag_reuse_index_meta).await?;
512                    open_frag_reuse_index(frag_reuse_index_meta.uuid, index_details.as_ref()).await
513                })
514                .await?;
515            let mut indices = indices.as_ref().clone();
516            indices.iter_mut().for_each(|idx| {
517                if let Some(bitmap) = idx.fragment_bitmap.as_mut() {
518                    frag_reuse_index.remap_fragment_bitmap(bitmap).unwrap();
519                }
520            });
521            Ok(Arc::new(indices))
522        } else {
523            Ok(indices)
524        }
525    }
526
527    async fn commit_existing_index(
528        &mut self,
529        index_name: &str,
530        column: &str,
531        index_id: Uuid,
532    ) -> Result<()> {
533        let Some(field) = self.schema().field(column) else {
534            return Err(Error::Index {
535                message: format!("CreateIndex: column '{column}' does not exist"),
536                location: location!(),
537            });
538        };
539
540        // TODO: We will need some way to determine the index details here.  Perhaps
541        // we can load the index itself and get the details that way.
542
543        let new_idx = IndexMetadata {
544            uuid: index_id,
545            name: index_name.to_string(),
546            fields: vec![field.id],
547            dataset_version: self.manifest.version,
548            fragment_bitmap: Some(self.get_fragments().iter().map(|f| f.id() as u32).collect()),
549            index_details: None,
550            index_version: 0,
551            created_at: Some(chrono::Utc::now()),
552            base_id: None, // New indices don't have base_id (they're not from shallow clone)
553        };
554
555        let transaction = Transaction::new(
556            self.manifest.version,
557            Operation::CreateIndex {
558                new_indices: vec![new_idx],
559                removed_indices: vec![],
560            },
561            /*blobs_op= */ None,
562            None,
563        );
564
565        self.apply_commit(transaction, &Default::default(), &Default::default())
566            .await?;
567
568        Ok(())
569    }
570
571    async fn load_scalar_index<'a, 'b>(
572        &'a self,
573        criteria: ScalarIndexCriteria<'b>,
574    ) -> Result<Option<IndexMetadata>> {
575        let indices = self.load_indices().await?;
576
577        let mut indices = indices
578            .iter()
579            .filter(|idx| {
580                // We shouldn't have any indices with empty fields, but just in case, log an error
581                // but don't fail the operation (we might not be using that index)
582                if idx.fields.is_empty() {
583                    if idx.name != FRAG_REUSE_INDEX_NAME {
584                        log::error!("Index {} has no fields", idx.name);
585                    }
586                    false
587                } else {
588                    true
589                }
590            })
591            .collect::<Vec<_>>();
592        // This sorting & chunking is only needed to provide some backwards compatibility behavior for
593        // old versions of Lance that don't write index details.
594        //
595        // TODO: At some point we should just fail if the index details are missing and ask the user to
596        // retrain the index.
597        indices.sort_by_key(|idx| idx.fields[0]);
598        let indice_by_field = indices.into_iter().chunk_by(|idx| idx.fields[0]);
599        for (field_id, indices) in &indice_by_field {
600            let indices = indices.collect::<Vec<_>>();
601            let has_multiple = indices.len() > 1;
602            for idx in indices {
603                let field = self.schema().field_by_id(field_id);
604                if let Some(field) = field {
605                    if index_matches_criteria(idx, &criteria, field, has_multiple, self.schema())? {
606                        let non_empty = idx.fragment_bitmap.as_ref().is_some_and(|bitmap| {
607                            bitmap.intersection_len(self.fragment_bitmap.as_ref()) > 0
608                        });
609                        let is_fts_index = if let Some(details) = &idx.index_details {
610                            IndexDetails(details.clone()).supports_fts()
611                        } else {
612                            false
613                        };
614                        // FTS indices must always be returned even if empty, because FTS queries
615                        // require an index to exist. The query execution will handle the empty
616                        // bitmap appropriately and fall back to scanning unindexed data.
617                        // Other index types can be skipped if empty since they're optional optimizations.
618                        if non_empty || is_fts_index {
619                            return Ok(Some(idx.clone()));
620                        }
621                    }
622                }
623            }
624        }
625        return Ok(None);
626    }
627
628    #[instrument(skip_all)]
629    async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()> {
630        let dataset = Arc::new(self.clone());
631        let indices = self.load_indices().await?;
632
633        let indices_to_optimize = options
634            .index_names
635            .as_ref()
636            .map(|names| names.iter().collect::<HashSet<_>>());
637        let name_to_indices = indices
638            .iter()
639            .filter(|idx| {
640                indices_to_optimize
641                    .as_ref()
642                    .is_none_or(|names| names.contains(&idx.name))
643                    && !is_system_index(idx)
644            })
645            .map(|idx| (idx.name.clone(), idx))
646            .into_group_map();
647
648        let mut new_indices = vec![];
649        let mut removed_indices = vec![];
650        for deltas in name_to_indices.values() {
651            let Some(res) = merge_indices(dataset.clone(), deltas.as_slice(), options).await?
652            else {
653                continue;
654            };
655
656            let last_idx = deltas.last().expect("Delta indices should not be empty");
657            let new_idx = IndexMetadata {
658                uuid: res.new_uuid,
659                name: last_idx.name.clone(), // Keep the same name
660                fields: last_idx.fields.clone(),
661                dataset_version: self.manifest.version,
662                fragment_bitmap: Some(res.new_fragment_bitmap),
663                index_details: Some(Arc::new(res.new_index_details)),
664                index_version: res.new_index_version,
665                created_at: Some(chrono::Utc::now()),
666                base_id: None, // Mew merged index file locates in the cloned dataset.
667            };
668            removed_indices.extend(res.removed_indices.iter().map(|&idx| idx.clone()));
669            if deltas.len() > removed_indices.len() {
670                new_indices.extend(
671                    deltas[0..(deltas.len() - res.removed_indices.len())]
672                        .iter()
673                        .map(|&idx| idx.clone()),
674                );
675            }
676            new_indices.push(new_idx);
677        }
678
679        if new_indices.is_empty() {
680            return Ok(());
681        }
682
683        let transaction = Transaction::new(
684            self.manifest.version,
685            Operation::CreateIndex {
686                new_indices,
687                removed_indices,
688            },
689            /*blobs_op= */ None,
690            None,
691        );
692
693        self.apply_commit(transaction, &Default::default(), &Default::default())
694            .await?;
695
696        Ok(())
697    }
698
699    async fn index_statistics(&self, index_name: &str) -> Result<String> {
700        let metadatas = self.load_indices_by_name(index_name).await?;
701        if metadatas.is_empty() {
702            return Err(Error::IndexNotFound {
703                identity: format!("name={}", index_name),
704                location: location!(),
705            });
706        }
707
708        if index_name == FRAG_REUSE_INDEX_NAME {
709            let index = self
710                .open_frag_reuse_index(&NoOpMetricsCollector)
711                .await?
712                .expect("FragmentReuse index does not exist");
713            return serde_json::to_string(&index.statistics()?).map_err(|e| Error::Index {
714                message: format!("Failed to serialize index statistics: {}", e),
715                location: location!(),
716            });
717        }
718
719        if index_name == MEM_WAL_INDEX_NAME {
720            let index = self
721                .open_mem_wal_index(&NoOpMetricsCollector)
722                .await?
723                .expect("MemWal index does not exist");
724            return serde_json::to_string(&index.statistics()?).map_err(|e| Error::Index {
725                message: format!("Failed to serialize index statistics: {}", e),
726                location: location!(),
727            });
728        }
729
730        let field_id = metadatas[0].fields[0];
731        let field_path = self.schema().field_path(field_id)?;
732
733        // Open all delta indices
734        let indices = stream::iter(metadatas.iter())
735            .then(|m| {
736                let field_path = field_path.clone();
737                async move {
738                    self.open_generic_index(&field_path, &m.uuid.to_string(), &NoOpMetricsCollector)
739                        .await
740                }
741            })
742            .try_collect::<Vec<_>>()
743            .await?;
744
745        // Stastistics for each delta index.
746        let indices_stats = indices
747            .iter()
748            .map(|idx| idx.statistics())
749            .collect::<Result<Vec<_>>>()?;
750
751        let index_type = indices[0].index_type().to_string();
752
753        let indexed_fragments_per_delta = self.indexed_fragments(index_name).await?;
754
755        let res = indexed_fragments_per_delta
756            .iter()
757            .map(|frags| {
758                let mut sum = 0;
759                for frag in frags.iter() {
760                    sum += frag.num_rows().ok_or_else(|| Error::Internal {
761                        message: "Fragment should have row counts, please upgrade lance and \
762                                      trigger a single write to fix this"
763                            .to_string(),
764                        location: location!(),
765                    })?;
766                }
767                Ok(sum)
768            })
769            .collect::<Result<Vec<_>>>();
770
771        async fn migrate_and_recompute(ds: &Dataset, index_name: &str) -> Result<String> {
772            let mut ds = ds.clone();
773            log::warn!(
774                "Detecting out-dated fragment metadata, migrating dataset. \
775                        To disable migration, set LANCE_AUTO_MIGRATION=false"
776            );
777            ds.delete("false").await.map_err(|err| {
778                Error::Execution {
779                    message: format!("Failed to migrate dataset while calculating index statistics. \
780                            To disable migration, set LANCE_AUTO_MIGRATION=false. Original error: {}", err),
781                    location: location!(),
782                }
783            })?;
784            ds.index_statistics(index_name).await
785        }
786
787        let num_indexed_rows_per_delta = match res {
788            Ok(rows) => rows,
789            Err(Error::Internal { message, .. })
790                if auto_migrate_corruption() && message.contains("trigger a single write") =>
791            {
792                return migrate_and_recompute(self, index_name).await;
793            }
794            Err(e) => return Err(e),
795        };
796
797        let mut fragment_ids = HashSet::new();
798        for frags in indexed_fragments_per_delta.iter() {
799            for frag in frags.iter() {
800                if !fragment_ids.insert(frag.id) {
801                    if auto_migrate_corruption() {
802                        return migrate_and_recompute(self, index_name).await;
803                    } else {
804                        return Err(Error::Internal {
805                            message:
806                                "Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \
807                                  and trigger a single write to fix this"
808                                    .to_string(),
809                            location: location!(),
810                        });
811                    }
812                }
813            }
814        }
815        let num_indexed_fragments = fragment_ids.len();
816
817        let num_unindexed_fragments = self.fragments().len() - num_indexed_fragments;
818        let num_indexed_rows: usize = num_indexed_rows_per_delta.iter().cloned().sum();
819        let num_unindexed_rows = self.count_rows(None).await? - num_indexed_rows;
820
821        // Calculate updated_at as max(created_at) from all index metadata
822        let updated_at = metadatas
823            .iter()
824            .filter_map(|m| m.created_at)
825            .max()
826            .map(|dt| dt.timestamp_millis() as u64);
827
828        let stats = json!({
829            "index_type": index_type,
830            "name": index_name,
831            "num_indices": metadatas.len(),
832            "indices": indices_stats,
833            "num_indexed_fragments": num_indexed_fragments,
834            "num_indexed_rows": num_indexed_rows,
835            "num_unindexed_fragments": num_unindexed_fragments,
836            "num_unindexed_rows": num_unindexed_rows,
837            "num_indexed_rows_per_delta": num_indexed_rows_per_delta,
838            "updated_at_timestamp_ms": updated_at,
839        });
840
841        serde_json::to_string(&stats).map_err(|e| Error::Index {
842            message: format!("Failed to serialize index statistics: {}", e),
843            location: location!(),
844        })
845    }
846
847    async fn read_index_partition(
848        &self,
849        index_name: &str,
850        partition_id: usize,
851        with_vector: bool,
852    ) -> Result<SendableRecordBatchStream> {
853        let indices = self.load_indices_by_name(index_name).await?;
854        if indices.is_empty() {
855            return Err(Error::IndexNotFound {
856                identity: format!("name={}", index_name),
857                location: location!(),
858            });
859        }
860        let column = self.schema().field_by_id(indices[0].fields[0]).unwrap();
861
862        let mut schema: Option<Arc<Schema>> = None;
863        let mut partition_streams = Vec::with_capacity(indices.len());
864        for index in indices {
865            let index = self
866                .open_vector_index(&column.name, &index.uuid.to_string(), &NoOpMetricsCollector)
867                .await?;
868
869            let stream = index
870                .partition_reader(partition_id, with_vector, &NoOpMetricsCollector)
871                .await?;
872            if schema.is_none() {
873                schema = Some(stream.schema());
874            }
875            partition_streams.push(stream);
876        }
877
878        match schema {
879            Some(schema) => {
880                let merged = stream::select_all(partition_streams);
881                let stream = RecordBatchStreamAdapter::new(schema, merged);
882                Ok(Box::pin(stream))
883            }
884            None => Ok(Box::pin(RecordBatchStreamAdapter::new(
885                Arc::new(Schema::empty()),
886                stream::empty(),
887            ))),
888        }
889    }
890}
891
892fn retain_supported_indices(indices: &mut Vec<IndexMetadata>) {
893    indices.retain(|idx| {
894        let max_supported_version = idx
895            .index_details
896            .as_ref()
897            .map(|details| {
898                IndexDetails(details.clone())
899                    .index_version()
900                    // If we don't know how to read the index, it isn't supported
901                    .unwrap_or(i32::MAX as u32)
902            })
903            .unwrap_or_default();
904        let is_valid = idx.index_version <= max_supported_version as i32;
905        if !is_valid {
906            log::warn!(
907                "Index {} has version {}, which is not supported (<={}), ignoring it",
908                idx.name,
909                idx.index_version,
910                max_supported_version,
911            );
912        }
913        is_valid
914    })
915}
916
917/// A trait for internal dataset utilities
918///
919/// Internal use only. No API stability guarantees.
920#[async_trait]
921pub trait DatasetIndexInternalExt: DatasetIndexExt {
922    /// Opens an index (scalar or vector) as a generic index
923    async fn open_generic_index(
924        &self,
925        column: &str,
926        uuid: &str,
927        metrics: &dyn MetricsCollector,
928    ) -> Result<Arc<dyn Index>>;
929    /// Opens the requested scalar index
930    async fn open_scalar_index(
931        &self,
932        column: &str,
933        uuid: &str,
934        metrics: &dyn MetricsCollector,
935    ) -> Result<Arc<dyn ScalarIndex>>;
936    /// Opens the requested vector index
937    async fn open_vector_index(
938        &self,
939        column: &str,
940        uuid: &str,
941        metrics: &dyn MetricsCollector,
942    ) -> Result<Arc<dyn VectorIndex>>;
943
944    /// Opens the fragment reuse index
945    async fn open_frag_reuse_index(
946        &self,
947        metrics: &dyn MetricsCollector,
948    ) -> Result<Option<Arc<FragReuseIndex>>>;
949
950    /// Opens the MemWAL index
951    async fn open_mem_wal_index(
952        &self,
953        metrics: &dyn MetricsCollector,
954    ) -> Result<Option<Arc<MemWalIndex>>>;
955
956    /// Gets the fragment reuse index UUID from the current manifest, if it exists
957    async fn frag_reuse_index_uuid(&self) -> Option<Uuid>;
958
959    /// Loads information about all the available scalar indices on the dataset
960    async fn scalar_index_info(&self) -> Result<ScalarIndexInfo>;
961
962    /// Return the fragments that are not covered by any of the deltas of the index.
963    async fn unindexed_fragments(&self, idx_name: &str) -> Result<Vec<Fragment>>;
964
965    /// Return the fragments that are covered by each of the deltas of the index.
966    async fn indexed_fragments(&self, idx_name: &str) -> Result<Vec<Vec<Fragment>>>;
967
968    /// Initialize a specific index on this dataset based on an index from a source dataset.
969    async fn initialize_index(&mut self, source_dataset: &Dataset, index_name: &str) -> Result<()>;
970
971    /// Initialize all indices on this dataset based on indices from a source dataset.
972    /// This will call `initialize_index` for each non-system index in the source dataset.
973    async fn initialize_indices(&mut self, source_dataset: &Dataset) -> Result<()>;
974}
975
976#[async_trait]
977impl DatasetIndexInternalExt for Dataset {
978    async fn open_generic_index(
979        &self,
980        column: &str,
981        uuid: &str,
982        metrics: &dyn MetricsCollector,
983    ) -> Result<Arc<dyn Index>> {
984        // Checking for cache existence is cheap so we just check both scalar and vector caches
985        let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
986        let cache_key = ScalarIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
987        if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
988            return Ok(index.as_index());
989        }
990
991        let vector_cache_key = VectorIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
992        if let Some(index) = self
993            .index_cache
994            .get_unsized_with_key(&vector_cache_key)
995            .await
996        {
997            return Ok(index.as_index());
998        }
999
1000        let frag_reuse_cache_key = FragReuseIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1001        if let Some(index) = self.index_cache.get_with_key(&frag_reuse_cache_key).await {
1002            return Ok(index.as_index());
1003        }
1004
1005        // Sometimes we want to open an index and we don't care if it is a scalar or vector index.
1006        // For example, we might want to get statistics for an index, regardless of type.
1007        //
1008        // Currently, we solve this problem by checking for the existence of INDEX_FILE_NAME since
1009        // only vector indices have this file.  In the future, once we support multiple kinds of
1010        // scalar indices, we may start having this file with scalar indices too.  Once that happens
1011        // we can just read this file and look at the `implementation` or `index_type` fields to
1012        // determine what kind of index it is.
1013        let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1014            message: format!("Index with id {} does not exist", uuid),
1015            location: location!(),
1016        })?;
1017        let index_dir = self.indice_files_dir(&index_meta)?;
1018        let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME);
1019        if self.object_store.exists(&index_file).await? {
1020            let index = self.open_vector_index(column, uuid, metrics).await?;
1021            Ok(index.as_index())
1022        } else {
1023            let index = self.open_scalar_index(column, uuid, metrics).await?;
1024            Ok(index.as_index())
1025        }
1026    }
1027
1028    async fn open_scalar_index(
1029        &self,
1030        column: &str,
1031        uuid: &str,
1032        metrics: &dyn MetricsCollector,
1033    ) -> Result<Arc<dyn ScalarIndex>> {
1034        let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1035        let cache_key = ScalarIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1036        if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
1037            return Ok(index);
1038        }
1039
1040        let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1041            message: format!("Index with id {} does not exist", uuid),
1042            location: location!(),
1043        })?;
1044
1045        let index = scalar::open_scalar_index(self, column, &index_meta, metrics).await?;
1046
1047        info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_SCALAR, index_type=index.index_type().to_string());
1048        metrics.record_index_load();
1049
1050        self.index_cache
1051            .insert_unsized_with_key(&cache_key, index.clone())
1052            .await;
1053        Ok(index)
1054    }
1055
1056    async fn open_vector_index(
1057        &self,
1058        column: &str,
1059        uuid: &str,
1060        metrics: &dyn MetricsCollector,
1061    ) -> Result<Arc<dyn VectorIndex>> {
1062        let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1063        let cache_key = VectorIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1064
1065        if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
1066            log::debug!("Found vector index in cache uuid: {}", uuid);
1067            return Ok(index);
1068        }
1069
1070        let frag_reuse_index = self.open_frag_reuse_index(metrics).await?;
1071        let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1072            message: format!("Index with id {} does not exist", uuid),
1073            location: location!(),
1074        })?;
1075        let index_dir = self.indice_files_dir(&index_meta)?;
1076        let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME);
1077        let reader: Arc<dyn Reader> = self.object_store.open(&index_file).await?.into();
1078
1079        let tailing_bytes = read_last_block(reader.as_ref()).await?;
1080        let (major_version, minor_version) = read_version(&tailing_bytes)?;
1081
1082        // Namespace the index cache by the UUID of the index.
1083        let index_cache = self.index_cache.with_key_prefix(&cache_key.key());
1084
1085        // the index file is in lance format since version (0,2)
1086        // TODO: we need to change the legacy IVF_PQ to be in lance format
1087        let index = match (major_version, minor_version) {
1088            (0, 1) | (0, 0) => {
1089                info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.1", index_type="IVF_PQ");
1090                let proto = open_index_proto(reader.as_ref()).await?;
1091                match &proto.implementation {
1092                    Some(Implementation::VectorIndex(vector_index)) => {
1093                        let dataset = Arc::new(self.clone());
1094                        vector::open_vector_index(
1095                            dataset,
1096                            uuid,
1097                            vector_index,
1098                            reader,
1099                            frag_reuse_index,
1100                        )
1101                        .await
1102                    }
1103                    None => Err(Error::Internal {
1104                        message: "Index proto was missing implementation field".into(),
1105                        location: location!(),
1106                    }),
1107                }
1108            }
1109
1110            (0, 2) => {
1111                info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.2", index_type="IVF_PQ");
1112                let reader = FileReader::try_new_self_described_from_reader(
1113                    reader.clone(),
1114                    Some(&self.metadata_cache.file_metadata_cache(&index_file)),
1115                )
1116                .await?;
1117                vector::open_vector_index_v2(
1118                    Arc::new(self.clone()),
1119                    column,
1120                    uuid,
1121                    reader,
1122                    frag_reuse_index,
1123                )
1124                .await
1125            }
1126
1127            (0, 3) | (2, _) => {
1128                let scheduler = ScanScheduler::new(
1129                    self.object_store.clone(),
1130                    SchedulerConfig::max_bandwidth(&self.object_store),
1131                );
1132                let file = scheduler
1133                    .open_file(&index_file, &CachedFileSize::unknown())
1134                    .await?;
1135                let reader = v2::reader::FileReader::try_open(
1136                    file,
1137                    None,
1138                    Default::default(),
1139                    &self.metadata_cache.file_metadata_cache(&index_file),
1140                    FileReaderOptions::default(),
1141                )
1142                .await?;
1143                let index_metadata = reader
1144                    .schema()
1145                    .metadata
1146                    .get(INDEX_METADATA_SCHEMA_KEY)
1147                    .ok_or(Error::Index {
1148                        message: "Index Metadata not found".to_owned(),
1149                        location: location!(),
1150                    })?;
1151                let index_metadata: lance_index::IndexMetadata =
1152                    serde_json::from_str(index_metadata)?;
1153                let field = self.schema().field(column).ok_or_else(|| Error::Index {
1154                    message: format!("Column {} does not exist in the schema", column),
1155                    location: location!(),
1156                })?;
1157
1158                let (_, element_type) = get_vector_type(self.schema(), column)?;
1159
1160                info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.3", index_type=index_metadata.index_type);
1161
1162                match index_metadata.index_type.as_str() {
1163                    "IVF_FLAT" => match element_type {
1164                        DataType::Float16 | DataType::Float32 | DataType::Float64 => {
1165                            let ivf = IVFIndex::<FlatIndex, FlatQuantizer>::try_new(
1166                                self.object_store.clone(),
1167                                index_dir,
1168                                uuid.to_owned(),
1169                                frag_reuse_index,
1170                                self.metadata_cache.as_ref(),
1171                                index_cache,
1172                            )
1173                            .await?;
1174                            Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1175                        }
1176                        DataType::UInt8 => {
1177                            let ivf = IVFIndex::<FlatIndex, FlatBinQuantizer>::try_new(
1178                                self.object_store.clone(),
1179                                index_dir,
1180                                uuid.to_owned(),
1181                                frag_reuse_index,
1182                                self.metadata_cache.as_ref(),
1183                                index_cache,
1184                            )
1185                            .await?;
1186                            Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1187                        }
1188                        _ => Err(Error::Index {
1189                            message: format!(
1190                                "the field type {} is not supported for FLAT index",
1191                                field.data_type()
1192                            ),
1193                            location: location!(),
1194                        }),
1195                    },
1196
1197                    "IVF_PQ" => {
1198                        let ivf = IVFIndex::<FlatIndex, ProductQuantizer>::try_new(
1199                            self.object_store.clone(),
1200                            index_dir,
1201                            uuid.to_owned(),
1202                            frag_reuse_index,
1203                            self.metadata_cache.as_ref(),
1204                            index_cache,
1205                        )
1206                        .await?;
1207                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1208                    }
1209
1210                    "IVF_SQ" => {
1211                        let ivf = IVFIndex::<FlatIndex, ScalarQuantizer>::try_new(
1212                            self.object_store.clone(),
1213                            index_dir,
1214                            uuid.to_owned(),
1215                            frag_reuse_index,
1216                            self.metadata_cache.as_ref(),
1217                            index_cache,
1218                        )
1219                        .await?;
1220                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1221                    }
1222
1223                    "IVF_RQ" => {
1224                        let ivf = IVFIndex::<FlatIndex, RabitQuantizer>::try_new(
1225                            self.object_store.clone(),
1226                            self.indices_dir(),
1227                            uuid.to_owned(),
1228                            frag_reuse_index,
1229                            self.metadata_cache.as_ref(),
1230                            index_cache,
1231                        )
1232                        .await?;
1233                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1234                    }
1235
1236                    "IVF_HNSW_FLAT" => {
1237                        let uri = index_dir.child(uuid).child("index.pb");
1238                        let file_metadata_cache =
1239                            self.session.metadata_cache.file_metadata_cache(&uri);
1240                        let ivf = IVFIndex::<HNSW, FlatQuantizer>::try_new(
1241                            self.object_store.clone(),
1242                            index_dir,
1243                            uuid.to_owned(),
1244                            frag_reuse_index,
1245                            &file_metadata_cache,
1246                            index_cache,
1247                        )
1248                        .await?;
1249                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1250                    }
1251
1252                    "IVF_HNSW_SQ" => {
1253                        let ivf = IVFIndex::<HNSW, ScalarQuantizer>::try_new(
1254                            self.object_store.clone(),
1255                            index_dir,
1256                            uuid.to_owned(),
1257                            frag_reuse_index,
1258                            self.metadata_cache.as_ref(),
1259                            index_cache,
1260                        )
1261                        .await?;
1262                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1263                    }
1264
1265                    "IVF_HNSW_PQ" => {
1266                        let ivf = IVFIndex::<HNSW, ProductQuantizer>::try_new(
1267                            self.object_store.clone(),
1268                            index_dir,
1269                            uuid.to_owned(),
1270                            frag_reuse_index,
1271                            self.metadata_cache.as_ref(),
1272                            index_cache,
1273                        )
1274                        .await?;
1275                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1276                    }
1277
1278                    _ => Err(Error::Index {
1279                        message: format!("Unsupported index type: {}", index_metadata.index_type),
1280                        location: location!(),
1281                    }),
1282                }
1283            }
1284
1285            _ => Err(Error::Index {
1286                message: "unsupported index version (maybe need to upgrade your lance version)"
1287                    .to_owned(),
1288                location: location!(),
1289            }),
1290        };
1291        let index = index?;
1292        metrics.record_index_load();
1293        self.index_cache
1294            .insert_unsized_with_key(&cache_key, index.clone())
1295            .await;
1296        Ok(index)
1297    }
1298
1299    async fn open_frag_reuse_index(
1300        &self,
1301        metrics: &dyn MetricsCollector,
1302    ) -> Result<Option<Arc<FragReuseIndex>>> {
1303        if let Some(frag_reuse_index_meta) = self.load_index_by_name(FRAG_REUSE_INDEX_NAME).await? {
1304            let uuid = frag_reuse_index_meta.uuid.to_string();
1305            let frag_reuse_key = FragReuseIndexKey { uuid: &uuid };
1306            let uuid_clone = uuid.clone();
1307
1308            let index = self
1309                .index_cache
1310                .get_or_insert_with_key(frag_reuse_key, || async move {
1311                    let index_meta = self.load_index(&uuid_clone).await?.ok_or_else(|| Error::Index {
1312                        message: format!("Index with id {} does not exist", uuid_clone),
1313                        location: location!(),
1314                    })?;
1315                    let index_details = load_frag_reuse_index_details(self, &index_meta).await?;
1316                    let index =
1317                        open_frag_reuse_index(frag_reuse_index_meta.uuid, index_details.as_ref()).await?;
1318
1319                    info!(target: TRACE_IO_EVENTS, index_uuid=uuid_clone, r#type=IO_TYPE_OPEN_FRAG_REUSE);
1320                    metrics.record_index_load();
1321
1322                    Ok(index)
1323                })
1324                .await?;
1325
1326            Ok(Some(index))
1327        } else {
1328            Ok(None)
1329        }
1330    }
1331
1332    async fn open_mem_wal_index(
1333        &self,
1334        metrics: &dyn MetricsCollector,
1335    ) -> Result<Option<Arc<MemWalIndex>>> {
1336        let Some(mem_wal_meta) = self.load_index_by_name(MEM_WAL_INDEX_NAME).await? else {
1337            return Ok(None);
1338        };
1339
1340        let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1341        let cache_key = MemWalCacheKey::new(&mem_wal_meta.uuid, frag_reuse_uuid.as_ref());
1342        if let Some(index) = self.index_cache.get_with_key(&cache_key).await {
1343            log::debug!("Found MemWAL index in cache uuid: {}", mem_wal_meta.uuid);
1344            return Ok(Some(index));
1345        }
1346
1347        let uuid = mem_wal_meta.uuid.to_string();
1348
1349        let index_meta = self.load_index(&uuid).await?.ok_or_else(|| Error::Index {
1350            message: format!("Index with id {} does not exist", uuid),
1351            location: location!(),
1352        })?;
1353        let index = open_mem_wal_index(index_meta)?;
1354
1355        info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_MEM_WAL);
1356        metrics.record_index_load();
1357
1358        self.index_cache
1359            .insert_with_key(&cache_key, index.clone())
1360            .await;
1361        Ok(Some(index))
1362    }
1363
1364    async fn frag_reuse_index_uuid(&self) -> Option<Uuid> {
1365        if let Ok(indices) = self.load_indices().await {
1366            indices
1367                .iter()
1368                .find(|idx| idx.name == FRAG_REUSE_INDEX_NAME)
1369                .map(|idx| idx.uuid)
1370        } else {
1371            None
1372        }
1373    }
1374
1375    #[instrument(level = "trace", skip_all)]
1376    async fn scalar_index_info(&self) -> Result<ScalarIndexInfo> {
1377        let indices = self.load_indices().await?;
1378        let schema = self.schema();
1379        let mut indexed_fields = Vec::new();
1380        for index in indices.iter().filter(|idx| {
1381            let idx_schema = schema.project_by_ids(idx.fields.as_slice(), true);
1382            let is_vector_index = idx_schema
1383                .fields
1384                .iter()
1385                .any(|f| is_vector_field(f.data_type()));
1386
1387            // Check if this is an FTS index by looking at index details
1388            let is_fts_index = if let Some(details) = &idx.index_details {
1389                IndexDetails(details.clone()).supports_fts()
1390            } else {
1391                false
1392            };
1393
1394            // Only include indices with non-empty fragment bitmaps, except for FTS indices
1395            // which need to be discoverable even when empty
1396            let has_non_empty_bitmap = idx.fragment_bitmap.as_ref().is_some_and(|bitmap| {
1397                !bitmap.is_empty() && !(bitmap & self.fragment_bitmap.as_ref()).is_empty()
1398            });
1399
1400            idx.fields.len() == 1 && !is_vector_index && (has_non_empty_bitmap || is_fts_index)
1401        }) {
1402            let field = index.fields[0];
1403            let field = schema.field_by_id(field).ok_or_else(|| Error::Internal {
1404                message: format!(
1405                    "Index referenced a field with id {field} which did not exist in the schema"
1406                ),
1407                location: location!(),
1408            })?;
1409
1410            // Build the full field path for nested fields
1411            let field_path = if let Some(ancestors) = schema.field_ancestry_by_id(field.id) {
1412                let field_refs: Vec<&str> = ancestors.iter().map(|f| f.name.as_str()).collect();
1413                lance_core::datatypes::format_field_path(&field_refs)
1414            } else {
1415                field.name.clone()
1416            };
1417
1418            let index_details = IndexDetails(fetch_index_details(self, &field.name, index).await?);
1419            let plugin = index_details.get_plugin()?;
1420            let query_parser = plugin.new_query_parser(index.name.clone(), &index_details.0);
1421
1422            if let Some(query_parser) = query_parser {
1423                indexed_fields.push((field_path, (field.data_type(), query_parser)));
1424            }
1425        }
1426        let mut index_info_map = HashMap::with_capacity(indexed_fields.len());
1427        for indexed_field in indexed_fields {
1428            // Need to wrap in an option here because we know that only one of and_modify and or_insert will be called
1429            // but the rust compiler does not.
1430            let mut parser = Some(indexed_field.1 .1);
1431            let parser = &mut parser;
1432            index_info_map
1433                .entry(indexed_field.0)
1434                .and_modify(|existing: &mut (DataType, Box<MultiQueryParser>)| {
1435                    // If there are two indices on the same column, they must have the same type
1436                    debug_assert_eq!(existing.0, indexed_field.1 .0);
1437
1438                    existing.1.add(parser.take().unwrap());
1439                })
1440                .or_insert_with(|| {
1441                    (
1442                        indexed_field.1 .0,
1443                        Box::new(MultiQueryParser::single(parser.take().unwrap())),
1444                    )
1445                });
1446        }
1447        Ok(ScalarIndexInfo {
1448            indexed_columns: index_info_map,
1449        })
1450    }
1451
1452    async fn unindexed_fragments(&self, name: &str) -> Result<Vec<Fragment>> {
1453        let indices = self.load_indices_by_name(name).await?;
1454        let mut total_fragment_bitmap = RoaringBitmap::new();
1455        for idx in indices.iter() {
1456            total_fragment_bitmap |= idx.fragment_bitmap.as_ref().ok_or(Error::Index {
1457                message: "Please upgrade lance to 0.8+ to use this function".to_string(),
1458                location: location!(),
1459            })?;
1460        }
1461        Ok(self
1462            .fragments()
1463            .iter()
1464            .filter(|f| !total_fragment_bitmap.contains(f.id as u32))
1465            .cloned()
1466            .collect())
1467    }
1468
1469    async fn indexed_fragments(&self, name: &str) -> Result<Vec<Vec<Fragment>>> {
1470        let indices = self.load_indices_by_name(name).await?;
1471        indices
1472            .iter()
1473            .map(|index| {
1474                let fragment_bitmap = index.fragment_bitmap.as_ref().ok_or(Error::Index {
1475                    message: "Please upgrade lance to 0.8+ to use this function".to_string(),
1476                    location: location!(),
1477                })?;
1478                let mut indexed_frags = Vec::with_capacity(fragment_bitmap.len() as usize);
1479                for frag in self.fragments().iter() {
1480                    if fragment_bitmap.contains(frag.id as u32) {
1481                        indexed_frags.push(frag.clone());
1482                    }
1483                }
1484                Ok(indexed_frags)
1485            })
1486            .collect()
1487    }
1488
1489    async fn initialize_index(&mut self, source_dataset: &Dataset, index_name: &str) -> Result<()> {
1490        let source_indices = source_dataset.load_indices_by_name(index_name).await?;
1491
1492        if source_indices.is_empty() {
1493            return Err(Error::Index {
1494                message: format!("Index '{}' not found in source dataset", index_name),
1495                location: location!(),
1496            });
1497        }
1498
1499        let source_index = source_indices
1500            .iter()
1501            .min_by_key(|idx| idx.created_at)
1502            .ok_or_else(|| Error::Index {
1503                message: format!("Could not determine oldest index for '{}'", index_name),
1504                location: location!(),
1505            })?;
1506
1507        let mut field_names = Vec::new();
1508        for field_id in source_index.fields.iter() {
1509            let source_field = source_dataset
1510                .schema()
1511                .field_by_id(*field_id)
1512                .ok_or_else(|| Error::Index {
1513                    message: format!("Field with id {} not found in source dataset", field_id),
1514                    location: location!(),
1515                })?;
1516
1517            let target_field =
1518                self.schema()
1519                    .field(&source_field.name)
1520                    .ok_or_else(|| Error::Index {
1521                        message: format!(
1522                            "Field '{}' required by index '{}' not found in target dataset",
1523                            source_field.name, index_name
1524                        ),
1525                        location: location!(),
1526                    })?;
1527
1528            if source_field.data_type() != target_field.data_type() {
1529                return Err(Error::Index {
1530                    message: format!(
1531                        "Field '{}' has different types in source ({:?}) and target ({:?}) datasets",
1532                        source_field.name,
1533                        source_field.data_type(),
1534                        target_field.data_type()
1535                    ),
1536                    location: location!(),
1537                });
1538            }
1539
1540            field_names.push(source_field.name.as_str());
1541        }
1542
1543        if field_names.is_empty() {
1544            return Err(Error::Index {
1545                message: format!("Index '{}' has no fields", index_name),
1546                location: location!(),
1547            });
1548        }
1549
1550        if let Some(index_details) = &source_index.index_details {
1551            let index_details_wrapper = IndexDetails(index_details.clone());
1552
1553            if index_details_wrapper.is_vector() {
1554                vector::initialize_vector_index(self, source_dataset, source_index, &field_names)
1555                    .await?;
1556            } else {
1557                scalar::initialize_scalar_index(self, source_dataset, source_index, &field_names)
1558                    .await?;
1559            }
1560        } else {
1561            log::warn!(
1562                "Index '{}' has no index_details, skipping",
1563                source_index.name
1564            );
1565        }
1566
1567        Ok(())
1568    }
1569
1570    async fn initialize_indices(&mut self, source_dataset: &Dataset) -> Result<()> {
1571        let source_indices = source_dataset.load_indices().await?;
1572        let non_system_indices: Vec<_> = source_indices
1573            .iter()
1574            .filter(|idx| !lance_index::is_system_index(idx))
1575            .collect();
1576
1577        if non_system_indices.is_empty() {
1578            return Ok(());
1579        }
1580
1581        let mut unique_index_names = HashSet::new();
1582        for index in non_system_indices.iter() {
1583            unique_index_names.insert(index.name.clone());
1584        }
1585
1586        for index_name in unique_index_names {
1587            self.initialize_index(source_dataset, &index_name).await?;
1588        }
1589
1590        Ok(())
1591    }
1592}
1593
1594fn is_vector_field(data_type: DataType) -> bool {
1595    match data_type {
1596        DataType::FixedSizeList(_, _) => true,
1597        DataType::List(inner) => {
1598            // If the inner type is a fixed size list, then it is a multivector field
1599            matches!(inner.data_type(), DataType::FixedSizeList(_, _))
1600        }
1601        _ => false,
1602    }
1603}
1604
1605#[cfg(test)]
1606mod tests {
1607    use crate::dataset::builder::DatasetBuilder;
1608    use crate::dataset::optimize::{compact_files, CompactionOptions};
1609    use crate::dataset::{ReadParams, WriteMode, WriteParams};
1610    use crate::index::vector::VectorIndexParams;
1611    use crate::session::Session;
1612    use crate::utils::test::{
1613        copy_test_data_to_tmp, DatagenExt, FragmentCount, FragmentRowCount, StatsHolder,
1614    };
1615    use arrow_array::Int32Array;
1616
1617    use super::*;
1618
1619    use arrow::array::AsArray;
1620    use arrow::datatypes::{Float32Type, Int32Type};
1621    use arrow_array::{
1622        FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator, StringArray,
1623    };
1624    use arrow_schema::{Field, Schema};
1625    use lance_arrow::*;
1626    use lance_core::utils::tempfile::TempStrDir;
1627    use lance_datagen::gen_batch;
1628    use lance_datagen::{array, BatchCount, Dimension, RowCount};
1629    use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams};
1630    use lance_index::vector::{
1631        hnsw::builder::HnswBuildParams, ivf::IvfBuildParams, sq::builder::SQBuildParams,
1632    };
1633    use lance_io::object_store::ObjectStoreParams;
1634    use lance_linalg::distance::{DistanceType, MetricType};
1635    use lance_testing::datagen::generate_random_array;
1636    use rstest::rstest;
1637    use std::collections::HashSet;
1638
1639    #[tokio::test]
1640    async fn test_recreate_index() {
1641        const DIM: i32 = 8;
1642        let schema = Arc::new(Schema::new(vec![
1643            Field::new(
1644                "v",
1645                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), DIM),
1646                true,
1647            ),
1648            Field::new(
1649                "o",
1650                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), DIM),
1651                true,
1652            ),
1653        ]));
1654        let data = generate_random_array(2048 * DIM as usize);
1655        let batches: Vec<RecordBatch> = vec![RecordBatch::try_new(
1656            schema.clone(),
1657            vec![
1658                Arc::new(FixedSizeListArray::try_new_from_values(data.clone(), DIM).unwrap()),
1659                Arc::new(FixedSizeListArray::try_new_from_values(data, DIM).unwrap()),
1660            ],
1661        )
1662        .unwrap()];
1663
1664        let test_dir = TempStrDir::default();
1665        let test_uri = &test_dir;
1666        let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
1667        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
1668
1669        let params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 2);
1670        dataset
1671            .create_index(&["v"], IndexType::Vector, None, &params, true)
1672            .await
1673            .unwrap();
1674        dataset
1675            .create_index(&["o"], IndexType::Vector, None, &params, true)
1676            .await
1677            .unwrap();
1678
1679        // Create index again
1680        dataset
1681            .create_index(&["v"], IndexType::Vector, None, &params, true)
1682            .await
1683            .unwrap();
1684
1685        // Can not overwrite an index on different columns.
1686        assert!(dataset
1687            .create_index(
1688                &["v"],
1689                IndexType::Vector,
1690                Some("o_idx".to_string()),
1691                &params,
1692                true,
1693            )
1694            .await
1695            .is_err());
1696    }
1697
1698    fn sample_vector_field() -> Field {
1699        let dimensions = 16;
1700        let column_name = "vec";
1701        Field::new(
1702            column_name,
1703            DataType::FixedSizeList(
1704                Arc::new(Field::new("item", DataType::Float32, true)),
1705                dimensions,
1706            ),
1707            false,
1708        )
1709    }
1710
1711    #[tokio::test]
1712    async fn test_drop_index() {
1713        let test_dir = TempStrDir::default();
1714        let schema = Schema::new(vec![
1715            sample_vector_field(),
1716            Field::new("ints", DataType::Int32, false),
1717        ]);
1718        let mut dataset = lance_datagen::rand(&schema)
1719            .into_dataset(
1720                &test_dir,
1721                FragmentCount::from(1),
1722                FragmentRowCount::from(256),
1723            )
1724            .await
1725            .unwrap();
1726
1727        let idx_name = "name".to_string();
1728        dataset
1729            .create_index(
1730                &["vec"],
1731                IndexType::Vector,
1732                Some(idx_name.clone()),
1733                &VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10),
1734                true,
1735            )
1736            .await
1737            .unwrap();
1738        dataset
1739            .create_index(
1740                &["ints"],
1741                IndexType::BTree,
1742                None,
1743                &ScalarIndexParams::default(),
1744                true,
1745            )
1746            .await
1747            .unwrap();
1748
1749        assert_eq!(dataset.load_indices().await.unwrap().len(), 2);
1750
1751        dataset.drop_index(&idx_name).await.unwrap();
1752
1753        assert_eq!(dataset.load_indices().await.unwrap().len(), 1);
1754
1755        // Even though we didn't give the scalar index a name it still has an auto-generated one we can use
1756        let scalar_idx_name = &dataset.load_indices().await.unwrap()[0].name;
1757        dataset.drop_index(scalar_idx_name).await.unwrap();
1758
1759        assert_eq!(dataset.load_indices().await.unwrap().len(), 0);
1760
1761        // Make sure it returns an error if the index doesn't exist
1762        assert!(dataset.drop_index(scalar_idx_name).await.is_err());
1763    }
1764
1765    #[tokio::test]
1766    async fn test_count_index_rows() {
1767        let test_dir = TempStrDir::default();
1768        let dimensions = 16;
1769        let column_name = "vec";
1770        let field = sample_vector_field();
1771        let schema = Arc::new(Schema::new(vec![field]));
1772
1773        let float_arr = generate_random_array(512 * dimensions as usize);
1774
1775        let vectors =
1776            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
1777
1778        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
1779
1780        let reader =
1781            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
1782        let test_uri = &test_dir;
1783        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
1784        dataset.validate().await.unwrap();
1785
1786        // Make sure it returns None if there's no index with the passed identifier
1787        assert!(dataset.index_statistics("bad_id").await.is_err());
1788        // Create an index
1789        let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
1790        dataset
1791            .create_index(
1792                &[column_name],
1793                IndexType::Vector,
1794                Some("vec_idx".into()),
1795                &params,
1796                true,
1797            )
1798            .await
1799            .unwrap();
1800
1801        let stats: serde_json::Value =
1802            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
1803        assert_eq!(stats["num_unindexed_rows"], 0);
1804        assert_eq!(stats["num_indexed_rows"], 512);
1805
1806        // Now we'll append some rows which shouldn't be indexed and see the
1807        // count change
1808        let float_arr = generate_random_array(512 * dimensions as usize);
1809        let vectors =
1810            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
1811
1812        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
1813
1814        let reader = RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema);
1815        dataset.append(reader, None).await.unwrap();
1816
1817        let stats: serde_json::Value =
1818            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
1819        assert_eq!(stats["num_unindexed_rows"], 512);
1820        assert_eq!(stats["num_indexed_rows"], 512);
1821    }
1822
1823    #[tokio::test]
1824    async fn test_optimize_delta_indices() {
1825        let dimensions = 16;
1826        let column_name = "vec";
1827        let vec_field = Field::new(
1828            column_name,
1829            DataType::FixedSizeList(
1830                Arc::new(Field::new("item", DataType::Float32, true)),
1831                dimensions,
1832            ),
1833            false,
1834        );
1835        let other_column_name = "other_vec";
1836        let other_vec_field = Field::new(
1837            other_column_name,
1838            DataType::FixedSizeList(
1839                Arc::new(Field::new("item", DataType::Float32, true)),
1840                dimensions,
1841            ),
1842            false,
1843        );
1844        let schema = Arc::new(Schema::new(vec![vec_field, other_vec_field]));
1845
1846        let float_arr = generate_random_array(512 * dimensions as usize);
1847
1848        let vectors = Arc::new(
1849            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(),
1850        );
1851
1852        let record_batch =
1853            RecordBatch::try_new(schema.clone(), vec![vectors.clone(), vectors.clone()]).unwrap();
1854
1855        let reader = RecordBatchIterator::new(
1856            vec![record_batch.clone()].into_iter().map(Ok),
1857            schema.clone(),
1858        );
1859
1860        let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
1861        let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
1862        dataset
1863            .create_index(
1864                &[column_name],
1865                IndexType::Vector,
1866                Some("vec_idx".into()),
1867                &params,
1868                true,
1869            )
1870            .await
1871            .unwrap();
1872        dataset
1873            .create_index(
1874                &[other_column_name],
1875                IndexType::Vector,
1876                Some("other_vec_idx".into()),
1877                &params,
1878                true,
1879            )
1880            .await
1881            .unwrap();
1882
1883        async fn get_stats(dataset: &Dataset, name: &str) -> serde_json::Value {
1884            serde_json::from_str(&dataset.index_statistics(name).await.unwrap()).unwrap()
1885        }
1886        async fn get_meta(dataset: &Dataset, name: &str) -> Vec<IndexMetadata> {
1887            dataset
1888                .load_indices()
1889                .await
1890                .unwrap()
1891                .iter()
1892                .filter(|m| m.name == name)
1893                .cloned()
1894                .collect()
1895        }
1896        fn get_bitmap(meta: &IndexMetadata) -> Vec<u32> {
1897            meta.fragment_bitmap.as_ref().unwrap().iter().collect()
1898        }
1899
1900        let stats = get_stats(&dataset, "vec_idx").await;
1901        assert_eq!(stats["num_unindexed_rows"], 0);
1902        assert_eq!(stats["num_indexed_rows"], 512);
1903        assert_eq!(stats["num_indexed_fragments"], 1);
1904        assert_eq!(stats["num_indices"], 1);
1905        let meta = get_meta(&dataset, "vec_idx").await;
1906        assert_eq!(meta.len(), 1);
1907        assert_eq!(get_bitmap(&meta[0]), vec![0]);
1908
1909        let reader =
1910            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
1911        dataset.append(reader, None).await.unwrap();
1912        let stats = get_stats(&dataset, "vec_idx").await;
1913        assert_eq!(stats["num_unindexed_rows"], 512);
1914        assert_eq!(stats["num_indexed_rows"], 512);
1915        assert_eq!(stats["num_indexed_fragments"], 1);
1916        assert_eq!(stats["num_unindexed_fragments"], 1);
1917        assert_eq!(stats["num_indices"], 1);
1918        let meta = get_meta(&dataset, "vec_idx").await;
1919        assert_eq!(meta.len(), 1);
1920        assert_eq!(get_bitmap(&meta[0]), vec![0]);
1921
1922        dataset
1923            .optimize_indices(&OptimizeOptions::append().index_names(vec![])) // Does nothing because no index name is passed
1924            .await
1925            .unwrap();
1926        let stats = get_stats(&dataset, "vec_idx").await;
1927        assert_eq!(stats["num_unindexed_rows"], 512);
1928        assert_eq!(stats["num_indexed_rows"], 512);
1929        assert_eq!(stats["num_indexed_fragments"], 1);
1930        assert_eq!(stats["num_unindexed_fragments"], 1);
1931        assert_eq!(stats["num_indices"], 1);
1932        let meta = get_meta(&dataset, "vec_idx").await;
1933        assert_eq!(meta.len(), 1);
1934        assert_eq!(get_bitmap(&meta[0]), vec![0]);
1935
1936        // optimize the other index
1937        dataset
1938            .optimize_indices(
1939                &OptimizeOptions::append().index_names(vec!["other_vec_idx".to_owned()]),
1940            )
1941            .await
1942            .unwrap();
1943        let stats = get_stats(&dataset, "vec_idx").await;
1944        assert_eq!(stats["num_unindexed_rows"], 512);
1945        assert_eq!(stats["num_indexed_rows"], 512);
1946        assert_eq!(stats["num_indexed_fragments"], 1);
1947        assert_eq!(stats["num_unindexed_fragments"], 1);
1948        assert_eq!(stats["num_indices"], 1);
1949        let meta = get_meta(&dataset, "vec_idx").await;
1950        assert_eq!(meta.len(), 1);
1951        assert_eq!(get_bitmap(&meta[0]), vec![0]);
1952
1953        let stats = get_stats(&dataset, "other_vec_idx").await;
1954        assert_eq!(stats["num_unindexed_rows"], 0);
1955        assert_eq!(stats["num_indexed_rows"], 1024);
1956        assert_eq!(stats["num_indexed_fragments"], 2);
1957        assert_eq!(stats["num_unindexed_fragments"], 0);
1958        assert_eq!(stats["num_indices"], 2);
1959        let meta = get_meta(&dataset, "other_vec_idx").await;
1960        assert_eq!(meta.len(), 2);
1961        assert_eq!(get_bitmap(&meta[0]), vec![0]);
1962        assert_eq!(get_bitmap(&meta[1]), vec![1]);
1963
1964        dataset
1965            .optimize_indices(&OptimizeOptions {
1966                num_indices_to_merge: 1, // merge the index with new data
1967                ..Default::default()
1968            })
1969            .await
1970            .unwrap();
1971
1972        let stats = get_stats(&dataset, "vec_idx").await;
1973        assert_eq!(stats["num_unindexed_rows"], 0);
1974        assert_eq!(stats["num_indexed_rows"], 1024);
1975        assert_eq!(stats["num_indexed_fragments"], 2);
1976        assert_eq!(stats["num_unindexed_fragments"], 0);
1977        assert_eq!(stats["num_indices"], 1);
1978        let meta = get_meta(&dataset, "vec_idx").await;
1979        assert_eq!(meta.len(), 1);
1980        assert_eq!(get_bitmap(&meta[0]), vec![0, 1]);
1981
1982        dataset
1983            .optimize_indices(&OptimizeOptions {
1984                num_indices_to_merge: 2,
1985                ..Default::default()
1986            })
1987            .await
1988            .unwrap();
1989        let stats = get_stats(&dataset, "other_vec_idx").await;
1990        assert_eq!(stats["num_unindexed_rows"], 0);
1991        assert_eq!(stats["num_indexed_rows"], 1024);
1992        assert_eq!(stats["num_indexed_fragments"], 2);
1993        assert_eq!(stats["num_unindexed_fragments"], 0);
1994        assert_eq!(stats["num_indices"], 1);
1995        let meta = get_meta(&dataset, "other_vec_idx").await;
1996        assert_eq!(meta.len(), 1);
1997        assert_eq!(get_bitmap(&meta[0]), vec![0, 1]);
1998    }
1999
2000    #[tokio::test]
2001    async fn test_optimize_ivf_hnsw_sq_delta_indices() {
2002        let test_dir = TempStrDir::default();
2003        let dimensions = 16;
2004        let column_name = "vec";
2005        let field = Field::new(
2006            column_name,
2007            DataType::FixedSizeList(
2008                Arc::new(Field::new("item", DataType::Float32, true)),
2009                dimensions,
2010            ),
2011            false,
2012        );
2013        let schema = Arc::new(Schema::new(vec![field]));
2014
2015        let float_arr = generate_random_array(512 * dimensions as usize);
2016
2017        let vectors =
2018            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2019
2020        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
2021
2022        let reader = RecordBatchIterator::new(
2023            vec![record_batch.clone()].into_iter().map(Ok),
2024            schema.clone(),
2025        );
2026
2027        let test_uri = &test_dir;
2028        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2029
2030        let ivf_params = IvfBuildParams::default();
2031        let hnsw_params = HnswBuildParams::default();
2032        let sq_params = SQBuildParams::default();
2033        let params = VectorIndexParams::with_ivf_hnsw_sq_params(
2034            MetricType::L2,
2035            ivf_params,
2036            hnsw_params,
2037            sq_params,
2038        );
2039        dataset
2040            .create_index(
2041                &[column_name],
2042                IndexType::Vector,
2043                Some("vec_idx".into()),
2044                &params,
2045                true,
2046            )
2047            .await
2048            .unwrap();
2049
2050        let stats: serde_json::Value =
2051            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2052        assert_eq!(stats["num_unindexed_rows"], 0);
2053        assert_eq!(stats["num_indexed_rows"], 512);
2054        assert_eq!(stats["num_indexed_fragments"], 1);
2055        assert_eq!(stats["num_indices"], 1);
2056
2057        let reader =
2058            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2059        dataset.append(reader, None).await.unwrap();
2060        let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap();
2061        let stats: serde_json::Value =
2062            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2063        assert_eq!(stats["num_unindexed_rows"], 512);
2064        assert_eq!(stats["num_indexed_rows"], 512);
2065        assert_eq!(stats["num_indexed_fragments"], 1);
2066        assert_eq!(stats["num_unindexed_fragments"], 1);
2067        assert_eq!(stats["num_indices"], 1);
2068
2069        dataset
2070            .optimize_indices(&OptimizeOptions {
2071                num_indices_to_merge: 0, // Just create index for delta
2072                ..Default::default()
2073            })
2074            .await
2075            .unwrap();
2076
2077        let stats: serde_json::Value =
2078            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2079        assert_eq!(stats["num_unindexed_rows"], 0);
2080        assert_eq!(stats["num_indexed_rows"], 1024);
2081        assert_eq!(stats["num_indexed_fragments"], 2);
2082        assert_eq!(stats["num_unindexed_fragments"], 0);
2083        assert_eq!(stats["num_indices"], 2);
2084
2085        dataset
2086            .optimize_indices(&OptimizeOptions {
2087                num_indices_to_merge: 2,
2088                ..Default::default()
2089            })
2090            .await
2091            .unwrap();
2092        let stats: serde_json::Value =
2093            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2094        assert_eq!(stats["num_unindexed_rows"], 0);
2095        assert_eq!(stats["num_indexed_rows"], 1024);
2096        assert_eq!(stats["num_indexed_fragments"], 2);
2097        assert_eq!(stats["num_unindexed_fragments"], 0);
2098        assert_eq!(stats["num_indices"], 1);
2099    }
2100
2101    #[rstest]
2102    #[tokio::test]
2103    async fn test_optimize_fts(#[values(false, true)] with_position: bool) {
2104        let words = ["apple", "banana", "cherry", "date"];
2105
2106        let dir = TempStrDir::default();
2107        let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
2108        let data = StringArray::from_iter_values(words.iter().map(|s| s.to_string()));
2109        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)]).unwrap();
2110        let batch_iterator = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2111
2112        let mut dataset = Dataset::write(batch_iterator, &dir, None).await.unwrap();
2113
2114        let params = InvertedIndexParams::default()
2115            .lower_case(false)
2116            .with_position(with_position);
2117        dataset
2118            .create_index(&["text"], IndexType::Inverted, None, &params, true)
2119            .await
2120            .unwrap();
2121
2122        async fn assert_indexed_rows(dataset: &Dataset, expected_indexed_rows: usize) {
2123            let stats = dataset.index_statistics("text_idx").await.unwrap();
2124            let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2125            let indexed_rows = stats["num_indexed_rows"].as_u64().unwrap() as usize;
2126            let unindexed_rows = stats["num_unindexed_rows"].as_u64().unwrap() as usize;
2127            let num_rows = dataset.count_all_rows().await.unwrap();
2128            assert_eq!(indexed_rows, expected_indexed_rows);
2129            assert_eq!(unindexed_rows, num_rows - expected_indexed_rows);
2130        }
2131
2132        let num_rows = dataset.count_all_rows().await.unwrap();
2133        assert_indexed_rows(&dataset, num_rows).await;
2134
2135        let new_words = ["elephant", "fig", "grape", "honeydew"];
2136        let new_data = StringArray::from_iter_values(new_words.iter().map(|s| s.to_string()));
2137        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(new_data)]).unwrap();
2138        let batch_iter = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2139        dataset.append(batch_iter, None).await.unwrap();
2140        assert_indexed_rows(&dataset, num_rows).await;
2141
2142        dataset
2143            .optimize_indices(&OptimizeOptions::append())
2144            .await
2145            .unwrap();
2146        let num_rows = dataset.count_all_rows().await.unwrap();
2147        assert_indexed_rows(&dataset, num_rows).await;
2148
2149        for &word in words.iter().chain(new_words.iter()) {
2150            let query_result = dataset
2151                .scan()
2152                .project(&["text"])
2153                .unwrap()
2154                .full_text_search(FullTextSearchQuery::new(word.to_string()))
2155                .unwrap()
2156                .limit(Some(10), None)
2157                .unwrap()
2158                .try_into_batch()
2159                .await
2160                .unwrap();
2161
2162            let texts = query_result["text"]
2163                .as_string::<i32>()
2164                .iter()
2165                .map(|v| match v {
2166                    None => "".to_string(),
2167                    Some(v) => v.to_string(),
2168                })
2169                .collect::<Vec<String>>();
2170
2171            assert_eq!(texts.len(), 1);
2172            assert_eq!(texts[0], word);
2173        }
2174
2175        let uppercase_words = ["Apple", "Banana", "Cherry", "Date"];
2176        for &word in uppercase_words.iter() {
2177            let query_result = dataset
2178                .scan()
2179                .project(&["text"])
2180                .unwrap()
2181                .full_text_search(FullTextSearchQuery::new(word.to_string()))
2182                .unwrap()
2183                .limit(Some(10), None)
2184                .unwrap()
2185                .try_into_batch()
2186                .await
2187                .unwrap();
2188
2189            let texts = query_result["text"]
2190                .as_string::<i32>()
2191                .iter()
2192                .map(|v| match v {
2193                    None => "".to_string(),
2194                    Some(v) => v.to_string(),
2195                })
2196                .collect::<Vec<String>>();
2197
2198            assert_eq!(texts.len(), 0);
2199        }
2200        let new_data = StringArray::from_iter_values(uppercase_words.iter().map(|s| s.to_string()));
2201        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(new_data)]).unwrap();
2202        let batch_iter = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2203        dataset.append(batch_iter, None).await.unwrap();
2204        assert_indexed_rows(&dataset, num_rows).await;
2205
2206        // we should be able to query the new words
2207        for &word in uppercase_words.iter() {
2208            let query_result = dataset
2209                .scan()
2210                .project(&["text"])
2211                .unwrap()
2212                .full_text_search(FullTextSearchQuery::new(word.to_string()))
2213                .unwrap()
2214                .limit(Some(10), None)
2215                .unwrap()
2216                .try_into_batch()
2217                .await
2218                .unwrap();
2219
2220            let texts = query_result["text"]
2221                .as_string::<i32>()
2222                .iter()
2223                .map(|v| match v {
2224                    None => "".to_string(),
2225                    Some(v) => v.to_string(),
2226                })
2227                .collect::<Vec<String>>();
2228
2229            assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2230            assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2231        }
2232
2233        dataset
2234            .optimize_indices(&OptimizeOptions::append())
2235            .await
2236            .unwrap();
2237        let num_rows = dataset.count_all_rows().await.unwrap();
2238        assert_indexed_rows(&dataset, num_rows).await;
2239
2240        // we should be able to query the new words after optimization
2241        for &word in uppercase_words.iter() {
2242            let query_result = dataset
2243                .scan()
2244                .project(&["text"])
2245                .unwrap()
2246                .full_text_search(FullTextSearchQuery::new(word.to_string()))
2247                .unwrap()
2248                .limit(Some(10), None)
2249                .unwrap()
2250                .try_into_batch()
2251                .await
2252                .unwrap();
2253
2254            let texts = query_result["text"]
2255                .as_string::<i32>()
2256                .iter()
2257                .map(|v| match v {
2258                    None => "".to_string(),
2259                    Some(v) => v.to_string(),
2260                })
2261                .collect::<Vec<String>>();
2262
2263            assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2264            assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2265
2266            // we should be able to query the new words after compaction
2267            compact_files(&mut dataset, CompactionOptions::default(), None)
2268                .await
2269                .unwrap();
2270            for &word in uppercase_words.iter() {
2271                let query_result = dataset
2272                    .scan()
2273                    .project(&["text"])
2274                    .unwrap()
2275                    .full_text_search(FullTextSearchQuery::new(word.to_string()))
2276                    .unwrap()
2277                    .try_into_batch()
2278                    .await
2279                    .unwrap();
2280                let texts = query_result["text"]
2281                    .as_string::<i32>()
2282                    .iter()
2283                    .map(|v| match v {
2284                        None => "".to_string(),
2285                        Some(v) => v.to_string(),
2286                    })
2287                    .collect::<Vec<String>>();
2288                assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2289                assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2290            }
2291            assert_indexed_rows(&dataset, num_rows).await;
2292        }
2293    }
2294
2295    #[tokio::test]
2296    async fn test_create_index_too_small_for_pq() {
2297        let test_dir = TempStrDir::default();
2298        let dimensions = 1536;
2299
2300        let field = Field::new(
2301            "vector",
2302            DataType::FixedSizeList(
2303                Arc::new(Field::new("item", DataType::Float32, true)),
2304                dimensions,
2305            ),
2306            false,
2307        );
2308
2309        let schema = Arc::new(Schema::new(vec![field]));
2310        let float_arr = generate_random_array(100 * dimensions as usize);
2311
2312        let vectors =
2313            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2314        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
2315        let reader = RecordBatchIterator::new(
2316            vec![record_batch.clone()].into_iter().map(Ok),
2317            schema.clone(),
2318        );
2319
2320        let test_uri = &test_dir;
2321        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2322
2323        let params = VectorIndexParams::ivf_pq(1, 8, 96, DistanceType::L2, 1);
2324        let result = dataset
2325            .create_index(&["vector"], IndexType::Vector, None, &params, false)
2326            .await;
2327
2328        assert!(matches!(result, Err(Error::Index { .. })));
2329        if let Error::Index { message, .. } = result.unwrap_err() {
2330            assert_eq!(
2331                message,
2332                "Not enough rows to train PQ. Requires 256 rows but only 100 available",
2333            )
2334        }
2335    }
2336
2337    #[tokio::test]
2338    async fn test_create_bitmap_index() {
2339        let test_dir = TempStrDir::default();
2340        let field = Field::new("tag", DataType::Utf8, false);
2341        let schema = Arc::new(Schema::new(vec![field]));
2342        let array = StringArray::from_iter_values((0..128).map(|i| ["a", "b", "c"][i % 3]));
2343        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2344        let reader = RecordBatchIterator::new(
2345            vec![record_batch.clone()].into_iter().map(Ok),
2346            schema.clone(),
2347        );
2348
2349        let test_uri = &test_dir;
2350        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2351        dataset
2352            .create_index(
2353                &["tag"],
2354                IndexType::Bitmap,
2355                None,
2356                &ScalarIndexParams::default(),
2357                false,
2358            )
2359            .await
2360            .unwrap();
2361        let indices = dataset.load_indices().await.unwrap();
2362        let index = dataset
2363            .open_generic_index("tag", &indices[0].uuid.to_string(), &NoOpMetricsCollector)
2364            .await
2365            .unwrap();
2366        assert_eq!(index.index_type(), IndexType::Bitmap);
2367    }
2368
2369    // #[tokio::test]
2370    #[lance_test_macros::test(tokio::test)]
2371    async fn test_load_indices() {
2372        let session = Arc::new(Session::default());
2373        let io_stats = Arc::new(StatsHolder::default());
2374        let write_params = WriteParams {
2375            store_params: Some(ObjectStoreParams {
2376                object_store_wrapper: Some(io_stats.clone()),
2377                ..Default::default()
2378            }),
2379            session: Some(session.clone()),
2380            ..Default::default()
2381        };
2382
2383        let test_dir = TempStrDir::default();
2384        let field = Field::new("tag", DataType::Utf8, false);
2385        let schema = Arc::new(Schema::new(vec![field]));
2386        let array = StringArray::from_iter_values((0..128).map(|i| ["a", "b", "c"][i % 3]));
2387        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2388        let reader = RecordBatchIterator::new(
2389            vec![record_batch.clone()].into_iter().map(Ok),
2390            schema.clone(),
2391        );
2392
2393        let test_uri = &test_dir;
2394        let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
2395            .await
2396            .unwrap();
2397        dataset
2398            .create_index(
2399                &["tag"],
2400                IndexType::Bitmap,
2401                None,
2402                &ScalarIndexParams::default(),
2403                false,
2404            )
2405            .await
2406            .unwrap();
2407        io_stats.incremental_stats(); // Reset
2408
2409        let indices = dataset.load_indices().await.unwrap();
2410        let stats = io_stats.incremental_stats();
2411        // We should already have this cached since we just wrote it.
2412        assert_eq!(
2413            stats.read_iops, 0,
2414            "Read IOPS should be 0. Saw requests: {:?}",
2415            stats.requests
2416        );
2417        assert_eq!(stats.read_bytes, 0);
2418        assert_eq!(indices.len(), 1);
2419
2420        session.index_cache.clear().await; // Clear the cache
2421
2422        let dataset2 = DatasetBuilder::from_uri(test_uri)
2423            .with_session(session.clone())
2424            .with_read_params(ReadParams {
2425                store_options: Some(ObjectStoreParams {
2426                    object_store_wrapper: Some(io_stats.clone()),
2427                    ..Default::default()
2428                }),
2429                session: Some(session.clone()),
2430                ..Default::default()
2431            })
2432            .load()
2433            .await
2434            .unwrap();
2435        let stats = io_stats.incremental_stats(); // Reset
2436        assert!(stats.read_bytes < 64 * 1024);
2437
2438        // Because the manifest is so small, we should have opportunistically
2439        // cached the indices in memory already.
2440        let indices2 = dataset2.load_indices().await.unwrap();
2441        let stats = io_stats.incremental_stats();
2442        assert_eq!(stats.read_iops, 0);
2443        assert_eq!(stats.read_bytes, 0);
2444        assert_eq!(indices2.len(), 1);
2445    }
2446
2447    #[tokio::test]
2448    async fn test_remap_empty() {
2449        let data = gen_batch()
2450            .col("int", array::step::<Int32Type>())
2451            .col(
2452                "vector",
2453                array::rand_vec::<Float32Type>(Dimension::from(16)),
2454            )
2455            .into_reader_rows(RowCount::from(256), BatchCount::from(1));
2456        let mut dataset = Dataset::write(data, "memory://", None).await.unwrap();
2457
2458        let params = VectorIndexParams::ivf_pq(1, 8, 1, DistanceType::L2, 1);
2459        dataset
2460            .create_index(&["vector"], IndexType::Vector, None, &params, false)
2461            .await
2462            .unwrap();
2463
2464        let index_uuid = dataset.load_indices().await.unwrap()[0].uuid;
2465        let remap_to_empty = (0..dataset.count_all_rows().await.unwrap())
2466            .map(|i| (i as u64, None))
2467            .collect::<HashMap<_, _>>();
2468        let new_uuid = remap_index(&dataset, &index_uuid, &remap_to_empty)
2469            .await
2470            .unwrap();
2471        assert_eq!(new_uuid, RemapResult::Keep(index_uuid));
2472    }
2473
2474    #[tokio::test]
2475    async fn test_optimize_ivf_pq_up_to_date() {
2476        // https://github.com/lancedb/lance/issues/4016
2477        let nrows = 256;
2478        let dimensions = 16;
2479        let column_name = "vector";
2480        let schema = Arc::new(Schema::new(vec![
2481            Field::new("id", DataType::Int32, false),
2482            Field::new(
2483                column_name,
2484                DataType::FixedSizeList(
2485                    Arc::new(Field::new("item", DataType::Float32, true)),
2486                    dimensions,
2487                ),
2488                false,
2489            ),
2490        ]));
2491
2492        let float_arr = generate_random_array(nrows * dimensions as usize);
2493        let vectors =
2494            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2495        let record_batch = RecordBatch::try_new(
2496            schema.clone(),
2497            vec![
2498                Arc::new(arrow_array::Int32Array::from_iter_values(0..nrows as i32)),
2499                Arc::new(vectors),
2500            ],
2501        )
2502        .unwrap();
2503
2504        let reader = RecordBatchIterator::new(
2505            vec![record_batch.clone()].into_iter().map(Ok),
2506            schema.clone(),
2507        );
2508        let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2509
2510        let params = VectorIndexParams::ivf_pq(1, 8, 2, MetricType::L2, 2);
2511        dataset
2512            .create_index(&[column_name], IndexType::Vector, None, &params, true)
2513            .await
2514            .unwrap();
2515
2516        let query_vector = generate_random_array(dimensions as usize);
2517
2518        let nearest = dataset
2519            .scan()
2520            .nearest(column_name, &query_vector, 5)
2521            .unwrap()
2522            .try_into_batch()
2523            .await
2524            .unwrap();
2525
2526        let ids = nearest["id"].as_primitive::<Int32Type>();
2527        let mut seen = HashSet::new();
2528        for id in ids.values() {
2529            assert!(seen.insert(*id), "Duplicate id found: {}", id);
2530        }
2531
2532        dataset
2533            .optimize_indices(&OptimizeOptions::default())
2534            .await
2535            .unwrap();
2536
2537        dataset.validate().await.unwrap();
2538
2539        let nearest_after = dataset
2540            .scan()
2541            .nearest(column_name, &query_vector, 5)
2542            .unwrap()
2543            .try_into_batch()
2544            .await
2545            .unwrap();
2546
2547        let ids = nearest_after["id"].as_primitive::<Int32Type>();
2548        let mut seen = HashSet::new();
2549        for id in ids.values() {
2550            assert!(seen.insert(*id), "Duplicate id found: {}", id);
2551        }
2552    }
2553
2554    #[tokio::test]
2555    async fn test_index_created_at_timestamp() {
2556        // Test that created_at is set when creating an index
2557        let schema = Arc::new(Schema::new(vec![
2558            Field::new("id", DataType::Int32, false),
2559            Field::new("values", DataType::Utf8, false),
2560        ]));
2561
2562        let values = StringArray::from_iter_values(["hello", "world", "foo", "bar"]);
2563        let record_batch = RecordBatch::try_new(
2564            schema.clone(),
2565            vec![
2566                Arc::new(Int32Array::from_iter_values(0..4)),
2567                Arc::new(values),
2568            ],
2569        )
2570        .unwrap();
2571
2572        let reader =
2573            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2574
2575        let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2576
2577        // Record time before creating index
2578        let before_index = chrono::Utc::now();
2579
2580        // Create a scalar index
2581        dataset
2582            .create_index(
2583                &["values"],
2584                IndexType::Scalar,
2585                Some("test_idx".to_string()),
2586                &ScalarIndexParams::default(),
2587                false,
2588            )
2589            .await
2590            .unwrap();
2591
2592        // Record time after creating index
2593        let after_index = chrono::Utc::now();
2594
2595        // Get index metadata
2596        let indices = dataset.load_indices().await.unwrap();
2597        let test_index = indices.iter().find(|idx| idx.name == "test_idx").unwrap();
2598
2599        // Verify created_at is set and within reasonable bounds
2600        assert!(test_index.created_at.is_some());
2601        let created_at = test_index.created_at.unwrap();
2602        assert!(created_at >= before_index);
2603        assert!(created_at <= after_index);
2604    }
2605
2606    #[tokio::test]
2607    async fn test_index_statistics_updated_at() {
2608        // Test that updated_at appears in index statistics
2609        let schema = Arc::new(Schema::new(vec![
2610            Field::new("id", DataType::Int32, false),
2611            Field::new("values", DataType::Utf8, false),
2612        ]));
2613
2614        let values = StringArray::from_iter_values(["hello", "world", "foo", "bar"]);
2615        let record_batch = RecordBatch::try_new(
2616            schema.clone(),
2617            vec![
2618                Arc::new(Int32Array::from_iter_values(0..4)),
2619                Arc::new(values),
2620            ],
2621        )
2622        .unwrap();
2623
2624        let reader =
2625            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2626
2627        let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2628
2629        // Create a scalar index
2630        dataset
2631            .create_index(
2632                &["values"],
2633                IndexType::Scalar,
2634                Some("test_idx".to_string()),
2635                &ScalarIndexParams::default(),
2636                false,
2637            )
2638            .await
2639            .unwrap();
2640
2641        // Get index statistics
2642        let stats_str = dataset.index_statistics("test_idx").await.unwrap();
2643        let stats: serde_json::Value = serde_json::from_str(&stats_str).unwrap();
2644
2645        // Verify updated_at_timestamp_ms field exists in statistics
2646        assert!(stats["updated_at_timestamp_ms"].is_number());
2647        let updated_at = stats["updated_at_timestamp_ms"].as_u64().unwrap();
2648
2649        // Get the index metadata to compare with created_at
2650        let indices = dataset.load_indices().await.unwrap();
2651        let test_index = indices.iter().find(|idx| idx.name == "test_idx").unwrap();
2652        let created_at = test_index.created_at.unwrap().timestamp_millis() as u64;
2653
2654        // For a single index, updated_at should equal created_at
2655        assert_eq!(updated_at, created_at);
2656    }
2657
2658    #[tokio::test]
2659    async fn test_index_statistics_updated_at_multiple_deltas() {
2660        // Test that updated_at reflects max(created_at) across multiple delta indices
2661        let schema = Arc::new(Schema::new(vec![
2662            Field::new("id", DataType::Int32, false),
2663            Field::new(
2664                "vector",
2665                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4),
2666                false,
2667            ),
2668        ]));
2669
2670        // Create initial dataset (need more rows for PQ training)
2671        let num_rows = 300;
2672        let float_arr = generate_random_array(4 * num_rows);
2673        let vectors = FixedSizeListArray::try_new_from_values(float_arr, 4).unwrap();
2674        let record_batch = RecordBatch::try_new(
2675            schema.clone(),
2676            vec![
2677                Arc::new(Int32Array::from_iter_values(0..num_rows as i32)),
2678                Arc::new(vectors),
2679            ],
2680        )
2681        .unwrap();
2682
2683        let reader =
2684            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2685
2686        let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2687
2688        // Create vector index
2689        let params = VectorIndexParams::ivf_pq(1, 8, 2, MetricType::L2, 2);
2690        dataset
2691            .create_index(
2692                &["vector"],
2693                IndexType::Vector,
2694                Some("test_vec_idx".to_string()),
2695                &params,
2696                false,
2697            )
2698            .await
2699            .unwrap();
2700
2701        // Get initial statistics
2702        let stats_str_1 = dataset.index_statistics("test_vec_idx").await.unwrap();
2703        let stats_1: serde_json::Value = serde_json::from_str(&stats_str_1).unwrap();
2704        let initial_updated_at = stats_1["updated_at_timestamp_ms"].as_u64().unwrap();
2705
2706        // Add more data to create additional delta indices
2707        std::thread::sleep(std::time::Duration::from_millis(10)); // Ensure different timestamp
2708
2709        let num_rows_2 = 50;
2710        let float_arr_2 = generate_random_array(4 * num_rows_2);
2711        let vectors_2 = FixedSizeListArray::try_new_from_values(float_arr_2, 4).unwrap();
2712        let record_batch_2 = RecordBatch::try_new(
2713            schema.clone(),
2714            vec![
2715                Arc::new(Int32Array::from_iter_values(
2716                    num_rows as i32..(num_rows + num_rows_2) as i32,
2717                )),
2718                Arc::new(vectors_2),
2719            ],
2720        )
2721        .unwrap();
2722
2723        let reader_2 =
2724            RecordBatchIterator::new(vec![record_batch_2].into_iter().map(Ok), schema.clone());
2725
2726        dataset.append(reader_2, None).await.unwrap();
2727
2728        // Update the index
2729        dataset
2730            .create_index(
2731                &["vector"],
2732                IndexType::Vector,
2733                Some("test_vec_idx".to_string()),
2734                &params,
2735                true,
2736            )
2737            .await
2738            .unwrap();
2739
2740        // Get updated statistics
2741        let stats_str_2 = dataset.index_statistics("test_vec_idx").await.unwrap();
2742        let stats_2: serde_json::Value = serde_json::from_str(&stats_str_2).unwrap();
2743        let final_updated_at = stats_2["updated_at_timestamp_ms"].as_u64().unwrap();
2744
2745        // The updated_at should be newer than the initial one
2746        assert!(final_updated_at >= initial_updated_at);
2747    }
2748
2749    #[tokio::test]
2750    async fn test_index_statistics_updated_at_none_when_no_created_at() {
2751        // Test backward compatibility: when indices were created before the created_at field,
2752        // updated_at_timestamp_ms should be null
2753
2754        // Use test data created with Lance 0.29.0 (before created_at field was added)
2755        let test_dir =
2756            copy_test_data_to_tmp("v0.30.0_pre_created_at/index_without_created_at").unwrap();
2757        let test_uri = test_dir.path_str();
2758        let test_uri = &test_uri;
2759
2760        let dataset = Dataset::open(test_uri).await.unwrap();
2761
2762        // Get the index metadata to verify created_at is None
2763        let indices = dataset.load_indices().await.unwrap();
2764        assert!(!indices.is_empty(), "Test dataset should have indices");
2765
2766        // Verify that the index created with old version has no created_at
2767        for index in indices.iter() {
2768            assert!(
2769                index.created_at.is_none(),
2770                "Index from old version should have created_at = None"
2771            );
2772        }
2773
2774        // Get index statistics - should work even with old indices
2775        let index_name = &indices[0].name;
2776        let stats_str = dataset.index_statistics(index_name).await.unwrap();
2777        let stats: serde_json::Value = serde_json::from_str(&stats_str).unwrap();
2778
2779        // Verify updated_at_timestamp_ms field is null when no indices have created_at
2780        assert!(
2781            stats["updated_at_timestamp_ms"].is_null(),
2782            "updated_at_timestamp_ms should be null when no indices have created_at timestamps"
2783        );
2784    }
2785    #[rstest]
2786    #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
2787    #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
2788    #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
2789    #[tokio::test]
2790    async fn test_create_empty_scalar_index(
2791        #[case] column_name: &str,
2792        #[case] index_type: IndexType,
2793        #[case] params: Box<dyn IndexParams>,
2794    ) {
2795        use lance_datagen::{array, BatchCount, ByteCount, RowCount};
2796
2797        // Create dataset with scalar and text columns (no vector column needed)
2798        let reader = lance_datagen::gen_batch()
2799            .col("i", array::step::<Int32Type>())
2800            .col("text", array::rand_utf8(ByteCount::from(10), false))
2801            .into_reader_rows(RowCount::from(100), BatchCount::from(1));
2802        let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
2803
2804        // Create an empty index with train=false
2805        // Test using IntoFuture - can await directly without calling .execute()
2806        dataset
2807            .create_index_builder(&[column_name], index_type, params.as_ref())
2808            .name("index".to_string())
2809            .train(false)
2810            .await
2811            .unwrap();
2812
2813        // Verify we can get index statistics
2814        let stats = dataset.index_statistics("index").await.unwrap();
2815        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2816        assert_eq!(
2817            stats["num_indexed_rows"], 0,
2818            "Empty index should have zero indexed rows"
2819        );
2820
2821        // Append new data using lance_datagen
2822        let append_reader = lance_datagen::gen_batch()
2823            .col("i", array::step::<Int32Type>())
2824            .col("text", array::rand_utf8(ByteCount::from(10), false))
2825            .into_reader_rows(RowCount::from(50), BatchCount::from(1));
2826
2827        dataset.append(append_reader, None).await.unwrap();
2828
2829        // Critical test: Verify the empty index is still present after append
2830        let indices_after_append = dataset.load_indices().await.unwrap();
2831        assert_eq!(
2832            indices_after_append.len(),
2833            1,
2834            "Index should be retained after append for index type {:?}",
2835            index_type
2836        );
2837
2838        let stats = dataset.index_statistics("index").await.unwrap();
2839        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2840        assert_eq!(
2841            stats["num_indexed_rows"], 0,
2842            "Empty index should still have zero indexed rows after append"
2843        );
2844
2845        // Test optimize_indices with empty index
2846        dataset.optimize_indices(&Default::default()).await.unwrap();
2847
2848        // Verify the index still exists after optimization
2849        let indices_after_optimize = dataset.load_indices().await.unwrap();
2850        assert_eq!(
2851            indices_after_optimize.len(),
2852            1,
2853            "Index should still exist after optimization"
2854        );
2855
2856        // Check index statistics after optimization
2857        let stats = dataset.index_statistics("index").await.unwrap();
2858        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2859        assert_eq!(
2860            stats["num_unindexed_rows"], 0,
2861            "Empty index should indexed all rows"
2862        );
2863    }
2864
2865    /// Helper function to check if an index is being used in a query plan
2866    fn assert_index_usage(plan: &str, column_name: &str, should_use_index: bool, context: &str) {
2867        let index_used = if column_name == "text" {
2868            // For inverted index, look for MatchQuery which indicates FTS index usage
2869            plan.contains("MatchQuery")
2870        } else {
2871            // For btree/bitmap, look for MaterializeIndex which indicates scalar index usage
2872            plan.contains("ScalarIndexQuery")
2873        };
2874
2875        if should_use_index {
2876            assert!(
2877                index_used,
2878                "Query plan should use index {}: {}",
2879                context, plan
2880            );
2881        } else {
2882            assert!(
2883                !index_used,
2884                "Query plan should NOT use index {}: {}",
2885                context, plan
2886            );
2887        }
2888    }
2889
2890    /// Test that scalar indices are retained after deleting all data from a table.
2891    ///
2892    /// This test verifies that when we:
2893    /// 1. Create a table with data
2894    /// 2. Add a scalar index with train=true
2895    /// 3. Delete all data in the table
2896    /// The index remains available on the table.
2897    #[rstest]
2898    #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
2899    #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
2900    #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
2901    #[tokio::test]
2902    async fn test_scalar_index_retained_after_delete_all(
2903        #[case] column_name: &str,
2904        #[case] index_type: IndexType,
2905        #[case] params: Box<dyn IndexParams>,
2906    ) {
2907        use lance_datagen::{array, BatchCount, ByteCount, RowCount};
2908
2909        // Create dataset with initial data
2910        let reader = lance_datagen::gen_batch()
2911            .col("i", array::step::<Int32Type>())
2912            .col("text", array::rand_utf8(ByteCount::from(10), false))
2913            .into_reader_rows(RowCount::from(100), BatchCount::from(1));
2914        let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
2915
2916        // Create index with train=true (normal index with data)
2917        dataset
2918            .create_index_builder(&[column_name], index_type, params.as_ref())
2919            .name("index".to_string())
2920            .train(true)
2921            .await
2922            .unwrap();
2923
2924        // Verify index was created and has indexed rows
2925        let stats = dataset.index_statistics("index").await.unwrap();
2926        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2927        assert_eq!(
2928            stats["num_indexed_rows"], 100,
2929            "Index should have indexed all 100 rows"
2930        );
2931
2932        // Verify index is being used in queries before delete
2933        let plan = if column_name == "text" {
2934            // Use full-text search for inverted index
2935            dataset
2936                .scan()
2937                .full_text_search(FullTextSearchQuery::new("test".to_string()))
2938                .unwrap()
2939                .explain_plan(false)
2940                .await
2941                .unwrap()
2942        } else {
2943            // Use equality filter for btree/bitmap indices
2944            dataset
2945                .scan()
2946                .filter(format!("{} = 50", column_name).as_str())
2947                .unwrap()
2948                .explain_plan(false)
2949                .await
2950                .unwrap()
2951        };
2952        // Verify index is being used before delete
2953        assert_index_usage(&plan, column_name, true, "before delete");
2954
2955        let indexes = dataset.load_indices().await.unwrap();
2956        let original_index = indexes[0].clone();
2957
2958        // Delete all rows from the table
2959        dataset.delete("true").await.unwrap();
2960
2961        // Verify table is empty
2962        let row_count = dataset.count_rows(None).await.unwrap();
2963        assert_eq!(row_count, 0, "Table should be empty after delete all");
2964
2965        // Critical test: Verify the index still exists after deleting all data
2966        let indices_after_delete = dataset.load_indices().await.unwrap();
2967        assert_eq!(
2968            indices_after_delete.len(),
2969            1,
2970            "Index should be retained after deleting all data"
2971        );
2972        assert_eq!(
2973            indices_after_delete[0].name, "index",
2974            "Index name should remain the same after delete"
2975        );
2976
2977        // Critical test: Verify the fragment bitmap is empty after delete
2978        let index_after_delete = &indices_after_delete[0];
2979        let effective_bitmap = index_after_delete
2980            .effective_fragment_bitmap(&dataset.fragment_bitmap)
2981            .unwrap();
2982        assert!(
2983            effective_bitmap.is_empty(),
2984            "Effective bitmap should be empty after deleting all data"
2985        );
2986        assert_eq!(
2987            index_after_delete.fragment_bitmap, original_index.fragment_bitmap,
2988            "Fragment bitmap should remain the same after delete"
2989        );
2990
2991        // Verify we can still get index statistics
2992        let stats = dataset.index_statistics("index").await.unwrap();
2993        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2994        assert_eq!(
2995            stats["num_indexed_rows"], 0,
2996            "Index should now report zero indexed rows after delete all"
2997        );
2998        assert_eq!(
2999            stats["num_unindexed_rows"], 0,
3000            "Index should report zero unindexed rows after delete all"
3001        );
3002        assert_eq!(
3003            stats["num_indexed_fragments"], 0,
3004            "Index should report zero indexed fragments after delete all"
3005        );
3006        assert_eq!(
3007            stats["num_unindexed_fragments"], 0,
3008            "Index should report zero unindexed fragments after delete all"
3009        );
3010
3011        // Verify index is NOT being used in queries after delete (empty bitmap)
3012        if column_name == "text" {
3013            // Inverted indexes will still appear to be used in FTS queries.
3014            // TODO: once metrics are working on FTS queries, we can check the
3015            // analyze plan output instead for index usage.
3016            let _plan_after_delete = dataset
3017                .scan()
3018                .project(&[column_name])
3019                .unwrap()
3020                .full_text_search(FullTextSearchQuery::new("test".to_string()))
3021                .unwrap()
3022                .explain_plan(false)
3023                .await
3024                .unwrap();
3025            assert_index_usage(
3026                &_plan_after_delete,
3027                column_name,
3028                true,
3029                "after delete (empty bitmap)",
3030            );
3031        } else {
3032            // Use equality filter for btree/bitmap indices
3033            let _plan_after_delete = dataset
3034                .scan()
3035                .filter(format!("{} = 50", column_name).as_str())
3036                .unwrap()
3037                .explain_plan(false)
3038                .await
3039                .unwrap();
3040            // Verify index is NOT being used after delete (empty bitmap)
3041            assert_index_usage(
3042                &_plan_after_delete,
3043                column_name,
3044                false,
3045                "after delete (empty bitmap)",
3046            );
3047        }
3048
3049        // Test that we can append new data and the index is still there
3050        let append_reader = lance_datagen::gen_batch()
3051            .col("i", array::step::<Int32Type>())
3052            .col("text", array::rand_utf8(ByteCount::from(10), false))
3053            .into_reader_rows(RowCount::from(50), BatchCount::from(1));
3054
3055        dataset.append(append_reader, None).await.unwrap();
3056
3057        // Verify index still exists after append
3058        let indices_after_append = dataset.load_indices().await.unwrap();
3059        assert_eq!(
3060            indices_after_append.len(),
3061            1,
3062            "Index should still exist after appending to empty table"
3063        );
3064        let stats = dataset.index_statistics("index").await.unwrap();
3065        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3066        assert_eq!(
3067            stats["num_indexed_rows"], 0,
3068            "Index should now report zero indexed rows after data is added"
3069        );
3070
3071        // Test optimize_indices after delete all
3072        dataset.optimize_indices(&Default::default()).await.unwrap();
3073
3074        // Verify index still exists after optimization
3075        let indices_after_optimize = dataset.load_indices().await.unwrap();
3076        assert_eq!(
3077            indices_after_optimize.len(),
3078            1,
3079            "Index should still exist after optimization following delete all"
3080        );
3081
3082        // Verify we can still get index statistics after optimization
3083        let stats = dataset.index_statistics("index").await.unwrap();
3084        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3085        assert_eq!(
3086            stats["num_indexed_rows"],
3087            dataset.count_rows(None).await.unwrap(),
3088            "Index should now cover all newly added rows after optimization"
3089        );
3090    }
3091
3092    /// Test that scalar indices are retained after updating rows in a table.
3093    ///
3094    /// This test verifies that when we:
3095    /// 1. Create a table with data
3096    /// 2. Add a scalar index with train=true
3097    /// 3. Update rows in the table
3098    /// The index remains available on the table.
3099    #[rstest]
3100    #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
3101    #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
3102    #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
3103    #[tokio::test]
3104    async fn test_scalar_index_retained_after_update(
3105        #[case] column_name: &str,
3106        #[case] index_type: IndexType,
3107        #[case] params: Box<dyn IndexParams>,
3108    ) {
3109        use crate::dataset::UpdateBuilder;
3110        use lance_datagen::{array, BatchCount, ByteCount, RowCount};
3111
3112        // Create dataset with initial data
3113        let reader = lance_datagen::gen_batch()
3114            .col("i", array::step::<Int32Type>())
3115            .col("text", array::rand_utf8(ByteCount::from(10), false))
3116            .into_reader_rows(RowCount::from(100), BatchCount::from(1));
3117        let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
3118
3119        // Create index with train=true (normal index with data)
3120        dataset
3121            .create_index_builder(&[column_name], index_type, params.as_ref())
3122            .name("index".to_string())
3123            .train(true)
3124            .await
3125            .unwrap();
3126
3127        // Verify index was created and has indexed rows
3128        let stats = dataset.index_statistics("index").await.unwrap();
3129        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3130        assert_eq!(
3131            stats["num_indexed_rows"], 100,
3132            "Index should have indexed all 100 rows"
3133        );
3134
3135        // Verify index is being used in queries before update
3136        let plan = if column_name == "text" {
3137            // Use full-text search for inverted index
3138            dataset
3139                .scan()
3140                .project(&[column_name])
3141                .unwrap()
3142                .full_text_search(FullTextSearchQuery::new("test".to_string()))
3143                .unwrap()
3144                .explain_plan(false)
3145                .await
3146                .unwrap()
3147        } else {
3148            // Use equality filter for btree/bitmap indices
3149            dataset
3150                .scan()
3151                .filter(format!("{} = 50", column_name).as_str())
3152                .unwrap()
3153                .explain_plan(false)
3154                .await
3155                .unwrap()
3156        };
3157        // Verify index is being used before update
3158        assert_index_usage(&plan, column_name, true, "before update");
3159
3160        // Update some rows - update first 50 rows
3161        let update_result = UpdateBuilder::new(Arc::new(dataset))
3162            .set("i", "i + 1000")
3163            .unwrap()
3164            .set("text", "'updated_' || text")
3165            .unwrap()
3166            .build()
3167            .unwrap()
3168            .execute()
3169            .await
3170            .unwrap();
3171
3172        let mut dataset = update_result.new_dataset.as_ref().clone();
3173
3174        // Verify row count remains the same
3175        let row_count = dataset.count_rows(None).await.unwrap();
3176        assert_eq!(row_count, 100, "Row count should remain 100 after update");
3177
3178        // Critical test: Verify the index still exists after updating data
3179        let indices_after_update = dataset.load_indices().await.unwrap();
3180        assert_eq!(
3181            indices_after_update.len(),
3182            1,
3183            "Index should be retained after updating rows"
3184        );
3185
3186        // Critical test: Verify the effective fragment bitmap is empty after update
3187        let indices = dataset.load_indices().await.unwrap();
3188        let index = &indices[0];
3189        let effective_bitmap = index
3190            .effective_fragment_bitmap(&dataset.fragment_bitmap)
3191            .unwrap();
3192        assert!(
3193            effective_bitmap.is_empty(),
3194            "Effective fragment bitmap should be empty after updating all data"
3195        );
3196
3197        // Verify we can still get index statistics
3198        let stats_after_update = dataset.index_statistics("index").await.unwrap();
3199        let stats_after_update: serde_json::Value =
3200            serde_json::from_str(&stats_after_update).unwrap();
3201
3202        // The index should still be available
3203        assert_eq!(
3204            stats_after_update["num_indexed_rows"], 0,
3205            "Index statistics should be zero after update, as it is not re-trained"
3206        );
3207
3208        // Verify index behavior in queries after update (empty bitmap)
3209        if column_name == "text" {
3210            // Inverted indexes will still appear to be used in FTS queries even with empty bitmaps.
3211            // This is because FTS queries require an index to exist, and the query execution
3212            // will handle the empty bitmap appropriately by falling back to scanning unindexed data.
3213            // TODO: once metrics are working on FTS queries, we can check the
3214            // analyze plan output instead for actual index usage statistics.
3215            let _plan_after_update = dataset
3216                .scan()
3217                .project(&[column_name])
3218                .unwrap()
3219                .full_text_search(FullTextSearchQuery::new("test".to_string()))
3220                .unwrap()
3221                .explain_plan(false)
3222                .await
3223                .unwrap();
3224            assert_index_usage(
3225                &_plan_after_update,
3226                column_name,
3227                true, // FTS indices always appear in the plan, even with empty bitmaps
3228                "after update (empty bitmap)",
3229            );
3230        } else {
3231            // Use equality filter for btree/bitmap indices
3232            let _plan_after_update = dataset
3233                .scan()
3234                .filter(format!("{} = 50", column_name).as_str())
3235                .unwrap()
3236                .explain_plan(false)
3237                .await
3238                .unwrap();
3239            // With immutable bitmaps, index is still used even with empty effective bitmap
3240            // The prefilter will handle non-existent fragments
3241            assert_index_usage(
3242                &_plan_after_update,
3243                column_name,
3244                false,
3245                "after update (empty effective bitmap)",
3246            );
3247        }
3248
3249        // Test that we can optimize indices after update
3250        dataset.optimize_indices(&Default::default()).await.unwrap();
3251
3252        // Verify index still exists after optimization
3253        let indices_after_optimize = dataset.load_indices().await.unwrap();
3254        assert_eq!(
3255            indices_after_optimize.len(),
3256            1,
3257            "Index should still exist after optimization following update"
3258        );
3259
3260        let stats_after_optimization = dataset.index_statistics("index").await.unwrap();
3261        let stats_after_optimization: serde_json::Value =
3262            serde_json::from_str(&stats_after_optimization).unwrap();
3263
3264        // The index should still be available
3265        assert_eq!(
3266            stats_after_optimization["num_unindexed_rows"], 0,
3267            "Index should have zero unindexed rows after optimization"
3268        );
3269    }
3270
3271    // Helper function to validate indices after each clone iteration
3272    async fn validate_indices_after_clone(
3273        dataset: &Dataset,
3274        round: usize,
3275        expected_scalar_rows: usize,
3276        dimensions: u32,
3277    ) {
3278        // Verify cloned dataset has indices
3279        let indices = dataset.load_indices().await.unwrap();
3280        assert_eq!(
3281            indices.len(),
3282            2,
3283            "Round {}: Cloned dataset should have 2 indices",
3284            round
3285        );
3286        let index_names: HashSet<String> = indices.iter().map(|idx| idx.name.clone()).collect();
3287        assert!(
3288            index_names.contains("vector_idx"),
3289            "Round {}: Should contain vector_idx",
3290            round
3291        );
3292        assert!(
3293            index_names.contains("category_idx"),
3294            "Round {}: Should contain category_idx",
3295            round
3296        );
3297
3298        // Test basic data access without using indices for now
3299        // This ensures the dataset is accessible and data is intact
3300        // In chain cloning, each round adds 50 rows to the previous dataset
3301        // Round 1: 300 (original), Round 2: 350 (300 + 50 from round 1), Round 3: 400 (350 + 50 from round 2)
3302        let expected_total_rows = 300 + (round - 1) * 50;
3303        let total_rows = dataset.count_rows(None).await.unwrap();
3304        assert_eq!(
3305            total_rows, expected_total_rows,
3306            "Round {}: Should have {} rows after clone (chain cloning accumulates data)",
3307            round, expected_total_rows
3308        );
3309
3310        // Verify vector search
3311        let query_vector = generate_random_array(dimensions as usize);
3312        let search_results = dataset
3313            .scan()
3314            .nearest("vector", &query_vector, 5)
3315            .unwrap()
3316            .limit(Some(5), None)
3317            .unwrap()
3318            .try_into_batch()
3319            .await
3320            .unwrap();
3321        assert!(
3322            search_results.num_rows() > 0,
3323            "Round {}: Vector search should return results immediately after clone",
3324            round
3325        );
3326
3327        // Test basic scalar query to verify data integrity
3328        let scalar_results = dataset
3329            .scan()
3330            .filter("category = 'category_0'")
3331            .unwrap()
3332            .try_into_batch()
3333            .await
3334            .unwrap();
3335
3336        assert_eq!(
3337            expected_scalar_rows,
3338            scalar_results.num_rows(),
3339            "Round {}: Scalar query should return {} results",
3340            round,
3341            expected_scalar_rows
3342        );
3343    }
3344
3345    #[tokio::test]
3346    async fn test_shallow_clone_with_index() {
3347        let test_dir = TempStrDir::default();
3348        let test_uri = &test_dir;
3349
3350        // Create a schema with both vector and scalar columns
3351        let dimensions = 16u32;
3352        // Generate test data using lance_datagen (300 rows to satisfy PQ training requirements)
3353        let data = gen_batch()
3354            .col("id", array::step::<Int32Type>())
3355            .col("category", array::fill_utf8("category_0".to_string()))
3356            .col(
3357                "vector",
3358                array::rand_vec::<Float32Type>(Dimension::from(dimensions)),
3359            )
3360            .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3361
3362        // Create initial dataset
3363        let mut dataset = Dataset::write(data, test_uri, None).await.unwrap();
3364        // Create vector index (IVF_PQ)
3365        let vector_params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10);
3366        dataset
3367            .create_index(
3368                &["vector"],
3369                IndexType::Vector,
3370                Some("vector_idx".to_string()),
3371                &vector_params,
3372                true,
3373            )
3374            .await
3375            .unwrap();
3376
3377        // Create scalar index (BTree)
3378        dataset
3379            .create_index(
3380                &["category"],
3381                IndexType::BTree,
3382                Some("category_idx".to_string()),
3383                &ScalarIndexParams::default(),
3384                true,
3385            )
3386            .await
3387            .unwrap();
3388
3389        // Verify indices were created
3390        let indices = dataset.load_indices().await.unwrap();
3391        assert_eq!(indices.len(), 2, "Should have 2 indices");
3392        let index_names: HashSet<String> = indices.iter().map(|idx| idx.name.clone()).collect();
3393        assert!(index_names.contains("vector_idx"));
3394        assert!(index_names.contains("category_idx"));
3395
3396        // Test scalar query on source dataset
3397        let scalar_results = dataset
3398            .scan()
3399            .filter("category = 'category_0'")
3400            .unwrap()
3401            .try_into_batch()
3402            .await
3403            .unwrap();
3404
3405        let source_scalar_query_rows = scalar_results.num_rows();
3406        assert!(
3407            scalar_results.num_rows() > 0,
3408            "Scalar query should return results"
3409        );
3410
3411        // Multiple shallow clone iterations test with chain cloning
3412        let clone_rounds = 3;
3413        let mut current_dataset = dataset;
3414
3415        for round in 1..=clone_rounds {
3416            let round_clone_dir = format!("{}/clone_round_{}", test_dir, round);
3417            let round_cloned_uri = &round_clone_dir;
3418            let tag_name = format!("shallow_clone_test_{}", round);
3419
3420            // Create tag for this round (use current dataset for chain cloning)
3421            let current_version = current_dataset.version().version;
3422            current_dataset
3423                .tags()
3424                .create(&tag_name, current_version)
3425                .await
3426                .unwrap();
3427
3428            // Perform shallow clone for this round (chain cloning from current dataset)
3429            let mut round_cloned_dataset = current_dataset
3430                .shallow_clone(round_cloned_uri, tag_name.as_str(), None)
3431                .await
3432                .unwrap();
3433
3434            // Immediately validate indices after each clone
3435            validate_indices_after_clone(
3436                &round_cloned_dataset,
3437                round,
3438                source_scalar_query_rows,
3439                dimensions,
3440            )
3441            .await;
3442
3443            // Complete validation cycle after each clone: append data, optimize index, validate
3444            // Append new data to the cloned dataset
3445            let new_data = gen_batch()
3446                .col(
3447                    "id",
3448                    array::step_custom::<Int32Type>(300 + (round * 50) as i32, 1),
3449                )
3450                .col("category", array::fill_utf8(format!("category_{}", round)))
3451                .col(
3452                    "vector",
3453                    array::rand_vec::<Float32Type>(Dimension::from(dimensions)),
3454                )
3455                .into_reader_rows(RowCount::from(50), BatchCount::from(1));
3456
3457            round_cloned_dataset = Dataset::write(
3458                new_data,
3459                round_cloned_uri,
3460                Some(WriteParams {
3461                    mode: WriteMode::Append,
3462                    ..Default::default()
3463                }),
3464            )
3465            .await
3466            .unwrap();
3467
3468            // Verify row count increased
3469            let expected_rows = 300 + round * 50;
3470            let total_rows = round_cloned_dataset.count_rows(None).await.unwrap();
3471            assert_eq!(
3472                total_rows, expected_rows,
3473                "Round {}: Should have {} rows after append",
3474                round, expected_rows
3475            );
3476
3477            let indices_before_optimize = round_cloned_dataset.load_indices().await.unwrap();
3478            let vector_idx_before = indices_before_optimize
3479                .iter()
3480                .find(|idx| idx.name == "vector_idx")
3481                .unwrap();
3482            let category_idx_before = indices_before_optimize
3483                .iter()
3484                .find(|idx| idx.name == "category_idx")
3485                .unwrap();
3486
3487            // Optimize indices
3488            round_cloned_dataset
3489                .optimize_indices(&OptimizeOptions::default())
3490                .await
3491                .unwrap();
3492
3493            // Verify index UUID has changed
3494            let optimized_indices = round_cloned_dataset.load_indices().await.unwrap();
3495            let new_vector_idx = optimized_indices
3496                .iter()
3497                .find(|idx| idx.name == "vector_idx")
3498                .unwrap();
3499            let new_category_idx = optimized_indices
3500                .iter()
3501                .find(|idx| idx.name == "category_idx")
3502                .unwrap();
3503
3504            assert_ne!(
3505                new_vector_idx.uuid, vector_idx_before.uuid,
3506                "Round {}: Vector index should have a new UUID after optimization",
3507                round
3508            );
3509            assert_ne!(
3510                new_category_idx.uuid, category_idx_before.uuid,
3511                "Round {}: Category index should have a new UUID after optimization",
3512                round
3513            );
3514
3515            // Verify the location of index files
3516            use std::path::PathBuf;
3517            let clone_indices_dir = PathBuf::from(round_cloned_uri).join("_indices");
3518            let vector_index_dir = clone_indices_dir.join(new_vector_idx.uuid.to_string());
3519            let category_index_dir = clone_indices_dir.join(new_category_idx.uuid.to_string());
3520
3521            assert!(
3522                vector_index_dir.exists(),
3523                "Round {}: New vector index directory should exist in cloned dataset location: {:?}",
3524                round, vector_index_dir
3525            );
3526            assert!(
3527                category_index_dir.exists(),
3528                "Round {}: New category index directory should exist in cloned dataset location: {:?}",
3529                round, category_index_dir
3530            );
3531
3532            // Verify base id
3533            assert!(
3534                new_vector_idx.base_id.is_none(),
3535                "Round {}: New vector index should not have base_id after optimization in cloned dataset",
3536                round
3537            );
3538            assert!(
3539                new_category_idx.base_id.is_none(),
3540                "Round {}: New category index should not have base_id after optimization in cloned dataset",
3541                round
3542            );
3543
3544            // Verify the source location does NOT contain new data
3545            let original_indices_dir = PathBuf::from(current_dataset.uri()).join("_indices");
3546            let wrong_vector_dir = original_indices_dir.join(new_vector_idx.uuid.to_string());
3547            let wrong_category_dir = original_indices_dir.join(new_category_idx.uuid.to_string());
3548
3549            assert!(
3550                !wrong_vector_dir.exists(),
3551                "Round {}: New vector index should NOT be in original dataset location: {:?}",
3552                round,
3553                wrong_vector_dir
3554            );
3555            assert!(
3556                !wrong_category_dir.exists(),
3557                "Round {}: New category index should NOT be in original dataset location: {:?}",
3558                round,
3559                wrong_category_dir
3560            );
3561
3562            // Validate data integrity and index functionality after optimization
3563            let old_category_results = round_cloned_dataset
3564                .scan()
3565                .filter("category = 'category_0'")
3566                .unwrap()
3567                .try_into_batch()
3568                .await
3569                .unwrap();
3570
3571            let new_category_results = round_cloned_dataset
3572                .scan()
3573                .filter(&format!("category = 'category_{}'", round))
3574                .unwrap()
3575                .try_into_batch()
3576                .await
3577                .unwrap();
3578
3579            assert_eq!(
3580                source_scalar_query_rows,
3581                old_category_results.num_rows(),
3582                "Round {}: Should find old category data with {} rows",
3583                round,
3584                source_scalar_query_rows
3585            );
3586            assert!(
3587                new_category_results.num_rows() > 0,
3588                "Round {}: Should find new category data",
3589                round
3590            );
3591
3592            // Test vector search functionality
3593            let query_vector = generate_random_array(dimensions as usize);
3594            let search_results = round_cloned_dataset
3595                .scan()
3596                .nearest("vector", &query_vector, 10)
3597                .unwrap()
3598                .limit(Some(10), None)
3599                .unwrap()
3600                .try_into_batch()
3601                .await
3602                .unwrap();
3603
3604            assert!(
3605                search_results.num_rows() > 0,
3606                "Round {}: Vector search should return results after optimization",
3607                round
3608            );
3609
3610            // Verify statistic of indexes
3611            let vector_stats: serde_json::Value = serde_json::from_str(
3612                &round_cloned_dataset
3613                    .index_statistics("vector_idx")
3614                    .await
3615                    .unwrap(),
3616            )
3617            .unwrap();
3618            let category_stats: serde_json::Value = serde_json::from_str(
3619                &round_cloned_dataset
3620                    .index_statistics("category_idx")
3621                    .await
3622                    .unwrap(),
3623            )
3624            .unwrap();
3625
3626            assert_eq!(
3627                vector_stats["num_indexed_rows"].as_u64().unwrap(),
3628                expected_rows as u64,
3629                "Round {}: Vector index should have {} indexed rows",
3630                round,
3631                expected_rows
3632            );
3633            assert_eq!(
3634                category_stats["num_indexed_rows"].as_u64().unwrap(),
3635                expected_rows as u64,
3636                "Round {}: Category index should have {} indexed rows",
3637                round,
3638                expected_rows
3639            );
3640
3641            // Prepare for next round: use the cloned dataset as the source for next clone
3642            current_dataset = round_cloned_dataset;
3643        }
3644
3645        // Use the final cloned dataset for any remaining tests
3646        let final_cloned_dataset = current_dataset;
3647
3648        // Verify cloned dataset has indices
3649        let cloned_indices = final_cloned_dataset.load_indices().await.unwrap();
3650        assert_eq!(
3651            cloned_indices.len(),
3652            2,
3653            "Final cloned dataset should have 2 indices"
3654        );
3655        let cloned_index_names: HashSet<String> =
3656            cloned_indices.iter().map(|idx| idx.name.clone()).collect();
3657        assert!(cloned_index_names.contains("vector_idx"));
3658        assert!(cloned_index_names.contains("category_idx"));
3659
3660        // Test vector search on final cloned dataset
3661        let query_vector = generate_random_array(dimensions as usize);
3662        let search_results = final_cloned_dataset
3663            .scan()
3664            .nearest("vector", &query_vector, 5)
3665            .unwrap()
3666            .limit(Some(5), None)
3667            .unwrap()
3668            .try_into_batch()
3669            .await
3670            .unwrap();
3671
3672        assert!(
3673            search_results.num_rows() > 0,
3674            "Vector search should return results on final dataset"
3675        );
3676
3677        // Test scalar query on final cloned dataset
3678        let scalar_results = final_cloned_dataset
3679            .scan()
3680            .filter("category = 'category_0'")
3681            .unwrap()
3682            .try_into_batch()
3683            .await
3684            .unwrap();
3685
3686        assert_eq!(
3687            source_scalar_query_rows,
3688            scalar_results.num_rows(),
3689            "Scalar query should return results on final dataset"
3690        );
3691    }
3692
3693    #[tokio::test]
3694    async fn test_initialize_indices() {
3695        use crate::dataset::Dataset;
3696        use arrow_array::types::Float32Type;
3697        use lance_core::utils::tempfile::TempStrDir;
3698        use lance_datagen::{array, BatchCount, RowCount};
3699        use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
3700        use lance_linalg::distance::MetricType;
3701        use std::collections::HashSet;
3702
3703        // Create source dataset with various index types
3704        let test_dir = TempStrDir::default();
3705        let source_uri = format!("{}/{}", test_dir, "source");
3706        let target_uri = format!("{}/{}", test_dir, "target");
3707
3708        // Generate test data using lance_datagen (need at least 256 rows for PQ training)
3709        let source_reader = lance_datagen::gen_batch()
3710            .col("vector", array::rand_vec::<Float32Type>(8.into()))
3711            .col(
3712                "text",
3713                array::cycle_utf8_literals(&["hello world", "foo bar", "test data"]),
3714            )
3715            .col("id", array::step::<Int32Type>())
3716            .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3717
3718        // Create source dataset
3719        let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
3720            .await
3721            .unwrap();
3722
3723        // Create indices on source dataset
3724        // 1. Vector index
3725        let vector_params = VectorIndexParams::ivf_pq(4, 8, 2, MetricType::L2, 10);
3726        source_dataset
3727            .create_index(
3728                &["vector"],
3729                IndexType::Vector,
3730                Some("vec_idx".to_string()),
3731                &vector_params,
3732                false,
3733            )
3734            .await
3735            .unwrap();
3736
3737        // 2. FTS index
3738        let fts_params = InvertedIndexParams::default();
3739        source_dataset
3740            .create_index(
3741                &["text"],
3742                IndexType::Inverted,
3743                Some("text_idx".to_string()),
3744                &fts_params,
3745                false,
3746            )
3747            .await
3748            .unwrap();
3749
3750        // 3. Scalar index
3751        let scalar_params = ScalarIndexParams::default();
3752        source_dataset
3753            .create_index(
3754                &["id"],
3755                IndexType::BTree,
3756                Some("id_idx".to_string()),
3757                &scalar_params,
3758                false,
3759            )
3760            .await
3761            .unwrap();
3762
3763        // Reload source dataset to get updated index metadata
3764        let source_dataset = Dataset::open(&source_uri).await.unwrap();
3765
3766        // Verify source has 3 indices
3767        let source_indices = source_dataset.load_indices().await.unwrap();
3768        assert_eq!(
3769            source_indices.len(),
3770            3,
3771            "Source dataset should have 3 indices"
3772        );
3773
3774        // Create target dataset with same schema but different data (need at least 256 rows for PQ)
3775        let target_reader = lance_datagen::gen_batch()
3776            .col("vector", array::rand_vec::<Float32Type>(8.into()))
3777            .col(
3778                "text",
3779                array::cycle_utf8_literals(&["foo bar", "test data", "hello world"]),
3780            )
3781            .col("id", array::step_custom::<Int32Type>(100, 1))
3782            .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3783        let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
3784            .await
3785            .unwrap();
3786
3787        // Initialize indices from source dataset
3788        target_dataset
3789            .initialize_indices(&source_dataset)
3790            .await
3791            .unwrap();
3792
3793        // Verify target has same indices
3794        let target_indices = target_dataset.load_indices().await.unwrap();
3795        assert_eq!(
3796            target_indices.len(),
3797            3,
3798            "Target dataset should have 3 indices after initialization"
3799        );
3800
3801        // Check index names match
3802        let source_names: HashSet<String> =
3803            source_indices.iter().map(|idx| idx.name.clone()).collect();
3804        let target_names: HashSet<String> =
3805            target_indices.iter().map(|idx| idx.name.clone()).collect();
3806        assert_eq!(
3807            source_names, target_names,
3808            "Index names should match between source and target"
3809        );
3810
3811        // Verify indices are functional by running queries
3812        // 1. Test vector index
3813        let query_vector = generate_random_array(8);
3814        let search_results = target_dataset
3815            .scan()
3816            .nearest("vector", &query_vector, 5)
3817            .unwrap()
3818            .limit(Some(5), None)
3819            .unwrap()
3820            .try_into_batch()
3821            .await
3822            .unwrap();
3823        assert!(
3824            search_results.num_rows() > 0,
3825            "Vector index should be functional"
3826        );
3827
3828        // 2. Test scalar index
3829        let scalar_results = target_dataset
3830            .scan()
3831            .filter("id = 125")
3832            .unwrap()
3833            .try_into_batch()
3834            .await
3835            .unwrap();
3836        assert_eq!(
3837            scalar_results.num_rows(),
3838            1,
3839            "Scalar index should find exact match"
3840        );
3841    }
3842
3843    #[tokio::test]
3844    async fn test_initialize_indices_with_missing_field() {
3845        use crate::dataset::Dataset;
3846        use arrow_array::types::Int32Type;
3847        use lance_core::utils::tempfile::TempStrDir;
3848        use lance_datagen::{array, BatchCount, RowCount};
3849        use lance_index::scalar::ScalarIndexParams;
3850
3851        // Test that initialize_indices handles missing fields gracefully
3852        let test_dir = TempStrDir::default();
3853        let source_uri = format!("{}/{}", test_dir, "source");
3854        let target_uri = format!("{}/{}", test_dir, "target");
3855
3856        // Create source dataset with extra field
3857        let source_reader = lance_datagen::gen_batch()
3858            .col("id", array::step::<Int32Type>())
3859            .col("extra", array::cycle_utf8_literals(&["test"]))
3860            .into_reader_rows(RowCount::from(10), BatchCount::from(1));
3861        let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
3862            .await
3863            .unwrap();
3864
3865        // Create index on extra field in source
3866        source_dataset
3867            .create_index(
3868                &["extra"],
3869                IndexType::BTree,
3870                None,
3871                &ScalarIndexParams::default(),
3872                false,
3873            )
3874            .await
3875            .unwrap();
3876
3877        // Create target dataset without extra field
3878        let target_reader = lance_datagen::gen_batch()
3879            .col("id", array::step_custom::<Int32Type>(10, 1))
3880            .into_reader_rows(RowCount::from(10), BatchCount::from(1));
3881        let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
3882            .await
3883            .unwrap();
3884
3885        // Initialize indices should skip the index on missing field with an error
3886        let result = target_dataset.initialize_indices(&source_dataset).await;
3887
3888        // Should fail when field is missing
3889        assert!(result.is_err(), "Should error when field is missing");
3890        assert!(result
3891            .unwrap_err()
3892            .to_string()
3893            .contains("not found in target dataset"));
3894    }
3895
3896    #[tokio::test]
3897    async fn test_initialize_single_index() {
3898        use crate::dataset::Dataset;
3899        use crate::index::vector::VectorIndexParams;
3900        use arrow_array::types::{Float32Type, Int32Type};
3901        use lance_core::utils::tempfile::TempStrDir;
3902        use lance_datagen::{array, BatchCount, RowCount};
3903        use lance_index::scalar::ScalarIndexParams;
3904        use lance_linalg::distance::MetricType;
3905
3906        let test_dir = TempStrDir::default();
3907        let source_uri = format!("{}/{}", test_dir, "source");
3908        let target_uri = format!("{}/{}", test_dir, "target");
3909
3910        // Create source dataset (need at least 256 rows for PQ training)
3911        let source_reader = lance_datagen::gen_batch()
3912            .col("id", array::step::<Int32Type>())
3913            .col("name", array::rand_utf8(4.into(), false))
3914            .col("vector", array::rand_vec::<Float32Type>(8.into()))
3915            .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3916        let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
3917            .await
3918            .unwrap();
3919
3920        // Create multiple indices on source
3921        let scalar_params = ScalarIndexParams::default();
3922        source_dataset
3923            .create_index(
3924                &["id"],
3925                IndexType::BTree,
3926                Some("id_index".to_string()),
3927                &scalar_params,
3928                false,
3929            )
3930            .await
3931            .unwrap();
3932
3933        let vector_params = VectorIndexParams::ivf_pq(16, 8, 4, MetricType::L2, 50);
3934        source_dataset
3935            .create_index(
3936                &["vector"],
3937                IndexType::Vector,
3938                Some("vector_index".to_string()),
3939                &vector_params,
3940                false,
3941            )
3942            .await
3943            .unwrap();
3944
3945        // Reload source dataset to get updated index metadata
3946        let source_dataset = Dataset::open(&source_uri).await.unwrap();
3947
3948        // Create target dataset with same schema
3949        let target_reader = lance_datagen::gen_batch()
3950            .col("id", array::step::<Int32Type>())
3951            .col("name", array::rand_utf8(4.into(), false))
3952            .col("vector", array::rand_vec::<Float32Type>(8.into()))
3953            .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3954        let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
3955            .await
3956            .unwrap();
3957
3958        // Initialize only the vector index
3959        target_dataset
3960            .initialize_index(&source_dataset, "vector_index")
3961            .await
3962            .unwrap();
3963
3964        // Verify only vector index was created
3965        let target_indices = target_dataset.load_indices().await.unwrap();
3966        assert_eq!(target_indices.len(), 1, "Should have only 1 index");
3967        assert_eq!(
3968            target_indices[0].name, "vector_index",
3969            "Should have the vector index"
3970        );
3971
3972        // Initialize the scalar index
3973        target_dataset
3974            .initialize_index(&source_dataset, "id_index")
3975            .await
3976            .unwrap();
3977
3978        // Verify both indices now exist
3979        let target_indices = target_dataset.load_indices().await.unwrap();
3980        assert_eq!(target_indices.len(), 2, "Should have 2 indices");
3981
3982        let index_names: HashSet<String> =
3983            target_indices.iter().map(|idx| idx.name.clone()).collect();
3984        assert!(
3985            index_names.contains("vector_index"),
3986            "Should have vector index"
3987        );
3988        assert!(index_names.contains("id_index"), "Should have id index");
3989
3990        // Test error case - non-existent index
3991        let result = target_dataset
3992            .initialize_index(&source_dataset, "non_existent")
3993            .await;
3994        assert!(result.is_err(), "Should error for non-existent index");
3995        assert!(result
3996            .unwrap_err()
3997            .to_string()
3998            .contains("not found in source dataset"));
3999    }
4000
4001    #[tokio::test]
4002    async fn test_vector_index_on_nested_field_with_dots() {
4003        let dimensions = 16;
4004        let num_rows = 256;
4005
4006        // Create schema with nested field containing dots in the name
4007        let struct_field = Field::new(
4008            "embedding_data",
4009            DataType::Struct(
4010                vec![
4011                    Field::new(
4012                        "vector.v1", // Field name with dot
4013                        DataType::FixedSizeList(
4014                            Arc::new(Field::new("item", DataType::Float32, true)),
4015                            dimensions,
4016                        ),
4017                        false,
4018                    ),
4019                    Field::new(
4020                        "vector.v2", // Another field name with dot
4021                        DataType::FixedSizeList(
4022                            Arc::new(Field::new("item", DataType::Float32, true)),
4023                            dimensions,
4024                        ),
4025                        false,
4026                    ),
4027                    Field::new("metadata", DataType::Utf8, false),
4028                ]
4029                .into(),
4030            ),
4031            false,
4032        );
4033
4034        let schema = Arc::new(Schema::new(vec![
4035            Field::new("id", DataType::Int32, false),
4036            struct_field,
4037        ]));
4038
4039        // Generate test data
4040        let float_arr_v1 = generate_random_array(num_rows * dimensions as usize);
4041        let vectors_v1 = FixedSizeListArray::try_new_from_values(float_arr_v1, dimensions).unwrap();
4042
4043        let float_arr_v2 = generate_random_array(num_rows * dimensions as usize);
4044        let vectors_v2 = FixedSizeListArray::try_new_from_values(float_arr_v2, dimensions).unwrap();
4045
4046        let ids = Int32Array::from_iter_values(0..num_rows as i32);
4047        let metadata = StringArray::from_iter_values((0..num_rows).map(|i| format!("meta_{}", i)));
4048
4049        let struct_array = arrow_array::StructArray::from(vec![
4050            (
4051                Arc::new(Field::new(
4052                    "vector.v1",
4053                    DataType::FixedSizeList(
4054                        Arc::new(Field::new("item", DataType::Float32, true)),
4055                        dimensions,
4056                    ),
4057                    false,
4058                )),
4059                Arc::new(vectors_v1) as Arc<dyn arrow_array::Array>,
4060            ),
4061            (
4062                Arc::new(Field::new(
4063                    "vector.v2",
4064                    DataType::FixedSizeList(
4065                        Arc::new(Field::new("item", DataType::Float32, true)),
4066                        dimensions,
4067                    ),
4068                    false,
4069                )),
4070                Arc::new(vectors_v2) as Arc<dyn arrow_array::Array>,
4071            ),
4072            (
4073                Arc::new(Field::new("metadata", DataType::Utf8, false)),
4074                Arc::new(metadata) as Arc<dyn arrow_array::Array>,
4075            ),
4076        ]);
4077
4078        let batch =
4079            RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4080                .unwrap();
4081
4082        let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4083
4084        let test_dir = TempStrDir::default();
4085        let test_uri = &test_dir;
4086        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4087
4088        // Test creating index on nested field with dots using quoted syntax
4089        let nested_column_path_v1 = "embedding_data.`vector.v1`";
4090        let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
4091
4092        dataset
4093            .create_index(
4094                &[nested_column_path_v1],
4095                IndexType::Vector,
4096                Some("vec_v1_idx".to_string()),
4097                &params,
4098                true,
4099            )
4100            .await
4101            .unwrap();
4102
4103        // Verify index was created
4104        let indices = dataset.load_indices().await.unwrap();
4105        assert_eq!(indices.len(), 1);
4106        assert_eq!(indices[0].name, "vec_v1_idx");
4107
4108        // Verify the correct field was indexed
4109        let field_id = indices[0].fields[0];
4110        let field_path = dataset.schema().field_path(field_id).unwrap();
4111        assert_eq!(field_path, "embedding_data.`vector.v1`");
4112
4113        // Test creating index on the second vector field with dots
4114        let nested_column_path_v2 = "embedding_data.`vector.v2`";
4115        dataset
4116            .create_index(
4117                &[nested_column_path_v2],
4118                IndexType::Vector,
4119                Some("vec_v2_idx".to_string()),
4120                &params,
4121                true,
4122            )
4123            .await
4124            .unwrap();
4125
4126        // Verify both indices exist
4127        let indices = dataset.load_indices().await.unwrap();
4128        assert_eq!(indices.len(), 2);
4129
4130        // Verify we can query using the indexed fields and check the plan
4131        let query_vector = generate_random_array(dimensions as usize);
4132
4133        // Check the query plan for the first vector field
4134        let plan_v1 = dataset
4135            .scan()
4136            .nearest(nested_column_path_v1, &query_vector, 5)
4137            .unwrap()
4138            .explain_plan(false)
4139            .await
4140            .unwrap();
4141
4142        // Verify the vector index is being used (should show ANNSubIndex or ANNIvfPartition)
4143        assert!(
4144            plan_v1.contains("ANNSubIndex") || plan_v1.contains("ANNIvfPartition"),
4145            "Query plan should use vector index for nested field with dots. Plan: {}",
4146            plan_v1
4147        );
4148
4149        let search_results_v1 = dataset
4150            .scan()
4151            .nearest(nested_column_path_v1, &query_vector, 5)
4152            .unwrap()
4153            .try_into_batch()
4154            .await
4155            .unwrap();
4156
4157        assert_eq!(search_results_v1.num_rows(), 5);
4158
4159        // Check the query plan for the second vector field
4160        let plan_v2 = dataset
4161            .scan()
4162            .nearest(nested_column_path_v2, &query_vector, 5)
4163            .unwrap()
4164            .explain_plan(false)
4165            .await
4166            .unwrap();
4167
4168        // Verify the vector index is being used
4169        assert!(
4170            plan_v2.contains("ANNSubIndex") || plan_v2.contains("ANNIvfPartition"),
4171            "Query plan should use vector index for second nested field with dots. Plan: {}",
4172            plan_v2
4173        );
4174
4175        let search_results_v2 = dataset
4176            .scan()
4177            .nearest(nested_column_path_v2, &query_vector, 5)
4178            .unwrap()
4179            .try_into_batch()
4180            .await
4181            .unwrap();
4182
4183        assert_eq!(search_results_v2.num_rows(), 5);
4184    }
4185
4186    #[tokio::test]
4187    async fn test_btree_index_on_nested_field_with_dots() {
4188        // Test creating BTree index on nested field with dots in the name
4189        let test_dir = TempStrDir::default();
4190        let test_uri = &test_dir;
4191
4192        // Create schema with nested field containing dots
4193        let schema = Arc::new(Schema::new(vec![
4194            Field::new("id", DataType::Int32, false),
4195            Field::new(
4196                "data",
4197                DataType::Struct(
4198                    vec![
4199                        Field::new("value.v1", DataType::Int32, false),
4200                        Field::new("value.v2", DataType::Float32, false),
4201                        Field::new("text", DataType::Utf8, false),
4202                    ]
4203                    .into(),
4204                ),
4205                false,
4206            ),
4207        ]));
4208
4209        // Generate test data
4210        let num_rows = 1000;
4211        let ids = Int32Array::from_iter_values(0..num_rows);
4212        let values_v1 = Int32Array::from_iter_values((0..num_rows).map(|i| i % 100));
4213        let values_v2 = Float32Array::from_iter_values((0..num_rows).map(|i| (i as f32) * 0.1));
4214        let texts = StringArray::from_iter_values((0..num_rows).map(|i| format!("text_{}", i)));
4215
4216        let struct_array = arrow_array::StructArray::from(vec![
4217            (
4218                Arc::new(Field::new("value.v1", DataType::Int32, false)),
4219                Arc::new(values_v1) as Arc<dyn arrow_array::Array>,
4220            ),
4221            (
4222                Arc::new(Field::new("value.v2", DataType::Float32, false)),
4223                Arc::new(values_v2) as Arc<dyn arrow_array::Array>,
4224            ),
4225            (
4226                Arc::new(Field::new("text", DataType::Utf8, false)),
4227                Arc::new(texts) as Arc<dyn arrow_array::Array>,
4228            ),
4229        ]);
4230
4231        let batch =
4232            RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4233                .unwrap();
4234
4235        let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4236        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4237
4238        // Create BTree index on nested field with dots
4239        let nested_column_path = "data.`value.v1`";
4240        let params = ScalarIndexParams::default();
4241
4242        dataset
4243            .create_index(
4244                &[nested_column_path],
4245                IndexType::BTree,
4246                Some("btree_v1_idx".to_string()),
4247                &params,
4248                true,
4249            )
4250            .await
4251            .unwrap();
4252
4253        // Reload dataset to ensure index is loaded
4254        dataset = Dataset::open(test_uri).await.unwrap();
4255
4256        // Verify index was created
4257        let indices = dataset.load_indices().await.unwrap();
4258        assert_eq!(indices.len(), 1);
4259        assert_eq!(indices[0].name, "btree_v1_idx");
4260
4261        // Verify the correct field was indexed
4262        let field_id = indices[0].fields[0];
4263        let field_path = dataset.schema().field_path(field_id).unwrap();
4264        assert_eq!(field_path, "data.`value.v1`");
4265
4266        // Test querying with the index and verify it's being used
4267        let plan = dataset
4268            .scan()
4269            .filter("data.`value.v1` = 42")
4270            .unwrap()
4271            .prefilter(true)
4272            .explain_plan(false)
4273            .await
4274            .unwrap();
4275
4276        // Verify the query plan (scalar indices on nested fields may use optimized filters)
4277        // The index may be used internally even if not shown as ScalarIndexQuery
4278        assert!(
4279            plan.contains("ScalarIndexQuery"),
4280            "Query plan should show optimized read. Plan: {}",
4281            plan
4282        );
4283
4284        // Also test that the query returns results
4285        let results = dataset
4286            .scan()
4287            .filter("data.`value.v1` = 42")
4288            .unwrap()
4289            .prefilter(true)
4290            .try_into_batch()
4291            .await
4292            .unwrap();
4293
4294        assert!(results.num_rows() > 0);
4295    }
4296
4297    #[tokio::test]
4298    async fn test_bitmap_index_on_nested_field_with_dots() {
4299        // Test creating Bitmap index on nested field with dots in the name
4300        let test_dir = TempStrDir::default();
4301        let test_uri = &test_dir;
4302
4303        // Create schema with nested field containing dots - using low cardinality for bitmap
4304        let schema = Arc::new(Schema::new(vec![
4305            Field::new("id", DataType::Int32, false),
4306            Field::new(
4307                "metadata",
4308                DataType::Struct(
4309                    vec![
4310                        Field::new("status.code", DataType::Int32, false),
4311                        Field::new("category.name", DataType::Utf8, false),
4312                    ]
4313                    .into(),
4314                ),
4315                false,
4316            ),
4317        ]));
4318
4319        // Generate test data with low cardinality (good for bitmap index)
4320        let num_rows = 1000;
4321        let ids = Int32Array::from_iter_values(0..num_rows);
4322        // Only 10 unique status codes
4323        let status_codes = Int32Array::from_iter_values((0..num_rows).map(|i| i % 10));
4324        // Only 5 unique categories
4325        let categories =
4326            StringArray::from_iter_values((0..num_rows).map(|i| format!("category_{}", i % 5)));
4327
4328        let struct_array = arrow_array::StructArray::from(vec![
4329            (
4330                Arc::new(Field::new("status.code", DataType::Int32, false)),
4331                Arc::new(status_codes) as Arc<dyn arrow_array::Array>,
4332            ),
4333            (
4334                Arc::new(Field::new("category.name", DataType::Utf8, false)),
4335                Arc::new(categories) as Arc<dyn arrow_array::Array>,
4336            ),
4337        ]);
4338
4339        let batch =
4340            RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4341                .unwrap();
4342
4343        let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4344        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4345
4346        // Create Bitmap index on nested field with dots
4347        let nested_column_path = "metadata.`status.code`";
4348        let params = ScalarIndexParams::default();
4349
4350        dataset
4351            .create_index(
4352                &[nested_column_path],
4353                IndexType::Bitmap,
4354                Some("bitmap_status_idx".to_string()),
4355                &params,
4356                true,
4357            )
4358            .await
4359            .unwrap();
4360
4361        // Reload dataset to ensure index is loaded
4362        dataset = Dataset::open(test_uri).await.unwrap();
4363
4364        // Verify index was created
4365        let indices = dataset.load_indices().await.unwrap();
4366        assert_eq!(indices.len(), 1);
4367        assert_eq!(indices[0].name, "bitmap_status_idx");
4368
4369        // Verify the correct field was indexed
4370        let field_id = indices[0].fields[0];
4371        let field_path = dataset.schema().field_path(field_id).unwrap();
4372        assert_eq!(field_path, "metadata.`status.code`");
4373
4374        // Test querying with the index and verify it's being used
4375        let plan = dataset
4376            .scan()
4377            .filter("metadata.`status.code` = 5")
4378            .unwrap()
4379            .explain_plan(false)
4380            .await
4381            .unwrap();
4382
4383        // Verify the query plan (scalar indices on nested fields may use optimized filters)
4384        // The index may be used internally even if not shown as ScalarIndexQuery
4385        assert!(
4386            plan.contains("ScalarIndexQuery"),
4387            "Query plan should show optimized read. Plan: {}",
4388            plan
4389        );
4390
4391        // Also test that the query returns results
4392        let results = dataset
4393            .scan()
4394            .filter("metadata.`status.code` = 5")
4395            .unwrap()
4396            .try_into_batch()
4397            .await
4398            .unwrap();
4399
4400        // Should have ~100 rows with status code 5
4401        assert!(results.num_rows() > 0);
4402        assert_eq!(results.num_rows(), 100);
4403    }
4404
4405    #[tokio::test]
4406    async fn test_inverted_index_on_nested_field_with_dots() {
4407        use lance_index::scalar::inverted::tokenizer::InvertedIndexParams;
4408
4409        // Test creating Inverted index on nested text field with dots in the name
4410        let test_dir = TempStrDir::default();
4411        let test_uri = &test_dir;
4412
4413        // Create schema with nested text field containing dots
4414        let schema = Arc::new(Schema::new(vec![
4415            Field::new("id", DataType::Int32, false),
4416            Field::new(
4417                "document",
4418                DataType::Struct(
4419                    vec![
4420                        Field::new("content.text", DataType::Utf8, false),
4421                        Field::new("content.summary", DataType::Utf8, false),
4422                    ]
4423                    .into(),
4424                ),
4425                false,
4426            ),
4427        ]));
4428
4429        // Generate test data with text content
4430        let num_rows = 100;
4431        let ids = Int32Array::from_iter_values(0..num_rows as i32);
4432        let content_texts = StringArray::from_iter_values((0..num_rows).map(|i| match i % 3 {
4433            0 => format!("The quick brown fox jumps over the lazy dog {}", i),
4434            1 => format!(
4435                "Machine learning and artificial intelligence document {}",
4436                i
4437            ),
4438            _ => format!("Data science and analytics content piece {}", i),
4439        }));
4440        let summaries = StringArray::from_iter_values(
4441            (0..num_rows).map(|i| format!("Summary of document {}", i)),
4442        );
4443
4444        let struct_array = arrow_array::StructArray::from(vec![
4445            (
4446                Arc::new(Field::new("content.text", DataType::Utf8, false)),
4447                Arc::new(content_texts) as Arc<dyn arrow_array::Array>,
4448            ),
4449            (
4450                Arc::new(Field::new("content.summary", DataType::Utf8, false)),
4451                Arc::new(summaries) as Arc<dyn arrow_array::Array>,
4452            ),
4453        ]);
4454
4455        let batch =
4456            RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4457                .unwrap();
4458
4459        let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4460        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4461
4462        // Create Inverted index on nested text field with dots
4463        let nested_column_path = "document.`content.text`";
4464        let params = InvertedIndexParams::default();
4465
4466        dataset
4467            .create_index(
4468                &[nested_column_path],
4469                IndexType::Inverted,
4470                Some("inverted_content_idx".to_string()),
4471                &params,
4472                true,
4473            )
4474            .await
4475            .unwrap();
4476
4477        // Reload the dataset to ensure the index is loaded
4478        dataset = Dataset::open(test_uri).await.unwrap();
4479
4480        // Verify index was created
4481        let indices = dataset.load_indices().await.unwrap();
4482        assert_eq!(indices.len(), 1);
4483        assert_eq!(indices[0].name, "inverted_content_idx");
4484
4485        // Verify the correct field was indexed
4486        let field_id = indices[0].fields[0];
4487        let field_path = dataset.schema().field_path(field_id).unwrap();
4488        assert_eq!(field_path, "document.`content.text`");
4489
4490        // Test full-text search on the nested field with dots
4491        // Use the field_path that the index reports
4492        let query = FullTextSearchQuery::new("machine learning".to_string())
4493            .with_column(field_path.clone())
4494            .unwrap();
4495
4496        // Check the query plan uses the inverted index
4497        let plan = dataset
4498            .scan()
4499            .full_text_search(query.clone())
4500            .unwrap()
4501            .explain_plan(false)
4502            .await
4503            .unwrap();
4504
4505        // Verify the inverted index is being used
4506        assert!(
4507            plan.contains("MatchQuery") || plan.contains("PhraseQuery"),
4508            "Query plan should use inverted index for nested field with dots. Plan: {}",
4509            plan
4510        );
4511
4512        let results = dataset
4513            .scan()
4514            .full_text_search(query)
4515            .unwrap()
4516            .try_into_stream()
4517            .await
4518            .unwrap()
4519            .try_collect::<Vec<_>>()
4520            .await
4521            .unwrap();
4522
4523        // Verify we get results from the full-text search
4524        assert!(
4525            !results.is_empty(),
4526            "Full-text search should return results"
4527        );
4528
4529        // Check that we found documents containing "machine learning"
4530        let mut found_count = 0;
4531        for batch in results {
4532            found_count += batch.num_rows();
4533        }
4534        // We expect to find approximately 1/3 of documents (those with i % 3 == 1)
4535        assert!(
4536            found_count > 0,
4537            "Should find at least some documents with 'machine learning'"
4538        );
4539        assert!(found_count < num_rows, "Should not match all documents");
4540    }
4541}