Skip to main content

lance/
index.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Secondary Index
5//!
6
7use std::collections::{HashMap, HashSet};
8use std::sync::{Arc, OnceLock};
9
10use arrow_schema::{DataType, Schema};
11use async_trait::async_trait;
12use datafusion::execution::SendableRecordBatchStream;
13use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
14use futures::{stream, StreamExt, TryStreamExt};
15use itertools::Itertools;
16use lance_core::cache::{CacheKey, UnsizedCacheKey};
17use lance_core::utils::address::RowAddress;
18use lance_core::utils::parse::str_is_truthy;
19use lance_core::utils::tracing::{
20    IO_TYPE_OPEN_FRAG_REUSE, IO_TYPE_OPEN_MEM_WAL, IO_TYPE_OPEN_SCALAR, IO_TYPE_OPEN_VECTOR,
21    TRACE_IO_EVENTS,
22};
23use lance_file::previous::reader::FileReader as PreviousFileReader;
24use lance_file::reader::FileReaderOptions;
25use lance_index::frag_reuse::{FragReuseIndex, FRAG_REUSE_INDEX_NAME};
26use lance_index::mem_wal::{MemWalIndex, MEM_WAL_INDEX_NAME};
27use lance_index::optimize::OptimizeOptions;
28use lance_index::pb::index::Implementation;
29use lance_index::scalar::expression::{
30    IndexInformationProvider, MultiQueryParser, ScalarQueryParser,
31};
32use lance_index::scalar::inverted::InvertedIndexPlugin;
33use lance_index::scalar::lance_format::LanceIndexStore;
34use lance_index::scalar::registry::{TrainingCriteria, TrainingOrdering};
35use lance_index::scalar::{CreatedIndex, ScalarIndex};
36use lance_index::vector::bq::builder::RabitQuantizer;
37use lance_index::vector::flat::index::{FlatBinQuantizer, FlatIndex, FlatQuantizer};
38use lance_index::vector::hnsw::HNSW;
39use lance_index::vector::pq::ProductQuantizer;
40use lance_index::vector::sq::ScalarQuantizer;
41pub use lance_index::IndexParams;
42use lance_index::{
43    is_system_index,
44    metrics::{MetricsCollector, NoOpMetricsCollector},
45    IndexCriteria,
46};
47use lance_index::{pb, vector::VectorIndex, Index, IndexType, INDEX_FILE_NAME};
48use lance_index::{
49    DatasetIndexExt, IndexDescription, INDEX_METADATA_SCHEMA_KEY, VECTOR_INDEX_VERSION,
50};
51use lance_io::scheduler::{ScanScheduler, SchedulerConfig};
52use lance_io::traits::Reader;
53use lance_io::utils::{
54    read_last_block, read_message, read_message_from_buf, read_metadata_offset, read_version,
55    CachedFileSize,
56};
57use lance_table::format::IndexMetadata;
58use lance_table::format::{Fragment, SelfDescribingFileReader};
59use lance_table::io::manifest::read_manifest_indexes;
60use roaring::RoaringBitmap;
61use scalar::index_matches_criteria;
62use serde_json::json;
63use snafu::location;
64use tracing::{info, instrument};
65use uuid::Uuid;
66use vector::ivf::v2::IVFIndex;
67use vector::utils::get_vector_type;
68
69pub(crate) mod append;
70mod create;
71pub mod frag_reuse;
72pub mod mem_wal;
73pub mod prefilter;
74pub mod scalar;
75pub mod vector;
76
77use self::append::merge_indices;
78use self::vector::remap_vector_index;
79use crate::dataset::index::LanceIndexStoreExt;
80use crate::dataset::optimize::remapping::RemapResult;
81use crate::dataset::optimize::RemappedIndex;
82use crate::dataset::transaction::{Operation, Transaction};
83use crate::index::frag_reuse::{load_frag_reuse_index_details, open_frag_reuse_index};
84use crate::index::mem_wal::open_mem_wal_index;
85pub use crate::index::prefilter::{FilterLoader, PreFilter};
86use crate::index::scalar::{fetch_index_details, load_training_data, IndexDetails};
87use crate::session::index_caches::{FragReuseIndexKey, IndexMetadataKey};
88use crate::{dataset::Dataset, Error, Result};
89pub use create::CreateIndexBuilder;
90
91// Cache keys for different index types
92#[derive(Debug, Clone)]
93pub struct ScalarIndexCacheKey<'a> {
94    pub uuid: &'a str,
95    pub fri_uuid: Option<&'a Uuid>,
96}
97
98impl<'a> ScalarIndexCacheKey<'a> {
99    pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
100        Self { uuid, fri_uuid }
101    }
102}
103
104impl UnsizedCacheKey for ScalarIndexCacheKey<'_> {
105    type ValueType = dyn ScalarIndex;
106
107    fn key(&self) -> std::borrow::Cow<'_, str> {
108        if let Some(fri_uuid) = self.fri_uuid {
109            format!("{}-{}", self.uuid, fri_uuid).into()
110        } else {
111            self.uuid.into()
112        }
113    }
114}
115
116#[derive(Debug, Clone)]
117pub struct VectorIndexCacheKey<'a> {
118    pub uuid: &'a str,
119    pub fri_uuid: Option<&'a Uuid>,
120}
121
122impl<'a> VectorIndexCacheKey<'a> {
123    pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
124        Self { uuid, fri_uuid }
125    }
126}
127
128impl UnsizedCacheKey for VectorIndexCacheKey<'_> {
129    type ValueType = dyn VectorIndex;
130
131    fn key(&self) -> std::borrow::Cow<'_, str> {
132        if let Some(fri_uuid) = self.fri_uuid {
133            format!("{}-{}", self.uuid, fri_uuid).into()
134        } else {
135            self.uuid.into()
136        }
137    }
138}
139
140#[derive(Debug, Clone)]
141pub struct FragReuseIndexCacheKey<'a> {
142    pub uuid: &'a str,
143    pub fri_uuid: Option<&'a Uuid>,
144}
145
146impl<'a> FragReuseIndexCacheKey<'a> {
147    pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
148        Self { uuid, fri_uuid }
149    }
150}
151
152impl CacheKey for FragReuseIndexCacheKey<'_> {
153    type ValueType = FragReuseIndex;
154
155    fn key(&self) -> std::borrow::Cow<'_, str> {
156        if let Some(fri_uuid) = self.fri_uuid {
157            format!("{}-{}", self.uuid, fri_uuid).into()
158        } else {
159            self.uuid.into()
160        }
161    }
162}
163
164#[derive(Debug, Clone)]
165pub struct MemWalCacheKey<'a> {
166    pub uuid: &'a Uuid,
167    pub fri_uuid: Option<&'a Uuid>,
168}
169
170impl<'a> MemWalCacheKey<'a> {
171    pub fn new(uuid: &'a Uuid, fri_uuid: Option<&'a Uuid>) -> Self {
172        Self { uuid, fri_uuid }
173    }
174}
175
176impl CacheKey for MemWalCacheKey<'_> {
177    type ValueType = MemWalIndex;
178
179    fn key(&self) -> std::borrow::Cow<'_, str> {
180        if let Some(fri_uuid) = self.fri_uuid {
181            format!("{}-{}", self.uuid, fri_uuid).into()
182        } else {
183            self.uuid.to_string().into()
184        }
185    }
186}
187
188// Whether to auto-migrate a dataset when we encounter corruption.
189fn auto_migrate_corruption() -> bool {
190    static LANCE_AUTO_MIGRATION: OnceLock<bool> = OnceLock::new();
191    *LANCE_AUTO_MIGRATION.get_or_init(|| {
192        std::env::var("LANCE_AUTO_MIGRATION")
193            .ok()
194            .map(|s| str_is_truthy(&s))
195            .unwrap_or(true)
196    })
197}
198
199/// Builds index.
200#[async_trait]
201pub trait IndexBuilder {
202    fn index_type() -> IndexType;
203
204    async fn build(&self) -> Result<()>;
205}
206
207pub(crate) async fn remap_index(
208    dataset: &Dataset,
209    index_id: &Uuid,
210    row_id_map: &HashMap<u64, Option<u64>>,
211) -> Result<RemapResult> {
212    // Load indices from the disk.
213    let indices = dataset.load_indices().await?;
214    let matched = indices
215        .iter()
216        .find(|i| i.uuid == *index_id)
217        .ok_or_else(|| Error::Index {
218            message: format!("Index with id {} does not exist", index_id),
219            location: location!(),
220        })?;
221
222    if matched.fields.len() > 1 {
223        return Err(Error::Index {
224            message: "Remapping indices with multiple fields is not supported".to_string(),
225            location: location!(),
226        });
227    }
228
229    if row_id_map.values().all(|v| v.is_none()) {
230        let deleted_bitmap = RoaringBitmap::from_iter(
231            row_id_map
232                .keys()
233                .map(|row_id| RowAddress::new_from_u64(*row_id))
234                .map(|addr| addr.fragment_id()),
235        );
236        if Some(deleted_bitmap) == matched.fragment_bitmap {
237            // If remap deleted all rows, we can just return the same index ID.
238            // This can happen if there is a bug where the index is covering empty
239            // fragment that haven't been cleaned up. They should be cleaned up
240            // outside of this function.
241            return Ok(RemapResult::Keep(*index_id));
242        }
243    }
244
245    let field_id = matched
246        .fields
247        .first()
248        .expect("An index existed with no fields");
249    let field_path = dataset.schema().field_path(*field_id)?;
250
251    let new_id = Uuid::new_v4();
252
253    let generic = dataset
254        .open_generic_index(&field_path, &index_id.to_string(), &NoOpMetricsCollector)
255        .await?;
256
257    let created_index = match generic.index_type() {
258        it if it.is_scalar() => {
259            let new_store = LanceIndexStore::from_dataset_for_new(dataset, &new_id.to_string())?;
260
261            let scalar_index = dataset
262                .open_scalar_index(&field_path, &index_id.to_string(), &NoOpMetricsCollector)
263                .await?;
264            if !scalar_index.can_remap() {
265                return Ok(RemapResult::Drop);
266            }
267
268            match scalar_index.index_type() {
269                IndexType::Inverted => {
270                    let inverted_index = scalar_index
271                        .as_any()
272                        .downcast_ref::<lance_index::scalar::inverted::InvertedIndex>()
273                        .ok_or(Error::Index {
274                            message: "expected inverted index".to_string(),
275                            location: location!(),
276                        })?;
277                    if inverted_index.is_legacy() {
278                        log::warn!("reindex because of legacy format, index_type: {}, index_id: {}, field: {}",
279                            scalar_index.index_type(),
280                            index_id,
281                            field_path
282                        );
283                        let training_data = load_training_data(
284                            dataset,
285                            &field_path,
286                            &TrainingCriteria::new(TrainingOrdering::None),
287                            None,
288                            true, // Legacy reindexing should always train
289                            None,
290                        )
291                        .await?;
292                        InvertedIndexPlugin::train_inverted_index(
293                            training_data,
294                            &new_store,
295                            inverted_index.params().clone(),
296                            None,
297                        )
298                        .await?
299                    } else {
300                        scalar_index.remap(row_id_map, &new_store).await?
301                    }
302                }
303                _ => scalar_index.remap(row_id_map, &new_store).await?,
304            }
305        }
306        it if it.is_vector() => {
307            remap_vector_index(
308                Arc::new(dataset.clone()),
309                &field_path,
310                index_id,
311                &new_id,
312                matched,
313                row_id_map,
314            )
315            .await?;
316            CreatedIndex {
317                index_details: prost_types::Any::from_msg(
318                    &lance_table::format::pb::VectorIndexDetails::default(),
319                )
320                .unwrap(),
321                index_version: VECTOR_INDEX_VERSION,
322            }
323        }
324        _ => {
325            return Err(Error::Index {
326                message: format!("Index type {} is not supported", generic.index_type()),
327                location: location!(),
328            });
329        }
330    };
331
332    Ok(RemapResult::Remapped(RemappedIndex {
333        old_id: *index_id,
334        new_id,
335        index_details: created_index.index_details,
336        index_version: created_index.index_version,
337    }))
338}
339
340#[derive(Debug)]
341pub struct ScalarIndexInfo {
342    indexed_columns: HashMap<String, (DataType, Box<MultiQueryParser>)>,
343}
344
345impl IndexInformationProvider for ScalarIndexInfo {
346    fn get_index(&self, col: &str) -> Option<(&DataType, &dyn ScalarQueryParser)> {
347        self.indexed_columns
348            .get(col)
349            .map(|(ty, parser)| (ty, parser.as_ref() as &dyn ScalarQueryParser))
350    }
351}
352
353async fn open_index_proto(reader: &dyn Reader) -> Result<pb::Index> {
354    let file_size = reader.size().await?;
355    let tail_bytes = read_last_block(reader).await?;
356    let metadata_pos = read_metadata_offset(&tail_bytes)?;
357    let proto: pb::Index = if metadata_pos < file_size - tail_bytes.len() {
358        // We have not read the metadata bytes yet.
359        read_message(reader, metadata_pos).await?
360    } else {
361        let offset = tail_bytes.len() - (file_size - metadata_pos);
362        read_message_from_buf(&tail_bytes.slice(offset..))?
363    };
364    Ok(proto)
365}
366
367fn vector_index_details() -> prost_types::Any {
368    let details = lance_table::format::pb::VectorIndexDetails::default();
369    prost_types::Any::from_msg(&details).unwrap()
370}
371
372struct IndexDescriptionImpl {
373    name: String,
374    field_ids: Vec<u32>,
375    segments: Vec<IndexMetadata>,
376    index_type: String,
377    details: IndexDetails,
378    rows_indexed: u64,
379}
380
381impl IndexDescriptionImpl {
382    fn try_new(segments: Vec<IndexMetadata>, dataset: &Dataset) -> Result<Self> {
383        if segments.is_empty() {
384            return Err(Error::Index {
385                message: "Index metadata is empty".to_string(),
386                location: location!(),
387            });
388        }
389
390        // We assume the type URL and details are the same for all segments
391        let example_metadata = &segments[0];
392
393        let name = example_metadata.name.clone();
394        if !segments.iter().all(|shard| shard.name == name) {
395            return Err(Error::Index {
396                message: "Index name should be identical across all segments".to_string(),
397                location: location!(),
398            });
399        }
400
401        let field_ids = &example_metadata.fields;
402        if !segments.iter().all(|shard| shard.fields == *field_ids) {
403            return Err(Error::Index {
404                message: "Index fields should be identical across all segments".to_string(),
405                location: location!(),
406            });
407        }
408        let field_ids = field_ids.iter().map(|id| *id as u32).collect();
409
410        // This should not fail as we have already filtered out indexes without index details.
411        let index_details = example_metadata.index_details.as_ref().ok_or(Error::Index {
412            message:
413                "Index details are required for index description.  This index must be retrained to support this method."
414                    .to_string(),
415            location: location!(),
416        })?;
417        let type_url = &index_details.type_url;
418        if !segments.iter().all(|shard| {
419            shard
420                .index_details
421                .as_ref()
422                .map(|d| d.type_url == *type_url)
423                .unwrap_or(false)
424        }) {
425            return Err(Error::Index {
426                message: "Index type URL should be present and identical across all segments"
427                    .to_string(),
428                location: location!(),
429            });
430        }
431
432        let details = IndexDetails(index_details.clone());
433        let mut rows_indexed = 0;
434
435        let index_type = details
436            .get_plugin()
437            .map(|p| p.name().to_string())
438            .unwrap_or_else(|_| "Unknown".to_string());
439
440        for shard in &segments {
441            let fragment_bitmap = shard
442            .fragment_bitmap
443            .as_ref()
444            .ok_or_else(|| Error::Index {
445                message: "Fragment bitmap is required for index description.  This index must be retrained to support this method.".to_string(),
446                location: location!(),
447            })?;
448
449            for fragment in dataset.get_fragments() {
450                if fragment_bitmap.contains(fragment.id() as u32) {
451                    rows_indexed += fragment.fast_logical_rows()? as u64;
452                }
453            }
454        }
455
456        Ok(Self {
457            name,
458            field_ids,
459            index_type,
460            segments,
461            details,
462            rows_indexed,
463        })
464    }
465}
466
467impl IndexDescription for IndexDescriptionImpl {
468    fn name(&self) -> &str {
469        &self.name
470    }
471
472    fn field_ids(&self) -> &[u32] {
473        &self.field_ids
474    }
475
476    fn index_type(&self) -> &str {
477        &self.index_type
478    }
479
480    fn metadata(&self) -> &[IndexMetadata] {
481        &self.segments
482    }
483
484    fn type_url(&self) -> &str {
485        self.details.0.type_url.as_str()
486    }
487
488    fn rows_indexed(&self) -> u64 {
489        self.rows_indexed
490    }
491
492    fn details(&self) -> Result<String> {
493        let plugin = self.details.get_plugin()?;
494        plugin
495            .details_as_json(&self.details.0)
496            .map(|v| v.to_string())
497    }
498}
499
500#[async_trait]
501impl DatasetIndexExt for Dataset {
502    type IndexBuilder<'a> = CreateIndexBuilder<'a>;
503
504    /// Create a builder for creating an index on columns.
505    ///
506    /// This returns a builder that can be configured with additional options
507    /// before awaiting to execute.
508    ///
509    /// # Examples
510    ///
511    /// Create a scalar BTREE index:
512    /// ```
513    /// # use lance::{Dataset, Result};
514    /// # use lance_index::{DatasetIndexExt, IndexType, scalar::ScalarIndexParams};
515    /// # async fn example(dataset: &mut Dataset) -> Result<()> {
516    /// let params = ScalarIndexParams::default();
517    /// dataset
518    ///     .create_index_builder(&["id"], IndexType::BTree, &params)
519    ///     .name("id_index".to_string())
520    ///     .await?;
521    /// # Ok(())
522    /// # }
523    /// ```
524    ///
525    /// Create an empty index that will be populated later:
526    /// ```
527    /// # use lance::{Dataset, Result};
528    /// # use lance_index::{DatasetIndexExt, IndexType, scalar::ScalarIndexParams};
529    /// # async fn example(dataset: &mut Dataset) -> Result<()> {
530    /// let params = ScalarIndexParams::default();
531    /// dataset
532    ///     .create_index_builder(&["category"], IndexType::Bitmap, &params)
533    ///     .train(false)  // Create empty index
534    ///     .replace(true)  // Replace if exists
535    ///     .await?;
536    /// # Ok(())
537    /// # }
538    /// ```
539    fn create_index_builder<'a>(
540        &'a mut self,
541        columns: &[&str],
542        index_type: IndexType,
543        params: &'a dyn IndexParams,
544    ) -> CreateIndexBuilder<'a> {
545        CreateIndexBuilder::new(self, columns, index_type, params)
546    }
547
548    #[instrument(skip_all)]
549    async fn create_index(
550        &mut self,
551        columns: &[&str],
552        index_type: IndexType,
553        name: Option<String>,
554        params: &dyn IndexParams,
555        replace: bool,
556    ) -> Result<()> {
557        // Use the builder pattern with default train=true for backward compatibility
558        let mut builder = self.create_index_builder(columns, index_type, params);
559
560        if let Some(name) = name {
561            builder = builder.name(name);
562        }
563
564        builder.replace(replace).await
565    }
566
567    async fn drop_index(&mut self, name: &str) -> Result<()> {
568        let indices = self.load_indices_by_name(name).await?;
569        if indices.is_empty() {
570            return Err(Error::IndexNotFound {
571                identity: format!("name={}", name),
572                location: location!(),
573            });
574        }
575
576        let transaction = Transaction::new(
577            self.manifest.version,
578            Operation::CreateIndex {
579                new_indices: vec![],
580                removed_indices: indices.clone(),
581            },
582            None,
583        );
584
585        self.apply_commit(transaction, &Default::default(), &Default::default())
586            .await?;
587
588        Ok(())
589    }
590
591    async fn prewarm_index(&self, name: &str) -> Result<()> {
592        let indices = self.load_indices_by_name(name).await?;
593        if indices.is_empty() {
594            return Err(Error::IndexNotFound {
595                identity: format!("name={}", name),
596                location: location!(),
597            });
598        }
599
600        let index = self
601            .open_generic_index(name, &indices[0].uuid.to_string(), &NoOpMetricsCollector)
602            .await?;
603        index.prewarm().await?;
604
605        Ok(())
606    }
607
608    async fn describe_indices<'a, 'b>(
609        &'a self,
610        criteria: Option<IndexCriteria<'b>>,
611    ) -> Result<Vec<Arc<dyn IndexDescription>>> {
612        let indices = self.load_indices().await?;
613        let mut indices = if let Some(criteria) = criteria {
614            indices.iter().filter(|idx| {
615                if idx.index_details.is_none() {
616                    log::warn!("The method describe_indices does not support indexes without index details.  Please retrain the index {}", idx.name);
617                    return false;
618                }
619                let fields = idx
620                    .fields
621                    .iter()
622                    .filter_map(|id| self.schema().field_by_id(*id))
623                    .collect::<Vec<_>>();
624                match index_matches_criteria(idx, &criteria, &fields, false, self.schema()) {
625                    Ok(matched) => matched,
626                    Err(err) => {
627                        log::warn!("Could not describe index {}: {}", idx.name, err);
628                        false
629                    }
630                }
631            }).collect::<Vec<_>>()
632        } else {
633            indices.iter().collect::<Vec<_>>()
634        };
635        indices.sort_by_key(|idx| &idx.name);
636
637        indices
638            .into_iter()
639            .chunk_by(|idx| &idx.name)
640            .into_iter()
641            .map(|(_, segments)| {
642                let segments = segments.cloned().collect::<Vec<_>>();
643                let desc = IndexDescriptionImpl::try_new(segments, self)?;
644                Ok(Arc::new(desc) as Arc<dyn IndexDescription>)
645            })
646            .collect::<Result<Vec<_>>>()
647    }
648
649    async fn load_indices(&self) -> Result<Arc<Vec<IndexMetadata>>> {
650        let metadata_key = IndexMetadataKey {
651            version: self.version().version,
652        };
653        let indices = match self.index_cache.get_with_key(&metadata_key).await {
654            Some(indices) => indices,
655            None => {
656                let mut loaded_indices = read_manifest_indexes(
657                    &self.object_store,
658                    &self.manifest_location,
659                    &self.manifest,
660                )
661                .await?;
662                retain_supported_indices(&mut loaded_indices);
663                let loaded_indices = Arc::new(loaded_indices);
664                self.index_cache
665                    .insert_with_key(&metadata_key, loaded_indices.clone())
666                    .await;
667                loaded_indices
668            }
669        };
670
671        if let Some(frag_reuse_index_meta) =
672            indices.iter().find(|idx| idx.name == FRAG_REUSE_INDEX_NAME)
673        {
674            let uuid = frag_reuse_index_meta.uuid.to_string();
675            let fri_key = FragReuseIndexKey { uuid: &uuid };
676            let frag_reuse_index = self
677                .index_cache
678                .get_or_insert_with_key(fri_key, || async move {
679                    let index_details =
680                        load_frag_reuse_index_details(self, frag_reuse_index_meta).await?;
681                    open_frag_reuse_index(frag_reuse_index_meta.uuid, index_details.as_ref()).await
682                })
683                .await?;
684            let mut indices = indices.as_ref().clone();
685            indices.iter_mut().for_each(|idx| {
686                if let Some(bitmap) = idx.fragment_bitmap.as_mut() {
687                    frag_reuse_index.remap_fragment_bitmap(bitmap).unwrap();
688                }
689            });
690            Ok(Arc::new(indices))
691        } else {
692            Ok(indices)
693        }
694    }
695
696    async fn commit_existing_index(
697        &mut self,
698        index_name: &str,
699        column: &str,
700        index_id: Uuid,
701    ) -> Result<()> {
702        let Some(field) = self.schema().field(column) else {
703            return Err(Error::Index {
704                message: format!("CreateIndex: column '{column}' does not exist"),
705                location: location!(),
706            });
707        };
708
709        // TODO: We will need some way to determine the index details here.  Perhaps
710        // we can load the index itself and get the details that way.
711
712        let new_idx = IndexMetadata {
713            uuid: index_id,
714            name: index_name.to_string(),
715            fields: vec![field.id],
716            dataset_version: self.manifest.version,
717            fragment_bitmap: Some(self.get_fragments().iter().map(|f| f.id() as u32).collect()),
718            index_details: None,
719            index_version: 0,
720            created_at: Some(chrono::Utc::now()),
721            base_id: None, // New indices don't have base_id (they're not from shallow clone)
722        };
723
724        let transaction = Transaction::new(
725            self.manifest.version,
726            Operation::CreateIndex {
727                new_indices: vec![new_idx],
728                removed_indices: vec![],
729            },
730            None,
731        );
732
733        self.apply_commit(transaction, &Default::default(), &Default::default())
734            .await?;
735
736        Ok(())
737    }
738
739    async fn load_scalar_index<'a, 'b>(
740        &'a self,
741        criteria: IndexCriteria<'b>,
742    ) -> Result<Option<IndexMetadata>> {
743        let indices = self.load_indices().await?;
744
745        let mut indices = indices
746            .iter()
747            .filter(|idx| {
748                // We shouldn't have any indices with empty fields, but just in case, log an error
749                // but don't fail the operation (we might not be using that index)
750                if idx.fields.is_empty() {
751                    if idx.name != FRAG_REUSE_INDEX_NAME {
752                        log::error!("Index {} has no fields", idx.name);
753                    }
754                    false
755                } else {
756                    true
757                }
758            })
759            .collect::<Vec<_>>();
760        // This sorting & chunking is only needed to calculate if there are multiple indexes on the same
761        // field.  This fact is only needed for backwards compatibility behavior for indexes that don't have
762        // index details.  At some point we should deprecate indexes without index details.
763        //
764        // TODO: At some point we should just fail if the index details are missing and ask the user to
765        // retrain the index.
766        indices.sort_by_key(|idx| idx.fields[0]);
767        let indice_by_field = indices.into_iter().chunk_by(|idx| idx.fields[0]);
768        for (field_id, indices) in &indice_by_field {
769            let indices = indices.collect::<Vec<_>>();
770            let has_multiple = indices.len() > 1;
771            for idx in indices {
772                let field = self.schema().field_by_id(field_id);
773                if let Some(field) = field {
774                    if index_matches_criteria(
775                        idx,
776                        &criteria,
777                        &[field],
778                        has_multiple,
779                        self.schema(),
780                    )? {
781                        let non_empty = idx.fragment_bitmap.as_ref().is_some_and(|bitmap| {
782                            bitmap.intersection_len(self.fragment_bitmap.as_ref()) > 0
783                        });
784                        let is_fts_index = if let Some(details) = &idx.index_details {
785                            IndexDetails(details.clone()).supports_fts()
786                        } else {
787                            false
788                        };
789                        // FTS indices must always be returned even if empty, because FTS queries
790                        // require an index to exist. The query execution will handle the empty
791                        // bitmap appropriately and fall back to scanning unindexed data.
792                        // Other index types can be skipped if empty since they're optional optimizations.
793                        if non_empty || is_fts_index {
794                            return Ok(Some(idx.clone()));
795                        }
796                    }
797                }
798            }
799        }
800        return Ok(None);
801    }
802
803    #[instrument(skip_all)]
804    async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()> {
805        let dataset = Arc::new(self.clone());
806        let indices = self.load_indices().await?;
807
808        let indices_to_optimize = options
809            .index_names
810            .as_ref()
811            .map(|names| names.iter().collect::<HashSet<_>>());
812        let name_to_indices = indices
813            .iter()
814            .filter(|idx| {
815                indices_to_optimize
816                    .as_ref()
817                    .is_none_or(|names| names.contains(&idx.name))
818                    && !is_system_index(idx)
819            })
820            .map(|idx| (idx.name.clone(), idx))
821            .into_group_map();
822
823        let mut new_indices = vec![];
824        let mut removed_indices = vec![];
825        for deltas in name_to_indices.values() {
826            let Some(res) = merge_indices(dataset.clone(), deltas.as_slice(), options).await?
827            else {
828                continue;
829            };
830
831            let last_idx = deltas.last().expect("Delta indices should not be empty");
832            let new_idx = IndexMetadata {
833                uuid: res.new_uuid,
834                name: last_idx.name.clone(), // Keep the same name
835                fields: last_idx.fields.clone(),
836                dataset_version: self.manifest.version,
837                fragment_bitmap: Some(res.new_fragment_bitmap),
838                index_details: Some(Arc::new(res.new_index_details)),
839                index_version: res.new_index_version,
840                created_at: Some(chrono::Utc::now()),
841                base_id: None, // Mew merged index file locates in the cloned dataset.
842            };
843            removed_indices.extend(res.removed_indices.iter().map(|&idx| idx.clone()));
844            if deltas.len() > res.removed_indices.len() {
845                new_indices.extend(
846                    deltas[0..(deltas.len() - res.removed_indices.len())]
847                        .iter()
848                        .map(|&idx| idx.clone()),
849                );
850            }
851            new_indices.push(new_idx);
852        }
853
854        if new_indices.is_empty() {
855            return Ok(());
856        }
857
858        let transaction = Transaction::new(
859            self.manifest.version,
860            Operation::CreateIndex {
861                new_indices,
862                removed_indices,
863            },
864            None,
865        );
866
867        self.apply_commit(transaction, &Default::default(), &Default::default())
868            .await?;
869
870        Ok(())
871    }
872
873    async fn index_statistics(&self, index_name: &str) -> Result<String> {
874        let metadatas = self.load_indices_by_name(index_name).await?;
875        if metadatas.is_empty() {
876            return Err(Error::IndexNotFound {
877                identity: format!("name={}", index_name),
878                location: location!(),
879            });
880        }
881
882        if index_name == FRAG_REUSE_INDEX_NAME {
883            let index = self
884                .open_frag_reuse_index(&NoOpMetricsCollector)
885                .await?
886                .expect("FragmentReuse index does not exist");
887            return serde_json::to_string(&index.statistics()?).map_err(|e| Error::Index {
888                message: format!("Failed to serialize index statistics: {}", e),
889                location: location!(),
890            });
891        }
892
893        if index_name == MEM_WAL_INDEX_NAME {
894            let index = self
895                .open_mem_wal_index(&NoOpMetricsCollector)
896                .await?
897                .expect("MemWal index does not exist");
898            return serde_json::to_string(&index.statistics()?).map_err(|e| Error::Index {
899                message: format!("Failed to serialize index statistics: {}", e),
900                location: location!(),
901            });
902        }
903
904        let field_id = metadatas[0].fields[0];
905        let field_path = self.schema().field_path(field_id)?;
906
907        // Open all delta indices
908        let indices = stream::iter(metadatas.iter())
909            .then(|m| {
910                let field_path = field_path.clone();
911                async move {
912                    self.open_generic_index(&field_path, &m.uuid.to_string(), &NoOpMetricsCollector)
913                        .await
914                }
915            })
916            .try_collect::<Vec<_>>()
917            .await?;
918
919        // Stastistics for each delta index.
920        let indices_stats = indices
921            .iter()
922            .map(|idx| idx.statistics())
923            .collect::<Result<Vec<_>>>()?;
924
925        let index_type = indices[0].index_type().to_string();
926
927        let indexed_fragments_per_delta = self.indexed_fragments(index_name).await?;
928
929        let res = indexed_fragments_per_delta
930            .iter()
931            .map(|frags| {
932                let mut sum = 0;
933                for frag in frags.iter() {
934                    sum += frag.num_rows().ok_or_else(|| Error::Internal {
935                        message: "Fragment should have row counts, please upgrade lance and \
936                                      trigger a single write to fix this"
937                            .to_string(),
938                        location: location!(),
939                    })?;
940                }
941                Ok(sum)
942            })
943            .collect::<Result<Vec<_>>>();
944
945        async fn migrate_and_recompute(ds: &Dataset, index_name: &str) -> Result<String> {
946            let mut ds = ds.clone();
947            log::warn!(
948                "Detecting out-dated fragment metadata, migrating dataset. \
949                        To disable migration, set LANCE_AUTO_MIGRATION=false"
950            );
951            ds.delete("false").await.map_err(|err| {
952                Error::Execution {
953                    message: format!("Failed to migrate dataset while calculating index statistics. \
954                            To disable migration, set LANCE_AUTO_MIGRATION=false. Original error: {}", err),
955                    location: location!(),
956                }
957            })?;
958            ds.index_statistics(index_name).await
959        }
960
961        let num_indexed_rows_per_delta = match res {
962            Ok(rows) => rows,
963            Err(Error::Internal { message, .. })
964                if auto_migrate_corruption() && message.contains("trigger a single write") =>
965            {
966                return migrate_and_recompute(self, index_name).await;
967            }
968            Err(e) => return Err(e),
969        };
970
971        let mut fragment_ids = HashSet::new();
972        for frags in indexed_fragments_per_delta.iter() {
973            for frag in frags.iter() {
974                if !fragment_ids.insert(frag.id) {
975                    if auto_migrate_corruption() {
976                        return migrate_and_recompute(self, index_name).await;
977                    } else {
978                        return Err(Error::Internal {
979                            message:
980                                "Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \
981                                  and trigger a single write to fix this"
982                                    .to_string(),
983                            location: location!(),
984                        });
985                    }
986                }
987            }
988        }
989        let num_indexed_fragments = fragment_ids.len();
990
991        let num_unindexed_fragments = self.fragments().len() - num_indexed_fragments;
992        let num_indexed_rows: usize = num_indexed_rows_per_delta.iter().cloned().sum();
993        let num_unindexed_rows = self.count_rows(None).await? - num_indexed_rows;
994
995        // Calculate updated_at as max(created_at) from all index metadata
996        let updated_at = metadatas
997            .iter()
998            .filter_map(|m| m.created_at)
999            .max()
1000            .map(|dt| dt.timestamp_millis() as u64);
1001
1002        let stats = json!({
1003            "index_type": index_type,
1004            "name": index_name,
1005            "num_indices": metadatas.len(),
1006            "indices": indices_stats,
1007            "num_indexed_fragments": num_indexed_fragments,
1008            "num_indexed_rows": num_indexed_rows,
1009            "num_unindexed_fragments": num_unindexed_fragments,
1010            "num_unindexed_rows": num_unindexed_rows,
1011            "num_indexed_rows_per_delta": num_indexed_rows_per_delta,
1012            "updated_at_timestamp_ms": updated_at,
1013        });
1014
1015        serde_json::to_string(&stats).map_err(|e| Error::Index {
1016            message: format!("Failed to serialize index statistics: {}", e),
1017            location: location!(),
1018        })
1019    }
1020
1021    async fn read_index_partition(
1022        &self,
1023        index_name: &str,
1024        partition_id: usize,
1025        with_vector: bool,
1026    ) -> Result<SendableRecordBatchStream> {
1027        let indices = self.load_indices_by_name(index_name).await?;
1028        if indices.is_empty() {
1029            return Err(Error::IndexNotFound {
1030                identity: format!("name={}", index_name),
1031                location: location!(),
1032            });
1033        }
1034        let column = self.schema().field_by_id(indices[0].fields[0]).unwrap();
1035
1036        let mut schema: Option<Arc<Schema>> = None;
1037        let mut partition_streams = Vec::with_capacity(indices.len());
1038        for index in indices {
1039            let index = self
1040                .open_vector_index(&column.name, &index.uuid.to_string(), &NoOpMetricsCollector)
1041                .await?;
1042
1043            let stream = index
1044                .partition_reader(partition_id, with_vector, &NoOpMetricsCollector)
1045                .await?;
1046            if schema.is_none() {
1047                schema = Some(stream.schema());
1048            }
1049            partition_streams.push(stream);
1050        }
1051
1052        match schema {
1053            Some(schema) => {
1054                let merged = stream::select_all(partition_streams);
1055                let stream = RecordBatchStreamAdapter::new(schema, merged);
1056                Ok(Box::pin(stream))
1057            }
1058            None => Ok(Box::pin(RecordBatchStreamAdapter::new(
1059                Arc::new(Schema::empty()),
1060                stream::empty(),
1061            ))),
1062        }
1063    }
1064}
1065
1066pub(crate) fn retain_supported_indices(indices: &mut Vec<IndexMetadata>) {
1067    indices.retain(|idx| {
1068        let max_supported_version = idx
1069            .index_details
1070            .as_ref()
1071            .map(|details| {
1072                IndexDetails(details.clone())
1073                    .index_version()
1074                    // If we don't know how to read the index, it isn't supported
1075                    .unwrap_or(i32::MAX as u32)
1076            })
1077            .unwrap_or_default();
1078        let is_valid = idx.index_version <= max_supported_version as i32;
1079        if !is_valid {
1080            log::warn!(
1081                "Index {} has version {}, which is not supported (<={}), ignoring it",
1082                idx.name,
1083                idx.index_version,
1084                max_supported_version,
1085            );
1086        }
1087        is_valid
1088    })
1089}
1090
1091/// A trait for internal dataset utilities
1092///
1093/// Internal use only. No API stability guarantees.
1094#[async_trait]
1095pub trait DatasetIndexInternalExt: DatasetIndexExt {
1096    /// Opens an index (scalar or vector) as a generic index
1097    async fn open_generic_index(
1098        &self,
1099        column: &str,
1100        uuid: &str,
1101        metrics: &dyn MetricsCollector,
1102    ) -> Result<Arc<dyn Index>>;
1103    /// Opens the requested scalar index
1104    async fn open_scalar_index(
1105        &self,
1106        column: &str,
1107        uuid: &str,
1108        metrics: &dyn MetricsCollector,
1109    ) -> Result<Arc<dyn ScalarIndex>>;
1110    /// Opens the requested vector index
1111    async fn open_vector_index(
1112        &self,
1113        column: &str,
1114        uuid: &str,
1115        metrics: &dyn MetricsCollector,
1116    ) -> Result<Arc<dyn VectorIndex>>;
1117
1118    /// Opens the fragment reuse index
1119    async fn open_frag_reuse_index(
1120        &self,
1121        metrics: &dyn MetricsCollector,
1122    ) -> Result<Option<Arc<FragReuseIndex>>>;
1123
1124    /// Opens the MemWAL index
1125    async fn open_mem_wal_index(
1126        &self,
1127        metrics: &dyn MetricsCollector,
1128    ) -> Result<Option<Arc<MemWalIndex>>>;
1129
1130    /// Gets the fragment reuse index UUID from the current manifest, if it exists
1131    async fn frag_reuse_index_uuid(&self) -> Option<Uuid>;
1132
1133    /// Loads information about all the available scalar indices on the dataset
1134    async fn scalar_index_info(&self) -> Result<ScalarIndexInfo>;
1135
1136    /// Return the fragments that are not covered by any of the deltas of the index.
1137    async fn unindexed_fragments(&self, idx_name: &str) -> Result<Vec<Fragment>>;
1138
1139    /// Return the fragments that are covered by each of the deltas of the index.
1140    async fn indexed_fragments(&self, idx_name: &str) -> Result<Vec<Vec<Fragment>>>;
1141
1142    /// Initialize a specific index on this dataset based on an index from a source dataset.
1143    async fn initialize_index(&mut self, source_dataset: &Dataset, index_name: &str) -> Result<()>;
1144
1145    /// Initialize all indices on this dataset based on indices from a source dataset.
1146    /// This will call `initialize_index` for each non-system index in the source dataset.
1147    async fn initialize_indices(&mut self, source_dataset: &Dataset) -> Result<()>;
1148}
1149
1150#[async_trait]
1151impl DatasetIndexInternalExt for Dataset {
1152    async fn open_generic_index(
1153        &self,
1154        column: &str,
1155        uuid: &str,
1156        metrics: &dyn MetricsCollector,
1157    ) -> Result<Arc<dyn Index>> {
1158        // Checking for cache existence is cheap so we just check both scalar and vector caches
1159        let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1160        let cache_key = ScalarIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1161        if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
1162            return Ok(index.as_index());
1163        }
1164
1165        let vector_cache_key = VectorIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1166        if let Some(index) = self
1167            .index_cache
1168            .get_unsized_with_key(&vector_cache_key)
1169            .await
1170        {
1171            return Ok(index.as_index());
1172        }
1173
1174        let frag_reuse_cache_key = FragReuseIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1175        if let Some(index) = self.index_cache.get_with_key(&frag_reuse_cache_key).await {
1176            return Ok(index.as_index());
1177        }
1178
1179        // Sometimes we want to open an index and we don't care if it is a scalar or vector index.
1180        // For example, we might want to get statistics for an index, regardless of type.
1181        //
1182        // Currently, we solve this problem by checking for the existence of INDEX_FILE_NAME since
1183        // only vector indices have this file.  In the future, once we support multiple kinds of
1184        // scalar indices, we may start having this file with scalar indices too.  Once that happens
1185        // we can just read this file and look at the `implementation` or `index_type` fields to
1186        // determine what kind of index it is.
1187        let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1188            message: format!("Index with id {} does not exist", uuid),
1189            location: location!(),
1190        })?;
1191        let index_dir = self.indice_files_dir(&index_meta)?;
1192        let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME);
1193        if self.object_store.exists(&index_file).await? {
1194            let index = self.open_vector_index(column, uuid, metrics).await?;
1195            Ok(index.as_index())
1196        } else {
1197            let index = self.open_scalar_index(column, uuid, metrics).await?;
1198            Ok(index.as_index())
1199        }
1200    }
1201
1202    async fn open_scalar_index(
1203        &self,
1204        column: &str,
1205        uuid: &str,
1206        metrics: &dyn MetricsCollector,
1207    ) -> Result<Arc<dyn ScalarIndex>> {
1208        let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1209        let cache_key = ScalarIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1210        if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
1211            return Ok(index);
1212        }
1213
1214        let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1215            message: format!("Index with id {} does not exist", uuid),
1216            location: location!(),
1217        })?;
1218
1219        let index = scalar::open_scalar_index(self, column, &index_meta, metrics).await?;
1220
1221        info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_SCALAR, index_type=index.index_type().to_string());
1222        metrics.record_index_load();
1223
1224        self.index_cache
1225            .insert_unsized_with_key(&cache_key, index.clone())
1226            .await;
1227        Ok(index)
1228    }
1229
1230    async fn open_vector_index(
1231        &self,
1232        column: &str,
1233        uuid: &str,
1234        metrics: &dyn MetricsCollector,
1235    ) -> Result<Arc<dyn VectorIndex>> {
1236        let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1237        let cache_key = VectorIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1238
1239        if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
1240            log::debug!("Found vector index in cache uuid: {}", uuid);
1241            return Ok(index);
1242        }
1243
1244        let frag_reuse_index = self.open_frag_reuse_index(metrics).await?;
1245        let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1246            message: format!("Index with id {} does not exist", uuid),
1247            location: location!(),
1248        })?;
1249        let index_dir = self.indice_files_dir(&index_meta)?;
1250        let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME);
1251        let reader: Arc<dyn Reader> = self.object_store.open(&index_file).await?.into();
1252
1253        let tailing_bytes = read_last_block(reader.as_ref()).await?;
1254        let (major_version, minor_version) = read_version(&tailing_bytes)?;
1255
1256        // Namespace the index cache by the UUID of the index.
1257        let index_cache = self.index_cache.with_key_prefix(&cache_key.key());
1258
1259        // the index file is in lance format since version (0,2)
1260        // TODO: we need to change the legacy IVF_PQ to be in lance format
1261        let index = match (major_version, minor_version) {
1262            (0, 1) | (0, 0) => {
1263                info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.1", index_type="IVF_PQ");
1264                let proto = open_index_proto(reader.as_ref()).await?;
1265                match &proto.implementation {
1266                    Some(Implementation::VectorIndex(vector_index)) => {
1267                        let dataset = Arc::new(self.clone());
1268                        vector::open_vector_index(
1269                            dataset,
1270                            uuid,
1271                            vector_index,
1272                            reader,
1273                            frag_reuse_index,
1274                        )
1275                        .await
1276                    }
1277                    None => Err(Error::Internal {
1278                        message: "Index proto was missing implementation field".into(),
1279                        location: location!(),
1280                    }),
1281                }
1282            }
1283
1284            (0, 2) => {
1285                info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.2", index_type="IVF_PQ");
1286                let reader = PreviousFileReader::try_new_self_described_from_reader(
1287                    reader.clone(),
1288                    Some(&self.metadata_cache.file_metadata_cache(&index_file)),
1289                )
1290                .await?;
1291                vector::open_vector_index_v2(
1292                    Arc::new(self.clone()),
1293                    column,
1294                    uuid,
1295                    reader,
1296                    frag_reuse_index,
1297                )
1298                .await
1299            }
1300
1301            (0, 3) | (2, _) => {
1302                let scheduler = ScanScheduler::new(
1303                    self.object_store.clone(),
1304                    SchedulerConfig::max_bandwidth(&self.object_store),
1305                );
1306                let file = scheduler
1307                    .open_file(&index_file, &CachedFileSize::unknown())
1308                    .await?;
1309                let reader = lance_file::reader::FileReader::try_open(
1310                    file,
1311                    None,
1312                    Default::default(),
1313                    &self.metadata_cache.file_metadata_cache(&index_file),
1314                    FileReaderOptions::default(),
1315                )
1316                .await?;
1317                let index_metadata = reader
1318                    .schema()
1319                    .metadata
1320                    .get(INDEX_METADATA_SCHEMA_KEY)
1321                    .ok_or(Error::Index {
1322                        message: "Index Metadata not found".to_owned(),
1323                        location: location!(),
1324                    })?;
1325                let index_metadata: lance_index::IndexMetadata =
1326                    serde_json::from_str(index_metadata)?;
1327                let field = self.schema().field(column).ok_or_else(|| Error::Index {
1328                    message: format!("Column {} does not exist in the schema", column),
1329                    location: location!(),
1330                })?;
1331
1332                let (_, element_type) = get_vector_type(self.schema(), column)?;
1333
1334                info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.3", index_type=index_metadata.index_type);
1335
1336                match index_metadata.index_type.as_str() {
1337                    "IVF_FLAT" => match element_type {
1338                        DataType::Float16 | DataType::Float32 | DataType::Float64 => {
1339                            let ivf = IVFIndex::<FlatIndex, FlatQuantizer>::try_new(
1340                                self.object_store.clone(),
1341                                index_dir,
1342                                uuid.to_owned(),
1343                                frag_reuse_index,
1344                                self.metadata_cache.as_ref(),
1345                                index_cache,
1346                            )
1347                            .await?;
1348                            Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1349                        }
1350                        DataType::UInt8 => {
1351                            let ivf = IVFIndex::<FlatIndex, FlatBinQuantizer>::try_new(
1352                                self.object_store.clone(),
1353                                index_dir,
1354                                uuid.to_owned(),
1355                                frag_reuse_index,
1356                                self.metadata_cache.as_ref(),
1357                                index_cache,
1358                            )
1359                            .await?;
1360                            Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1361                        }
1362                        _ => Err(Error::Index {
1363                            message: format!(
1364                                "the field type {} is not supported for FLAT index",
1365                                field.data_type()
1366                            ),
1367                            location: location!(),
1368                        }),
1369                    },
1370
1371                    "IVF_PQ" => {
1372                        let ivf = IVFIndex::<FlatIndex, ProductQuantizer>::try_new(
1373                            self.object_store.clone(),
1374                            index_dir,
1375                            uuid.to_owned(),
1376                            frag_reuse_index,
1377                            self.metadata_cache.as_ref(),
1378                            index_cache,
1379                        )
1380                        .await?;
1381                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1382                    }
1383
1384                    "IVF_SQ" => {
1385                        let ivf = IVFIndex::<FlatIndex, ScalarQuantizer>::try_new(
1386                            self.object_store.clone(),
1387                            index_dir,
1388                            uuid.to_owned(),
1389                            frag_reuse_index,
1390                            self.metadata_cache.as_ref(),
1391                            index_cache,
1392                        )
1393                        .await?;
1394                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1395                    }
1396
1397                    "IVF_RQ" => {
1398                        let ivf = IVFIndex::<FlatIndex, RabitQuantizer>::try_new(
1399                            self.object_store.clone(),
1400                            self.indices_dir(),
1401                            uuid.to_owned(),
1402                            frag_reuse_index,
1403                            self.metadata_cache.as_ref(),
1404                            index_cache,
1405                        )
1406                        .await?;
1407                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1408                    }
1409
1410                    "IVF_HNSW_FLAT" => {
1411                        let uri = index_dir.child(uuid).child("index.pb");
1412                        let file_metadata_cache =
1413                            self.session.metadata_cache.file_metadata_cache(&uri);
1414                        let ivf = IVFIndex::<HNSW, FlatQuantizer>::try_new(
1415                            self.object_store.clone(),
1416                            index_dir,
1417                            uuid.to_owned(),
1418                            frag_reuse_index,
1419                            &file_metadata_cache,
1420                            index_cache,
1421                        )
1422                        .await?;
1423                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1424                    }
1425
1426                    "IVF_HNSW_SQ" => {
1427                        let ivf = IVFIndex::<HNSW, ScalarQuantizer>::try_new(
1428                            self.object_store.clone(),
1429                            index_dir,
1430                            uuid.to_owned(),
1431                            frag_reuse_index,
1432                            self.metadata_cache.as_ref(),
1433                            index_cache,
1434                        )
1435                        .await?;
1436                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1437                    }
1438
1439                    "IVF_HNSW_PQ" => {
1440                        let ivf = IVFIndex::<HNSW, ProductQuantizer>::try_new(
1441                            self.object_store.clone(),
1442                            index_dir,
1443                            uuid.to_owned(),
1444                            frag_reuse_index,
1445                            self.metadata_cache.as_ref(),
1446                            index_cache,
1447                        )
1448                        .await?;
1449                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1450                    }
1451
1452                    _ => Err(Error::Index {
1453                        message: format!("Unsupported index type: {}", index_metadata.index_type),
1454                        location: location!(),
1455                    }),
1456                }
1457            }
1458
1459            _ => Err(Error::Index {
1460                message: "unsupported index version (maybe need to upgrade your lance version)"
1461                    .to_owned(),
1462                location: location!(),
1463            }),
1464        };
1465        let index = index?;
1466        metrics.record_index_load();
1467        self.index_cache
1468            .insert_unsized_with_key(&cache_key, index.clone())
1469            .await;
1470        Ok(index)
1471    }
1472
1473    async fn open_frag_reuse_index(
1474        &self,
1475        metrics: &dyn MetricsCollector,
1476    ) -> Result<Option<Arc<FragReuseIndex>>> {
1477        if let Some(frag_reuse_index_meta) = self.load_index_by_name(FRAG_REUSE_INDEX_NAME).await? {
1478            let uuid = frag_reuse_index_meta.uuid.to_string();
1479            let frag_reuse_key = FragReuseIndexKey { uuid: &uuid };
1480            let uuid_clone = uuid.clone();
1481
1482            let index = self
1483                .index_cache
1484                .get_or_insert_with_key(frag_reuse_key, || async move {
1485                    let index_meta = self.load_index(&uuid_clone).await?.ok_or_else(|| Error::Index {
1486                        message: format!("Index with id {} does not exist", uuid_clone),
1487                        location: location!(),
1488                    })?;
1489                    let index_details = load_frag_reuse_index_details(self, &index_meta).await?;
1490                    let index =
1491                        open_frag_reuse_index(frag_reuse_index_meta.uuid, index_details.as_ref()).await?;
1492
1493                    info!(target: TRACE_IO_EVENTS, index_uuid=uuid_clone, r#type=IO_TYPE_OPEN_FRAG_REUSE);
1494                    metrics.record_index_load();
1495
1496                    Ok(index)
1497                })
1498                .await?;
1499
1500            Ok(Some(index))
1501        } else {
1502            Ok(None)
1503        }
1504    }
1505
1506    async fn open_mem_wal_index(
1507        &self,
1508        metrics: &dyn MetricsCollector,
1509    ) -> Result<Option<Arc<MemWalIndex>>> {
1510        let Some(mem_wal_meta) = self.load_index_by_name(MEM_WAL_INDEX_NAME).await? else {
1511            return Ok(None);
1512        };
1513
1514        let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1515        let cache_key = MemWalCacheKey::new(&mem_wal_meta.uuid, frag_reuse_uuid.as_ref());
1516        if let Some(index) = self.index_cache.get_with_key(&cache_key).await {
1517            log::debug!("Found MemWAL index in cache uuid: {}", mem_wal_meta.uuid);
1518            return Ok(Some(index));
1519        }
1520
1521        let uuid = mem_wal_meta.uuid.to_string();
1522
1523        let index_meta = self.load_index(&uuid).await?.ok_or_else(|| Error::Index {
1524            message: format!("Index with id {} does not exist", uuid),
1525            location: location!(),
1526        })?;
1527        let index = open_mem_wal_index(index_meta)?;
1528
1529        info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_MEM_WAL);
1530        metrics.record_index_load();
1531
1532        self.index_cache
1533            .insert_with_key(&cache_key, index.clone())
1534            .await;
1535        Ok(Some(index))
1536    }
1537
1538    async fn frag_reuse_index_uuid(&self) -> Option<Uuid> {
1539        if let Ok(indices) = self.load_indices().await {
1540            indices
1541                .iter()
1542                .find(|idx| idx.name == FRAG_REUSE_INDEX_NAME)
1543                .map(|idx| idx.uuid)
1544        } else {
1545            None
1546        }
1547    }
1548
1549    #[instrument(level = "trace", skip_all)]
1550    async fn scalar_index_info(&self) -> Result<ScalarIndexInfo> {
1551        let indices = self.load_indices().await?;
1552        let schema = self.schema();
1553        let mut indexed_fields = Vec::new();
1554        for index in indices.iter().filter(|idx| {
1555            let idx_schema = schema.project_by_ids(idx.fields.as_slice(), true);
1556            let is_vector_index = idx_schema
1557                .fields
1558                .iter()
1559                .any(|f| is_vector_field(f.data_type()));
1560
1561            // Check if this is an FTS index by looking at index details
1562            let is_fts_index = if let Some(details) = &idx.index_details {
1563                IndexDetails(details.clone()).supports_fts()
1564            } else {
1565                false
1566            };
1567
1568            // Only include indices with non-empty fragment bitmaps, except for FTS indices
1569            // which need to be discoverable even when empty
1570            let has_non_empty_bitmap = idx.fragment_bitmap.as_ref().is_some_and(|bitmap| {
1571                !bitmap.is_empty() && !(bitmap & self.fragment_bitmap.as_ref()).is_empty()
1572            });
1573
1574            idx.fields.len() == 1 && !is_vector_index && (has_non_empty_bitmap || is_fts_index)
1575        }) {
1576            let field = index.fields[0];
1577            let field = schema.field_by_id(field).ok_or_else(|| Error::Internal {
1578                message: format!(
1579                    "Index referenced a field with id {field} which did not exist in the schema"
1580                ),
1581                location: location!(),
1582            })?;
1583
1584            // Build the full field path for nested fields
1585            let field_path = if let Some(ancestors) = schema.field_ancestry_by_id(field.id) {
1586                let field_refs: Vec<&str> = ancestors.iter().map(|f| f.name.as_str()).collect();
1587                lance_core::datatypes::format_field_path(&field_refs)
1588            } else {
1589                field.name.clone()
1590            };
1591
1592            let index_details = IndexDetails(fetch_index_details(self, &field_path, index).await?);
1593            if index_details.is_vector() {
1594                continue;
1595            }
1596
1597            let plugin = index_details.get_plugin()?;
1598            let query_parser = plugin.new_query_parser(index.name.clone(), &index_details.0);
1599
1600            if let Some(query_parser) = query_parser {
1601                indexed_fields.push((field_path, (field.data_type(), query_parser)));
1602            }
1603        }
1604        let mut index_info_map = HashMap::with_capacity(indexed_fields.len());
1605        for indexed_field in indexed_fields {
1606            // Need to wrap in an option here because we know that only one of and_modify and or_insert will be called
1607            // but the rust compiler does not.
1608            let mut parser = Some(indexed_field.1 .1);
1609            let parser = &mut parser;
1610            index_info_map
1611                .entry(indexed_field.0)
1612                .and_modify(|existing: &mut (DataType, Box<MultiQueryParser>)| {
1613                    // If there are two indices on the same column, they must have the same type
1614                    debug_assert_eq!(existing.0, indexed_field.1 .0);
1615
1616                    existing.1.add(parser.take().unwrap());
1617                })
1618                .or_insert_with(|| {
1619                    (
1620                        indexed_field.1 .0,
1621                        Box::new(MultiQueryParser::single(parser.take().unwrap())),
1622                    )
1623                });
1624        }
1625        Ok(ScalarIndexInfo {
1626            indexed_columns: index_info_map,
1627        })
1628    }
1629
1630    async fn unindexed_fragments(&self, name: &str) -> Result<Vec<Fragment>> {
1631        let indices = self.load_indices_by_name(name).await?;
1632        let mut total_fragment_bitmap = RoaringBitmap::new();
1633        for idx in indices.iter() {
1634            total_fragment_bitmap |= idx.fragment_bitmap.as_ref().ok_or(Error::Index {
1635                message: "Please upgrade lance to 0.8+ to use this function".to_string(),
1636                location: location!(),
1637            })?;
1638        }
1639        Ok(self
1640            .fragments()
1641            .iter()
1642            .filter(|f| !total_fragment_bitmap.contains(f.id as u32))
1643            .cloned()
1644            .collect())
1645    }
1646
1647    async fn indexed_fragments(&self, name: &str) -> Result<Vec<Vec<Fragment>>> {
1648        let indices = self.load_indices_by_name(name).await?;
1649        indices
1650            .iter()
1651            .map(|index| {
1652                let fragment_bitmap = index.fragment_bitmap.as_ref().ok_or(Error::Index {
1653                    message: "Please upgrade lance to 0.8+ to use this function".to_string(),
1654                    location: location!(),
1655                })?;
1656                let mut indexed_frags = Vec::with_capacity(fragment_bitmap.len() as usize);
1657                for frag in self.fragments().iter() {
1658                    if fragment_bitmap.contains(frag.id as u32) {
1659                        indexed_frags.push(frag.clone());
1660                    }
1661                }
1662                Ok(indexed_frags)
1663            })
1664            .collect()
1665    }
1666
1667    async fn initialize_index(&mut self, source_dataset: &Dataset, index_name: &str) -> Result<()> {
1668        let source_indices = source_dataset.load_indices_by_name(index_name).await?;
1669
1670        if source_indices.is_empty() {
1671            return Err(Error::Index {
1672                message: format!("Index '{}' not found in source dataset", index_name),
1673                location: location!(),
1674            });
1675        }
1676
1677        let source_index = source_indices
1678            .iter()
1679            .min_by_key(|idx| idx.created_at)
1680            .ok_or_else(|| Error::Index {
1681                message: format!("Could not determine oldest index for '{}'", index_name),
1682                location: location!(),
1683            })?;
1684
1685        let mut field_names = Vec::new();
1686        for field_id in source_index.fields.iter() {
1687            let source_field = source_dataset
1688                .schema()
1689                .field_by_id(*field_id)
1690                .ok_or_else(|| Error::Index {
1691                    message: format!("Field with id {} not found in source dataset", field_id),
1692                    location: location!(),
1693                })?;
1694
1695            let target_field =
1696                self.schema()
1697                    .field(&source_field.name)
1698                    .ok_or_else(|| Error::Index {
1699                        message: format!(
1700                            "Field '{}' required by index '{}' not found in target dataset",
1701                            source_field.name, index_name
1702                        ),
1703                        location: location!(),
1704                    })?;
1705
1706            if source_field.data_type() != target_field.data_type() {
1707                return Err(Error::Index {
1708                    message: format!(
1709                        "Field '{}' has different types in source ({:?}) and target ({:?}) datasets",
1710                        source_field.name,
1711                        source_field.data_type(),
1712                        target_field.data_type()
1713                    ),
1714                    location: location!(),
1715                });
1716            }
1717
1718            field_names.push(source_field.name.as_str());
1719        }
1720
1721        if field_names.is_empty() {
1722            return Err(Error::Index {
1723                message: format!("Index '{}' has no fields", index_name),
1724                location: location!(),
1725            });
1726        }
1727
1728        if let Some(index_details) = &source_index.index_details {
1729            let index_details_wrapper = IndexDetails(index_details.clone());
1730
1731            if index_details_wrapper.is_vector() {
1732                vector::initialize_vector_index(self, source_dataset, source_index, &field_names)
1733                    .await?;
1734            } else {
1735                scalar::initialize_scalar_index(self, source_dataset, source_index, &field_names)
1736                    .await?;
1737            }
1738        } else {
1739            log::warn!(
1740                "Index '{}' has no index_details, skipping",
1741                source_index.name
1742            );
1743        }
1744
1745        Ok(())
1746    }
1747
1748    async fn initialize_indices(&mut self, source_dataset: &Dataset) -> Result<()> {
1749        let source_indices = source_dataset.load_indices().await?;
1750        let non_system_indices: Vec<_> = source_indices
1751            .iter()
1752            .filter(|idx| !lance_index::is_system_index(idx))
1753            .collect();
1754
1755        if non_system_indices.is_empty() {
1756            return Ok(());
1757        }
1758
1759        let mut unique_index_names = HashSet::new();
1760        for index in non_system_indices.iter() {
1761            unique_index_names.insert(index.name.clone());
1762        }
1763
1764        for index_name in unique_index_names {
1765            self.initialize_index(source_dataset, &index_name).await?;
1766        }
1767
1768        Ok(())
1769    }
1770}
1771
1772fn is_vector_field(data_type: DataType) -> bool {
1773    match data_type {
1774        DataType::FixedSizeList(_, _) => true,
1775        DataType::List(inner) => {
1776            // If the inner type is a fixed size list, then it is a multivector field
1777            matches!(inner.data_type(), DataType::FixedSizeList(_, _))
1778        }
1779        _ => false,
1780    }
1781}
1782
1783#[cfg(test)]
1784mod tests {
1785    use crate::dataset::builder::DatasetBuilder;
1786    use crate::dataset::optimize::{compact_files, CompactionOptions};
1787    use crate::dataset::{WriteMode, WriteParams};
1788    use crate::index::vector::VectorIndexParams;
1789    use crate::session::Session;
1790    use crate::utils::test::{copy_test_data_to_tmp, DatagenExt, FragmentCount, FragmentRowCount};
1791    use arrow_array::Int32Array;
1792
1793    use lance_io::{assert_io_eq, assert_io_lt};
1794
1795    use super::*;
1796
1797    use arrow::array::AsArray;
1798    use arrow::datatypes::{Float32Type, Int32Type};
1799    use arrow_array::{
1800        FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator, StringArray,
1801    };
1802    use arrow_schema::{Field, Schema};
1803    use lance_arrow::*;
1804    use lance_core::utils::tempfile::TempStrDir;
1805    use lance_datagen::gen_batch;
1806    use lance_datagen::{array, BatchCount, Dimension, RowCount};
1807    use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams};
1808    use lance_index::vector::{
1809        hnsw::builder::HnswBuildParams, ivf::IvfBuildParams, sq::builder::SQBuildParams,
1810    };
1811
1812    use lance_linalg::distance::{DistanceType, MetricType};
1813    use lance_testing::datagen::generate_random_array;
1814    use rstest::rstest;
1815    use std::collections::HashSet;
1816
1817    #[tokio::test]
1818    async fn test_recreate_index() {
1819        const DIM: i32 = 8;
1820        let schema = Arc::new(Schema::new(vec![
1821            Field::new(
1822                "v",
1823                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), DIM),
1824                true,
1825            ),
1826            Field::new(
1827                "o",
1828                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), DIM),
1829                true,
1830            ),
1831        ]));
1832        let data = generate_random_array(2048 * DIM as usize);
1833        let batches: Vec<RecordBatch> = vec![RecordBatch::try_new(
1834            schema.clone(),
1835            vec![
1836                Arc::new(FixedSizeListArray::try_new_from_values(data.clone(), DIM).unwrap()),
1837                Arc::new(FixedSizeListArray::try_new_from_values(data, DIM).unwrap()),
1838            ],
1839        )
1840        .unwrap()];
1841
1842        let test_dir = TempStrDir::default();
1843        let test_uri = &test_dir;
1844        let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
1845        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
1846
1847        let params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 2);
1848        dataset
1849            .create_index(&["v"], IndexType::Vector, None, &params, true)
1850            .await
1851            .unwrap();
1852        dataset
1853            .create_index(&["o"], IndexType::Vector, None, &params, true)
1854            .await
1855            .unwrap();
1856
1857        // Create index again
1858        dataset
1859            .create_index(&["v"], IndexType::Vector, None, &params, true)
1860            .await
1861            .unwrap();
1862
1863        // Can not overwrite an index on different columns.
1864        assert!(dataset
1865            .create_index(
1866                &["v"],
1867                IndexType::Vector,
1868                Some("o_idx".to_string()),
1869                &params,
1870                true,
1871            )
1872            .await
1873            .is_err());
1874    }
1875
1876    fn sample_vector_field() -> Field {
1877        let dimensions = 16;
1878        let column_name = "vec";
1879        Field::new(
1880            column_name,
1881            DataType::FixedSizeList(
1882                Arc::new(Field::new("item", DataType::Float32, true)),
1883                dimensions,
1884            ),
1885            false,
1886        )
1887    }
1888
1889    #[tokio::test]
1890    async fn test_drop_index() {
1891        let test_dir = TempStrDir::default();
1892        let schema = Schema::new(vec![
1893            sample_vector_field(),
1894            Field::new("ints", DataType::Int32, false),
1895        ]);
1896        let mut dataset = lance_datagen::rand(&schema)
1897            .into_dataset(
1898                &test_dir,
1899                FragmentCount::from(1),
1900                FragmentRowCount::from(256),
1901            )
1902            .await
1903            .unwrap();
1904
1905        let idx_name = "name".to_string();
1906        dataset
1907            .create_index(
1908                &["vec"],
1909                IndexType::Vector,
1910                Some(idx_name.clone()),
1911                &VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10),
1912                true,
1913            )
1914            .await
1915            .unwrap();
1916        dataset
1917            .create_index(
1918                &["ints"],
1919                IndexType::BTree,
1920                None,
1921                &ScalarIndexParams::default(),
1922                true,
1923            )
1924            .await
1925            .unwrap();
1926
1927        assert_eq!(dataset.load_indices().await.unwrap().len(), 2);
1928
1929        dataset.drop_index(&idx_name).await.unwrap();
1930
1931        assert_eq!(dataset.load_indices().await.unwrap().len(), 1);
1932
1933        // Even though we didn't give the scalar index a name it still has an auto-generated one we can use
1934        let scalar_idx_name = &dataset.load_indices().await.unwrap()[0].name;
1935        dataset.drop_index(scalar_idx_name).await.unwrap();
1936
1937        assert_eq!(dataset.load_indices().await.unwrap().len(), 0);
1938
1939        // Make sure it returns an error if the index doesn't exist
1940        assert!(dataset.drop_index(scalar_idx_name).await.is_err());
1941    }
1942
1943    #[tokio::test]
1944    async fn test_count_index_rows() {
1945        let test_dir = TempStrDir::default();
1946        let dimensions = 16;
1947        let column_name = "vec";
1948        let field = sample_vector_field();
1949        let schema = Arc::new(Schema::new(vec![field]));
1950
1951        let float_arr = generate_random_array(512 * dimensions as usize);
1952
1953        let vectors =
1954            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
1955
1956        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
1957
1958        let reader =
1959            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
1960        let test_uri = &test_dir;
1961        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
1962        dataset.validate().await.unwrap();
1963
1964        // Make sure it returns None if there's no index with the passed identifier
1965        assert!(dataset.index_statistics("bad_id").await.is_err());
1966        // Create an index
1967        let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
1968        dataset
1969            .create_index(
1970                &[column_name],
1971                IndexType::Vector,
1972                Some("vec_idx".into()),
1973                &params,
1974                true,
1975            )
1976            .await
1977            .unwrap();
1978
1979        let stats: serde_json::Value =
1980            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
1981        assert_eq!(stats["num_unindexed_rows"], 0);
1982        assert_eq!(stats["num_indexed_rows"], 512);
1983
1984        // Now we'll append some rows which shouldn't be indexed and see the
1985        // count change
1986        let float_arr = generate_random_array(512 * dimensions as usize);
1987        let vectors =
1988            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
1989
1990        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
1991
1992        let reader = RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema);
1993        dataset.append(reader, None).await.unwrap();
1994
1995        let stats: serde_json::Value =
1996            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
1997        assert_eq!(stats["num_unindexed_rows"], 512);
1998        assert_eq!(stats["num_indexed_rows"], 512);
1999    }
2000
2001    #[tokio::test]
2002    async fn test_optimize_delta_indices() {
2003        let dimensions = 16;
2004        let column_name = "vec";
2005        let vec_field = Field::new(
2006            column_name,
2007            DataType::FixedSizeList(
2008                Arc::new(Field::new("item", DataType::Float32, true)),
2009                dimensions,
2010            ),
2011            false,
2012        );
2013        let other_column_name = "other_vec";
2014        let other_vec_field = Field::new(
2015            other_column_name,
2016            DataType::FixedSizeList(
2017                Arc::new(Field::new("item", DataType::Float32, true)),
2018                dimensions,
2019            ),
2020            false,
2021        );
2022        let schema = Arc::new(Schema::new(vec![vec_field, other_vec_field]));
2023
2024        let float_arr = generate_random_array(512 * dimensions as usize);
2025
2026        let vectors = Arc::new(
2027            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(),
2028        );
2029
2030        let record_batch =
2031            RecordBatch::try_new(schema.clone(), vec![vectors.clone(), vectors.clone()]).unwrap();
2032
2033        let reader = RecordBatchIterator::new(
2034            vec![record_batch.clone()].into_iter().map(Ok),
2035            schema.clone(),
2036        );
2037
2038        let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2039        let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
2040        dataset
2041            .create_index(
2042                &[column_name],
2043                IndexType::Vector,
2044                Some("vec_idx".into()),
2045                &params,
2046                true,
2047            )
2048            .await
2049            .unwrap();
2050        dataset
2051            .create_index(
2052                &[other_column_name],
2053                IndexType::Vector,
2054                Some("other_vec_idx".into()),
2055                &params,
2056                true,
2057            )
2058            .await
2059            .unwrap();
2060
2061        async fn get_stats(dataset: &Dataset, name: &str) -> serde_json::Value {
2062            serde_json::from_str(&dataset.index_statistics(name).await.unwrap()).unwrap()
2063        }
2064        async fn get_meta(dataset: &Dataset, name: &str) -> Vec<IndexMetadata> {
2065            dataset
2066                .load_indices()
2067                .await
2068                .unwrap()
2069                .iter()
2070                .filter(|m| m.name == name)
2071                .cloned()
2072                .collect()
2073        }
2074        fn get_bitmap(meta: &IndexMetadata) -> Vec<u32> {
2075            meta.fragment_bitmap.as_ref().unwrap().iter().collect()
2076        }
2077
2078        let stats = get_stats(&dataset, "vec_idx").await;
2079        assert_eq!(stats["num_unindexed_rows"], 0);
2080        assert_eq!(stats["num_indexed_rows"], 512);
2081        assert_eq!(stats["num_indexed_fragments"], 1);
2082        assert_eq!(stats["num_indices"], 1);
2083        let meta = get_meta(&dataset, "vec_idx").await;
2084        assert_eq!(meta.len(), 1);
2085        assert_eq!(get_bitmap(&meta[0]), vec![0]);
2086
2087        let reader =
2088            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2089        dataset.append(reader, None).await.unwrap();
2090        let stats = get_stats(&dataset, "vec_idx").await;
2091        assert_eq!(stats["num_unindexed_rows"], 512);
2092        assert_eq!(stats["num_indexed_rows"], 512);
2093        assert_eq!(stats["num_indexed_fragments"], 1);
2094        assert_eq!(stats["num_unindexed_fragments"], 1);
2095        assert_eq!(stats["num_indices"], 1);
2096        let meta = get_meta(&dataset, "vec_idx").await;
2097        assert_eq!(meta.len(), 1);
2098        assert_eq!(get_bitmap(&meta[0]), vec![0]);
2099
2100        dataset
2101            .optimize_indices(&OptimizeOptions::append().index_names(vec![])) // Does nothing because no index name is passed
2102            .await
2103            .unwrap();
2104        let stats = get_stats(&dataset, "vec_idx").await;
2105        assert_eq!(stats["num_unindexed_rows"], 512);
2106        assert_eq!(stats["num_indexed_rows"], 512);
2107        assert_eq!(stats["num_indexed_fragments"], 1);
2108        assert_eq!(stats["num_unindexed_fragments"], 1);
2109        assert_eq!(stats["num_indices"], 1);
2110        let meta = get_meta(&dataset, "vec_idx").await;
2111        assert_eq!(meta.len(), 1);
2112        assert_eq!(get_bitmap(&meta[0]), vec![0]);
2113
2114        // optimize the other index
2115        dataset
2116            .optimize_indices(
2117                &OptimizeOptions::append().index_names(vec!["other_vec_idx".to_owned()]),
2118            )
2119            .await
2120            .unwrap();
2121        let stats = get_stats(&dataset, "vec_idx").await;
2122        assert_eq!(stats["num_unindexed_rows"], 512);
2123        assert_eq!(stats["num_indexed_rows"], 512);
2124        assert_eq!(stats["num_indexed_fragments"], 1);
2125        assert_eq!(stats["num_unindexed_fragments"], 1);
2126        assert_eq!(stats["num_indices"], 1);
2127        let meta = get_meta(&dataset, "vec_idx").await;
2128        assert_eq!(meta.len(), 1);
2129        assert_eq!(get_bitmap(&meta[0]), vec![0]);
2130
2131        let stats = get_stats(&dataset, "other_vec_idx").await;
2132        assert_eq!(stats["num_unindexed_rows"], 0);
2133        assert_eq!(stats["num_indexed_rows"], 1024);
2134        assert_eq!(stats["num_indexed_fragments"], 2);
2135        assert_eq!(stats["num_unindexed_fragments"], 0);
2136        assert_eq!(stats["num_indices"], 2);
2137        let meta = get_meta(&dataset, "other_vec_idx").await;
2138        assert_eq!(meta.len(), 2);
2139        assert_eq!(get_bitmap(&meta[0]), vec![0]);
2140        assert_eq!(get_bitmap(&meta[1]), vec![1]);
2141
2142        dataset
2143            .optimize_indices(&OptimizeOptions::retrain())
2144            .await
2145            .unwrap();
2146
2147        let stats = get_stats(&dataset, "vec_idx").await;
2148        assert_eq!(stats["num_unindexed_rows"], 0);
2149        assert_eq!(stats["num_indexed_rows"], 1024);
2150        assert_eq!(stats["num_indexed_fragments"], 2);
2151        assert_eq!(stats["num_unindexed_fragments"], 0);
2152        assert_eq!(stats["num_indices"], 1);
2153        let meta = get_meta(&dataset, "vec_idx").await;
2154        assert_eq!(meta.len(), 1);
2155        assert_eq!(get_bitmap(&meta[0]), vec![0, 1]);
2156
2157        dataset
2158            .optimize_indices(&OptimizeOptions::retrain())
2159            .await
2160            .unwrap();
2161        let stats = get_stats(&dataset, "other_vec_idx").await;
2162        assert_eq!(stats["num_unindexed_rows"], 0);
2163        assert_eq!(stats["num_indexed_rows"], 1024);
2164        assert_eq!(stats["num_indexed_fragments"], 2);
2165        assert_eq!(stats["num_unindexed_fragments"], 0);
2166        assert_eq!(stats["num_indices"], 1);
2167        let meta = get_meta(&dataset, "other_vec_idx").await;
2168        assert_eq!(meta.len(), 1);
2169        assert_eq!(get_bitmap(&meta[0]), vec![0, 1]);
2170    }
2171
2172    #[tokio::test]
2173    async fn test_optimize_ivf_hnsw_sq_delta_indices() {
2174        let test_dir = TempStrDir::default();
2175        let dimensions = 16;
2176        let column_name = "vec";
2177        let field = Field::new(
2178            column_name,
2179            DataType::FixedSizeList(
2180                Arc::new(Field::new("item", DataType::Float32, true)),
2181                dimensions,
2182            ),
2183            false,
2184        );
2185        let schema = Arc::new(Schema::new(vec![field]));
2186
2187        let float_arr = generate_random_array(512 * dimensions as usize);
2188
2189        let vectors =
2190            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2191
2192        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
2193
2194        let reader = RecordBatchIterator::new(
2195            vec![record_batch.clone()].into_iter().map(Ok),
2196            schema.clone(),
2197        );
2198
2199        let test_uri = &test_dir;
2200        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2201
2202        let ivf_params = IvfBuildParams::default();
2203        let hnsw_params = HnswBuildParams::default();
2204        let sq_params = SQBuildParams::default();
2205        let params = VectorIndexParams::with_ivf_hnsw_sq_params(
2206            MetricType::L2,
2207            ivf_params,
2208            hnsw_params,
2209            sq_params,
2210        );
2211        dataset
2212            .create_index(
2213                &[column_name],
2214                IndexType::Vector,
2215                Some("vec_idx".into()),
2216                &params,
2217                true,
2218            )
2219            .await
2220            .unwrap();
2221
2222        let stats: serde_json::Value =
2223            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2224        assert_eq!(stats["num_unindexed_rows"], 0);
2225        assert_eq!(stats["num_indexed_rows"], 512);
2226        assert_eq!(stats["num_indexed_fragments"], 1);
2227        assert_eq!(stats["num_indices"], 1);
2228
2229        let reader =
2230            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2231        dataset.append(reader, None).await.unwrap();
2232        let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap();
2233        let stats: serde_json::Value =
2234            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2235        assert_eq!(stats["num_unindexed_rows"], 512);
2236        assert_eq!(stats["num_indexed_rows"], 512);
2237        assert_eq!(stats["num_indexed_fragments"], 1);
2238        assert_eq!(stats["num_unindexed_fragments"], 1);
2239        assert_eq!(stats["num_indices"], 1);
2240
2241        dataset
2242            .optimize_indices(&OptimizeOptions::append())
2243            .await
2244            .unwrap();
2245
2246        let stats: serde_json::Value =
2247            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2248        assert_eq!(stats["num_unindexed_rows"], 0);
2249        assert_eq!(stats["num_indexed_rows"], 1024);
2250        assert_eq!(stats["num_indexed_fragments"], 2);
2251        assert_eq!(stats["num_unindexed_fragments"], 0);
2252        assert_eq!(stats["num_indices"], 2);
2253
2254        dataset
2255            .optimize_indices(&OptimizeOptions::retrain())
2256            .await
2257            .unwrap();
2258        let stats: serde_json::Value =
2259            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2260        assert_eq!(stats["num_unindexed_rows"], 0);
2261        assert_eq!(stats["num_indexed_rows"], 1024);
2262        assert_eq!(stats["num_indexed_fragments"], 2);
2263        assert_eq!(stats["num_unindexed_fragments"], 0);
2264        assert_eq!(stats["num_indices"], 1);
2265    }
2266
2267    #[rstest]
2268    #[tokio::test]
2269    async fn test_optimize_fts(#[values(false, true)] with_position: bool) {
2270        let words = ["apple", "banana", "cherry", "date"];
2271
2272        let dir = TempStrDir::default();
2273        let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
2274        let data = StringArray::from_iter_values(words.iter().map(|s| s.to_string()));
2275        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)]).unwrap();
2276        let batch_iterator = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2277
2278        let mut dataset = Dataset::write(batch_iterator, &dir, None).await.unwrap();
2279
2280        let params = InvertedIndexParams::default()
2281            .lower_case(false)
2282            .with_position(with_position);
2283        dataset
2284            .create_index(&["text"], IndexType::Inverted, None, &params, true)
2285            .await
2286            .unwrap();
2287
2288        async fn assert_indexed_rows(dataset: &Dataset, expected_indexed_rows: usize) {
2289            let stats = dataset.index_statistics("text_idx").await.unwrap();
2290            let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2291            let indexed_rows = stats["num_indexed_rows"].as_u64().unwrap() as usize;
2292            let unindexed_rows = stats["num_unindexed_rows"].as_u64().unwrap() as usize;
2293            let num_rows = dataset.count_all_rows().await.unwrap();
2294            assert_eq!(indexed_rows, expected_indexed_rows);
2295            assert_eq!(unindexed_rows, num_rows - expected_indexed_rows);
2296        }
2297
2298        let num_rows = dataset.count_all_rows().await.unwrap();
2299        assert_indexed_rows(&dataset, num_rows).await;
2300
2301        let new_words = ["elephant", "fig", "grape", "honeydew"];
2302        let new_data = StringArray::from_iter_values(new_words.iter().map(|s| s.to_string()));
2303        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(new_data)]).unwrap();
2304        let batch_iter = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2305        dataset.append(batch_iter, None).await.unwrap();
2306        assert_indexed_rows(&dataset, num_rows).await;
2307
2308        dataset
2309            .optimize_indices(&OptimizeOptions::append())
2310            .await
2311            .unwrap();
2312        let num_rows = dataset.count_all_rows().await.unwrap();
2313        assert_indexed_rows(&dataset, num_rows).await;
2314
2315        for &word in words.iter().chain(new_words.iter()) {
2316            let query_result = dataset
2317                .scan()
2318                .project(&["text"])
2319                .unwrap()
2320                .full_text_search(FullTextSearchQuery::new(word.to_string()))
2321                .unwrap()
2322                .limit(Some(10), None)
2323                .unwrap()
2324                .try_into_batch()
2325                .await
2326                .unwrap();
2327
2328            let texts = query_result["text"]
2329                .as_string::<i32>()
2330                .iter()
2331                .map(|v| match v {
2332                    None => "".to_string(),
2333                    Some(v) => v.to_string(),
2334                })
2335                .collect::<Vec<String>>();
2336
2337            assert_eq!(texts.len(), 1);
2338            assert_eq!(texts[0], word);
2339        }
2340
2341        let uppercase_words = ["Apple", "Banana", "Cherry", "Date"];
2342        for &word in uppercase_words.iter() {
2343            let query_result = dataset
2344                .scan()
2345                .project(&["text"])
2346                .unwrap()
2347                .full_text_search(FullTextSearchQuery::new(word.to_string()))
2348                .unwrap()
2349                .limit(Some(10), None)
2350                .unwrap()
2351                .try_into_batch()
2352                .await
2353                .unwrap();
2354
2355            let texts = query_result["text"]
2356                .as_string::<i32>()
2357                .iter()
2358                .map(|v| match v {
2359                    None => "".to_string(),
2360                    Some(v) => v.to_string(),
2361                })
2362                .collect::<Vec<String>>();
2363
2364            assert_eq!(texts.len(), 0);
2365        }
2366        let new_data = StringArray::from_iter_values(uppercase_words.iter().map(|s| s.to_string()));
2367        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(new_data)]).unwrap();
2368        let batch_iter = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2369        dataset.append(batch_iter, None).await.unwrap();
2370        assert_indexed_rows(&dataset, num_rows).await;
2371
2372        // we should be able to query the new words
2373        for &word in uppercase_words.iter() {
2374            let query_result = dataset
2375                .scan()
2376                .project(&["text"])
2377                .unwrap()
2378                .full_text_search(FullTextSearchQuery::new(word.to_string()))
2379                .unwrap()
2380                .limit(Some(10), None)
2381                .unwrap()
2382                .try_into_batch()
2383                .await
2384                .unwrap();
2385
2386            let texts = query_result["text"]
2387                .as_string::<i32>()
2388                .iter()
2389                .map(|v| match v {
2390                    None => "".to_string(),
2391                    Some(v) => v.to_string(),
2392                })
2393                .collect::<Vec<String>>();
2394
2395            assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2396            assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2397        }
2398
2399        dataset
2400            .optimize_indices(&OptimizeOptions::append())
2401            .await
2402            .unwrap();
2403        let num_rows = dataset.count_all_rows().await.unwrap();
2404        assert_indexed_rows(&dataset, num_rows).await;
2405
2406        // we should be able to query the new words after optimization
2407        for &word in uppercase_words.iter() {
2408            let query_result = dataset
2409                .scan()
2410                .project(&["text"])
2411                .unwrap()
2412                .full_text_search(FullTextSearchQuery::new(word.to_string()))
2413                .unwrap()
2414                .limit(Some(10), None)
2415                .unwrap()
2416                .try_into_batch()
2417                .await
2418                .unwrap();
2419
2420            let texts = query_result["text"]
2421                .as_string::<i32>()
2422                .iter()
2423                .map(|v| match v {
2424                    None => "".to_string(),
2425                    Some(v) => v.to_string(),
2426                })
2427                .collect::<Vec<String>>();
2428
2429            assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2430            assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2431
2432            // we should be able to query the new words after compaction
2433            compact_files(&mut dataset, CompactionOptions::default(), None)
2434                .await
2435                .unwrap();
2436            for &word in uppercase_words.iter() {
2437                let query_result = dataset
2438                    .scan()
2439                    .project(&["text"])
2440                    .unwrap()
2441                    .full_text_search(FullTextSearchQuery::new(word.to_string()))
2442                    .unwrap()
2443                    .try_into_batch()
2444                    .await
2445                    .unwrap();
2446                let texts = query_result["text"]
2447                    .as_string::<i32>()
2448                    .iter()
2449                    .map(|v| match v {
2450                        None => "".to_string(),
2451                        Some(v) => v.to_string(),
2452                    })
2453                    .collect::<Vec<String>>();
2454                assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2455                assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2456            }
2457            assert_indexed_rows(&dataset, num_rows).await;
2458        }
2459    }
2460
2461    #[tokio::test]
2462    async fn test_create_index_too_small_for_pq() {
2463        let test_dir = TempStrDir::default();
2464        let dimensions = 1536;
2465
2466        let field = Field::new(
2467            "vector",
2468            DataType::FixedSizeList(
2469                Arc::new(Field::new("item", DataType::Float32, true)),
2470                dimensions,
2471            ),
2472            false,
2473        );
2474
2475        let schema = Arc::new(Schema::new(vec![field]));
2476        let float_arr = generate_random_array(100 * dimensions as usize);
2477
2478        let vectors =
2479            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2480        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
2481        let reader = RecordBatchIterator::new(
2482            vec![record_batch.clone()].into_iter().map(Ok),
2483            schema.clone(),
2484        );
2485
2486        let test_uri = &test_dir;
2487        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2488
2489        let params = VectorIndexParams::ivf_pq(1, 8, 96, DistanceType::L2, 1);
2490        let result = dataset
2491            .create_index(&["vector"], IndexType::Vector, None, &params, false)
2492            .await;
2493
2494        assert!(matches!(result, Err(Error::Unprocessable { .. })));
2495        if let Error::Unprocessable { message, .. } = result.unwrap_err() {
2496            assert_eq!(
2497                message,
2498                "Not enough rows to train PQ. Requires 256 rows but only 100 available",
2499            )
2500        }
2501    }
2502
2503    #[tokio::test]
2504    async fn test_create_bitmap_index() {
2505        let test_dir = TempStrDir::default();
2506        let field = Field::new("tag", DataType::Utf8, false);
2507        let schema = Arc::new(Schema::new(vec![field]));
2508        let array = StringArray::from_iter_values((0..128).map(|i| ["a", "b", "c"][i % 3]));
2509        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2510        let reader = RecordBatchIterator::new(
2511            vec![record_batch.clone()].into_iter().map(Ok),
2512            schema.clone(),
2513        );
2514
2515        let test_uri = &test_dir;
2516        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2517        dataset
2518            .create_index(
2519                &["tag"],
2520                IndexType::Bitmap,
2521                None,
2522                &ScalarIndexParams::default(),
2523                false,
2524            )
2525            .await
2526            .unwrap();
2527        let indices = dataset.load_indices().await.unwrap();
2528        let index = dataset
2529            .open_generic_index("tag", &indices[0].uuid.to_string(), &NoOpMetricsCollector)
2530            .await
2531            .unwrap();
2532        assert_eq!(index.index_type(), IndexType::Bitmap);
2533    }
2534
2535    // #[tokio::test]
2536    #[lance_test_macros::test(tokio::test)]
2537    async fn test_load_indices() {
2538        let session = Arc::new(Session::default());
2539        let write_params = WriteParams {
2540            session: Some(session.clone()),
2541            ..Default::default()
2542        };
2543
2544        let test_dir = TempStrDir::default();
2545        let field = Field::new("tag", DataType::Utf8, false);
2546        let schema = Arc::new(Schema::new(vec![field]));
2547        let array = StringArray::from_iter_values((0..128).map(|i| ["a", "b", "c"][i % 3]));
2548        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2549        let reader = RecordBatchIterator::new(
2550            vec![record_batch.clone()].into_iter().map(Ok),
2551            schema.clone(),
2552        );
2553
2554        let test_uri = &test_dir;
2555        let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
2556            .await
2557            .unwrap();
2558        dataset
2559            .create_index(
2560                &["tag"],
2561                IndexType::Bitmap,
2562                None,
2563                &ScalarIndexParams::default(),
2564                false,
2565            )
2566            .await
2567            .unwrap();
2568        dataset.object_store().io_stats_incremental(); // Reset
2569
2570        let indices = dataset.load_indices().await.unwrap();
2571        let stats = dataset.object_store().io_stats_incremental();
2572        // We should already have this cached since we just wrote it.
2573        assert_io_eq!(stats, read_iops, 0);
2574        assert_io_eq!(stats, read_bytes, 0);
2575        assert_eq!(indices.len(), 1);
2576
2577        session.index_cache.clear().await; // Clear the cache
2578
2579        let dataset2 = DatasetBuilder::from_uri(test_uri)
2580            .with_session(session.clone())
2581            .load()
2582            .await
2583            .unwrap();
2584        let stats = dataset2.object_store().io_stats_incremental(); // Reset
2585        assert_io_lt!(stats, read_bytes, 64 * 1024);
2586
2587        // Because the manifest is so small, we should have opportunistically
2588        // cached the indices in memory already.
2589        let indices2 = dataset2.load_indices().await.unwrap();
2590        let stats = dataset2.object_store().io_stats_incremental();
2591        assert_io_eq!(stats, read_iops, 0);
2592        assert_io_eq!(stats, read_bytes, 0);
2593        assert_eq!(indices2.len(), 1);
2594    }
2595
2596    #[tokio::test]
2597    async fn test_remap_empty() {
2598        let data = gen_batch()
2599            .col("int", array::step::<Int32Type>())
2600            .col(
2601                "vector",
2602                array::rand_vec::<Float32Type>(Dimension::from(16)),
2603            )
2604            .into_reader_rows(RowCount::from(256), BatchCount::from(1));
2605        let mut dataset = Dataset::write(data, "memory://", None).await.unwrap();
2606
2607        let params = VectorIndexParams::ivf_pq(1, 8, 1, DistanceType::L2, 1);
2608        dataset
2609            .create_index(&["vector"], IndexType::Vector, None, &params, false)
2610            .await
2611            .unwrap();
2612
2613        let index_uuid = dataset.load_indices().await.unwrap()[0].uuid;
2614        let remap_to_empty = (0..dataset.count_all_rows().await.unwrap())
2615            .map(|i| (i as u64, None))
2616            .collect::<HashMap<_, _>>();
2617        let new_uuid = remap_index(&dataset, &index_uuid, &remap_to_empty)
2618            .await
2619            .unwrap();
2620        assert_eq!(new_uuid, RemapResult::Keep(index_uuid));
2621    }
2622
2623    #[tokio::test]
2624    async fn test_optimize_ivf_pq_up_to_date() {
2625        // https://github.com/lance-format/lance/issues/4016
2626        let nrows = 256;
2627        let dimensions = 16;
2628        let column_name = "vector";
2629        let schema = Arc::new(Schema::new(vec![
2630            Field::new("id", DataType::Int32, false),
2631            Field::new(
2632                column_name,
2633                DataType::FixedSizeList(
2634                    Arc::new(Field::new("item", DataType::Float32, true)),
2635                    dimensions,
2636                ),
2637                false,
2638            ),
2639        ]));
2640
2641        let float_arr = generate_random_array(nrows * dimensions as usize);
2642        let vectors =
2643            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2644        let record_batch = RecordBatch::try_new(
2645            schema.clone(),
2646            vec![
2647                Arc::new(arrow_array::Int32Array::from_iter_values(0..nrows as i32)),
2648                Arc::new(vectors),
2649            ],
2650        )
2651        .unwrap();
2652
2653        let reader = RecordBatchIterator::new(
2654            vec![record_batch.clone()].into_iter().map(Ok),
2655            schema.clone(),
2656        );
2657        let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2658
2659        let params = VectorIndexParams::ivf_pq(1, 8, 2, MetricType::L2, 2);
2660        dataset
2661            .create_index(&[column_name], IndexType::Vector, None, &params, true)
2662            .await
2663            .unwrap();
2664
2665        let query_vector = generate_random_array(dimensions as usize);
2666
2667        let nearest = dataset
2668            .scan()
2669            .nearest(column_name, &query_vector, 5)
2670            .unwrap()
2671            .try_into_batch()
2672            .await
2673            .unwrap();
2674
2675        let ids = nearest["id"].as_primitive::<Int32Type>();
2676        let mut seen = HashSet::new();
2677        for id in ids.values() {
2678            assert!(seen.insert(*id), "Duplicate id found: {}", id);
2679        }
2680
2681        dataset
2682            .optimize_indices(&OptimizeOptions::default())
2683            .await
2684            .unwrap();
2685
2686        dataset.validate().await.unwrap();
2687
2688        let nearest_after = dataset
2689            .scan()
2690            .nearest(column_name, &query_vector, 5)
2691            .unwrap()
2692            .try_into_batch()
2693            .await
2694            .unwrap();
2695
2696        let ids = nearest_after["id"].as_primitive::<Int32Type>();
2697        let mut seen = HashSet::new();
2698        for id in ids.values() {
2699            assert!(seen.insert(*id), "Duplicate id found: {}", id);
2700        }
2701    }
2702
2703    #[tokio::test]
2704    async fn test_index_created_at_timestamp() {
2705        // Test that created_at is set when creating an index
2706        let schema = Arc::new(Schema::new(vec![
2707            Field::new("id", DataType::Int32, false),
2708            Field::new("values", DataType::Utf8, false),
2709        ]));
2710
2711        let values = StringArray::from_iter_values(["hello", "world", "foo", "bar"]);
2712        let record_batch = RecordBatch::try_new(
2713            schema.clone(),
2714            vec![
2715                Arc::new(Int32Array::from_iter_values(0..4)),
2716                Arc::new(values),
2717            ],
2718        )
2719        .unwrap();
2720
2721        let reader =
2722            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2723
2724        let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2725
2726        // Record time before creating index
2727        let before_index = chrono::Utc::now();
2728
2729        // Create a scalar index
2730        dataset
2731            .create_index(
2732                &["values"],
2733                IndexType::Scalar,
2734                Some("test_idx".to_string()),
2735                &ScalarIndexParams::default(),
2736                false,
2737            )
2738            .await
2739            .unwrap();
2740
2741        // Record time after creating index
2742        let after_index = chrono::Utc::now();
2743
2744        // Get index metadata
2745        let indices = dataset.load_indices().await.unwrap();
2746        let test_index = indices.iter().find(|idx| idx.name == "test_idx").unwrap();
2747
2748        // Verify created_at is set and within reasonable bounds
2749        assert!(test_index.created_at.is_some());
2750        let created_at = test_index.created_at.unwrap();
2751        assert!(created_at >= before_index);
2752        assert!(created_at <= after_index);
2753    }
2754
2755    #[tokio::test]
2756    async fn test_index_statistics_updated_at() {
2757        // Test that updated_at appears in index statistics
2758        let schema = Arc::new(Schema::new(vec![
2759            Field::new("id", DataType::Int32, false),
2760            Field::new("values", DataType::Utf8, false),
2761        ]));
2762
2763        let values = StringArray::from_iter_values(["hello", "world", "foo", "bar"]);
2764        let record_batch = RecordBatch::try_new(
2765            schema.clone(),
2766            vec![
2767                Arc::new(Int32Array::from_iter_values(0..4)),
2768                Arc::new(values),
2769            ],
2770        )
2771        .unwrap();
2772
2773        let reader =
2774            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2775
2776        let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2777
2778        // Create a scalar index
2779        dataset
2780            .create_index(
2781                &["values"],
2782                IndexType::Scalar,
2783                Some("test_idx".to_string()),
2784                &ScalarIndexParams::default(),
2785                false,
2786            )
2787            .await
2788            .unwrap();
2789
2790        // Get index statistics
2791        let stats_str = dataset.index_statistics("test_idx").await.unwrap();
2792        let stats: serde_json::Value = serde_json::from_str(&stats_str).unwrap();
2793
2794        // Verify updated_at_timestamp_ms field exists in statistics
2795        assert!(stats["updated_at_timestamp_ms"].is_number());
2796        let updated_at = stats["updated_at_timestamp_ms"].as_u64().unwrap();
2797
2798        // Get the index metadata to compare with created_at
2799        let indices = dataset.load_indices().await.unwrap();
2800        let test_index = indices.iter().find(|idx| idx.name == "test_idx").unwrap();
2801        let created_at = test_index.created_at.unwrap().timestamp_millis() as u64;
2802
2803        // For a single index, updated_at should equal created_at
2804        assert_eq!(updated_at, created_at);
2805    }
2806
2807    #[tokio::test]
2808    async fn test_index_statistics_updated_at_multiple_deltas() {
2809        // Test that updated_at reflects max(created_at) across multiple delta indices
2810        let schema = Arc::new(Schema::new(vec![
2811            Field::new("id", DataType::Int32, false),
2812            Field::new(
2813                "vector",
2814                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4),
2815                false,
2816            ),
2817        ]));
2818
2819        // Create initial dataset (need more rows for PQ training)
2820        let num_rows = 300;
2821        let float_arr = generate_random_array(4 * num_rows);
2822        let vectors = FixedSizeListArray::try_new_from_values(float_arr, 4).unwrap();
2823        let record_batch = RecordBatch::try_new(
2824            schema.clone(),
2825            vec![
2826                Arc::new(Int32Array::from_iter_values(0..num_rows as i32)),
2827                Arc::new(vectors),
2828            ],
2829        )
2830        .unwrap();
2831
2832        let reader =
2833            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2834
2835        let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2836
2837        // Create vector index
2838        let params = VectorIndexParams::ivf_pq(1, 8, 2, MetricType::L2, 2);
2839        dataset
2840            .create_index(
2841                &["vector"],
2842                IndexType::Vector,
2843                Some("test_vec_idx".to_string()),
2844                &params,
2845                false,
2846            )
2847            .await
2848            .unwrap();
2849
2850        // Get initial statistics
2851        let stats_str_1 = dataset.index_statistics("test_vec_idx").await.unwrap();
2852        let stats_1: serde_json::Value = serde_json::from_str(&stats_str_1).unwrap();
2853        let initial_updated_at = stats_1["updated_at_timestamp_ms"].as_u64().unwrap();
2854
2855        // Add more data to create additional delta indices
2856        std::thread::sleep(std::time::Duration::from_millis(10)); // Ensure different timestamp
2857
2858        let num_rows_2 = 50;
2859        let float_arr_2 = generate_random_array(4 * num_rows_2);
2860        let vectors_2 = FixedSizeListArray::try_new_from_values(float_arr_2, 4).unwrap();
2861        let record_batch_2 = RecordBatch::try_new(
2862            schema.clone(),
2863            vec![
2864                Arc::new(Int32Array::from_iter_values(
2865                    num_rows as i32..(num_rows + num_rows_2) as i32,
2866                )),
2867                Arc::new(vectors_2),
2868            ],
2869        )
2870        .unwrap();
2871
2872        let reader_2 =
2873            RecordBatchIterator::new(vec![record_batch_2].into_iter().map(Ok), schema.clone());
2874
2875        dataset.append(reader_2, None).await.unwrap();
2876
2877        // Update the index
2878        dataset
2879            .create_index(
2880                &["vector"],
2881                IndexType::Vector,
2882                Some("test_vec_idx".to_string()),
2883                &params,
2884                true,
2885            )
2886            .await
2887            .unwrap();
2888
2889        // Get updated statistics
2890        let stats_str_2 = dataset.index_statistics("test_vec_idx").await.unwrap();
2891        let stats_2: serde_json::Value = serde_json::from_str(&stats_str_2).unwrap();
2892        let final_updated_at = stats_2["updated_at_timestamp_ms"].as_u64().unwrap();
2893
2894        // The updated_at should be newer than the initial one
2895        assert!(final_updated_at >= initial_updated_at);
2896    }
2897
2898    #[tokio::test]
2899    async fn test_index_statistics_updated_at_none_when_no_created_at() {
2900        // Test backward compatibility: when indices were created before the created_at field,
2901        // updated_at_timestamp_ms should be null
2902
2903        // Use test data created with Lance 0.29.0 (before created_at field was added)
2904        let test_dir =
2905            copy_test_data_to_tmp("v0.30.0_pre_created_at/index_without_created_at").unwrap();
2906        let test_uri = test_dir.path_str();
2907        let test_uri = &test_uri;
2908
2909        let dataset = Dataset::open(test_uri).await.unwrap();
2910
2911        // Get the index metadata to verify created_at is None
2912        let indices = dataset.load_indices().await.unwrap();
2913        assert!(!indices.is_empty(), "Test dataset should have indices");
2914
2915        // Verify that the index created with old version has no created_at
2916        for index in indices.iter() {
2917            assert!(
2918                index.created_at.is_none(),
2919                "Index from old version should have created_at = None"
2920            );
2921        }
2922
2923        // Get index statistics - should work even with old indices
2924        let index_name = &indices[0].name;
2925        let stats_str = dataset.index_statistics(index_name).await.unwrap();
2926        let stats: serde_json::Value = serde_json::from_str(&stats_str).unwrap();
2927
2928        // Verify updated_at_timestamp_ms field is null when no indices have created_at
2929        assert!(
2930            stats["updated_at_timestamp_ms"].is_null(),
2931            "updated_at_timestamp_ms should be null when no indices have created_at timestamps"
2932        );
2933    }
2934    #[rstest]
2935    #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
2936    #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
2937    #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
2938    #[tokio::test]
2939    async fn test_create_empty_scalar_index(
2940        #[case] column_name: &str,
2941        #[case] index_type: IndexType,
2942        #[case] params: Box<dyn IndexParams>,
2943    ) {
2944        use lance_datagen::{array, BatchCount, ByteCount, RowCount};
2945
2946        // Create dataset with scalar and text columns (no vector column needed)
2947        let reader = lance_datagen::gen_batch()
2948            .col("i", array::step::<Int32Type>())
2949            .col("text", array::rand_utf8(ByteCount::from(10), false))
2950            .into_reader_rows(RowCount::from(100), BatchCount::from(1));
2951        let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
2952
2953        // Create an empty index with train=false
2954        // Test using IntoFuture - can await directly without calling .execute()
2955        dataset
2956            .create_index_builder(&[column_name], index_type, params.as_ref())
2957            .name("index".to_string())
2958            .train(false)
2959            .await
2960            .unwrap();
2961
2962        // Verify we can get index statistics
2963        let stats = dataset.index_statistics("index").await.unwrap();
2964        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2965        assert_eq!(
2966            stats["num_indexed_rows"], 0,
2967            "Empty index should have zero indexed rows"
2968        );
2969
2970        // Append new data using lance_datagen
2971        let append_reader = lance_datagen::gen_batch()
2972            .col("i", array::step::<Int32Type>())
2973            .col("text", array::rand_utf8(ByteCount::from(10), false))
2974            .into_reader_rows(RowCount::from(50), BatchCount::from(1));
2975
2976        dataset.append(append_reader, None).await.unwrap();
2977
2978        // Critical test: Verify the empty index is still present after append
2979        let indices_after_append = dataset.load_indices().await.unwrap();
2980        assert_eq!(
2981            indices_after_append.len(),
2982            1,
2983            "Index should be retained after append for index type {:?}",
2984            index_type
2985        );
2986
2987        let stats = dataset.index_statistics("index").await.unwrap();
2988        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2989        assert_eq!(
2990            stats["num_indexed_rows"], 0,
2991            "Empty index should still have zero indexed rows after append"
2992        );
2993
2994        // Test optimize_indices with empty index
2995        dataset.optimize_indices(&Default::default()).await.unwrap();
2996
2997        // Verify the index still exists after optimization
2998        let indices_after_optimize = dataset.load_indices().await.unwrap();
2999        assert_eq!(
3000            indices_after_optimize.len(),
3001            1,
3002            "Index should still exist after optimization"
3003        );
3004
3005        // Check index statistics after optimization
3006        let stats = dataset.index_statistics("index").await.unwrap();
3007        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3008        assert_eq!(
3009            stats["num_unindexed_rows"], 0,
3010            "Empty index should indexed all rows"
3011        );
3012    }
3013
3014    /// Helper function to check if an index is being used in a query plan
3015    fn assert_index_usage(plan: &str, column_name: &str, should_use_index: bool, context: &str) {
3016        let index_used = if column_name == "text" {
3017            // For inverted index, look for MatchQuery which indicates FTS index usage
3018            plan.contains("MatchQuery")
3019        } else {
3020            // For btree/bitmap, look for MaterializeIndex which indicates scalar index usage
3021            plan.contains("ScalarIndexQuery")
3022        };
3023
3024        if should_use_index {
3025            assert!(
3026                index_used,
3027                "Query plan should use index {}: {}",
3028                context, plan
3029            );
3030        } else {
3031            assert!(
3032                !index_used,
3033                "Query plan should NOT use index {}: {}",
3034                context, plan
3035            );
3036        }
3037    }
3038
3039    /// Test that scalar indices are retained after deleting all data from a table.
3040    ///
3041    /// This test verifies that when we:
3042    /// 1. Create a table with data
3043    /// 2. Add a scalar index with train=true
3044    /// 3. Delete all data in the table
3045    /// The index remains available on the table.
3046    #[rstest]
3047    #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
3048    #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
3049    #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
3050    #[tokio::test]
3051    async fn test_scalar_index_retained_after_delete_all(
3052        #[case] column_name: &str,
3053        #[case] index_type: IndexType,
3054        #[case] params: Box<dyn IndexParams>,
3055    ) {
3056        use lance_datagen::{array, BatchCount, ByteCount, RowCount};
3057
3058        // Create dataset with initial data
3059        let reader = lance_datagen::gen_batch()
3060            .col("i", array::step::<Int32Type>())
3061            .col("text", array::rand_utf8(ByteCount::from(10), false))
3062            .into_reader_rows(RowCount::from(100), BatchCount::from(1));
3063        let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
3064
3065        // Create index with train=true (normal index with data)
3066        dataset
3067            .create_index_builder(&[column_name], index_type, params.as_ref())
3068            .name("index".to_string())
3069            .train(true)
3070            .await
3071            .unwrap();
3072
3073        // Verify index was created and has indexed rows
3074        let stats = dataset.index_statistics("index").await.unwrap();
3075        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3076        assert_eq!(
3077            stats["num_indexed_rows"], 100,
3078            "Index should have indexed all 100 rows"
3079        );
3080
3081        // Verify index is being used in queries before delete
3082        let plan = if column_name == "text" {
3083            // Use full-text search for inverted index
3084            dataset
3085                .scan()
3086                .full_text_search(FullTextSearchQuery::new("test".to_string()))
3087                .unwrap()
3088                .explain_plan(false)
3089                .await
3090                .unwrap()
3091        } else {
3092            // Use equality filter for btree/bitmap indices
3093            dataset
3094                .scan()
3095                .filter(format!("{} = 50", column_name).as_str())
3096                .unwrap()
3097                .explain_plan(false)
3098                .await
3099                .unwrap()
3100        };
3101        // Verify index is being used before delete
3102        assert_index_usage(&plan, column_name, true, "before delete");
3103
3104        let indexes = dataset.load_indices().await.unwrap();
3105        let original_index = indexes[0].clone();
3106
3107        // Delete all rows from the table
3108        dataset.delete("true").await.unwrap();
3109
3110        // Verify table is empty
3111        let row_count = dataset.count_rows(None).await.unwrap();
3112        assert_eq!(row_count, 0, "Table should be empty after delete all");
3113
3114        // Critical test: Verify the index still exists after deleting all data
3115        let indices_after_delete = dataset.load_indices().await.unwrap();
3116        assert_eq!(
3117            indices_after_delete.len(),
3118            1,
3119            "Index should be retained after deleting all data"
3120        );
3121        assert_eq!(
3122            indices_after_delete[0].name, "index",
3123            "Index name should remain the same after delete"
3124        );
3125
3126        // Critical test: Verify the fragment bitmap is empty after delete
3127        let index_after_delete = &indices_after_delete[0];
3128        let effective_bitmap = index_after_delete
3129            .effective_fragment_bitmap(&dataset.fragment_bitmap)
3130            .unwrap();
3131        assert!(
3132            effective_bitmap.is_empty(),
3133            "Effective bitmap should be empty after deleting all data"
3134        );
3135        assert_eq!(
3136            index_after_delete.fragment_bitmap, original_index.fragment_bitmap,
3137            "Fragment bitmap should remain the same after delete"
3138        );
3139
3140        // Verify we can still get index statistics
3141        let stats = dataset.index_statistics("index").await.unwrap();
3142        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3143        assert_eq!(
3144            stats["num_indexed_rows"], 0,
3145            "Index should now report zero indexed rows after delete all"
3146        );
3147        assert_eq!(
3148            stats["num_unindexed_rows"], 0,
3149            "Index should report zero unindexed rows after delete all"
3150        );
3151        assert_eq!(
3152            stats["num_indexed_fragments"], 0,
3153            "Index should report zero indexed fragments after delete all"
3154        );
3155        assert_eq!(
3156            stats["num_unindexed_fragments"], 0,
3157            "Index should report zero unindexed fragments after delete all"
3158        );
3159
3160        // Verify index is NOT being used in queries after delete (empty bitmap)
3161        if column_name == "text" {
3162            // Inverted indexes will still appear to be used in FTS queries.
3163            // TODO: once metrics are working on FTS queries, we can check the
3164            // analyze plan output instead for index usage.
3165            let _plan_after_delete = dataset
3166                .scan()
3167                .project(&[column_name])
3168                .unwrap()
3169                .full_text_search(FullTextSearchQuery::new("test".to_string()))
3170                .unwrap()
3171                .explain_plan(false)
3172                .await
3173                .unwrap();
3174            assert_index_usage(
3175                &_plan_after_delete,
3176                column_name,
3177                true,
3178                "after delete (empty bitmap)",
3179            );
3180        } else {
3181            // Use equality filter for btree/bitmap indices
3182            let _plan_after_delete = dataset
3183                .scan()
3184                .filter(format!("{} = 50", column_name).as_str())
3185                .unwrap()
3186                .explain_plan(false)
3187                .await
3188                .unwrap();
3189            // Verify index is NOT being used after delete (empty bitmap)
3190            assert_index_usage(
3191                &_plan_after_delete,
3192                column_name,
3193                false,
3194                "after delete (empty bitmap)",
3195            );
3196        }
3197
3198        // Test that we can append new data and the index is still there
3199        let append_reader = lance_datagen::gen_batch()
3200            .col("i", array::step::<Int32Type>())
3201            .col("text", array::rand_utf8(ByteCount::from(10), false))
3202            .into_reader_rows(RowCount::from(50), BatchCount::from(1));
3203
3204        dataset.append(append_reader, None).await.unwrap();
3205
3206        // Verify index still exists after append
3207        let indices_after_append = dataset.load_indices().await.unwrap();
3208        assert_eq!(
3209            indices_after_append.len(),
3210            1,
3211            "Index should still exist after appending to empty table"
3212        );
3213        let stats = dataset.index_statistics("index").await.unwrap();
3214        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3215        assert_eq!(
3216            stats["num_indexed_rows"], 0,
3217            "Index should now report zero indexed rows after data is added"
3218        );
3219
3220        // Test optimize_indices after delete all
3221        dataset.optimize_indices(&Default::default()).await.unwrap();
3222
3223        // Verify index still exists after optimization
3224        let indices_after_optimize = dataset.load_indices().await.unwrap();
3225        assert_eq!(
3226            indices_after_optimize.len(),
3227            1,
3228            "Index should still exist after optimization following delete all"
3229        );
3230
3231        // Verify we can still get index statistics after optimization
3232        let stats = dataset.index_statistics("index").await.unwrap();
3233        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3234        assert_eq!(
3235            stats["num_indexed_rows"],
3236            dataset.count_rows(None).await.unwrap(),
3237            "Index should now cover all newly added rows after optimization"
3238        );
3239    }
3240
3241    /// Test that scalar indices are retained after updating rows in a table.
3242    ///
3243    /// This test verifies that when we:
3244    /// 1. Create a table with data
3245    /// 2. Add a scalar index with train=true
3246    /// 3. Update rows in the table
3247    /// The index remains available on the table.
3248    #[rstest]
3249    #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
3250    #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
3251    #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
3252    #[tokio::test]
3253    async fn test_scalar_index_retained_after_update(
3254        #[case] column_name: &str,
3255        #[case] index_type: IndexType,
3256        #[case] params: Box<dyn IndexParams>,
3257    ) {
3258        use crate::dataset::UpdateBuilder;
3259        use lance_datagen::{array, BatchCount, ByteCount, RowCount};
3260
3261        // Create dataset with initial data
3262        let reader = lance_datagen::gen_batch()
3263            .col("i", array::step::<Int32Type>())
3264            .col("text", array::rand_utf8(ByteCount::from(10), false))
3265            .into_reader_rows(RowCount::from(100), BatchCount::from(1));
3266        let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
3267
3268        // Create index with train=true (normal index with data)
3269        dataset
3270            .create_index_builder(&[column_name], index_type, params.as_ref())
3271            .name("index".to_string())
3272            .train(true)
3273            .await
3274            .unwrap();
3275
3276        // Verify index was created and has indexed rows
3277        let stats = dataset.index_statistics("index").await.unwrap();
3278        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3279        assert_eq!(
3280            stats["num_indexed_rows"], 100,
3281            "Index should have indexed all 100 rows"
3282        );
3283
3284        // Verify index is being used in queries before update
3285        let plan = if column_name == "text" {
3286            // Use full-text search for inverted index
3287            dataset
3288                .scan()
3289                .project(&[column_name])
3290                .unwrap()
3291                .full_text_search(FullTextSearchQuery::new("test".to_string()))
3292                .unwrap()
3293                .explain_plan(false)
3294                .await
3295                .unwrap()
3296        } else {
3297            // Use equality filter for btree/bitmap indices
3298            dataset
3299                .scan()
3300                .filter(format!("{} = 50", column_name).as_str())
3301                .unwrap()
3302                .explain_plan(false)
3303                .await
3304                .unwrap()
3305        };
3306        // Verify index is being used before update
3307        assert_index_usage(&plan, column_name, true, "before update");
3308
3309        // Update some rows - update first 50 rows
3310        let update_result = UpdateBuilder::new(Arc::new(dataset))
3311            .set("i", "i + 1000")
3312            .unwrap()
3313            .set("text", "'updated_' || text")
3314            .unwrap()
3315            .build()
3316            .unwrap()
3317            .execute()
3318            .await
3319            .unwrap();
3320
3321        let mut dataset = update_result.new_dataset.as_ref().clone();
3322
3323        // Verify row count remains the same
3324        let row_count = dataset.count_rows(None).await.unwrap();
3325        assert_eq!(row_count, 100, "Row count should remain 100 after update");
3326
3327        // Critical test: Verify the index still exists after updating data
3328        let indices_after_update = dataset.load_indices().await.unwrap();
3329        assert_eq!(
3330            indices_after_update.len(),
3331            1,
3332            "Index should be retained after updating rows"
3333        );
3334
3335        // Critical test: Verify the effective fragment bitmap is empty after update
3336        let indices = dataset.load_indices().await.unwrap();
3337        let index = &indices[0];
3338        let effective_bitmap = index
3339            .effective_fragment_bitmap(&dataset.fragment_bitmap)
3340            .unwrap();
3341        assert!(
3342            effective_bitmap.is_empty(),
3343            "Effective fragment bitmap should be empty after updating all data"
3344        );
3345
3346        // Verify we can still get index statistics
3347        let stats_after_update = dataset.index_statistics("index").await.unwrap();
3348        let stats_after_update: serde_json::Value =
3349            serde_json::from_str(&stats_after_update).unwrap();
3350
3351        // The index should still be available
3352        assert_eq!(
3353            stats_after_update["num_indexed_rows"], 0,
3354            "Index statistics should be zero after update, as it is not re-trained"
3355        );
3356
3357        // Verify index behavior in queries after update (empty bitmap)
3358        if column_name == "text" {
3359            // Inverted indexes will still appear to be used in FTS queries even with empty bitmaps.
3360            // This is because FTS queries require an index to exist, and the query execution
3361            // will handle the empty bitmap appropriately by falling back to scanning unindexed data.
3362            // TODO: once metrics are working on FTS queries, we can check the
3363            // analyze plan output instead for actual index usage statistics.
3364            let _plan_after_update = dataset
3365                .scan()
3366                .project(&[column_name])
3367                .unwrap()
3368                .full_text_search(FullTextSearchQuery::new("test".to_string()))
3369                .unwrap()
3370                .explain_plan(false)
3371                .await
3372                .unwrap();
3373            assert_index_usage(
3374                &_plan_after_update,
3375                column_name,
3376                true, // FTS indices always appear in the plan, even with empty bitmaps
3377                "after update (empty bitmap)",
3378            );
3379        } else {
3380            // Use equality filter for btree/bitmap indices
3381            let _plan_after_update = dataset
3382                .scan()
3383                .filter(format!("{} = 50", column_name).as_str())
3384                .unwrap()
3385                .explain_plan(false)
3386                .await
3387                .unwrap();
3388            // With immutable bitmaps, index is still used even with empty effective bitmap
3389            // The prefilter will handle non-existent fragments
3390            assert_index_usage(
3391                &_plan_after_update,
3392                column_name,
3393                false,
3394                "after update (empty effective bitmap)",
3395            );
3396        }
3397
3398        // Test that we can optimize indices after update
3399        dataset.optimize_indices(&Default::default()).await.unwrap();
3400
3401        // Verify index still exists after optimization
3402        let indices_after_optimize = dataset.load_indices().await.unwrap();
3403        assert_eq!(
3404            indices_after_optimize.len(),
3405            1,
3406            "Index should still exist after optimization following update"
3407        );
3408
3409        let stats_after_optimization = dataset.index_statistics("index").await.unwrap();
3410        let stats_after_optimization: serde_json::Value =
3411            serde_json::from_str(&stats_after_optimization).unwrap();
3412
3413        // The index should still be available
3414        assert_eq!(
3415            stats_after_optimization["num_unindexed_rows"], 0,
3416            "Index should have zero unindexed rows after optimization"
3417        );
3418    }
3419
3420    // Helper function to validate indices after each clone iteration
3421    async fn validate_indices_after_clone(
3422        dataset: &Dataset,
3423        round: usize,
3424        expected_scalar_rows: usize,
3425        dimensions: u32,
3426    ) {
3427        // Verify cloned dataset has indices
3428        let indices = dataset.load_indices().await.unwrap();
3429        assert_eq!(
3430            indices.len(),
3431            2,
3432            "Round {}: Cloned dataset should have 2 indices",
3433            round
3434        );
3435        let index_names: HashSet<String> = indices.iter().map(|idx| idx.name.clone()).collect();
3436        assert!(
3437            index_names.contains("vector_idx"),
3438            "Round {}: Should contain vector_idx",
3439            round
3440        );
3441        assert!(
3442            index_names.contains("category_idx"),
3443            "Round {}: Should contain category_idx",
3444            round
3445        );
3446
3447        // Test basic data access without using indices for now
3448        // This ensures the dataset is accessible and data is intact
3449        // In chain cloning, each round adds 50 rows to the previous dataset
3450        // Round 1: 300 (original), Round 2: 350 (300 + 50 from round 1), Round 3: 400 (350 + 50 from round 2)
3451        let expected_total_rows = 300 + (round - 1) * 50;
3452        let total_rows = dataset.count_rows(None).await.unwrap();
3453        assert_eq!(
3454            total_rows, expected_total_rows,
3455            "Round {}: Should have {} rows after clone (chain cloning accumulates data)",
3456            round, expected_total_rows
3457        );
3458
3459        // Verify vector search
3460        let query_vector = generate_random_array(dimensions as usize);
3461        let search_results = dataset
3462            .scan()
3463            .nearest("vector", &query_vector, 5)
3464            .unwrap()
3465            .limit(Some(5), None)
3466            .unwrap()
3467            .try_into_batch()
3468            .await
3469            .unwrap();
3470        assert!(
3471            search_results.num_rows() > 0,
3472            "Round {}: Vector search should return results immediately after clone",
3473            round
3474        );
3475
3476        // Test basic scalar query to verify data integrity
3477        let scalar_results = dataset
3478            .scan()
3479            .filter("category = 'category_0'")
3480            .unwrap()
3481            .try_into_batch()
3482            .await
3483            .unwrap();
3484
3485        assert_eq!(
3486            expected_scalar_rows,
3487            scalar_results.num_rows(),
3488            "Round {}: Scalar query should return {} results",
3489            round,
3490            expected_scalar_rows
3491        );
3492    }
3493
3494    #[tokio::test]
3495    async fn test_shallow_clone_with_index() {
3496        let test_dir = TempStrDir::default();
3497        let test_uri = &test_dir;
3498
3499        // Create a schema with both vector and scalar columns
3500        let dimensions = 16u32;
3501        // Generate test data using lance_datagen (300 rows to satisfy PQ training requirements)
3502        let data = gen_batch()
3503            .col("id", array::step::<Int32Type>())
3504            .col("category", array::fill_utf8("category_0".to_string()))
3505            .col(
3506                "vector",
3507                array::rand_vec::<Float32Type>(Dimension::from(dimensions)),
3508            )
3509            .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3510
3511        // Create initial dataset
3512        let mut dataset = Dataset::write(data, test_uri, None).await.unwrap();
3513        // Create vector index (IVF_PQ)
3514        let vector_params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10);
3515        dataset
3516            .create_index(
3517                &["vector"],
3518                IndexType::Vector,
3519                Some("vector_idx".to_string()),
3520                &vector_params,
3521                true,
3522            )
3523            .await
3524            .unwrap();
3525
3526        // Create scalar index (BTree)
3527        dataset
3528            .create_index(
3529                &["category"],
3530                IndexType::BTree,
3531                Some("category_idx".to_string()),
3532                &ScalarIndexParams::default(),
3533                true,
3534            )
3535            .await
3536            .unwrap();
3537
3538        // Verify indices were created
3539        let indices = dataset.load_indices().await.unwrap();
3540        assert_eq!(indices.len(), 2, "Should have 2 indices");
3541        let index_names: HashSet<String> = indices.iter().map(|idx| idx.name.clone()).collect();
3542        assert!(index_names.contains("vector_idx"));
3543        assert!(index_names.contains("category_idx"));
3544
3545        // Test scalar query on source dataset
3546        let scalar_results = dataset
3547            .scan()
3548            .filter("category = 'category_0'")
3549            .unwrap()
3550            .try_into_batch()
3551            .await
3552            .unwrap();
3553
3554        let source_scalar_query_rows = scalar_results.num_rows();
3555        assert!(
3556            scalar_results.num_rows() > 0,
3557            "Scalar query should return results"
3558        );
3559
3560        // Multiple shallow clone iterations test with chain cloning
3561        let clone_rounds = 3;
3562        let mut current_dataset = dataset;
3563
3564        for round in 1..=clone_rounds {
3565            let round_clone_dir = format!("{}/clone_round_{}", test_dir, round);
3566            let round_cloned_uri = &round_clone_dir;
3567            let tag_name = format!("shallow_clone_test_{}", round);
3568
3569            // Create tag for this round (use current dataset for chain cloning)
3570            let current_version = current_dataset.version().version;
3571            current_dataset
3572                .tags()
3573                .create(&tag_name, current_version)
3574                .await
3575                .unwrap();
3576
3577            // Perform shallow clone for this round (chain cloning from current dataset)
3578            let mut round_cloned_dataset = current_dataset
3579                .shallow_clone(round_cloned_uri, tag_name.as_str(), None)
3580                .await
3581                .unwrap();
3582
3583            // Immediately validate indices after each clone
3584            validate_indices_after_clone(
3585                &round_cloned_dataset,
3586                round,
3587                source_scalar_query_rows,
3588                dimensions,
3589            )
3590            .await;
3591
3592            // Complete validation cycle after each clone: append data, optimize index, validate
3593            // Append new data to the cloned dataset
3594            let new_data = gen_batch()
3595                .col(
3596                    "id",
3597                    array::step_custom::<Int32Type>(300 + (round * 50) as i32, 1),
3598                )
3599                .col("category", array::fill_utf8(format!("category_{}", round)))
3600                .col(
3601                    "vector",
3602                    array::rand_vec::<Float32Type>(Dimension::from(dimensions)),
3603                )
3604                .into_reader_rows(RowCount::from(50), BatchCount::from(1));
3605
3606            round_cloned_dataset = Dataset::write(
3607                new_data,
3608                round_cloned_uri,
3609                Some(WriteParams {
3610                    mode: WriteMode::Append,
3611                    ..Default::default()
3612                }),
3613            )
3614            .await
3615            .unwrap();
3616
3617            // Verify row count increased
3618            let expected_rows = 300 + round * 50;
3619            let total_rows = round_cloned_dataset.count_rows(None).await.unwrap();
3620            assert_eq!(
3621                total_rows, expected_rows,
3622                "Round {}: Should have {} rows after append",
3623                round, expected_rows
3624            );
3625
3626            let indices_before_optimize = round_cloned_dataset.load_indices().await.unwrap();
3627            let vector_idx_before = indices_before_optimize
3628                .iter()
3629                .find(|idx| idx.name == "vector_idx")
3630                .unwrap();
3631            let category_idx_before = indices_before_optimize
3632                .iter()
3633                .find(|idx| idx.name == "category_idx")
3634                .unwrap();
3635
3636            // Optimize indices
3637            round_cloned_dataset
3638                .optimize_indices(&OptimizeOptions::merge(indices_before_optimize.len()))
3639                .await
3640                .unwrap();
3641
3642            // Verify index UUID has changed
3643            let optimized_indices = round_cloned_dataset.load_indices().await.unwrap();
3644            let new_vector_idx = optimized_indices
3645                .iter()
3646                .find(|idx| idx.name == "vector_idx")
3647                .unwrap();
3648            let new_category_idx = optimized_indices
3649                .iter()
3650                .find(|idx| idx.name == "category_idx")
3651                .unwrap();
3652
3653            assert_ne!(
3654                new_vector_idx.uuid, vector_idx_before.uuid,
3655                "Round {}: Vector index should have a new UUID after optimization",
3656                round
3657            );
3658            assert_ne!(
3659                new_category_idx.uuid, category_idx_before.uuid,
3660                "Round {}: Category index should have a new UUID after optimization",
3661                round
3662            );
3663
3664            // Verify the location of index files
3665            use std::path::PathBuf;
3666            let clone_indices_dir = PathBuf::from(round_cloned_uri).join("_indices");
3667            let vector_index_dir = clone_indices_dir.join(new_vector_idx.uuid.to_string());
3668            let category_index_dir = clone_indices_dir.join(new_category_idx.uuid.to_string());
3669
3670            assert!(
3671                vector_index_dir.exists(),
3672                "Round {}: New vector index directory should exist in cloned dataset location: {:?}",
3673                round, vector_index_dir
3674            );
3675            assert!(
3676                category_index_dir.exists(),
3677                "Round {}: New category index directory should exist in cloned dataset location: {:?}",
3678                round, category_index_dir
3679            );
3680
3681            // Verify base id
3682            assert!(
3683                new_vector_idx.base_id.is_none(),
3684                "Round {}: New vector index should not have base_id after optimization in cloned dataset",
3685                round
3686            );
3687            assert!(
3688                new_category_idx.base_id.is_none(),
3689                "Round {}: New category index should not have base_id after optimization in cloned dataset",
3690                round
3691            );
3692
3693            // Verify the source location does NOT contain new data
3694            let original_indices_dir = PathBuf::from(current_dataset.uri()).join("_indices");
3695            let wrong_vector_dir = original_indices_dir.join(new_vector_idx.uuid.to_string());
3696            let wrong_category_dir = original_indices_dir.join(new_category_idx.uuid.to_string());
3697
3698            assert!(
3699                !wrong_vector_dir.exists(),
3700                "Round {}: New vector index should NOT be in original dataset location: {:?}",
3701                round,
3702                wrong_vector_dir
3703            );
3704            assert!(
3705                !wrong_category_dir.exists(),
3706                "Round {}: New category index should NOT be in original dataset location: {:?}",
3707                round,
3708                wrong_category_dir
3709            );
3710
3711            // Validate data integrity and index functionality after optimization
3712            let old_category_results = round_cloned_dataset
3713                .scan()
3714                .filter("category = 'category_0'")
3715                .unwrap()
3716                .try_into_batch()
3717                .await
3718                .unwrap();
3719
3720            let new_category_results = round_cloned_dataset
3721                .scan()
3722                .filter(&format!("category = 'category_{}'", round))
3723                .unwrap()
3724                .try_into_batch()
3725                .await
3726                .unwrap();
3727
3728            assert_eq!(
3729                source_scalar_query_rows,
3730                old_category_results.num_rows(),
3731                "Round {}: Should find old category data with {} rows",
3732                round,
3733                source_scalar_query_rows
3734            );
3735            assert!(
3736                new_category_results.num_rows() > 0,
3737                "Round {}: Should find new category data",
3738                round
3739            );
3740
3741            // Test vector search functionality
3742            let query_vector = generate_random_array(dimensions as usize);
3743            let search_results = round_cloned_dataset
3744                .scan()
3745                .nearest("vector", &query_vector, 10)
3746                .unwrap()
3747                .limit(Some(10), None)
3748                .unwrap()
3749                .try_into_batch()
3750                .await
3751                .unwrap();
3752
3753            assert!(
3754                search_results.num_rows() > 0,
3755                "Round {}: Vector search should return results after optimization",
3756                round
3757            );
3758
3759            // Verify statistic of indexes
3760            let vector_stats: serde_json::Value = serde_json::from_str(
3761                &round_cloned_dataset
3762                    .index_statistics("vector_idx")
3763                    .await
3764                    .unwrap(),
3765            )
3766            .unwrap();
3767            let category_stats: serde_json::Value = serde_json::from_str(
3768                &round_cloned_dataset
3769                    .index_statistics("category_idx")
3770                    .await
3771                    .unwrap(),
3772            )
3773            .unwrap();
3774
3775            assert_eq!(
3776                vector_stats["num_indexed_rows"].as_u64().unwrap(),
3777                expected_rows as u64,
3778                "Round {}: Vector index should have {} indexed rows",
3779                round,
3780                expected_rows
3781            );
3782            assert_eq!(
3783                category_stats["num_indexed_rows"].as_u64().unwrap(),
3784                expected_rows as u64,
3785                "Round {}: Category index should have {} indexed rows",
3786                round,
3787                expected_rows
3788            );
3789
3790            // Prepare for next round: use the cloned dataset as the source for next clone
3791            current_dataset = round_cloned_dataset;
3792        }
3793
3794        // Use the final cloned dataset for any remaining tests
3795        let final_cloned_dataset = current_dataset;
3796
3797        // Verify cloned dataset has indices
3798        let cloned_indices = final_cloned_dataset.load_indices().await.unwrap();
3799        assert_eq!(
3800            cloned_indices.len(),
3801            2,
3802            "Final cloned dataset should have 2 indices"
3803        );
3804        let cloned_index_names: HashSet<String> =
3805            cloned_indices.iter().map(|idx| idx.name.clone()).collect();
3806        assert!(cloned_index_names.contains("vector_idx"));
3807        assert!(cloned_index_names.contains("category_idx"));
3808
3809        // Test vector search on final cloned dataset
3810        let query_vector = generate_random_array(dimensions as usize);
3811        let search_results = final_cloned_dataset
3812            .scan()
3813            .nearest("vector", &query_vector, 5)
3814            .unwrap()
3815            .limit(Some(5), None)
3816            .unwrap()
3817            .try_into_batch()
3818            .await
3819            .unwrap();
3820
3821        assert!(
3822            search_results.num_rows() > 0,
3823            "Vector search should return results on final dataset"
3824        );
3825
3826        // Test scalar query on final cloned dataset
3827        let scalar_results = final_cloned_dataset
3828            .scan()
3829            .filter("category = 'category_0'")
3830            .unwrap()
3831            .try_into_batch()
3832            .await
3833            .unwrap();
3834
3835        assert_eq!(
3836            source_scalar_query_rows,
3837            scalar_results.num_rows(),
3838            "Scalar query should return results on final dataset"
3839        );
3840    }
3841
3842    #[tokio::test]
3843    async fn test_initialize_indices() {
3844        use crate::dataset::Dataset;
3845        use arrow_array::types::Float32Type;
3846        use lance_core::utils::tempfile::TempStrDir;
3847        use lance_datagen::{array, BatchCount, RowCount};
3848        use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
3849        use lance_linalg::distance::MetricType;
3850        use std::collections::HashSet;
3851
3852        // Create source dataset with various index types
3853        let test_dir = TempStrDir::default();
3854        let source_uri = format!("{}/{}", test_dir, "source");
3855        let target_uri = format!("{}/{}", test_dir, "target");
3856
3857        // Generate test data using lance_datagen (need at least 256 rows for PQ training)
3858        let source_reader = lance_datagen::gen_batch()
3859            .col("vector", array::rand_vec::<Float32Type>(8.into()))
3860            .col(
3861                "text",
3862                array::cycle_utf8_literals(&["hello world", "foo bar", "test data"]),
3863            )
3864            .col("id", array::step::<Int32Type>())
3865            .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3866
3867        // Create source dataset
3868        let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
3869            .await
3870            .unwrap();
3871
3872        // Create indices on source dataset
3873        // 1. Vector index
3874        let vector_params = VectorIndexParams::ivf_pq(4, 8, 2, MetricType::L2, 10);
3875        source_dataset
3876            .create_index(
3877                &["vector"],
3878                IndexType::Vector,
3879                Some("vec_idx".to_string()),
3880                &vector_params,
3881                false,
3882            )
3883            .await
3884            .unwrap();
3885
3886        // 2. FTS index
3887        let fts_params = InvertedIndexParams::default();
3888        source_dataset
3889            .create_index(
3890                &["text"],
3891                IndexType::Inverted,
3892                Some("text_idx".to_string()),
3893                &fts_params,
3894                false,
3895            )
3896            .await
3897            .unwrap();
3898
3899        // 3. Scalar index
3900        let scalar_params = ScalarIndexParams::default();
3901        source_dataset
3902            .create_index(
3903                &["id"],
3904                IndexType::BTree,
3905                Some("id_idx".to_string()),
3906                &scalar_params,
3907                false,
3908            )
3909            .await
3910            .unwrap();
3911
3912        // Reload source dataset to get updated index metadata
3913        let source_dataset = Dataset::open(&source_uri).await.unwrap();
3914
3915        // Verify source has 3 indices
3916        let source_indices = source_dataset.load_indices().await.unwrap();
3917        assert_eq!(
3918            source_indices.len(),
3919            3,
3920            "Source dataset should have 3 indices"
3921        );
3922
3923        // Create target dataset with same schema but different data (need at least 256 rows for PQ)
3924        let target_reader = lance_datagen::gen_batch()
3925            .col("vector", array::rand_vec::<Float32Type>(8.into()))
3926            .col(
3927                "text",
3928                array::cycle_utf8_literals(&["foo bar", "test data", "hello world"]),
3929            )
3930            .col("id", array::step_custom::<Int32Type>(100, 1))
3931            .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3932        let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
3933            .await
3934            .unwrap();
3935
3936        // Initialize indices from source dataset
3937        target_dataset
3938            .initialize_indices(&source_dataset)
3939            .await
3940            .unwrap();
3941
3942        // Verify target has same indices
3943        let target_indices = target_dataset.load_indices().await.unwrap();
3944        assert_eq!(
3945            target_indices.len(),
3946            3,
3947            "Target dataset should have 3 indices after initialization"
3948        );
3949
3950        // Check index names match
3951        let source_names: HashSet<String> =
3952            source_indices.iter().map(|idx| idx.name.clone()).collect();
3953        let target_names: HashSet<String> =
3954            target_indices.iter().map(|idx| idx.name.clone()).collect();
3955        assert_eq!(
3956            source_names, target_names,
3957            "Index names should match between source and target"
3958        );
3959
3960        // Verify indices are functional by running queries
3961        // 1. Test vector index
3962        let query_vector = generate_random_array(8);
3963        let search_results = target_dataset
3964            .scan()
3965            .nearest("vector", &query_vector, 5)
3966            .unwrap()
3967            .limit(Some(5), None)
3968            .unwrap()
3969            .try_into_batch()
3970            .await
3971            .unwrap();
3972        assert!(
3973            search_results.num_rows() > 0,
3974            "Vector index should be functional"
3975        );
3976
3977        // 2. Test scalar index
3978        let scalar_results = target_dataset
3979            .scan()
3980            .filter("id = 125")
3981            .unwrap()
3982            .try_into_batch()
3983            .await
3984            .unwrap();
3985        assert_eq!(
3986            scalar_results.num_rows(),
3987            1,
3988            "Scalar index should find exact match"
3989        );
3990    }
3991
3992    #[tokio::test]
3993    async fn test_initialize_indices_with_missing_field() {
3994        use crate::dataset::Dataset;
3995        use arrow_array::types::Int32Type;
3996        use lance_core::utils::tempfile::TempStrDir;
3997        use lance_datagen::{array, BatchCount, RowCount};
3998        use lance_index::scalar::ScalarIndexParams;
3999
4000        // Test that initialize_indices handles missing fields gracefully
4001        let test_dir = TempStrDir::default();
4002        let source_uri = format!("{}/{}", test_dir, "source");
4003        let target_uri = format!("{}/{}", test_dir, "target");
4004
4005        // Create source dataset with extra field
4006        let source_reader = lance_datagen::gen_batch()
4007            .col("id", array::step::<Int32Type>())
4008            .col("extra", array::cycle_utf8_literals(&["test"]))
4009            .into_reader_rows(RowCount::from(10), BatchCount::from(1));
4010        let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
4011            .await
4012            .unwrap();
4013
4014        // Create index on extra field in source
4015        source_dataset
4016            .create_index(
4017                &["extra"],
4018                IndexType::BTree,
4019                None,
4020                &ScalarIndexParams::default(),
4021                false,
4022            )
4023            .await
4024            .unwrap();
4025
4026        // Create target dataset without extra field
4027        let target_reader = lance_datagen::gen_batch()
4028            .col("id", array::step_custom::<Int32Type>(10, 1))
4029            .into_reader_rows(RowCount::from(10), BatchCount::from(1));
4030        let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
4031            .await
4032            .unwrap();
4033
4034        // Initialize indices should skip the index on missing field with an error
4035        let result = target_dataset.initialize_indices(&source_dataset).await;
4036
4037        // Should fail when field is missing
4038        assert!(result.is_err(), "Should error when field is missing");
4039        assert!(result
4040            .unwrap_err()
4041            .to_string()
4042            .contains("not found in target dataset"));
4043    }
4044
4045    #[tokio::test]
4046    async fn test_initialize_single_index() {
4047        use crate::dataset::Dataset;
4048        use crate::index::vector::VectorIndexParams;
4049        use arrow_array::types::{Float32Type, Int32Type};
4050        use lance_core::utils::tempfile::TempStrDir;
4051        use lance_datagen::{array, BatchCount, RowCount};
4052        use lance_index::scalar::ScalarIndexParams;
4053        use lance_linalg::distance::MetricType;
4054
4055        let test_dir = TempStrDir::default();
4056        let source_uri = format!("{}/{}", test_dir, "source");
4057        let target_uri = format!("{}/{}", test_dir, "target");
4058
4059        // Create source dataset (need at least 256 rows for PQ training)
4060        let source_reader = lance_datagen::gen_batch()
4061            .col("id", array::step::<Int32Type>())
4062            .col("name", array::rand_utf8(4.into(), false))
4063            .col("vector", array::rand_vec::<Float32Type>(8.into()))
4064            .into_reader_rows(RowCount::from(300), BatchCount::from(1));
4065        let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
4066            .await
4067            .unwrap();
4068
4069        // Create multiple indices on source
4070        let scalar_params = ScalarIndexParams::default();
4071        source_dataset
4072            .create_index(
4073                &["id"],
4074                IndexType::BTree,
4075                Some("id_index".to_string()),
4076                &scalar_params,
4077                false,
4078            )
4079            .await
4080            .unwrap();
4081
4082        let vector_params = VectorIndexParams::ivf_pq(16, 8, 4, MetricType::L2, 50);
4083        source_dataset
4084            .create_index(
4085                &["vector"],
4086                IndexType::Vector,
4087                Some("vector_index".to_string()),
4088                &vector_params,
4089                false,
4090            )
4091            .await
4092            .unwrap();
4093
4094        // Reload source dataset to get updated index metadata
4095        let source_dataset = Dataset::open(&source_uri).await.unwrap();
4096
4097        // Create target dataset with same schema
4098        let target_reader = lance_datagen::gen_batch()
4099            .col("id", array::step::<Int32Type>())
4100            .col("name", array::rand_utf8(4.into(), false))
4101            .col("vector", array::rand_vec::<Float32Type>(8.into()))
4102            .into_reader_rows(RowCount::from(300), BatchCount::from(1));
4103        let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
4104            .await
4105            .unwrap();
4106
4107        // Initialize only the vector index
4108        target_dataset
4109            .initialize_index(&source_dataset, "vector_index")
4110            .await
4111            .unwrap();
4112
4113        // Verify only vector index was created
4114        let target_indices = target_dataset.load_indices().await.unwrap();
4115        assert_eq!(target_indices.len(), 1, "Should have only 1 index");
4116        assert_eq!(
4117            target_indices[0].name, "vector_index",
4118            "Should have the vector index"
4119        );
4120
4121        // Initialize the scalar index
4122        target_dataset
4123            .initialize_index(&source_dataset, "id_index")
4124            .await
4125            .unwrap();
4126
4127        // Verify both indices now exist
4128        let target_indices = target_dataset.load_indices().await.unwrap();
4129        assert_eq!(target_indices.len(), 2, "Should have 2 indices");
4130
4131        let index_names: HashSet<String> =
4132            target_indices.iter().map(|idx| idx.name.clone()).collect();
4133        assert!(
4134            index_names.contains("vector_index"),
4135            "Should have vector index"
4136        );
4137        assert!(index_names.contains("id_index"), "Should have id index");
4138
4139        // Test error case - non-existent index
4140        let result = target_dataset
4141            .initialize_index(&source_dataset, "non_existent")
4142            .await;
4143        assert!(result.is_err(), "Should error for non-existent index");
4144        assert!(result
4145            .unwrap_err()
4146            .to_string()
4147            .contains("not found in source dataset"));
4148    }
4149
4150    #[tokio::test]
4151    async fn test_vector_index_on_nested_field_with_dots() {
4152        let dimensions = 16;
4153        let num_rows = 256;
4154
4155        // Create schema with nested field containing dots in the name
4156        let struct_field = Field::new(
4157            "embedding_data",
4158            DataType::Struct(
4159                vec![
4160                    Field::new(
4161                        "vector.v1", // Field name with dot
4162                        DataType::FixedSizeList(
4163                            Arc::new(Field::new("item", DataType::Float32, true)),
4164                            dimensions,
4165                        ),
4166                        false,
4167                    ),
4168                    Field::new(
4169                        "vector.v2", // Another field name with dot
4170                        DataType::FixedSizeList(
4171                            Arc::new(Field::new("item", DataType::Float32, true)),
4172                            dimensions,
4173                        ),
4174                        false,
4175                    ),
4176                    Field::new("metadata", DataType::Utf8, false),
4177                ]
4178                .into(),
4179            ),
4180            false,
4181        );
4182
4183        let schema = Arc::new(Schema::new(vec![
4184            Field::new("id", DataType::Int32, false),
4185            struct_field,
4186        ]));
4187
4188        // Generate test data
4189        let float_arr_v1 = generate_random_array(num_rows * dimensions as usize);
4190        let vectors_v1 = FixedSizeListArray::try_new_from_values(float_arr_v1, dimensions).unwrap();
4191
4192        let float_arr_v2 = generate_random_array(num_rows * dimensions as usize);
4193        let vectors_v2 = FixedSizeListArray::try_new_from_values(float_arr_v2, dimensions).unwrap();
4194
4195        let ids = Int32Array::from_iter_values(0..num_rows as i32);
4196        let metadata = StringArray::from_iter_values((0..num_rows).map(|i| format!("meta_{}", i)));
4197
4198        let struct_array = arrow_array::StructArray::from(vec![
4199            (
4200                Arc::new(Field::new(
4201                    "vector.v1",
4202                    DataType::FixedSizeList(
4203                        Arc::new(Field::new("item", DataType::Float32, true)),
4204                        dimensions,
4205                    ),
4206                    false,
4207                )),
4208                Arc::new(vectors_v1) as Arc<dyn arrow_array::Array>,
4209            ),
4210            (
4211                Arc::new(Field::new(
4212                    "vector.v2",
4213                    DataType::FixedSizeList(
4214                        Arc::new(Field::new("item", DataType::Float32, true)),
4215                        dimensions,
4216                    ),
4217                    false,
4218                )),
4219                Arc::new(vectors_v2) as Arc<dyn arrow_array::Array>,
4220            ),
4221            (
4222                Arc::new(Field::new("metadata", DataType::Utf8, false)),
4223                Arc::new(metadata) as Arc<dyn arrow_array::Array>,
4224            ),
4225        ]);
4226
4227        let batch =
4228            RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4229                .unwrap();
4230
4231        let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4232
4233        let test_dir = TempStrDir::default();
4234        let test_uri = &test_dir;
4235        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4236
4237        // Test creating index on nested field with dots using quoted syntax
4238        let nested_column_path_v1 = "embedding_data.`vector.v1`";
4239        let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
4240
4241        dataset
4242            .create_index(
4243                &[nested_column_path_v1],
4244                IndexType::Vector,
4245                Some("vec_v1_idx".to_string()),
4246                &params,
4247                true,
4248            )
4249            .await
4250            .unwrap();
4251
4252        // Verify index was created
4253        let indices = dataset.load_indices().await.unwrap();
4254        assert_eq!(indices.len(), 1);
4255        assert_eq!(indices[0].name, "vec_v1_idx");
4256
4257        // Verify the correct field was indexed
4258        let field_id = indices[0].fields[0];
4259        let field_path = dataset.schema().field_path(field_id).unwrap();
4260        assert_eq!(field_path, "embedding_data.`vector.v1`");
4261
4262        // Test creating index on the second vector field with dots
4263        let nested_column_path_v2 = "embedding_data.`vector.v2`";
4264        dataset
4265            .create_index(
4266                &[nested_column_path_v2],
4267                IndexType::Vector,
4268                Some("vec_v2_idx".to_string()),
4269                &params,
4270                true,
4271            )
4272            .await
4273            .unwrap();
4274
4275        // Verify both indices exist
4276        let indices = dataset.load_indices().await.unwrap();
4277        assert_eq!(indices.len(), 2);
4278
4279        // Verify we can query using the indexed fields and check the plan
4280        let query_vector = generate_random_array(dimensions as usize);
4281
4282        // Check the query plan for the first vector field
4283        let plan_v1 = dataset
4284            .scan()
4285            .nearest(nested_column_path_v1, &query_vector, 5)
4286            .unwrap()
4287            .explain_plan(false)
4288            .await
4289            .unwrap();
4290
4291        // Verify the vector index is being used (should show ANNSubIndex or ANNIvfPartition)
4292        assert!(
4293            plan_v1.contains("ANNSubIndex") || plan_v1.contains("ANNIvfPartition"),
4294            "Query plan should use vector index for nested field with dots. Plan: {}",
4295            plan_v1
4296        );
4297
4298        let search_results_v1 = dataset
4299            .scan()
4300            .nearest(nested_column_path_v1, &query_vector, 5)
4301            .unwrap()
4302            .try_into_batch()
4303            .await
4304            .unwrap();
4305
4306        assert_eq!(search_results_v1.num_rows(), 5);
4307
4308        // Check the query plan for the second vector field
4309        let plan_v2 = dataset
4310            .scan()
4311            .nearest(nested_column_path_v2, &query_vector, 5)
4312            .unwrap()
4313            .explain_plan(false)
4314            .await
4315            .unwrap();
4316
4317        // Verify the vector index is being used
4318        assert!(
4319            plan_v2.contains("ANNSubIndex") || plan_v2.contains("ANNIvfPartition"),
4320            "Query plan should use vector index for second nested field with dots. Plan: {}",
4321            plan_v2
4322        );
4323
4324        let search_results_v2 = dataset
4325            .scan()
4326            .nearest(nested_column_path_v2, &query_vector, 5)
4327            .unwrap()
4328            .try_into_batch()
4329            .await
4330            .unwrap();
4331
4332        assert_eq!(search_results_v2.num_rows(), 5);
4333    }
4334
4335    #[tokio::test]
4336    async fn test_vector_index_on_simple_nested_field() {
4337        // This test reproduces the Python test scenario from test_nested_field_vector_index
4338        // where the nested field path is simple (data.embedding) without dots in field names
4339        let dimensions = 16;
4340        let num_rows = 256;
4341
4342        // Create schema with simple nested field (no dots in field names)
4343        let struct_field = Field::new(
4344            "data",
4345            DataType::Struct(
4346                vec![
4347                    Field::new(
4348                        "embedding",
4349                        DataType::FixedSizeList(
4350                            Arc::new(Field::new("item", DataType::Float32, true)),
4351                            dimensions,
4352                        ),
4353                        false,
4354                    ),
4355                    Field::new("label", DataType::Utf8, false),
4356                ]
4357                .into(),
4358            ),
4359            false,
4360        );
4361
4362        let schema = Arc::new(Schema::new(vec![
4363            Field::new("id", DataType::Int32, false),
4364            struct_field,
4365        ]));
4366
4367        // Generate test data
4368        let float_arr = generate_random_array(num_rows * dimensions as usize);
4369        let vectors = FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
4370
4371        let ids = Int32Array::from_iter_values(0..num_rows as i32);
4372        let labels = StringArray::from_iter_values((0..num_rows).map(|i| format!("label_{}", i)));
4373
4374        let struct_array = arrow_array::StructArray::from(vec![
4375            (
4376                Arc::new(Field::new(
4377                    "embedding",
4378                    DataType::FixedSizeList(
4379                        Arc::new(Field::new("item", DataType::Float32, true)),
4380                        dimensions,
4381                    ),
4382                    false,
4383                )),
4384                Arc::new(vectors) as Arc<dyn arrow_array::Array>,
4385            ),
4386            (
4387                Arc::new(Field::new("label", DataType::Utf8, false)),
4388                Arc::new(labels) as Arc<dyn arrow_array::Array>,
4389            ),
4390        ]);
4391
4392        let batch =
4393            RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4394                .unwrap();
4395
4396        let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4397
4398        let test_dir = TempStrDir::default();
4399        let test_uri = &test_dir;
4400        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4401
4402        // Test creating index on nested field
4403        let nested_column_path = "data.embedding";
4404        let params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10);
4405
4406        dataset
4407            .create_index(
4408                &[nested_column_path],
4409                IndexType::Vector,
4410                Some("vec_idx".to_string()),
4411                &params,
4412                true,
4413            )
4414            .await
4415            .unwrap();
4416
4417        // Verify index was created
4418        let indices = dataset.load_indices().await.unwrap();
4419        assert_eq!(indices.len(), 1);
4420        assert_eq!(indices[0].name, "vec_idx");
4421
4422        // Verify the correct field was indexed
4423        let field_id = indices[0].fields[0];
4424        let field_path = dataset.schema().field_path(field_id).unwrap();
4425        assert_eq!(field_path, "data.embedding");
4426
4427        // Test querying with the index
4428        let query_vector = generate_random_array(dimensions as usize);
4429
4430        let plan = dataset
4431            .scan()
4432            .nearest(nested_column_path, &query_vector, 5)
4433            .unwrap()
4434            .explain_plan(false)
4435            .await
4436            .unwrap();
4437
4438        // Verify the vector index is being used
4439        assert!(
4440            plan.contains("ANNSubIndex") || plan.contains("ANNIvfPartition"),
4441            "Query plan should use vector index for nested field. Plan: {}",
4442            plan
4443        );
4444
4445        let search_results = dataset
4446            .scan()
4447            .nearest(nested_column_path, &query_vector, 5)
4448            .unwrap()
4449            .try_into_batch()
4450            .await
4451            .unwrap();
4452
4453        assert_eq!(search_results.num_rows(), 5);
4454    }
4455
4456    #[tokio::test]
4457    async fn test_btree_index_on_nested_field_with_dots() {
4458        // Test creating BTree index on nested field with dots in the name
4459        let test_dir = TempStrDir::default();
4460        let test_uri = &test_dir;
4461
4462        // Create schema with nested field containing dots
4463        let schema = Arc::new(Schema::new(vec![
4464            Field::new("id", DataType::Int32, false),
4465            Field::new(
4466                "data",
4467                DataType::Struct(
4468                    vec![
4469                        Field::new("value.v1", DataType::Int32, false),
4470                        Field::new("value.v2", DataType::Float32, false),
4471                        Field::new("text", DataType::Utf8, false),
4472                    ]
4473                    .into(),
4474                ),
4475                false,
4476            ),
4477        ]));
4478
4479        // Generate test data
4480        let num_rows = 1000;
4481        let ids = Int32Array::from_iter_values(0..num_rows);
4482        let values_v1 = Int32Array::from_iter_values((0..num_rows).map(|i| i % 100));
4483        let values_v2 = Float32Array::from_iter_values((0..num_rows).map(|i| (i as f32) * 0.1));
4484        let texts = StringArray::from_iter_values((0..num_rows).map(|i| format!("text_{}", i)));
4485
4486        let struct_array = arrow_array::StructArray::from(vec![
4487            (
4488                Arc::new(Field::new("value.v1", DataType::Int32, false)),
4489                Arc::new(values_v1) as Arc<dyn arrow_array::Array>,
4490            ),
4491            (
4492                Arc::new(Field::new("value.v2", DataType::Float32, false)),
4493                Arc::new(values_v2) as Arc<dyn arrow_array::Array>,
4494            ),
4495            (
4496                Arc::new(Field::new("text", DataType::Utf8, false)),
4497                Arc::new(texts) as Arc<dyn arrow_array::Array>,
4498            ),
4499        ]);
4500
4501        let batch =
4502            RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4503                .unwrap();
4504
4505        let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4506        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4507
4508        // Create BTree index on nested field with dots
4509        let nested_column_path = "data.`value.v1`";
4510        let params = ScalarIndexParams::default();
4511
4512        dataset
4513            .create_index(
4514                &[nested_column_path],
4515                IndexType::BTree,
4516                Some("btree_v1_idx".to_string()),
4517                &params,
4518                true,
4519            )
4520            .await
4521            .unwrap();
4522
4523        // Reload dataset to ensure index is loaded
4524        dataset = Dataset::open(test_uri).await.unwrap();
4525
4526        // Verify index was created
4527        let indices = dataset.load_indices().await.unwrap();
4528        assert_eq!(indices.len(), 1);
4529        assert_eq!(indices[0].name, "btree_v1_idx");
4530
4531        // Verify the correct field was indexed
4532        let field_id = indices[0].fields[0];
4533        let field_path = dataset.schema().field_path(field_id).unwrap();
4534        assert_eq!(field_path, "data.`value.v1`");
4535
4536        // Test querying with the index and verify it's being used
4537        let plan = dataset
4538            .scan()
4539            .filter("data.`value.v1` = 42")
4540            .unwrap()
4541            .prefilter(true)
4542            .explain_plan(false)
4543            .await
4544            .unwrap();
4545
4546        // Verify the query plan (scalar indices on nested fields may use optimized filters)
4547        // The index may be used internally even if not shown as ScalarIndexQuery
4548        assert!(
4549            plan.contains("ScalarIndexQuery"),
4550            "Query plan should show optimized read. Plan: {}",
4551            plan
4552        );
4553
4554        // Also test that the query returns results
4555        let results = dataset
4556            .scan()
4557            .filter("data.`value.v1` = 42")
4558            .unwrap()
4559            .prefilter(true)
4560            .try_into_batch()
4561            .await
4562            .unwrap();
4563
4564        assert!(results.num_rows() > 0);
4565    }
4566
4567    #[tokio::test]
4568    async fn test_bitmap_index_on_nested_field_with_dots() {
4569        // Test creating Bitmap index on nested field with dots in the name
4570        let test_dir = TempStrDir::default();
4571        let test_uri = &test_dir;
4572
4573        // Create schema with nested field containing dots - using low cardinality for bitmap
4574        let schema = Arc::new(Schema::new(vec![
4575            Field::new("id", DataType::Int32, false),
4576            Field::new(
4577                "metadata",
4578                DataType::Struct(
4579                    vec![
4580                        Field::new("status.code", DataType::Int32, false),
4581                        Field::new("category.name", DataType::Utf8, false),
4582                    ]
4583                    .into(),
4584                ),
4585                false,
4586            ),
4587        ]));
4588
4589        // Generate test data with low cardinality (good for bitmap index)
4590        let num_rows = 1000;
4591        let ids = Int32Array::from_iter_values(0..num_rows);
4592        // Only 10 unique status codes
4593        let status_codes = Int32Array::from_iter_values((0..num_rows).map(|i| i % 10));
4594        // Only 5 unique categories
4595        let categories =
4596            StringArray::from_iter_values((0..num_rows).map(|i| format!("category_{}", i % 5)));
4597
4598        let struct_array = arrow_array::StructArray::from(vec![
4599            (
4600                Arc::new(Field::new("status.code", DataType::Int32, false)),
4601                Arc::new(status_codes) as Arc<dyn arrow_array::Array>,
4602            ),
4603            (
4604                Arc::new(Field::new("category.name", DataType::Utf8, false)),
4605                Arc::new(categories) as Arc<dyn arrow_array::Array>,
4606            ),
4607        ]);
4608
4609        let batch =
4610            RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4611                .unwrap();
4612
4613        let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4614        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4615
4616        // Create Bitmap index on nested field with dots
4617        let nested_column_path = "metadata.`status.code`";
4618        let params = ScalarIndexParams::default();
4619
4620        dataset
4621            .create_index(
4622                &[nested_column_path],
4623                IndexType::Bitmap,
4624                Some("bitmap_status_idx".to_string()),
4625                &params,
4626                true,
4627            )
4628            .await
4629            .unwrap();
4630
4631        // Reload dataset to ensure index is loaded
4632        dataset = Dataset::open(test_uri).await.unwrap();
4633
4634        // Verify index was created
4635        let indices = dataset.load_indices().await.unwrap();
4636        assert_eq!(indices.len(), 1);
4637        assert_eq!(indices[0].name, "bitmap_status_idx");
4638
4639        // Verify the correct field was indexed
4640        let field_id = indices[0].fields[0];
4641        let field_path = dataset.schema().field_path(field_id).unwrap();
4642        assert_eq!(field_path, "metadata.`status.code`");
4643
4644        // Test querying with the index and verify it's being used
4645        let plan = dataset
4646            .scan()
4647            .filter("metadata.`status.code` = 5")
4648            .unwrap()
4649            .explain_plan(false)
4650            .await
4651            .unwrap();
4652
4653        // Verify the query plan (scalar indices on nested fields may use optimized filters)
4654        // The index may be used internally even if not shown as ScalarIndexQuery
4655        assert!(
4656            plan.contains("ScalarIndexQuery"),
4657            "Query plan should show optimized read. Plan: {}",
4658            plan
4659        );
4660
4661        // Also test that the query returns results
4662        let results = dataset
4663            .scan()
4664            .filter("metadata.`status.code` = 5")
4665            .unwrap()
4666            .try_into_batch()
4667            .await
4668            .unwrap();
4669
4670        // Should have ~100 rows with status code 5
4671        assert!(results.num_rows() > 0);
4672        assert_eq!(results.num_rows(), 100);
4673    }
4674
4675    #[tokio::test]
4676    async fn test_inverted_index_on_nested_field_with_dots() {
4677        use lance_index::scalar::inverted::tokenizer::InvertedIndexParams;
4678
4679        // Test creating Inverted index on nested text field with dots in the name
4680        let test_dir = TempStrDir::default();
4681        let test_uri = &test_dir;
4682
4683        // Create schema with nested text field containing dots
4684        let schema = Arc::new(Schema::new(vec![
4685            Field::new("id", DataType::Int32, false),
4686            Field::new(
4687                "document",
4688                DataType::Struct(
4689                    vec![
4690                        Field::new("content.text", DataType::Utf8, false),
4691                        Field::new("content.summary", DataType::Utf8, false),
4692                    ]
4693                    .into(),
4694                ),
4695                false,
4696            ),
4697        ]));
4698
4699        // Generate test data with text content
4700        let num_rows = 100;
4701        let ids = Int32Array::from_iter_values(0..num_rows as i32);
4702        let content_texts = StringArray::from_iter_values((0..num_rows).map(|i| match i % 3 {
4703            0 => format!("The quick brown fox jumps over the lazy dog {}", i),
4704            1 => format!(
4705                "Machine learning and artificial intelligence document {}",
4706                i
4707            ),
4708            _ => format!("Data science and analytics content piece {}", i),
4709        }));
4710        let summaries = StringArray::from_iter_values(
4711            (0..num_rows).map(|i| format!("Summary of document {}", i)),
4712        );
4713
4714        let struct_array = arrow_array::StructArray::from(vec![
4715            (
4716                Arc::new(Field::new("content.text", DataType::Utf8, false)),
4717                Arc::new(content_texts) as Arc<dyn arrow_array::Array>,
4718            ),
4719            (
4720                Arc::new(Field::new("content.summary", DataType::Utf8, false)),
4721                Arc::new(summaries) as Arc<dyn arrow_array::Array>,
4722            ),
4723        ]);
4724
4725        let batch =
4726            RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4727                .unwrap();
4728
4729        let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4730        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4731
4732        // Create Inverted index on nested text field with dots
4733        let nested_column_path = "document.`content.text`";
4734        let params = InvertedIndexParams::default();
4735
4736        dataset
4737            .create_index(
4738                &[nested_column_path],
4739                IndexType::Inverted,
4740                Some("inverted_content_idx".to_string()),
4741                &params,
4742                true,
4743            )
4744            .await
4745            .unwrap();
4746
4747        // Reload the dataset to ensure the index is loaded
4748        dataset = Dataset::open(test_uri).await.unwrap();
4749
4750        // Verify index was created
4751        let indices = dataset.load_indices().await.unwrap();
4752        assert_eq!(indices.len(), 1);
4753        assert_eq!(indices[0].name, "inverted_content_idx");
4754
4755        // Verify the correct field was indexed
4756        let field_id = indices[0].fields[0];
4757        let field_path = dataset.schema().field_path(field_id).unwrap();
4758        assert_eq!(field_path, "document.`content.text`");
4759
4760        // Test full-text search on the nested field with dots
4761        // Use the field_path that the index reports
4762        let query = FullTextSearchQuery::new("machine learning".to_string())
4763            .with_column(field_path.clone())
4764            .unwrap();
4765
4766        // Check the query plan uses the inverted index
4767        let plan = dataset
4768            .scan()
4769            .full_text_search(query.clone())
4770            .unwrap()
4771            .explain_plan(false)
4772            .await
4773            .unwrap();
4774
4775        // Verify the inverted index is being used
4776        assert!(
4777            plan.contains("MatchQuery") || plan.contains("PhraseQuery"),
4778            "Query plan should use inverted index for nested field with dots. Plan: {}",
4779            plan
4780        );
4781
4782        let results = dataset
4783            .scan()
4784            .full_text_search(query)
4785            .unwrap()
4786            .try_into_stream()
4787            .await
4788            .unwrap()
4789            .try_collect::<Vec<_>>()
4790            .await
4791            .unwrap();
4792
4793        // Verify we get results from the full-text search
4794        assert!(
4795            !results.is_empty(),
4796            "Full-text search should return results"
4797        );
4798
4799        // Check that we found documents containing "machine learning"
4800        let mut found_count = 0;
4801        for batch in results {
4802            found_count += batch.num_rows();
4803        }
4804        // We expect to find approximately 1/3 of documents (those with i % 3 == 1)
4805        assert!(
4806            found_count > 0,
4807            "Should find at least some documents with 'machine learning'"
4808        );
4809        assert!(found_count < num_rows, "Should not match all documents");
4810    }
4811}