Skip to main content

lance/
index.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Secondary Index
5//!
6
7use std::collections::{HashMap, HashSet};
8use std::sync::{Arc, OnceLock};
9
10use arrow_schema::{DataType, Schema};
11use async_trait::async_trait;
12use datafusion::execution::SendableRecordBatchStream;
13use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
14use futures::{stream, FutureExt};
15use itertools::Itertools;
16use lance_core::cache::{CacheKey, UnsizedCacheKey};
17use lance_core::datatypes::Field;
18use lance_core::datatypes::Schema as LanceSchema;
19use lance_core::utils::address::RowAddress;
20use lance_core::utils::parse::str_is_truthy;
21use lance_core::utils::tracing::{
22    IO_TYPE_OPEN_FRAG_REUSE, IO_TYPE_OPEN_MEM_WAL, IO_TYPE_OPEN_SCALAR, IO_TYPE_OPEN_VECTOR,
23    TRACE_IO_EVENTS,
24};
25use lance_file::previous::reader::FileReader as PreviousFileReader;
26use lance_file::reader::FileReaderOptions;
27use lance_index::frag_reuse::{FragReuseIndex, FRAG_REUSE_INDEX_NAME};
28use lance_index::mem_wal::{MemWalIndex, MEM_WAL_INDEX_NAME};
29use lance_index::optimize::OptimizeOptions;
30use lance_index::pb::index::Implementation;
31use lance_index::scalar::expression::{
32    IndexInformationProvider, MultiQueryParser, ScalarQueryParser,
33};
34use lance_index::scalar::inverted::InvertedIndexPlugin;
35use lance_index::scalar::lance_format::LanceIndexStore;
36use lance_index::scalar::registry::{TrainingCriteria, TrainingOrdering};
37use lance_index::scalar::{CreatedIndex, ScalarIndex};
38use lance_index::vector::bq::builder::RabitQuantizer;
39use lance_index::vector::flat::index::{FlatBinQuantizer, FlatIndex, FlatQuantizer};
40use lance_index::vector::hnsw::HNSW;
41use lance_index::vector::pq::ProductQuantizer;
42use lance_index::vector::sq::ScalarQuantizer;
43pub use lance_index::IndexParams;
44use lance_index::{
45    is_system_index,
46    metrics::{MetricsCollector, NoOpMetricsCollector},
47    IndexCriteria,
48};
49use lance_index::{pb, vector::VectorIndex, Index, IndexType, INDEX_FILE_NAME};
50use lance_index::{
51    DatasetIndexExt, IndexDescription, INDEX_METADATA_SCHEMA_KEY, VECTOR_INDEX_VERSION,
52};
53use lance_io::scheduler::{ScanScheduler, SchedulerConfig};
54use lance_io::traits::Reader;
55use lance_io::utils::{
56    read_last_block, read_message, read_message_from_buf, read_metadata_offset, read_version,
57    CachedFileSize,
58};
59use lance_table::format::IndexMetadata;
60use lance_table::format::{Fragment, SelfDescribingFileReader};
61use lance_table::io::manifest::read_manifest_indexes;
62use roaring::RoaringBitmap;
63use scalar::index_matches_criteria;
64use serde_json::json;
65use snafu::location;
66use tracing::{info, instrument};
67use uuid::Uuid;
68use vector::ivf::v2::IVFIndex;
69use vector::utils::get_vector_type;
70
71pub(crate) mod append;
72mod create;
73pub mod frag_reuse;
74pub mod mem_wal;
75pub mod prefilter;
76pub mod scalar;
77pub mod vector;
78
79use self::append::merge_indices;
80use self::vector::remap_vector_index;
81use crate::dataset::index::LanceIndexStoreExt;
82use crate::dataset::optimize::remapping::RemapResult;
83use crate::dataset::optimize::RemappedIndex;
84use crate::dataset::transaction::{Operation, Transaction};
85use crate::index::frag_reuse::{load_frag_reuse_index_details, open_frag_reuse_index};
86use crate::index::mem_wal::open_mem_wal_index;
87pub use crate::index::prefilter::{FilterLoader, PreFilter};
88use crate::index::scalar::{fetch_index_details, load_training_data, IndexDetails};
89use crate::session::index_caches::{FragReuseIndexKey, IndexMetadataKey};
90use crate::{dataset::Dataset, Error, Result};
91pub use create::CreateIndexBuilder;
92
93// Cache keys for different index types
94#[derive(Debug, Clone)]
95pub struct ScalarIndexCacheKey<'a> {
96    pub uuid: &'a str,
97    pub fri_uuid: Option<&'a Uuid>,
98}
99
100impl<'a> ScalarIndexCacheKey<'a> {
101    pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
102        Self { uuid, fri_uuid }
103    }
104}
105
106impl UnsizedCacheKey for ScalarIndexCacheKey<'_> {
107    type ValueType = dyn ScalarIndex;
108
109    fn key(&self) -> std::borrow::Cow<'_, str> {
110        if let Some(fri_uuid) = self.fri_uuid {
111            format!("{}-{}", self.uuid, fri_uuid).into()
112        } else {
113            self.uuid.into()
114        }
115    }
116}
117
118#[derive(Debug, Clone)]
119pub struct VectorIndexCacheKey<'a> {
120    pub uuid: &'a str,
121    pub fri_uuid: Option<&'a Uuid>,
122}
123
124impl<'a> VectorIndexCacheKey<'a> {
125    pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
126        Self { uuid, fri_uuid }
127    }
128}
129
130impl UnsizedCacheKey for VectorIndexCacheKey<'_> {
131    type ValueType = dyn VectorIndex;
132
133    fn key(&self) -> std::borrow::Cow<'_, str> {
134        if let Some(fri_uuid) = self.fri_uuid {
135            format!("{}-{}", self.uuid, fri_uuid).into()
136        } else {
137            self.uuid.into()
138        }
139    }
140}
141
142#[derive(Debug, Clone)]
143pub struct FragReuseIndexCacheKey<'a> {
144    pub uuid: &'a str,
145    pub fri_uuid: Option<&'a Uuid>,
146}
147
148impl<'a> FragReuseIndexCacheKey<'a> {
149    pub fn new(uuid: &'a str, fri_uuid: Option<&'a Uuid>) -> Self {
150        Self { uuid, fri_uuid }
151    }
152}
153
154impl CacheKey for FragReuseIndexCacheKey<'_> {
155    type ValueType = FragReuseIndex;
156
157    fn key(&self) -> std::borrow::Cow<'_, str> {
158        if let Some(fri_uuid) = self.fri_uuid {
159            format!("{}-{}", self.uuid, fri_uuid).into()
160        } else {
161            self.uuid.into()
162        }
163    }
164}
165
166#[derive(Debug, Clone)]
167pub struct MemWalCacheKey<'a> {
168    pub uuid: &'a Uuid,
169    pub fri_uuid: Option<&'a Uuid>,
170}
171
172impl<'a> MemWalCacheKey<'a> {
173    pub fn new(uuid: &'a Uuid, fri_uuid: Option<&'a Uuid>) -> Self {
174        Self { uuid, fri_uuid }
175    }
176}
177
178impl CacheKey for MemWalCacheKey<'_> {
179    type ValueType = MemWalIndex;
180
181    fn key(&self) -> std::borrow::Cow<'_, str> {
182        if let Some(fri_uuid) = self.fri_uuid {
183            format!("{}-{}", self.uuid, fri_uuid).into()
184        } else {
185            self.uuid.to_string().into()
186        }
187    }
188}
189
190// Whether to auto-migrate a dataset when we encounter corruption.
191fn auto_migrate_corruption() -> bool {
192    static LANCE_AUTO_MIGRATION: OnceLock<bool> = OnceLock::new();
193    *LANCE_AUTO_MIGRATION.get_or_init(|| {
194        std::env::var("LANCE_AUTO_MIGRATION")
195            .ok()
196            .map(|s| str_is_truthy(&s))
197            .unwrap_or(true)
198    })
199}
200
201/// Derive a friendly (but not necessarily unique) type name from a type URL.
202/// Extract a human-friendly type name from a type URL.
203///
204/// Strips prefixes like `type.googleapis.com/` and package names, then removes
205/// trailing `IndexDetails` / `Index` so callers get a concise display name.
206fn type_name_from_uri(index_uri: &str) -> String {
207    let type_name = index_uri.rsplit('/').next().unwrap_or(index_uri);
208    let type_name = type_name.rsplit('.').next().unwrap_or(type_name);
209    type_name.trim_end_matches("IndexDetails").to_string()
210}
211
212/// Legacy mapping from type URL to the old IndexType string for backwards compatibility.
213/// Legacy mapping from type URL to the old IndexType string for backwards compatibility.
214///
215/// If `index_type_hint` is provided (e.g. parsed from the index statistics of a concrete
216/// index instance), it takes precedence so callers can surface the exact index type even
217/// when the type URL alone is too generic (such as VectorIndexDetails).
218fn legacy_type_name(index_uri: &str, index_type_hint: Option<&str>) -> String {
219    if let Some(hint) = index_type_hint {
220        return hint.to_string();
221    }
222
223    let base = type_name_from_uri(index_uri);
224
225    match base.as_str() {
226        "BTree" => IndexType::BTree.to_string(),
227        "Bitmap" => IndexType::Bitmap.to_string(),
228        "LabelList" => IndexType::LabelList.to_string(),
229        "NGram" => IndexType::NGram.to_string(),
230        "ZoneMap" => IndexType::ZoneMap.to_string(),
231        "BloomFilter" => IndexType::BloomFilter.to_string(),
232        "Inverted" => IndexType::Inverted.to_string(),
233        "Json" => IndexType::Scalar.to_string(),
234        "Flat" | "Vector" => IndexType::Vector.to_string(),
235        other if other.contains("Vector") => IndexType::Vector.to_string(),
236        _ => "N/A".to_string(),
237    }
238}
239
240/// Builds index.
241#[async_trait]
242pub trait IndexBuilder {
243    fn index_type() -> IndexType;
244
245    async fn build(&self) -> Result<()>;
246}
247
248pub(crate) async fn remap_index(
249    dataset: &Dataset,
250    index_id: &Uuid,
251    row_id_map: &HashMap<u64, Option<u64>>,
252) -> Result<RemapResult> {
253    // Load indices from the disk.
254    let indices = dataset.load_indices().await?;
255    let matched = indices
256        .iter()
257        .find(|i| i.uuid == *index_id)
258        .ok_or_else(|| Error::Index {
259            message: format!("Index with id {} does not exist", index_id),
260            location: location!(),
261        })?;
262
263    if matched.fields.len() > 1 {
264        return Err(Error::Index {
265            message: "Remapping indices with multiple fields is not supported".to_string(),
266            location: location!(),
267        });
268    }
269
270    if row_id_map.values().all(|v| v.is_none()) {
271        let deleted_bitmap = RoaringBitmap::from_iter(
272            row_id_map
273                .keys()
274                .map(|row_id| RowAddress::new_from_u64(*row_id))
275                .map(|addr| addr.fragment_id()),
276        );
277        if Some(deleted_bitmap) == matched.fragment_bitmap {
278            // If remap deleted all rows, we can just return the same index ID.
279            // This can happen if there is a bug where the index is covering empty
280            // fragment that haven't been cleaned up. They should be cleaned up
281            // outside of this function.
282            return Ok(RemapResult::Keep(*index_id));
283        }
284    }
285
286    let field_id = matched
287        .fields
288        .first()
289        .expect("An index existed with no fields");
290    let field_path = dataset.schema().field_path(*field_id)?;
291
292    let new_id = Uuid::new_v4();
293
294    let generic = dataset
295        .open_generic_index(&field_path, &index_id.to_string(), &NoOpMetricsCollector)
296        .await?;
297
298    let created_index = match generic.index_type() {
299        it if it.is_scalar() => {
300            let new_store = LanceIndexStore::from_dataset_for_new(dataset, &new_id.to_string())?;
301
302            let scalar_index = dataset
303                .open_scalar_index(&field_path, &index_id.to_string(), &NoOpMetricsCollector)
304                .await?;
305            if !scalar_index.can_remap() {
306                return Ok(RemapResult::Drop);
307            }
308
309            match scalar_index.index_type() {
310                IndexType::Inverted => {
311                    let inverted_index = scalar_index
312                        .as_any()
313                        .downcast_ref::<lance_index::scalar::inverted::InvertedIndex>()
314                        .ok_or(Error::Index {
315                            message: "expected inverted index".to_string(),
316                            location: location!(),
317                        })?;
318                    if inverted_index.is_legacy() {
319                        log::warn!("reindex because of legacy format, index_type: {}, index_id: {}, field: {}",
320                            scalar_index.index_type(),
321                            index_id,
322                            field_path
323                        );
324                        let training_data = load_training_data(
325                            dataset,
326                            &field_path,
327                            &TrainingCriteria::new(TrainingOrdering::None),
328                            None,
329                            true, // Legacy reindexing should always train
330                            None,
331                        )
332                        .await?;
333                        InvertedIndexPlugin::train_inverted_index(
334                            training_data,
335                            &new_store,
336                            inverted_index.params().clone(),
337                            None,
338                        )
339                        .await?
340                    } else {
341                        scalar_index.remap(row_id_map, &new_store).await?
342                    }
343                }
344                _ => scalar_index.remap(row_id_map, &new_store).await?,
345            }
346        }
347        it if it.is_vector() => {
348            remap_vector_index(
349                Arc::new(dataset.clone()),
350                &field_path,
351                index_id,
352                &new_id,
353                matched,
354                row_id_map,
355            )
356            .await?;
357            CreatedIndex {
358                index_details: prost_types::Any::from_msg(
359                    &lance_table::format::pb::VectorIndexDetails::default(),
360                )
361                .unwrap(),
362                index_version: VECTOR_INDEX_VERSION,
363            }
364        }
365        _ => {
366            return Err(Error::Index {
367                message: format!("Index type {} is not supported", generic.index_type()),
368                location: location!(),
369            });
370        }
371    };
372
373    Ok(RemapResult::Remapped(RemappedIndex {
374        old_id: *index_id,
375        new_id,
376        index_details: created_index.index_details,
377        index_version: created_index.index_version,
378    }))
379}
380
381#[derive(Debug)]
382pub struct ScalarIndexInfo {
383    indexed_columns: HashMap<String, (DataType, Box<MultiQueryParser>)>,
384}
385
386impl IndexInformationProvider for ScalarIndexInfo {
387    fn get_index(&self, col: &str) -> Option<(&DataType, &dyn ScalarQueryParser)> {
388        self.indexed_columns
389            .get(col)
390            .map(|(ty, parser)| (ty, parser.as_ref() as &dyn ScalarQueryParser))
391    }
392}
393
394async fn open_index_proto(reader: &dyn Reader) -> Result<pb::Index> {
395    let file_size = reader.size().await?;
396    let tail_bytes = read_last_block(reader).await?;
397    let metadata_pos = read_metadata_offset(&tail_bytes)?;
398    let proto: pb::Index = if metadata_pos < file_size - tail_bytes.len() {
399        // We have not read the metadata bytes yet.
400        read_message(reader, metadata_pos).await?
401    } else {
402        let offset = tail_bytes.len() - (file_size - metadata_pos);
403        read_message_from_buf(&tail_bytes.slice(offset..))?
404    };
405    Ok(proto)
406}
407
408fn vector_index_details() -> prost_types::Any {
409    let details = lance_table::format::pb::VectorIndexDetails::default();
410    prost_types::Any::from_msg(&details).unwrap()
411}
412
413struct IndexDescriptionImpl {
414    name: String,
415    field_ids: Vec<u32>,
416    segments: Vec<IndexMetadata>,
417    index_type: String,
418    details: IndexDetails,
419    rows_indexed: u64,
420}
421
422impl IndexDescriptionImpl {
423    fn try_new(segments: Vec<IndexMetadata>, dataset: &Dataset) -> Result<Self> {
424        if segments.is_empty() {
425            return Err(Error::Index {
426                message: "Index metadata is empty".to_string(),
427                location: location!(),
428            });
429        }
430
431        // We assume the type URL and details are the same for all segments
432        let example_metadata = &segments[0];
433
434        let name = example_metadata.name.clone();
435        if !segments.iter().all(|shard| shard.name == name) {
436            return Err(Error::Index {
437                message: "Index name should be identical across all segments".to_string(),
438                location: location!(),
439            });
440        }
441
442        let field_ids = &example_metadata.fields;
443        if !segments.iter().all(|shard| shard.fields == *field_ids) {
444            return Err(Error::Index {
445                message: "Index fields should be identical across all segments".to_string(),
446                location: location!(),
447            });
448        }
449        let field_ids = field_ids.iter().map(|id| *id as u32).collect();
450
451        // This should not fail as we have already filtered out indexes without index details.
452        let index_details = example_metadata.index_details.as_ref().ok_or(Error::Index {
453            message:
454                "Index details are required for index description.  This index must be retrained to support this method."
455                    .to_string(),
456            location: location!(),
457        })?;
458        let type_url = &index_details.type_url;
459        if !segments.iter().all(|shard| {
460            shard
461                .index_details
462                .as_ref()
463                .map(|d| d.type_url == *type_url)
464                .unwrap_or(false)
465        }) {
466            return Err(Error::Index {
467                message: "Index type URL should be present and identical across all segments"
468                    .to_string(),
469                location: location!(),
470            });
471        }
472
473        let details = IndexDetails(index_details.clone());
474        let mut rows_indexed = 0;
475
476        let index_type = details
477            .get_plugin()
478            .map(|p| p.name().to_string())
479            .unwrap_or_else(|_| "Unknown".to_string());
480
481        for shard in &segments {
482            let fragment_bitmap = shard
483            .fragment_bitmap
484            .as_ref()
485            .ok_or_else(|| Error::Index {
486                message: "Fragment bitmap is required for index description.  This index must be retrained to support this method.".to_string(),
487                location: location!(),
488            })?;
489
490            for fragment in dataset.get_fragments() {
491                if fragment_bitmap.contains(fragment.id() as u32) {
492                    rows_indexed += fragment.fast_logical_rows()? as u64;
493                }
494            }
495        }
496
497        Ok(Self {
498            name,
499            field_ids,
500            index_type,
501            segments,
502            details,
503            rows_indexed,
504        })
505    }
506}
507
508impl IndexDescription for IndexDescriptionImpl {
509    fn name(&self) -> &str {
510        &self.name
511    }
512
513    fn field_ids(&self) -> &[u32] {
514        &self.field_ids
515    }
516
517    fn index_type(&self) -> &str {
518        &self.index_type
519    }
520
521    fn metadata(&self) -> &[IndexMetadata] {
522        &self.segments
523    }
524
525    fn type_url(&self) -> &str {
526        self.details.0.type_url.as_str()
527    }
528
529    fn rows_indexed(&self) -> u64 {
530        self.rows_indexed
531    }
532
533    fn details(&self) -> Result<String> {
534        let plugin = self.details.get_plugin()?;
535        plugin
536            .details_as_json(&self.details.0)
537            .map(|v| v.to_string())
538    }
539}
540
541#[async_trait]
542impl DatasetIndexExt for Dataset {
543    type IndexBuilder<'a> = CreateIndexBuilder<'a>;
544
545    /// Create a builder for creating an index on columns.
546    ///
547    /// This returns a builder that can be configured with additional options
548    /// before awaiting to execute.
549    ///
550    /// # Examples
551    ///
552    /// Create a scalar BTREE index:
553    /// ```
554    /// # use lance::{Dataset, Result};
555    /// # use lance_index::{DatasetIndexExt, IndexType, scalar::ScalarIndexParams};
556    /// # async fn example(dataset: &mut Dataset) -> Result<()> {
557    /// let params = ScalarIndexParams::default();
558    /// dataset
559    ///     .create_index_builder(&["id"], IndexType::BTree, &params)
560    ///     .name("id_index".to_string())
561    ///     .await?;
562    /// # Ok(())
563    /// # }
564    /// ```
565    ///
566    /// Create an empty index that will be populated later:
567    /// ```
568    /// # use lance::{Dataset, Result};
569    /// # use lance_index::{DatasetIndexExt, IndexType, scalar::ScalarIndexParams};
570    /// # async fn example(dataset: &mut Dataset) -> Result<()> {
571    /// let params = ScalarIndexParams::default();
572    /// dataset
573    ///     .create_index_builder(&["category"], IndexType::Bitmap, &params)
574    ///     .train(false)  // Create empty index
575    ///     .replace(true)  // Replace if exists
576    ///     .await?;
577    /// # Ok(())
578    /// # }
579    /// ```
580    fn create_index_builder<'a>(
581        &'a mut self,
582        columns: &[&str],
583        index_type: IndexType,
584        params: &'a dyn IndexParams,
585    ) -> CreateIndexBuilder<'a> {
586        CreateIndexBuilder::new(self, columns, index_type, params)
587    }
588
589    #[instrument(skip_all)]
590    async fn create_index(
591        &mut self,
592        columns: &[&str],
593        index_type: IndexType,
594        name: Option<String>,
595        params: &dyn IndexParams,
596        replace: bool,
597    ) -> Result<IndexMetadata> {
598        // Use the builder pattern with default train=true for backward compatibility
599        let mut builder = self.create_index_builder(columns, index_type, params);
600
601        if let Some(name) = name {
602            builder = builder.name(name);
603        }
604
605        builder.replace(replace).await
606    }
607
608    async fn drop_index(&mut self, name: &str) -> Result<()> {
609        let indices = self.load_indices_by_name(name).await?;
610        if indices.is_empty() {
611            return Err(Error::IndexNotFound {
612                identity: format!("name={}", name),
613                location: location!(),
614            });
615        }
616
617        let transaction = Transaction::new(
618            self.manifest.version,
619            Operation::CreateIndex {
620                new_indices: vec![],
621                removed_indices: indices.clone(),
622            },
623            None,
624        );
625
626        self.apply_commit(transaction, &Default::default(), &Default::default())
627            .await?;
628
629        Ok(())
630    }
631
632    async fn prewarm_index(&self, name: &str) -> Result<()> {
633        let indices = self.load_indices_by_name(name).await?;
634        if indices.is_empty() {
635            return Err(Error::IndexNotFound {
636                identity: format!("name={}", name),
637                location: location!(),
638            });
639        }
640
641        let index = self
642            .open_generic_index(name, &indices[0].uuid.to_string(), &NoOpMetricsCollector)
643            .await?;
644        index.prewarm().await?;
645
646        Ok(())
647    }
648
649    async fn describe_indices<'a, 'b>(
650        &'a self,
651        criteria: Option<IndexCriteria<'b>>,
652    ) -> Result<Vec<Arc<dyn IndexDescription>>> {
653        let indices = self.load_indices().await?;
654        let mut indices = if let Some(criteria) = criteria {
655            indices.iter().filter(|idx| {
656                if idx.index_details.is_none() {
657                    log::warn!("The method describe_indices does not support indexes without index details.  Please retrain the index {}", idx.name);
658                    return false;
659                }
660                let fields = idx
661                    .fields
662                    .iter()
663                    .filter_map(|id| self.schema().field_by_id(*id))
664                    .collect::<Vec<_>>();
665                match index_matches_criteria(idx, &criteria, &fields, false, self.schema()) {
666                    Ok(matched) => matched,
667                    Err(err) => {
668                        log::warn!("Could not describe index {}: {}", idx.name, err);
669                        false
670                    }
671                }
672            }).collect::<Vec<_>>()
673        } else {
674            indices.iter().collect::<Vec<_>>()
675        };
676        indices.sort_by_key(|idx| &idx.name);
677
678        indices
679            .into_iter()
680            .chunk_by(|idx| &idx.name)
681            .into_iter()
682            .map(|(_, segments)| {
683                let segments = segments.cloned().collect::<Vec<_>>();
684                let desc = IndexDescriptionImpl::try_new(segments, self)?;
685                Ok(Arc::new(desc) as Arc<dyn IndexDescription>)
686            })
687            .collect::<Result<Vec<_>>>()
688    }
689
690    async fn load_indices(&self) -> Result<Arc<Vec<IndexMetadata>>> {
691        let metadata_key = IndexMetadataKey {
692            version: self.version().version,
693        };
694        let indices = match self.index_cache.get_with_key(&metadata_key).await {
695            Some(indices) => indices,
696            None => {
697                let mut loaded_indices = read_manifest_indexes(
698                    &self.object_store,
699                    &self.manifest_location,
700                    &self.manifest,
701                )
702                .await?;
703                retain_supported_indices(&mut loaded_indices);
704                let loaded_indices = Arc::new(loaded_indices);
705                self.index_cache
706                    .insert_with_key(&metadata_key, loaded_indices.clone())
707                    .await;
708                loaded_indices
709            }
710        };
711
712        if let Some(frag_reuse_index_meta) =
713            indices.iter().find(|idx| idx.name == FRAG_REUSE_INDEX_NAME)
714        {
715            let uuid = frag_reuse_index_meta.uuid.to_string();
716            let fri_key = FragReuseIndexKey { uuid: &uuid };
717            let frag_reuse_index = self
718                .index_cache
719                .get_or_insert_with_key(fri_key, || async move {
720                    let index_details =
721                        load_frag_reuse_index_details(self, frag_reuse_index_meta).await?;
722                    open_frag_reuse_index(frag_reuse_index_meta.uuid, index_details.as_ref()).await
723                })
724                .await?;
725            let mut indices = indices.as_ref().clone();
726            indices.iter_mut().for_each(|idx| {
727                if let Some(bitmap) = idx.fragment_bitmap.as_mut() {
728                    frag_reuse_index.remap_fragment_bitmap(bitmap).unwrap();
729                }
730            });
731            Ok(Arc::new(indices))
732        } else {
733            Ok(indices)
734        }
735    }
736
737    async fn commit_existing_index(
738        &mut self,
739        index_name: &str,
740        column: &str,
741        index_id: Uuid,
742    ) -> Result<()> {
743        let Some(field) = self.schema().field(column) else {
744            return Err(Error::Index {
745                message: format!("CreateIndex: column '{column}' does not exist"),
746                location: location!(),
747            });
748        };
749
750        // TODO: We will need some way to determine the index details here.  Perhaps
751        // we can load the index itself and get the details that way.
752
753        let new_idx = IndexMetadata {
754            uuid: index_id,
755            name: index_name.to_string(),
756            fields: vec![field.id],
757            dataset_version: self.manifest.version,
758            fragment_bitmap: Some(self.get_fragments().iter().map(|f| f.id() as u32).collect()),
759            index_details: None,
760            index_version: 0,
761            created_at: Some(chrono::Utc::now()),
762            base_id: None, // New indices don't have base_id (they're not from shallow clone)
763        };
764
765        let transaction = Transaction::new(
766            self.manifest.version,
767            Operation::CreateIndex {
768                new_indices: vec![new_idx],
769                removed_indices: vec![],
770            },
771            None,
772        );
773
774        self.apply_commit(transaction, &Default::default(), &Default::default())
775            .await?;
776
777        Ok(())
778    }
779
780    async fn load_scalar_index<'a, 'b>(
781        &'a self,
782        criteria: IndexCriteria<'b>,
783    ) -> Result<Option<IndexMetadata>> {
784        let indices = self.load_indices().await?;
785
786        let mut indices = indices
787            .iter()
788            .filter(|idx| {
789                // We shouldn't have any indices with empty fields, but just in case, log an error
790                // but don't fail the operation (we might not be using that index)
791                if idx.fields.is_empty() {
792                    if idx.name != FRAG_REUSE_INDEX_NAME {
793                        log::error!("Index {} has no fields", idx.name);
794                    }
795                    false
796                } else {
797                    true
798                }
799            })
800            .collect::<Vec<_>>();
801        // This sorting & chunking is only needed to calculate if there are multiple indexes on the same
802        // field.  This fact is only needed for backwards compatibility behavior for indexes that don't have
803        // index details.  At some point we should deprecate indexes without index details.
804        //
805        // TODO: At some point we should just fail if the index details are missing and ask the user to
806        // retrain the index.
807        indices.sort_by_key(|idx| idx.fields[0]);
808        let indice_by_field = indices.into_iter().chunk_by(|idx| idx.fields[0]);
809        for (field_id, indices) in &indice_by_field {
810            let indices = indices.collect::<Vec<_>>();
811            let has_multiple = indices.len() > 1;
812            for idx in indices {
813                let field = self.schema().field_by_id(field_id);
814                if let Some(field) = field {
815                    if index_matches_criteria(
816                        idx,
817                        &criteria,
818                        &[field],
819                        has_multiple,
820                        self.schema(),
821                    )? {
822                        let non_empty = idx.fragment_bitmap.as_ref().is_some_and(|bitmap| {
823                            bitmap.intersection_len(self.fragment_bitmap.as_ref()) > 0
824                        });
825                        let is_fts_index = if let Some(details) = &idx.index_details {
826                            IndexDetails(details.clone()).supports_fts()
827                        } else {
828                            false
829                        };
830                        // FTS indices must always be returned even if empty, because FTS queries
831                        // require an index to exist. The query execution will handle the empty
832                        // bitmap appropriately and fall back to scanning unindexed data.
833                        // Other index types can be skipped if empty since they're optional optimizations.
834                        if non_empty || is_fts_index {
835                            return Ok(Some(idx.clone()));
836                        }
837                    }
838                }
839            }
840        }
841        return Ok(None);
842    }
843
844    #[instrument(skip_all)]
845    async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()> {
846        let dataset = Arc::new(self.clone());
847        let indices = self.load_indices().await?;
848
849        let indices_to_optimize = options
850            .index_names
851            .as_ref()
852            .map(|names| names.iter().collect::<HashSet<_>>());
853        let name_to_indices = indices
854            .iter()
855            .filter(|idx| {
856                indices_to_optimize
857                    .as_ref()
858                    .is_none_or(|names| names.contains(&idx.name))
859                    && !is_system_index(idx)
860            })
861            .map(|idx| (idx.name.clone(), idx))
862            .into_group_map();
863
864        let mut new_indices = vec![];
865        let mut removed_indices = vec![];
866        for deltas in name_to_indices.values() {
867            let Some(res) = merge_indices(dataset.clone(), deltas.as_slice(), options).await?
868            else {
869                continue;
870            };
871
872            let last_idx = deltas.last().expect("Delta indices should not be empty");
873            let new_idx = IndexMetadata {
874                uuid: res.new_uuid,
875                name: last_idx.name.clone(), // Keep the same name
876                fields: last_idx.fields.clone(),
877                dataset_version: self.manifest.version,
878                fragment_bitmap: Some(res.new_fragment_bitmap),
879                index_details: Some(Arc::new(res.new_index_details)),
880                index_version: res.new_index_version,
881                created_at: Some(chrono::Utc::now()),
882                base_id: None, // Mew merged index file locates in the cloned dataset.
883            };
884            removed_indices.extend(res.removed_indices.iter().map(|&idx| idx.clone()));
885            if deltas.len() > res.removed_indices.len() {
886                new_indices.extend(
887                    deltas[0..(deltas.len() - res.removed_indices.len())]
888                        .iter()
889                        .map(|&idx| idx.clone()),
890                );
891            }
892            new_indices.push(new_idx);
893        }
894
895        if new_indices.is_empty() {
896            return Ok(());
897        }
898
899        let transaction = Transaction::new(
900            self.manifest.version,
901            Operation::CreateIndex {
902                new_indices,
903                removed_indices,
904            },
905            None,
906        );
907
908        self.apply_commit(transaction, &Default::default(), &Default::default())
909            .await?;
910
911        Ok(())
912    }
913
914    async fn index_statistics(&self, index_name: &str) -> Result<String> {
915        let metadatas = self.load_indices_by_name(index_name).await?;
916        if metadatas.is_empty() {
917            return Err(Error::IndexNotFound {
918                identity: format!("name={}", index_name),
919                location: location!(),
920            });
921        }
922
923        if index_name == FRAG_REUSE_INDEX_NAME {
924            return index_statistics_frag_reuse(self).boxed().await;
925        }
926
927        if index_name == MEM_WAL_INDEX_NAME {
928            return index_statistics_mem_wal(self).boxed().await;
929        }
930
931        index_statistics_scalar(self, index_name, metadatas)
932            .boxed()
933            .await
934    }
935
936    async fn read_index_partition(
937        &self,
938        index_name: &str,
939        partition_id: usize,
940        with_vector: bool,
941    ) -> Result<SendableRecordBatchStream> {
942        let indices = self.load_indices_by_name(index_name).await?;
943        if indices.is_empty() {
944            return Err(Error::IndexNotFound {
945                identity: format!("name={}", index_name),
946                location: location!(),
947            });
948        }
949        let column = self.schema().field_by_id(indices[0].fields[0]).unwrap();
950
951        let mut schema: Option<Arc<Schema>> = None;
952        let mut partition_streams = Vec::with_capacity(indices.len());
953        for index in indices {
954            let index = self
955                .open_vector_index(&column.name, &index.uuid.to_string(), &NoOpMetricsCollector)
956                .await?;
957
958            let stream = index
959                .partition_reader(partition_id, with_vector, &NoOpMetricsCollector)
960                .await?;
961            if schema.is_none() {
962                schema = Some(stream.schema());
963            }
964            partition_streams.push(stream);
965        }
966
967        match schema {
968            Some(schema) => {
969                let merged = stream::select_all(partition_streams);
970                let stream = RecordBatchStreamAdapter::new(schema, merged);
971                Ok(Box::pin(stream))
972            }
973            None => Ok(Box::pin(RecordBatchStreamAdapter::new(
974                Arc::new(Schema::empty()),
975                stream::empty(),
976            ))),
977        }
978    }
979}
980
981fn sum_indexed_rows_per_delta(indexed_fragments_per_delta: &[Vec<Fragment>]) -> Result<Vec<usize>> {
982    let mut rows_per_delta = Vec::with_capacity(indexed_fragments_per_delta.len());
983    for frags in indexed_fragments_per_delta {
984        let mut sum = 0usize;
985        for frag in frags {
986            sum += frag.num_rows().ok_or_else(|| Error::Internal {
987                message: "Fragment should have row counts, please upgrade lance and \
988                                      trigger a single write to fix this"
989                    .to_string(),
990                location: location!(),
991            })?;
992        }
993        rows_per_delta.push(sum);
994    }
995    Ok(rows_per_delta)
996}
997
998fn unique_indexed_fragment_count(indexed_fragments_per_delta: &[Vec<Fragment>]) -> Option<usize> {
999    let mut fragment_ids = HashSet::new();
1000    for frags in indexed_fragments_per_delta {
1001        for frag in frags {
1002            if !fragment_ids.insert(frag.id) {
1003                return None;
1004            }
1005        }
1006    }
1007    Some(fragment_ids.len())
1008}
1009
1010fn serialize_index_statistics(stats: &serde_json::Value) -> Result<String> {
1011    serde_json::to_string(stats).map_err(|e| Error::Index {
1012        message: format!("Failed to serialize index statistics: {}", e),
1013        location: location!(),
1014    })
1015}
1016
1017async fn migrate_and_recompute_index_statistics(ds: &Dataset, index_name: &str) -> Result<String> {
1018    let mut ds = ds.clone();
1019    log::warn!(
1020        "Detecting out-dated fragment metadata, migrating dataset. \
1021                        To disable migration, set LANCE_AUTO_MIGRATION=false"
1022    );
1023    ds.delete("false").await.map_err(|err| Error::Execution {
1024        message: format!(
1025            "Failed to migrate dataset while calculating index statistics. \
1026                            To disable migration, set LANCE_AUTO_MIGRATION=false. Original error: {}",
1027            err
1028        ),
1029        location: location!(),
1030    })?;
1031    ds.index_statistics(index_name).await
1032}
1033
1034async fn index_statistics_frag_reuse(ds: &Dataset) -> Result<String> {
1035    let index = ds
1036        .open_frag_reuse_index(&NoOpMetricsCollector)
1037        .await?
1038        .expect("FragmentReuse index does not exist");
1039    serialize_index_statistics(&index.statistics()?)
1040}
1041
1042async fn index_statistics_mem_wal(ds: &Dataset) -> Result<String> {
1043    let index = ds
1044        .open_mem_wal_index(&NoOpMetricsCollector)
1045        .await?
1046        .expect("MemWal index does not exist");
1047    serialize_index_statistics(&index.statistics()?)
1048}
1049
1050async fn index_statistics_scalar(
1051    ds: &Dataset,
1052    index_name: &str,
1053    metadatas: Vec<IndexMetadata>,
1054) -> Result<String> {
1055    let field_id = metadatas[0].fields[0];
1056    let field_path = ds.schema().field_path(field_id)?;
1057
1058    let (indices_stats, index_uri, num_indices, updated_at) =
1059        collect_regular_indices_statistics(ds, metadatas, &field_path).await?;
1060
1061    let index_type_hint = indices_stats
1062        .first()
1063        .and_then(|stats| stats.get("index_type"))
1064        .and_then(|v| v.as_str());
1065    let index_type = legacy_type_name(&index_uri, index_type_hint);
1066
1067    let Some((
1068        num_indexed_rows_per_delta,
1069        num_indexed_fragments,
1070        num_unindexed_fragments,
1071        num_indexed_rows,
1072        num_unindexed_rows,
1073    )) = gather_fragment_statistics(ds, index_name).await?
1074    else {
1075        return migrate_and_recompute_index_statistics(ds, index_name).await;
1076    };
1077
1078    let stats = json!({
1079        "index_type": index_type,
1080        "name": index_name,
1081        "num_indices": num_indices,
1082        "indices": indices_stats,
1083        "num_indexed_fragments": num_indexed_fragments,
1084        "num_indexed_rows": num_indexed_rows,
1085        "num_unindexed_fragments": num_unindexed_fragments,
1086        "num_unindexed_rows": num_unindexed_rows,
1087        "num_indexed_rows_per_delta": num_indexed_rows_per_delta,
1088        "updated_at_timestamp_ms": updated_at,
1089    });
1090
1091    serialize_index_statistics(&stats)
1092}
1093
1094async fn collect_regular_indices_statistics(
1095    ds: &Dataset,
1096    metadatas: Vec<IndexMetadata>,
1097    field_path: &str,
1098) -> Result<(Vec<serde_json::Value>, String, usize, Option<u64>)> {
1099    let num_indices = metadatas.len();
1100    let updated_at = metadatas
1101        .iter()
1102        .filter_map(|m| m.created_at)
1103        .max()
1104        .map(|dt| dt.timestamp_millis() as u64);
1105
1106    let mut indices_stats = Vec::with_capacity(num_indices);
1107    let mut index_uri: Option<String> = None;
1108
1109    for meta in metadatas.iter() {
1110        let index_store = Arc::new(LanceIndexStore::from_dataset_for_existing(ds, meta)?);
1111        let index_details = scalar::fetch_index_details(ds, field_path, meta).await?;
1112        if index_uri.is_none() {
1113            index_uri = Some(index_details.type_url.clone());
1114        }
1115
1116        let index_details_wrapper = scalar::IndexDetails(index_details.clone());
1117        if let Ok(plugin) = index_details_wrapper.get_plugin() {
1118            if let Some(stats) = plugin
1119                .load_statistics(index_store.clone(), index_details.as_ref())
1120                .await?
1121            {
1122                indices_stats.push(stats);
1123                continue;
1124            }
1125        }
1126
1127        let index = ds
1128            .open_generic_index(field_path, &meta.uuid.to_string(), &NoOpMetricsCollector)
1129            .await?;
1130
1131        indices_stats.push(index.statistics()?);
1132    }
1133
1134    Ok((
1135        indices_stats,
1136        index_uri.unwrap_or_else(|| "unknown".to_string()),
1137        num_indices,
1138        updated_at,
1139    ))
1140}
1141
1142async fn gather_fragment_statistics(
1143    ds: &Dataset,
1144    index_name: &str,
1145) -> Result<Option<(Vec<usize>, usize, usize, usize, usize)>> {
1146    let indexed_fragments_per_delta = ds.indexed_fragments(index_name).await?;
1147
1148    let num_indexed_rows_per_delta = match sum_indexed_rows_per_delta(&indexed_fragments_per_delta)
1149    {
1150        Ok(rows) => rows,
1151        Err(Error::Internal { message, .. })
1152            if auto_migrate_corruption() && message.contains("trigger a single write") =>
1153        {
1154            return Ok(None);
1155        }
1156        Err(e) => return Err(e),
1157    };
1158
1159    let Some(num_indexed_fragments) = unique_indexed_fragment_count(&indexed_fragments_per_delta)
1160    else {
1161        if auto_migrate_corruption() {
1162            return Ok(None);
1163        }
1164        return Err(Error::Internal {
1165            message: "Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \
1166                                  and trigger a single write to fix this"
1167                .to_string(),
1168            location: location!(),
1169        });
1170    };
1171
1172    let num_unindexed_fragments = ds.fragments().len() - num_indexed_fragments;
1173    let num_indexed_rows: usize = num_indexed_rows_per_delta.iter().sum();
1174
1175    drop(indexed_fragments_per_delta);
1176    let total_rows = ds.count_rows(None).await?;
1177    let num_unindexed_rows = total_rows - num_indexed_rows;
1178
1179    Ok(Some((
1180        num_indexed_rows_per_delta,
1181        num_indexed_fragments,
1182        num_unindexed_fragments,
1183        num_indexed_rows,
1184        num_unindexed_rows,
1185    )))
1186}
1187
1188pub(crate) fn retain_supported_indices(indices: &mut Vec<IndexMetadata>) {
1189    indices.retain(|idx| {
1190        let max_supported_version = idx
1191            .index_details
1192            .as_ref()
1193            .map(|details| {
1194                IndexDetails(details.clone())
1195                    .index_version()
1196                    // If we don't know how to read the index, it isn't supported
1197                    .unwrap_or(i32::MAX as u32)
1198            })
1199            .unwrap_or_default();
1200        let is_valid = idx.index_version <= max_supported_version as i32;
1201        if !is_valid {
1202            log::warn!(
1203                "Index {} has version {}, which is not supported (<={}), ignoring it",
1204                idx.name,
1205                idx.index_version,
1206                max_supported_version,
1207            );
1208        }
1209        is_valid
1210    })
1211}
1212
1213/// A trait for internal dataset utilities
1214///
1215/// Internal use only. No API stability guarantees.
1216#[async_trait]
1217pub trait DatasetIndexInternalExt: DatasetIndexExt {
1218    /// Opens an index (scalar or vector) as a generic index
1219    async fn open_generic_index(
1220        &self,
1221        column: &str,
1222        uuid: &str,
1223        metrics: &dyn MetricsCollector,
1224    ) -> Result<Arc<dyn Index>>;
1225    /// Opens the requested scalar index
1226    async fn open_scalar_index(
1227        &self,
1228        column: &str,
1229        uuid: &str,
1230        metrics: &dyn MetricsCollector,
1231    ) -> Result<Arc<dyn ScalarIndex>>;
1232    /// Opens the requested vector index
1233    async fn open_vector_index(
1234        &self,
1235        column: &str,
1236        uuid: &str,
1237        metrics: &dyn MetricsCollector,
1238    ) -> Result<Arc<dyn VectorIndex>>;
1239
1240    /// Opens the fragment reuse index
1241    async fn open_frag_reuse_index(
1242        &self,
1243        metrics: &dyn MetricsCollector,
1244    ) -> Result<Option<Arc<FragReuseIndex>>>;
1245
1246    /// Opens the MemWAL index
1247    async fn open_mem_wal_index(
1248        &self,
1249        metrics: &dyn MetricsCollector,
1250    ) -> Result<Option<Arc<MemWalIndex>>>;
1251
1252    /// Gets the fragment reuse index UUID from the current manifest, if it exists
1253    async fn frag_reuse_index_uuid(&self) -> Option<Uuid>;
1254
1255    /// Loads information about all the available scalar indices on the dataset
1256    async fn scalar_index_info(&self) -> Result<ScalarIndexInfo>;
1257
1258    /// Return the fragments that are not covered by any of the deltas of the index.
1259    async fn unindexed_fragments(&self, idx_name: &str) -> Result<Vec<Fragment>>;
1260
1261    /// Return the fragments that are covered by each of the deltas of the index.
1262    async fn indexed_fragments(&self, idx_name: &str) -> Result<Vec<Vec<Fragment>>>;
1263
1264    /// Initialize a specific index on this dataset based on an index from a source dataset.
1265    async fn initialize_index(&mut self, source_dataset: &Dataset, index_name: &str) -> Result<()>;
1266
1267    /// Initialize all indices on this dataset based on indices from a source dataset.
1268    /// This will call `initialize_index` for each non-system index in the source dataset.
1269    async fn initialize_indices(&mut self, source_dataset: &Dataset) -> Result<()>;
1270}
1271
1272#[async_trait]
1273impl DatasetIndexInternalExt for Dataset {
1274    async fn open_generic_index(
1275        &self,
1276        column: &str,
1277        uuid: &str,
1278        metrics: &dyn MetricsCollector,
1279    ) -> Result<Arc<dyn Index>> {
1280        // Checking for cache existence is cheap so we just check both scalar and vector caches
1281        let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1282        let cache_key = ScalarIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1283        if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
1284            return Ok(index.as_index());
1285        }
1286
1287        let vector_cache_key = VectorIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1288        if let Some(index) = self
1289            .index_cache
1290            .get_unsized_with_key(&vector_cache_key)
1291            .await
1292        {
1293            return Ok(index.as_index());
1294        }
1295
1296        let frag_reuse_cache_key = FragReuseIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1297        if let Some(index) = self.index_cache.get_with_key(&frag_reuse_cache_key).await {
1298            return Ok(index.as_index());
1299        }
1300
1301        // Sometimes we want to open an index and we don't care if it is a scalar or vector index.
1302        // For example, we might want to get statistics for an index, regardless of type.
1303        //
1304        // Currently, we solve this problem by checking for the existence of INDEX_FILE_NAME since
1305        // only vector indices have this file.  In the future, once we support multiple kinds of
1306        // scalar indices, we may start having this file with scalar indices too.  Once that happens
1307        // we can just read this file and look at the `implementation` or `index_type` fields to
1308        // determine what kind of index it is.
1309        let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1310            message: format!("Index with id {} does not exist", uuid),
1311            location: location!(),
1312        })?;
1313        let index_dir = self.indice_files_dir(&index_meta)?;
1314        let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME);
1315        if self.object_store.exists(&index_file).await? {
1316            let index = self.open_vector_index(column, uuid, metrics).await?;
1317            Ok(index.as_index())
1318        } else {
1319            let index = self.open_scalar_index(column, uuid, metrics).await?;
1320            Ok(index.as_index())
1321        }
1322    }
1323
1324    #[instrument(level = "debug", skip_all)]
1325    async fn open_scalar_index(
1326        &self,
1327        column: &str,
1328        uuid: &str,
1329        metrics: &dyn MetricsCollector,
1330    ) -> Result<Arc<dyn ScalarIndex>> {
1331        let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1332        let cache_key = ScalarIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1333        if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
1334            return Ok(index);
1335        }
1336
1337        let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1338            message: format!("Index with id {} does not exist", uuid),
1339            location: location!(),
1340        })?;
1341
1342        let index = scalar::open_scalar_index(self, column, &index_meta, metrics).await?;
1343
1344        info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_SCALAR, index_type=index.index_type().to_string());
1345        metrics.record_index_load();
1346
1347        self.index_cache
1348            .insert_unsized_with_key(&cache_key, index.clone())
1349            .await;
1350        Ok(index)
1351    }
1352
1353    async fn open_vector_index(
1354        &self,
1355        column: &str,
1356        uuid: &str,
1357        metrics: &dyn MetricsCollector,
1358    ) -> Result<Arc<dyn VectorIndex>> {
1359        let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1360        let cache_key = VectorIndexCacheKey::new(uuid, frag_reuse_uuid.as_ref());
1361
1362        if let Some(index) = self.index_cache.get_unsized_with_key(&cache_key).await {
1363            log::debug!("Found vector index in cache uuid: {}", uuid);
1364            return Ok(index);
1365        }
1366
1367        let frag_reuse_index = self.open_frag_reuse_index(metrics).await?;
1368        let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index {
1369            message: format!("Index with id {} does not exist", uuid),
1370            location: location!(),
1371        })?;
1372        let index_dir = self.indice_files_dir(&index_meta)?;
1373        let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME);
1374        let reader: Arc<dyn Reader> = self.object_store.open(&index_file).await?.into();
1375
1376        let tailing_bytes = read_last_block(reader.as_ref()).await?;
1377        let (major_version, minor_version) = read_version(&tailing_bytes)?;
1378
1379        // Namespace the index cache by the UUID of the index.
1380        let index_cache = self.index_cache.with_key_prefix(&cache_key.key());
1381
1382        // the index file is in lance format since version (0,2)
1383        // TODO: we need to change the legacy IVF_PQ to be in lance format
1384        let index = match (major_version, minor_version) {
1385            (0, 1) | (0, 0) => {
1386                info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.1", index_type="IVF_PQ");
1387                let proto = open_index_proto(reader.as_ref()).await?;
1388                match &proto.implementation {
1389                    Some(Implementation::VectorIndex(vector_index)) => {
1390                        let dataset = Arc::new(self.clone());
1391                        vector::open_vector_index(
1392                            dataset,
1393                            uuid,
1394                            vector_index,
1395                            reader,
1396                            frag_reuse_index,
1397                        )
1398                        .await
1399                    }
1400                    None => Err(Error::Internal {
1401                        message: "Index proto was missing implementation field".into(),
1402                        location: location!(),
1403                    }),
1404                }
1405            }
1406
1407            (0, 2) => {
1408                info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.2", index_type="IVF_PQ");
1409                let reader = PreviousFileReader::try_new_self_described_from_reader(
1410                    reader.clone(),
1411                    Some(&self.metadata_cache.file_metadata_cache(&index_file)),
1412                )
1413                .await?;
1414                vector::open_vector_index_v2(
1415                    Arc::new(self.clone()),
1416                    column,
1417                    uuid,
1418                    reader,
1419                    frag_reuse_index,
1420                )
1421                .await
1422            }
1423
1424            (0, 3) | (2, _) => {
1425                let scheduler = ScanScheduler::new(
1426                    self.object_store.clone(),
1427                    SchedulerConfig::max_bandwidth(&self.object_store),
1428                );
1429                let file = scheduler
1430                    .open_file(&index_file, &CachedFileSize::unknown())
1431                    .await?;
1432                let reader = lance_file::reader::FileReader::try_open(
1433                    file,
1434                    None,
1435                    Default::default(),
1436                    &self.metadata_cache.file_metadata_cache(&index_file),
1437                    FileReaderOptions::default(),
1438                )
1439                .await?;
1440                let index_metadata = reader
1441                    .schema()
1442                    .metadata
1443                    .get(INDEX_METADATA_SCHEMA_KEY)
1444                    .ok_or(Error::Index {
1445                        message: "Index Metadata not found".to_owned(),
1446                        location: location!(),
1447                    })?;
1448                let index_metadata: lance_index::IndexMetadata =
1449                    serde_json::from_str(index_metadata)?;
1450
1451                // Resolve the column name and field
1452                let (field_path, field) = resolve_index_column(self.schema(), &index_meta, column)?;
1453
1454                let (_, element_type) = get_vector_type(self.schema(), &field_path)?;
1455
1456                info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_VECTOR, version="0.3", index_type=index_metadata.index_type);
1457
1458                match index_metadata.index_type.as_str() {
1459                    "IVF_FLAT" => match element_type {
1460                        DataType::Float16 | DataType::Float32 | DataType::Float64 => {
1461                            let ivf = IVFIndex::<FlatIndex, FlatQuantizer>::try_new(
1462                                self.object_store.clone(),
1463                                index_dir,
1464                                uuid.to_owned(),
1465                                frag_reuse_index,
1466                                self.metadata_cache.as_ref(),
1467                                index_cache,
1468                            )
1469                            .await?;
1470                            Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1471                        }
1472                        DataType::UInt8 => {
1473                            let ivf = IVFIndex::<FlatIndex, FlatBinQuantizer>::try_new(
1474                                self.object_store.clone(),
1475                                index_dir,
1476                                uuid.to_owned(),
1477                                frag_reuse_index,
1478                                self.metadata_cache.as_ref(),
1479                                index_cache,
1480                            )
1481                            .await?;
1482                            Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1483                        }
1484                        _ => Err(Error::Index {
1485                            message: format!(
1486                                "the field type {} is not supported for FLAT index",
1487                                field.data_type()
1488                            ),
1489                            location: location!(),
1490                        }),
1491                    },
1492
1493                    "IVF_PQ" => {
1494                        let ivf = IVFIndex::<FlatIndex, ProductQuantizer>::try_new(
1495                            self.object_store.clone(),
1496                            index_dir,
1497                            uuid.to_owned(),
1498                            frag_reuse_index,
1499                            self.metadata_cache.as_ref(),
1500                            index_cache,
1501                        )
1502                        .await?;
1503                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1504                    }
1505
1506                    "IVF_SQ" => {
1507                        let ivf = IVFIndex::<FlatIndex, ScalarQuantizer>::try_new(
1508                            self.object_store.clone(),
1509                            index_dir,
1510                            uuid.to_owned(),
1511                            frag_reuse_index,
1512                            self.metadata_cache.as_ref(),
1513                            index_cache,
1514                        )
1515                        .await?;
1516                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1517                    }
1518
1519                    "IVF_RQ" => {
1520                        let ivf = IVFIndex::<FlatIndex, RabitQuantizer>::try_new(
1521                            self.object_store.clone(),
1522                            self.indices_dir(),
1523                            uuid.to_owned(),
1524                            frag_reuse_index,
1525                            self.metadata_cache.as_ref(),
1526                            index_cache,
1527                        )
1528                        .await?;
1529                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1530                    }
1531
1532                    "IVF_HNSW_FLAT" => {
1533                        let uri = index_dir.child(uuid).child("index.pb");
1534                        let file_metadata_cache =
1535                            self.session.metadata_cache.file_metadata_cache(&uri);
1536                        let ivf = IVFIndex::<HNSW, FlatQuantizer>::try_new(
1537                            self.object_store.clone(),
1538                            index_dir,
1539                            uuid.to_owned(),
1540                            frag_reuse_index,
1541                            &file_metadata_cache,
1542                            index_cache,
1543                        )
1544                        .await?;
1545                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1546                    }
1547
1548                    "IVF_HNSW_SQ" => {
1549                        let ivf = IVFIndex::<HNSW, ScalarQuantizer>::try_new(
1550                            self.object_store.clone(),
1551                            index_dir,
1552                            uuid.to_owned(),
1553                            frag_reuse_index,
1554                            self.metadata_cache.as_ref(),
1555                            index_cache,
1556                        )
1557                        .await?;
1558                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1559                    }
1560
1561                    "IVF_HNSW_PQ" => {
1562                        let ivf = IVFIndex::<HNSW, ProductQuantizer>::try_new(
1563                            self.object_store.clone(),
1564                            index_dir,
1565                            uuid.to_owned(),
1566                            frag_reuse_index,
1567                            self.metadata_cache.as_ref(),
1568                            index_cache,
1569                        )
1570                        .await?;
1571                        Ok(Arc::new(ivf) as Arc<dyn VectorIndex>)
1572                    }
1573
1574                    _ => Err(Error::Index {
1575                        message: format!("Unsupported index type: {}", index_metadata.index_type),
1576                        location: location!(),
1577                    }),
1578                }
1579            }
1580
1581            _ => Err(Error::Index {
1582                message: "unsupported index version (maybe need to upgrade your lance version)"
1583                    .to_owned(),
1584                location: location!(),
1585            }),
1586        };
1587        let index = index?;
1588        metrics.record_index_load();
1589        self.index_cache
1590            .insert_unsized_with_key(&cache_key, index.clone())
1591            .await;
1592        Ok(index)
1593    }
1594
1595    async fn open_frag_reuse_index(
1596        &self,
1597        metrics: &dyn MetricsCollector,
1598    ) -> Result<Option<Arc<FragReuseIndex>>> {
1599        if let Some(frag_reuse_index_meta) = self.load_index_by_name(FRAG_REUSE_INDEX_NAME).await? {
1600            let uuid = frag_reuse_index_meta.uuid.to_string();
1601            let frag_reuse_key = FragReuseIndexKey { uuid: &uuid };
1602            let uuid_clone = uuid.clone();
1603
1604            let index = self
1605                .index_cache
1606                .get_or_insert_with_key(frag_reuse_key, || async move {
1607                    let index_meta = self.load_index(&uuid_clone).await?.ok_or_else(|| Error::Index {
1608                        message: format!("Index with id {} does not exist", uuid_clone),
1609                        location: location!(),
1610                    })?;
1611                    let index_details = load_frag_reuse_index_details(self, &index_meta).await?;
1612                    let index =
1613                        open_frag_reuse_index(frag_reuse_index_meta.uuid, index_details.as_ref()).await?;
1614
1615                    info!(target: TRACE_IO_EVENTS, index_uuid=uuid_clone, r#type=IO_TYPE_OPEN_FRAG_REUSE);
1616                    metrics.record_index_load();
1617
1618                    Ok(index)
1619                })
1620                .await?;
1621
1622            Ok(Some(index))
1623        } else {
1624            Ok(None)
1625        }
1626    }
1627
1628    async fn open_mem_wal_index(
1629        &self,
1630        metrics: &dyn MetricsCollector,
1631    ) -> Result<Option<Arc<MemWalIndex>>> {
1632        let Some(mem_wal_meta) = self.load_index_by_name(MEM_WAL_INDEX_NAME).await? else {
1633            return Ok(None);
1634        };
1635
1636        let frag_reuse_uuid = self.frag_reuse_index_uuid().await;
1637        let cache_key = MemWalCacheKey::new(&mem_wal_meta.uuid, frag_reuse_uuid.as_ref());
1638        if let Some(index) = self.index_cache.get_with_key(&cache_key).await {
1639            log::debug!("Found MemWAL index in cache uuid: {}", mem_wal_meta.uuid);
1640            return Ok(Some(index));
1641        }
1642
1643        let uuid = mem_wal_meta.uuid.to_string();
1644
1645        let index_meta = self.load_index(&uuid).await?.ok_or_else(|| Error::Index {
1646            message: format!("Index with id {} does not exist", uuid),
1647            location: location!(),
1648        })?;
1649        let index = open_mem_wal_index(index_meta)?;
1650
1651        info!(target: TRACE_IO_EVENTS, index_uuid=uuid, r#type=IO_TYPE_OPEN_MEM_WAL);
1652        metrics.record_index_load();
1653
1654        self.index_cache
1655            .insert_with_key(&cache_key, index.clone())
1656            .await;
1657        Ok(Some(index))
1658    }
1659
1660    async fn frag_reuse_index_uuid(&self) -> Option<Uuid> {
1661        if let Ok(indices) = self.load_indices().await {
1662            indices
1663                .iter()
1664                .find(|idx| idx.name == FRAG_REUSE_INDEX_NAME)
1665                .map(|idx| idx.uuid)
1666        } else {
1667            None
1668        }
1669    }
1670
1671    #[instrument(level = "trace", skip_all)]
1672    async fn scalar_index_info(&self) -> Result<ScalarIndexInfo> {
1673        let indices = self.load_indices().await?;
1674        let schema = self.schema();
1675        let mut indexed_fields = Vec::new();
1676        for index in indices.iter().filter(|idx| {
1677            let idx_schema = schema.project_by_ids(idx.fields.as_slice(), true);
1678            let is_vector_index = idx_schema
1679                .fields
1680                .iter()
1681                .any(|f| is_vector_field(f.data_type()));
1682
1683            // Check if this is an FTS index by looking at index details
1684            let is_fts_index = if let Some(details) = &idx.index_details {
1685                IndexDetails(details.clone()).supports_fts()
1686            } else {
1687                false
1688            };
1689
1690            // Only include indices with non-empty fragment bitmaps, except for FTS indices
1691            // which need to be discoverable even when empty
1692            let has_non_empty_bitmap = idx.fragment_bitmap.as_ref().is_some_and(|bitmap| {
1693                !bitmap.is_empty() && !(bitmap & self.fragment_bitmap.as_ref()).is_empty()
1694            });
1695
1696            idx.fields.len() == 1 && !is_vector_index && (has_non_empty_bitmap || is_fts_index)
1697        }) {
1698            let field = index.fields[0];
1699            let field = schema.field_by_id(field).ok_or_else(|| Error::Internal {
1700                message: format!(
1701                    "Index referenced a field with id {field} which did not exist in the schema"
1702                ),
1703                location: location!(),
1704            })?;
1705
1706            // Build the full field path for nested fields
1707            let field_path = if let Some(ancestors) = schema.field_ancestry_by_id(field.id) {
1708                let field_refs: Vec<&str> = ancestors.iter().map(|f| f.name.as_str()).collect();
1709                lance_core::datatypes::format_field_path(&field_refs)
1710            } else {
1711                field.name.clone()
1712            };
1713
1714            let index_details = IndexDetails(fetch_index_details(self, &field_path, index).await?);
1715            if index_details.is_vector() {
1716                continue;
1717            }
1718
1719            let plugin = index_details.get_plugin()?;
1720            let query_parser = plugin.new_query_parser(index.name.clone(), &index_details.0);
1721
1722            if let Some(query_parser) = query_parser {
1723                indexed_fields.push((field_path, (field.data_type(), query_parser)));
1724            }
1725        }
1726        let mut index_info_map = HashMap::with_capacity(indexed_fields.len());
1727        for indexed_field in indexed_fields {
1728            // Need to wrap in an option here because we know that only one of and_modify and or_insert will be called
1729            // but the rust compiler does not.
1730            let mut parser = Some(indexed_field.1 .1);
1731            let parser = &mut parser;
1732            index_info_map
1733                .entry(indexed_field.0)
1734                .and_modify(|existing: &mut (DataType, Box<MultiQueryParser>)| {
1735                    // If there are two indices on the same column, they must have the same type
1736                    debug_assert_eq!(existing.0, indexed_field.1 .0);
1737
1738                    existing.1.add(parser.take().unwrap());
1739                })
1740                .or_insert_with(|| {
1741                    (
1742                        indexed_field.1 .0,
1743                        Box::new(MultiQueryParser::single(parser.take().unwrap())),
1744                    )
1745                });
1746        }
1747        Ok(ScalarIndexInfo {
1748            indexed_columns: index_info_map,
1749        })
1750    }
1751
1752    async fn unindexed_fragments(&self, name: &str) -> Result<Vec<Fragment>> {
1753        let indices = self.load_indices_by_name(name).await?;
1754        let mut total_fragment_bitmap = RoaringBitmap::new();
1755        for idx in indices.iter() {
1756            total_fragment_bitmap |= idx.fragment_bitmap.as_ref().ok_or(Error::Index {
1757                message: "Please upgrade lance to 0.8+ to use this function".to_string(),
1758                location: location!(),
1759            })?;
1760        }
1761        Ok(self
1762            .fragments()
1763            .iter()
1764            .filter(|f| !total_fragment_bitmap.contains(f.id as u32))
1765            .cloned()
1766            .collect())
1767    }
1768
1769    async fn indexed_fragments(&self, name: &str) -> Result<Vec<Vec<Fragment>>> {
1770        let indices = self.load_indices_by_name(name).await?;
1771        indices
1772            .iter()
1773            .map(|index| {
1774                let fragment_bitmap = index.fragment_bitmap.as_ref().ok_or(Error::Index {
1775                    message: "Please upgrade lance to 0.8+ to use this function".to_string(),
1776                    location: location!(),
1777                })?;
1778                let mut indexed_frags = Vec::with_capacity(fragment_bitmap.len() as usize);
1779                for frag in self.fragments().iter() {
1780                    if fragment_bitmap.contains(frag.id as u32) {
1781                        indexed_frags.push(frag.clone());
1782                    }
1783                }
1784                Ok(indexed_frags)
1785            })
1786            .collect()
1787    }
1788
1789    async fn initialize_index(&mut self, source_dataset: &Dataset, index_name: &str) -> Result<()> {
1790        let source_indices = source_dataset.load_indices_by_name(index_name).await?;
1791
1792        if source_indices.is_empty() {
1793            return Err(Error::Index {
1794                message: format!("Index '{}' not found in source dataset", index_name),
1795                location: location!(),
1796            });
1797        }
1798
1799        let source_index = source_indices
1800            .iter()
1801            .min_by_key(|idx| idx.created_at)
1802            .ok_or_else(|| Error::Index {
1803                message: format!("Could not determine oldest index for '{}'", index_name),
1804                location: location!(),
1805            })?;
1806
1807        let mut field_names = Vec::new();
1808        for field_id in source_index.fields.iter() {
1809            let source_field = source_dataset
1810                .schema()
1811                .field_by_id(*field_id)
1812                .ok_or_else(|| Error::Index {
1813                    message: format!("Field with id {} not found in source dataset", field_id),
1814                    location: location!(),
1815                })?;
1816
1817            let target_field =
1818                self.schema()
1819                    .field(&source_field.name)
1820                    .ok_or_else(|| Error::Index {
1821                        message: format!(
1822                            "Field '{}' required by index '{}' not found in target dataset",
1823                            source_field.name, index_name
1824                        ),
1825                        location: location!(),
1826                    })?;
1827
1828            if source_field.data_type() != target_field.data_type() {
1829                return Err(Error::Index {
1830                    message: format!(
1831                        "Field '{}' has different types in source ({:?}) and target ({:?}) datasets",
1832                        source_field.name,
1833                        source_field.data_type(),
1834                        target_field.data_type()
1835                    ),
1836                    location: location!(),
1837                });
1838            }
1839
1840            field_names.push(source_field.name.as_str());
1841        }
1842
1843        if field_names.is_empty() {
1844            return Err(Error::Index {
1845                message: format!("Index '{}' has no fields", index_name),
1846                location: location!(),
1847            });
1848        }
1849
1850        if let Some(index_details) = &source_index.index_details {
1851            let index_details_wrapper = IndexDetails(index_details.clone());
1852
1853            if index_details_wrapper.is_vector() {
1854                vector::initialize_vector_index(self, source_dataset, source_index, &field_names)
1855                    .await?;
1856            } else {
1857                scalar::initialize_scalar_index(self, source_dataset, source_index, &field_names)
1858                    .await?;
1859            }
1860        } else {
1861            log::warn!(
1862                "Index '{}' has no index_details, skipping",
1863                source_index.name
1864            );
1865        }
1866
1867        Ok(())
1868    }
1869
1870    async fn initialize_indices(&mut self, source_dataset: &Dataset) -> Result<()> {
1871        let source_indices = source_dataset.load_indices().await?;
1872        let non_system_indices: Vec<_> = source_indices
1873            .iter()
1874            .filter(|idx| !lance_index::is_system_index(idx))
1875            .collect();
1876
1877        if non_system_indices.is_empty() {
1878            return Ok(());
1879        }
1880
1881        let mut unique_index_names = HashSet::new();
1882        for index in non_system_indices.iter() {
1883            unique_index_names.insert(index.name.clone());
1884        }
1885
1886        for index_name in unique_index_names {
1887            self.initialize_index(source_dataset, &index_name).await?;
1888        }
1889
1890        Ok(())
1891    }
1892}
1893
1894/// Resolves the column name and field for an index operation.
1895///
1896/// This function handles the case where the caller passes an index name instead of a column name.
1897/// It returns the full field path and the field reference.
1898fn resolve_index_column(
1899    schema: &LanceSchema,
1900    index_meta: &IndexMetadata,
1901    column_arg: &str,
1902) -> Result<(String, Arc<Field>)> {
1903    // First, try to find the column directly in the schema
1904    if let Some(field) = schema.field(column_arg) {
1905        // Column exists in schema, use it
1906        return Ok((column_arg.to_string(), Arc::new(field.clone())));
1907    }
1908
1909    // Column doesn't exist in schema, check if it's the index name
1910    if column_arg == index_meta.name {
1911        // Get the actual column from index metadata
1912        if let Some(field_id) = index_meta.fields.first() {
1913            let field = schema.field_by_id(*field_id).ok_or_else(|| Error::Index {
1914                message: format!(
1915                    "Index '{}' references field with id {} which does not exist in schema",
1916                    index_meta.name, field_id
1917                ),
1918                location: location!(),
1919            })?;
1920            let field_path = schema.field_path(*field_id)?;
1921            return Ok((field_path, Arc::new(field.clone())));
1922        } else {
1923            return Err(Error::Index {
1924                message: format!("Index '{}' has no fields", index_meta.name),
1925                location: location!(),
1926            });
1927        }
1928    }
1929
1930    // Column doesn't exist and is not the index name
1931    Err(Error::Index {
1932        message: format!("Column '{}' does not exist in the schema", column_arg),
1933        location: location!(),
1934    })
1935}
1936
1937fn is_vector_field(data_type: DataType) -> bool {
1938    match data_type {
1939        DataType::FixedSizeList(_, _) => true,
1940        DataType::List(inner) => {
1941            // If the inner type is a fixed size list, then it is a multivector field
1942            matches!(inner.data_type(), DataType::FixedSizeList(_, _))
1943        }
1944        _ => false,
1945    }
1946}
1947
1948#[cfg(test)]
1949mod tests {
1950    use super::*;
1951    use crate::dataset::builder::DatasetBuilder;
1952    use crate::dataset::optimize::{compact_files, CompactionOptions};
1953    use crate::dataset::{WriteMode, WriteParams};
1954    use crate::index::vector::VectorIndexParams;
1955    use crate::session::Session;
1956    use crate::utils::test::{copy_test_data_to_tmp, DatagenExt, FragmentCount, FragmentRowCount};
1957    use arrow::array::AsArray;
1958    use arrow::datatypes::{Float32Type, Int32Type};
1959    use arrow_array::Int32Array;
1960    use arrow_array::{
1961        FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator, StringArray,
1962    };
1963    use arrow_schema::{DataType, Field, Schema};
1964    use futures::stream::TryStreamExt;
1965    use lance_arrow::*;
1966    use lance_core::utils::tempfile::TempStrDir;
1967    use lance_datagen::gen_batch;
1968    use lance_datagen::{array, BatchCount, Dimension, RowCount};
1969    use lance_index::scalar::bitmap::BITMAP_LOOKUP_NAME;
1970    use lance_index::scalar::{
1971        BuiltinIndexType, FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams,
1972    };
1973    use lance_index::vector::{
1974        hnsw::builder::HnswBuildParams, ivf::IvfBuildParams, sq::builder::SQBuildParams,
1975    };
1976    use lance_io::{assert_io_eq, assert_io_lt};
1977    use lance_linalg::distance::{DistanceType, MetricType};
1978    use lance_testing::datagen::generate_random_array;
1979    use rstest::rstest;
1980    use std::collections::HashSet;
1981
1982    #[tokio::test]
1983    async fn test_recreate_index() {
1984        const DIM: i32 = 8;
1985        let schema = Arc::new(Schema::new(vec![
1986            Field::new(
1987                "v",
1988                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), DIM),
1989                true,
1990            ),
1991            Field::new(
1992                "o",
1993                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), DIM),
1994                true,
1995            ),
1996        ]));
1997        let data = generate_random_array(2048 * DIM as usize);
1998        let batches: Vec<RecordBatch> = vec![RecordBatch::try_new(
1999            schema.clone(),
2000            vec![
2001                Arc::new(FixedSizeListArray::try_new_from_values(data.clone(), DIM).unwrap()),
2002                Arc::new(FixedSizeListArray::try_new_from_values(data, DIM).unwrap()),
2003            ],
2004        )
2005        .unwrap()];
2006
2007        let test_dir = TempStrDir::default();
2008        let test_uri = &test_dir;
2009        let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
2010        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2011
2012        let params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 2);
2013        dataset
2014            .create_index(&["v"], IndexType::Vector, None, &params, true)
2015            .await
2016            .unwrap();
2017        dataset
2018            .create_index(&["o"], IndexType::Vector, None, &params, true)
2019            .await
2020            .unwrap();
2021
2022        // Create index again
2023        dataset
2024            .create_index(&["v"], IndexType::Vector, None, &params, true)
2025            .await
2026            .unwrap();
2027
2028        // Can not overwrite an index on different columns.
2029        assert!(dataset
2030            .create_index(
2031                &["v"],
2032                IndexType::Vector,
2033                Some("o_idx".to_string()),
2034                &params,
2035                true,
2036            )
2037            .await
2038            .is_err());
2039    }
2040
2041    #[tokio::test]
2042    async fn test_bitmap_index_statistics_minimal_io_via_dataset() {
2043        const NUM_ROWS: usize = 500_000;
2044        let test_dir = TempStrDir::default();
2045        let schema = Arc::new(Schema::new(vec![Field::new(
2046            "status",
2047            DataType::Int32,
2048            false,
2049        )]));
2050        let values: Vec<i32> = (0..NUM_ROWS as i32).collect();
2051        let batch =
2052            RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(values))]).unwrap();
2053        let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2054
2055        let mut dataset = Dataset::write(reader, &test_dir, None).await.unwrap();
2056        let io_tracker = dataset.object_store().io_tracker().clone();
2057
2058        let params = ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap);
2059        dataset
2060            .create_index(
2061                &["status"],
2062                IndexType::Bitmap,
2063                Some("status_idx".to_string()),
2064                &params,
2065                true,
2066            )
2067            .await
2068            .unwrap();
2069
2070        let indices = dataset.load_indices().await.unwrap();
2071        let index_meta = indices
2072            .iter()
2073            .find(|idx| idx.name == "status_idx")
2074            .expect("status_idx should exist");
2075        let lookup_path = dataset
2076            .indice_files_dir(index_meta)
2077            .unwrap()
2078            .child(index_meta.uuid.to_string())
2079            .child(BITMAP_LOOKUP_NAME);
2080        let meta = dataset.object_store.inner.head(&lookup_path).await.unwrap();
2081        assert!(
2082            meta.size >= 1_000_000,
2083            "bitmap index should be large enough to fail without metadata path, size={} bytes",
2084            meta.size
2085        );
2086
2087        // Reset stats collected during index creation
2088        io_tracker.incremental_stats();
2089
2090        dataset.index_statistics("status_idx").await.unwrap();
2091
2092        let stats = io_tracker.incremental_stats();
2093        assert_io_eq!(
2094            stats,
2095            read_bytes,
2096            4096,
2097            "index_statistics should only read the index footer; got {} bytes",
2098            stats.read_bytes
2099        );
2100        assert_io_lt!(
2101            stats,
2102            read_iops,
2103            3,
2104            "index_statistics should only require a head plus one range read; got {} ops",
2105            stats.read_iops
2106        );
2107        assert_io_eq!(
2108            stats,
2109            written_bytes,
2110            0,
2111            "index_statistics should not perform writes"
2112        );
2113    }
2114
2115    fn sample_vector_field() -> Field {
2116        let dimensions = 16;
2117        let column_name = "vec";
2118        Field::new(
2119            column_name,
2120            DataType::FixedSizeList(
2121                Arc::new(Field::new("item", DataType::Float32, true)),
2122                dimensions,
2123            ),
2124            false,
2125        )
2126    }
2127
2128    #[tokio::test]
2129    async fn test_drop_index() {
2130        let test_dir = TempStrDir::default();
2131        let schema = Schema::new(vec![
2132            sample_vector_field(),
2133            Field::new("ints", DataType::Int32, false),
2134        ]);
2135        let mut dataset = lance_datagen::rand(&schema)
2136            .into_dataset(
2137                &test_dir,
2138                FragmentCount::from(1),
2139                FragmentRowCount::from(256),
2140            )
2141            .await
2142            .unwrap();
2143
2144        let idx_name = "name".to_string();
2145        dataset
2146            .create_index(
2147                &["vec"],
2148                IndexType::Vector,
2149                Some(idx_name.clone()),
2150                &VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10),
2151                true,
2152            )
2153            .await
2154            .unwrap();
2155        dataset
2156            .create_index(
2157                &["ints"],
2158                IndexType::BTree,
2159                None,
2160                &ScalarIndexParams::default(),
2161                true,
2162            )
2163            .await
2164            .unwrap();
2165
2166        assert_eq!(dataset.load_indices().await.unwrap().len(), 2);
2167
2168        dataset.drop_index(&idx_name).await.unwrap();
2169
2170        assert_eq!(dataset.load_indices().await.unwrap().len(), 1);
2171
2172        // Even though we didn't give the scalar index a name it still has an auto-generated one we can use
2173        let scalar_idx_name = &dataset.load_indices().await.unwrap()[0].name;
2174        dataset.drop_index(scalar_idx_name).await.unwrap();
2175
2176        assert_eq!(dataset.load_indices().await.unwrap().len(), 0);
2177
2178        // Make sure it returns an error if the index doesn't exist
2179        assert!(dataset.drop_index(scalar_idx_name).await.is_err());
2180    }
2181
2182    #[tokio::test]
2183    async fn test_count_index_rows() {
2184        let test_dir = TempStrDir::default();
2185        let dimensions = 16;
2186        let column_name = "vec";
2187        let field = sample_vector_field();
2188        let schema = Arc::new(Schema::new(vec![field]));
2189
2190        let float_arr = generate_random_array(512 * dimensions as usize);
2191
2192        let vectors =
2193            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2194
2195        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
2196
2197        let reader =
2198            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2199        let test_uri = &test_dir;
2200        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2201        dataset.validate().await.unwrap();
2202
2203        // Make sure it returns None if there's no index with the passed identifier
2204        assert!(dataset.index_statistics("bad_id").await.is_err());
2205        // Create an index
2206        let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
2207        dataset
2208            .create_index(
2209                &[column_name],
2210                IndexType::Vector,
2211                Some("vec_idx".into()),
2212                &params,
2213                true,
2214            )
2215            .await
2216            .unwrap();
2217
2218        let stats: serde_json::Value =
2219            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2220        assert_eq!(stats["num_unindexed_rows"], 0);
2221        assert_eq!(stats["num_indexed_rows"], 512);
2222
2223        // Now we'll append some rows which shouldn't be indexed and see the
2224        // count change
2225        let float_arr = generate_random_array(512 * dimensions as usize);
2226        let vectors =
2227            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2228
2229        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
2230
2231        let reader = RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema);
2232        dataset.append(reader, None).await.unwrap();
2233
2234        let stats: serde_json::Value =
2235            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2236        assert_eq!(stats["num_unindexed_rows"], 512);
2237        assert_eq!(stats["num_indexed_rows"], 512);
2238    }
2239
2240    #[tokio::test]
2241    async fn test_optimize_delta_indices() {
2242        let dimensions = 16;
2243        let column_name = "vec";
2244        let vec_field = Field::new(
2245            column_name,
2246            DataType::FixedSizeList(
2247                Arc::new(Field::new("item", DataType::Float32, true)),
2248                dimensions,
2249            ),
2250            false,
2251        );
2252        let other_column_name = "other_vec";
2253        let other_vec_field = Field::new(
2254            other_column_name,
2255            DataType::FixedSizeList(
2256                Arc::new(Field::new("item", DataType::Float32, true)),
2257                dimensions,
2258            ),
2259            false,
2260        );
2261        let schema = Arc::new(Schema::new(vec![vec_field, other_vec_field]));
2262
2263        let float_arr = generate_random_array(512 * dimensions as usize);
2264
2265        let vectors = Arc::new(
2266            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(),
2267        );
2268
2269        let record_batch =
2270            RecordBatch::try_new(schema.clone(), vec![vectors.clone(), vectors.clone()]).unwrap();
2271
2272        let reader = RecordBatchIterator::new(
2273            vec![record_batch.clone()].into_iter().map(Ok),
2274            schema.clone(),
2275        );
2276
2277        let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2278        let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
2279        dataset
2280            .create_index(
2281                &[column_name],
2282                IndexType::Vector,
2283                Some("vec_idx".into()),
2284                &params,
2285                true,
2286            )
2287            .await
2288            .unwrap();
2289        dataset
2290            .create_index(
2291                &[other_column_name],
2292                IndexType::Vector,
2293                Some("other_vec_idx".into()),
2294                &params,
2295                true,
2296            )
2297            .await
2298            .unwrap();
2299
2300        async fn get_stats(dataset: &Dataset, name: &str) -> serde_json::Value {
2301            serde_json::from_str(&dataset.index_statistics(name).await.unwrap()).unwrap()
2302        }
2303        async fn get_meta(dataset: &Dataset, name: &str) -> Vec<IndexMetadata> {
2304            dataset
2305                .load_indices()
2306                .await
2307                .unwrap()
2308                .iter()
2309                .filter(|m| m.name == name)
2310                .cloned()
2311                .collect()
2312        }
2313        fn get_bitmap(meta: &IndexMetadata) -> Vec<u32> {
2314            meta.fragment_bitmap.as_ref().unwrap().iter().collect()
2315        }
2316
2317        let stats = get_stats(&dataset, "vec_idx").await;
2318        assert_eq!(stats["num_unindexed_rows"], 0);
2319        assert_eq!(stats["num_indexed_rows"], 512);
2320        assert_eq!(stats["num_indexed_fragments"], 1);
2321        assert_eq!(stats["num_indices"], 1);
2322        let meta = get_meta(&dataset, "vec_idx").await;
2323        assert_eq!(meta.len(), 1);
2324        assert_eq!(get_bitmap(&meta[0]), vec![0]);
2325
2326        let reader =
2327            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2328        dataset.append(reader, None).await.unwrap();
2329        let stats = get_stats(&dataset, "vec_idx").await;
2330        assert_eq!(stats["num_unindexed_rows"], 512);
2331        assert_eq!(stats["num_indexed_rows"], 512);
2332        assert_eq!(stats["num_indexed_fragments"], 1);
2333        assert_eq!(stats["num_unindexed_fragments"], 1);
2334        assert_eq!(stats["num_indices"], 1);
2335        let meta = get_meta(&dataset, "vec_idx").await;
2336        assert_eq!(meta.len(), 1);
2337        assert_eq!(get_bitmap(&meta[0]), vec![0]);
2338
2339        dataset
2340            .optimize_indices(&OptimizeOptions::append().index_names(vec![])) // Does nothing because no index name is passed
2341            .await
2342            .unwrap();
2343        let stats = get_stats(&dataset, "vec_idx").await;
2344        assert_eq!(stats["num_unindexed_rows"], 512);
2345        assert_eq!(stats["num_indexed_rows"], 512);
2346        assert_eq!(stats["num_indexed_fragments"], 1);
2347        assert_eq!(stats["num_unindexed_fragments"], 1);
2348        assert_eq!(stats["num_indices"], 1);
2349        let meta = get_meta(&dataset, "vec_idx").await;
2350        assert_eq!(meta.len(), 1);
2351        assert_eq!(get_bitmap(&meta[0]), vec![0]);
2352
2353        // optimize the other index
2354        dataset
2355            .optimize_indices(
2356                &OptimizeOptions::append().index_names(vec!["other_vec_idx".to_owned()]),
2357            )
2358            .await
2359            .unwrap();
2360        let stats = get_stats(&dataset, "vec_idx").await;
2361        assert_eq!(stats["num_unindexed_rows"], 512);
2362        assert_eq!(stats["num_indexed_rows"], 512);
2363        assert_eq!(stats["num_indexed_fragments"], 1);
2364        assert_eq!(stats["num_unindexed_fragments"], 1);
2365        assert_eq!(stats["num_indices"], 1);
2366        let meta = get_meta(&dataset, "vec_idx").await;
2367        assert_eq!(meta.len(), 1);
2368        assert_eq!(get_bitmap(&meta[0]), vec![0]);
2369
2370        let stats = get_stats(&dataset, "other_vec_idx").await;
2371        assert_eq!(stats["num_unindexed_rows"], 0);
2372        assert_eq!(stats["num_indexed_rows"], 1024);
2373        assert_eq!(stats["num_indexed_fragments"], 2);
2374        assert_eq!(stats["num_unindexed_fragments"], 0);
2375        assert_eq!(stats["num_indices"], 2);
2376        let meta = get_meta(&dataset, "other_vec_idx").await;
2377        assert_eq!(meta.len(), 2);
2378        assert_eq!(get_bitmap(&meta[0]), vec![0]);
2379        assert_eq!(get_bitmap(&meta[1]), vec![1]);
2380
2381        dataset
2382            .optimize_indices(&OptimizeOptions::retrain())
2383            .await
2384            .unwrap();
2385
2386        let stats = get_stats(&dataset, "vec_idx").await;
2387        assert_eq!(stats["num_unindexed_rows"], 0);
2388        assert_eq!(stats["num_indexed_rows"], 1024);
2389        assert_eq!(stats["num_indexed_fragments"], 2);
2390        assert_eq!(stats["num_unindexed_fragments"], 0);
2391        assert_eq!(stats["num_indices"], 1);
2392        let meta = get_meta(&dataset, "vec_idx").await;
2393        assert_eq!(meta.len(), 1);
2394        assert_eq!(get_bitmap(&meta[0]), vec![0, 1]);
2395
2396        dataset
2397            .optimize_indices(&OptimizeOptions::retrain())
2398            .await
2399            .unwrap();
2400        let stats = get_stats(&dataset, "other_vec_idx").await;
2401        assert_eq!(stats["num_unindexed_rows"], 0);
2402        assert_eq!(stats["num_indexed_rows"], 1024);
2403        assert_eq!(stats["num_indexed_fragments"], 2);
2404        assert_eq!(stats["num_unindexed_fragments"], 0);
2405        assert_eq!(stats["num_indices"], 1);
2406        let meta = get_meta(&dataset, "other_vec_idx").await;
2407        assert_eq!(meta.len(), 1);
2408        assert_eq!(get_bitmap(&meta[0]), vec![0, 1]);
2409    }
2410
2411    #[tokio::test]
2412    async fn test_optimize_ivf_hnsw_sq_delta_indices() {
2413        let test_dir = TempStrDir::default();
2414        let dimensions = 16;
2415        let column_name = "vec";
2416        let field = Field::new(
2417            column_name,
2418            DataType::FixedSizeList(
2419                Arc::new(Field::new("item", DataType::Float32, true)),
2420                dimensions,
2421            ),
2422            false,
2423        );
2424        let schema = Arc::new(Schema::new(vec![field]));
2425
2426        let float_arr = generate_random_array(512 * dimensions as usize);
2427
2428        let vectors =
2429            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2430
2431        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
2432
2433        let reader = RecordBatchIterator::new(
2434            vec![record_batch.clone()].into_iter().map(Ok),
2435            schema.clone(),
2436        );
2437
2438        let test_uri = &test_dir;
2439        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2440
2441        let ivf_params = IvfBuildParams::default();
2442        let hnsw_params = HnswBuildParams::default();
2443        let sq_params = SQBuildParams::default();
2444        let params = VectorIndexParams::with_ivf_hnsw_sq_params(
2445            MetricType::L2,
2446            ivf_params,
2447            hnsw_params,
2448            sq_params,
2449        );
2450        dataset
2451            .create_index(
2452                &[column_name],
2453                IndexType::Vector,
2454                Some("vec_idx".into()),
2455                &params,
2456                true,
2457            )
2458            .await
2459            .unwrap();
2460
2461        let stats: serde_json::Value =
2462            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2463        assert_eq!(stats["num_unindexed_rows"], 0);
2464        assert_eq!(stats["num_indexed_rows"], 512);
2465        assert_eq!(stats["num_indexed_fragments"], 1);
2466        assert_eq!(stats["num_indices"], 1);
2467
2468        let reader =
2469            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2470        dataset.append(reader, None).await.unwrap();
2471        let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap();
2472        let stats: serde_json::Value =
2473            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2474        assert_eq!(stats["num_unindexed_rows"], 512);
2475        assert_eq!(stats["num_indexed_rows"], 512);
2476        assert_eq!(stats["num_indexed_fragments"], 1);
2477        assert_eq!(stats["num_unindexed_fragments"], 1);
2478        assert_eq!(stats["num_indices"], 1);
2479
2480        dataset
2481            .optimize_indices(&OptimizeOptions::append())
2482            .await
2483            .unwrap();
2484
2485        let stats: serde_json::Value =
2486            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2487        assert_eq!(stats["num_unindexed_rows"], 0);
2488        assert_eq!(stats["num_indexed_rows"], 1024);
2489        assert_eq!(stats["num_indexed_fragments"], 2);
2490        assert_eq!(stats["num_unindexed_fragments"], 0);
2491        assert_eq!(stats["num_indices"], 2);
2492
2493        dataset
2494            .optimize_indices(&OptimizeOptions::retrain())
2495            .await
2496            .unwrap();
2497        let stats: serde_json::Value =
2498            serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
2499        assert_eq!(stats["num_unindexed_rows"], 0);
2500        assert_eq!(stats["num_indexed_rows"], 1024);
2501        assert_eq!(stats["num_indexed_fragments"], 2);
2502        assert_eq!(stats["num_unindexed_fragments"], 0);
2503        assert_eq!(stats["num_indices"], 1);
2504    }
2505
2506    #[rstest]
2507    #[tokio::test]
2508    async fn test_optimize_fts(#[values(false, true)] with_position: bool) {
2509        let words = ["apple", "banana", "cherry", "date"];
2510
2511        let dir = TempStrDir::default();
2512        let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
2513        let data = StringArray::from_iter_values(words.iter().map(|s| s.to_string()));
2514        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)]).unwrap();
2515        let batch_iterator = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2516
2517        let mut dataset = Dataset::write(batch_iterator, &dir, None).await.unwrap();
2518
2519        let params = InvertedIndexParams::default()
2520            .lower_case(false)
2521            .with_position(with_position);
2522        dataset
2523            .create_index(&["text"], IndexType::Inverted, None, &params, true)
2524            .await
2525            .unwrap();
2526
2527        async fn assert_indexed_rows(dataset: &Dataset, expected_indexed_rows: usize) {
2528            let stats = dataset.index_statistics("text_idx").await.unwrap();
2529            let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
2530            let indexed_rows = stats["num_indexed_rows"].as_u64().unwrap() as usize;
2531            let unindexed_rows = stats["num_unindexed_rows"].as_u64().unwrap() as usize;
2532            let num_rows = dataset.count_all_rows().await.unwrap();
2533            assert_eq!(indexed_rows, expected_indexed_rows);
2534            assert_eq!(unindexed_rows, num_rows - expected_indexed_rows);
2535        }
2536
2537        let num_rows = dataset.count_all_rows().await.unwrap();
2538        assert_indexed_rows(&dataset, num_rows).await;
2539
2540        let new_words = ["elephant", "fig", "grape", "honeydew"];
2541        let new_data = StringArray::from_iter_values(new_words.iter().map(|s| s.to_string()));
2542        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(new_data)]).unwrap();
2543        let batch_iter = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2544        dataset.append(batch_iter, None).await.unwrap();
2545        assert_indexed_rows(&dataset, num_rows).await;
2546
2547        dataset
2548            .optimize_indices(&OptimizeOptions::append())
2549            .await
2550            .unwrap();
2551        let num_rows = dataset.count_all_rows().await.unwrap();
2552        assert_indexed_rows(&dataset, num_rows).await;
2553
2554        for &word in words.iter().chain(new_words.iter()) {
2555            let query_result = dataset
2556                .scan()
2557                .project(&["text"])
2558                .unwrap()
2559                .full_text_search(FullTextSearchQuery::new(word.to_string()))
2560                .unwrap()
2561                .limit(Some(10), None)
2562                .unwrap()
2563                .try_into_batch()
2564                .await
2565                .unwrap();
2566
2567            let texts = query_result["text"]
2568                .as_string::<i32>()
2569                .iter()
2570                .map(|v| match v {
2571                    None => "".to_string(),
2572                    Some(v) => v.to_string(),
2573                })
2574                .collect::<Vec<String>>();
2575
2576            assert_eq!(texts.len(), 1);
2577            assert_eq!(texts[0], word);
2578        }
2579
2580        let uppercase_words = ["Apple", "Banana", "Cherry", "Date"];
2581        for &word in uppercase_words.iter() {
2582            let query_result = dataset
2583                .scan()
2584                .project(&["text"])
2585                .unwrap()
2586                .full_text_search(FullTextSearchQuery::new(word.to_string()))
2587                .unwrap()
2588                .limit(Some(10), None)
2589                .unwrap()
2590                .try_into_batch()
2591                .await
2592                .unwrap();
2593
2594            let texts = query_result["text"]
2595                .as_string::<i32>()
2596                .iter()
2597                .map(|v| match v {
2598                    None => "".to_string(),
2599                    Some(v) => v.to_string(),
2600                })
2601                .collect::<Vec<String>>();
2602
2603            assert_eq!(texts.len(), 0);
2604        }
2605        let new_data = StringArray::from_iter_values(uppercase_words.iter().map(|s| s.to_string()));
2606        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(new_data)]).unwrap();
2607        let batch_iter = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
2608        dataset.append(batch_iter, None).await.unwrap();
2609        assert_indexed_rows(&dataset, num_rows).await;
2610
2611        // we should be able to query the new words
2612        for &word in uppercase_words.iter() {
2613            let query_result = dataset
2614                .scan()
2615                .project(&["text"])
2616                .unwrap()
2617                .full_text_search(FullTextSearchQuery::new(word.to_string()))
2618                .unwrap()
2619                .limit(Some(10), None)
2620                .unwrap()
2621                .try_into_batch()
2622                .await
2623                .unwrap();
2624
2625            let texts = query_result["text"]
2626                .as_string::<i32>()
2627                .iter()
2628                .map(|v| match v {
2629                    None => "".to_string(),
2630                    Some(v) => v.to_string(),
2631                })
2632                .collect::<Vec<String>>();
2633
2634            assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2635            assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2636        }
2637
2638        dataset
2639            .optimize_indices(&OptimizeOptions::append())
2640            .await
2641            .unwrap();
2642        let num_rows = dataset.count_all_rows().await.unwrap();
2643        assert_indexed_rows(&dataset, num_rows).await;
2644
2645        // we should be able to query the new words after optimization
2646        for &word in uppercase_words.iter() {
2647            let query_result = dataset
2648                .scan()
2649                .project(&["text"])
2650                .unwrap()
2651                .full_text_search(FullTextSearchQuery::new(word.to_string()))
2652                .unwrap()
2653                .limit(Some(10), None)
2654                .unwrap()
2655                .try_into_batch()
2656                .await
2657                .unwrap();
2658
2659            let texts = query_result["text"]
2660                .as_string::<i32>()
2661                .iter()
2662                .map(|v| match v {
2663                    None => "".to_string(),
2664                    Some(v) => v.to_string(),
2665                })
2666                .collect::<Vec<String>>();
2667
2668            assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2669            assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2670
2671            // we should be able to query the new words after compaction
2672            compact_files(&mut dataset, CompactionOptions::default(), None)
2673                .await
2674                .unwrap();
2675            for &word in uppercase_words.iter() {
2676                let query_result = dataset
2677                    .scan()
2678                    .project(&["text"])
2679                    .unwrap()
2680                    .full_text_search(FullTextSearchQuery::new(word.to_string()))
2681                    .unwrap()
2682                    .try_into_batch()
2683                    .await
2684                    .unwrap();
2685                let texts = query_result["text"]
2686                    .as_string::<i32>()
2687                    .iter()
2688                    .map(|v| match v {
2689                        None => "".to_string(),
2690                        Some(v) => v.to_string(),
2691                    })
2692                    .collect::<Vec<String>>();
2693                assert_eq!(texts.len(), 1, "query: {}, texts: {:?}", word, texts);
2694                assert_eq!(texts[0], word, "query: {}, texts: {:?}", word, texts);
2695            }
2696            assert_indexed_rows(&dataset, num_rows).await;
2697        }
2698    }
2699
2700    #[tokio::test]
2701    async fn test_create_index_too_small_for_pq() {
2702        let test_dir = TempStrDir::default();
2703        let dimensions = 1536;
2704
2705        let field = Field::new(
2706            "vector",
2707            DataType::FixedSizeList(
2708                Arc::new(Field::new("item", DataType::Float32, true)),
2709                dimensions,
2710            ),
2711            false,
2712        );
2713
2714        let schema = Arc::new(Schema::new(vec![field]));
2715        let float_arr = generate_random_array(100 * dimensions as usize);
2716
2717        let vectors =
2718            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2719        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
2720        let reader = RecordBatchIterator::new(
2721            vec![record_batch.clone()].into_iter().map(Ok),
2722            schema.clone(),
2723        );
2724
2725        let test_uri = &test_dir;
2726        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2727
2728        let params = VectorIndexParams::ivf_pq(1, 8, 96, DistanceType::L2, 1);
2729        let result = dataset
2730            .create_index(&["vector"], IndexType::Vector, None, &params, false)
2731            .await;
2732
2733        assert!(matches!(result, Err(Error::Unprocessable { .. })));
2734        if let Error::Unprocessable { message, .. } = result.unwrap_err() {
2735            assert_eq!(
2736                message,
2737                "Not enough rows to train PQ. Requires 256 rows but only 100 available",
2738            )
2739        }
2740    }
2741
2742    #[tokio::test]
2743    async fn test_create_bitmap_index() {
2744        let test_dir = TempStrDir::default();
2745        let field = Field::new("tag", DataType::Utf8, false);
2746        let schema = Arc::new(Schema::new(vec![field]));
2747        let array = StringArray::from_iter_values((0..128).map(|i| ["a", "b", "c"][i % 3]));
2748        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2749        let reader = RecordBatchIterator::new(
2750            vec![record_batch.clone()].into_iter().map(Ok),
2751            schema.clone(),
2752        );
2753
2754        let test_uri = &test_dir;
2755        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
2756        dataset
2757            .create_index(
2758                &["tag"],
2759                IndexType::Bitmap,
2760                None,
2761                &ScalarIndexParams::default(),
2762                false,
2763            )
2764            .await
2765            .unwrap();
2766        let indices = dataset.load_indices().await.unwrap();
2767        let index = dataset
2768            .open_generic_index("tag", &indices[0].uuid.to_string(), &NoOpMetricsCollector)
2769            .await
2770            .unwrap();
2771        assert_eq!(index.index_type(), IndexType::Bitmap);
2772    }
2773
2774    // #[tokio::test]
2775    #[lance_test_macros::test(tokio::test)]
2776    async fn test_load_indices() {
2777        let session = Arc::new(Session::default());
2778        let write_params = WriteParams {
2779            session: Some(session.clone()),
2780            ..Default::default()
2781        };
2782
2783        let test_dir = TempStrDir::default();
2784        let field = Field::new("tag", DataType::Utf8, false);
2785        let schema = Arc::new(Schema::new(vec![field]));
2786        let array = StringArray::from_iter_values((0..128).map(|i| ["a", "b", "c"][i % 3]));
2787        let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2788        let reader = RecordBatchIterator::new(
2789            vec![record_batch.clone()].into_iter().map(Ok),
2790            schema.clone(),
2791        );
2792
2793        let test_uri = &test_dir;
2794        let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
2795            .await
2796            .unwrap();
2797        dataset
2798            .create_index(
2799                &["tag"],
2800                IndexType::Bitmap,
2801                None,
2802                &ScalarIndexParams::default(),
2803                false,
2804            )
2805            .await
2806            .unwrap();
2807        dataset.object_store().io_stats_incremental(); // Reset
2808
2809        let indices = dataset.load_indices().await.unwrap();
2810        let stats = dataset.object_store().io_stats_incremental();
2811        // We should already have this cached since we just wrote it.
2812        assert_io_eq!(stats, read_iops, 0);
2813        assert_io_eq!(stats, read_bytes, 0);
2814        assert_eq!(indices.len(), 1);
2815
2816        session.index_cache.clear().await; // Clear the cache
2817
2818        let dataset2 = DatasetBuilder::from_uri(test_uri)
2819            .with_session(session.clone())
2820            .load()
2821            .await
2822            .unwrap();
2823        let stats = dataset2.object_store().io_stats_incremental(); // Reset
2824        assert_io_lt!(stats, read_bytes, 64 * 1024);
2825
2826        // Because the manifest is so small, we should have opportunistically
2827        // cached the indices in memory already.
2828        let indices2 = dataset2.load_indices().await.unwrap();
2829        let stats = dataset2.object_store().io_stats_incremental();
2830        assert_io_eq!(stats, read_iops, 0);
2831        assert_io_eq!(stats, read_bytes, 0);
2832        assert_eq!(indices2.len(), 1);
2833    }
2834
2835    #[tokio::test]
2836    async fn test_remap_empty() {
2837        let data = gen_batch()
2838            .col("int", array::step::<Int32Type>())
2839            .col(
2840                "vector",
2841                array::rand_vec::<Float32Type>(Dimension::from(16)),
2842            )
2843            .into_reader_rows(RowCount::from(256), BatchCount::from(1));
2844        let mut dataset = Dataset::write(data, "memory://", None).await.unwrap();
2845
2846        let params = VectorIndexParams::ivf_pq(1, 8, 1, DistanceType::L2, 1);
2847        dataset
2848            .create_index(&["vector"], IndexType::Vector, None, &params, false)
2849            .await
2850            .unwrap();
2851
2852        let index_uuid = dataset.load_indices().await.unwrap()[0].uuid;
2853        let remap_to_empty = (0..dataset.count_all_rows().await.unwrap())
2854            .map(|i| (i as u64, None))
2855            .collect::<HashMap<_, _>>();
2856        let new_uuid = remap_index(&dataset, &index_uuid, &remap_to_empty)
2857            .await
2858            .unwrap();
2859        assert_eq!(new_uuid, RemapResult::Keep(index_uuid));
2860    }
2861
2862    #[tokio::test]
2863    async fn test_optimize_ivf_pq_up_to_date() {
2864        // https://github.com/lance-format/lance/issues/4016
2865        let nrows = 256;
2866        let dimensions = 16;
2867        let column_name = "vector";
2868        let schema = Arc::new(Schema::new(vec![
2869            Field::new("id", DataType::Int32, false),
2870            Field::new(
2871                column_name,
2872                DataType::FixedSizeList(
2873                    Arc::new(Field::new("item", DataType::Float32, true)),
2874                    dimensions,
2875                ),
2876                false,
2877            ),
2878        ]));
2879
2880        let float_arr = generate_random_array(nrows * dimensions as usize);
2881        let vectors =
2882            arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
2883        let record_batch = RecordBatch::try_new(
2884            schema.clone(),
2885            vec![
2886                Arc::new(arrow_array::Int32Array::from_iter_values(0..nrows as i32)),
2887                Arc::new(vectors),
2888            ],
2889        )
2890        .unwrap();
2891
2892        let reader = RecordBatchIterator::new(
2893            vec![record_batch.clone()].into_iter().map(Ok),
2894            schema.clone(),
2895        );
2896        let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2897
2898        let params = VectorIndexParams::ivf_pq(1, 8, 2, MetricType::L2, 2);
2899        dataset
2900            .create_index(&[column_name], IndexType::Vector, None, &params, true)
2901            .await
2902            .unwrap();
2903
2904        let query_vector = generate_random_array(dimensions as usize);
2905
2906        let nearest = dataset
2907            .scan()
2908            .nearest(column_name, &query_vector, 5)
2909            .unwrap()
2910            .try_into_batch()
2911            .await
2912            .unwrap();
2913
2914        let ids = nearest["id"].as_primitive::<Int32Type>();
2915        let mut seen = HashSet::new();
2916        for id in ids.values() {
2917            assert!(seen.insert(*id), "Duplicate id found: {}", id);
2918        }
2919
2920        dataset
2921            .optimize_indices(&OptimizeOptions::default())
2922            .await
2923            .unwrap();
2924
2925        dataset.validate().await.unwrap();
2926
2927        let nearest_after = dataset
2928            .scan()
2929            .nearest(column_name, &query_vector, 5)
2930            .unwrap()
2931            .try_into_batch()
2932            .await
2933            .unwrap();
2934
2935        let ids = nearest_after["id"].as_primitive::<Int32Type>();
2936        let mut seen = HashSet::new();
2937        for id in ids.values() {
2938            assert!(seen.insert(*id), "Duplicate id found: {}", id);
2939        }
2940    }
2941
2942    #[tokio::test]
2943    async fn test_index_created_at_timestamp() {
2944        // Test that created_at is set when creating an index
2945        let schema = Arc::new(Schema::new(vec![
2946            Field::new("id", DataType::Int32, false),
2947            Field::new("values", DataType::Utf8, false),
2948        ]));
2949
2950        let values = StringArray::from_iter_values(["hello", "world", "foo", "bar"]);
2951        let record_batch = RecordBatch::try_new(
2952            schema.clone(),
2953            vec![
2954                Arc::new(Int32Array::from_iter_values(0..4)),
2955                Arc::new(values),
2956            ],
2957        )
2958        .unwrap();
2959
2960        let reader =
2961            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
2962
2963        let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
2964
2965        // Record time before creating index
2966        let before_index = chrono::Utc::now();
2967
2968        // Create a scalar index
2969        dataset
2970            .create_index(
2971                &["values"],
2972                IndexType::Scalar,
2973                Some("test_idx".to_string()),
2974                &ScalarIndexParams::default(),
2975                false,
2976            )
2977            .await
2978            .unwrap();
2979
2980        // Record time after creating index
2981        let after_index = chrono::Utc::now();
2982
2983        // Get index metadata
2984        let indices = dataset.load_indices().await.unwrap();
2985        let test_index = indices.iter().find(|idx| idx.name == "test_idx").unwrap();
2986
2987        // Verify created_at is set and within reasonable bounds
2988        assert!(test_index.created_at.is_some());
2989        let created_at = test_index.created_at.unwrap();
2990        assert!(created_at >= before_index);
2991        assert!(created_at <= after_index);
2992    }
2993
2994    #[tokio::test]
2995    async fn test_index_statistics_updated_at() {
2996        // Test that updated_at appears in index statistics
2997        let schema = Arc::new(Schema::new(vec![
2998            Field::new("id", DataType::Int32, false),
2999            Field::new("values", DataType::Utf8, false),
3000        ]));
3001
3002        let values = StringArray::from_iter_values(["hello", "world", "foo", "bar"]);
3003        let record_batch = RecordBatch::try_new(
3004            schema.clone(),
3005            vec![
3006                Arc::new(Int32Array::from_iter_values(0..4)),
3007                Arc::new(values),
3008            ],
3009        )
3010        .unwrap();
3011
3012        let reader =
3013            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
3014
3015        let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
3016
3017        // Create a scalar index
3018        dataset
3019            .create_index(
3020                &["values"],
3021                IndexType::Scalar,
3022                Some("test_idx".to_string()),
3023                &ScalarIndexParams::default(),
3024                false,
3025            )
3026            .await
3027            .unwrap();
3028
3029        // Get index statistics
3030        let stats_str = dataset.index_statistics("test_idx").await.unwrap();
3031        let stats: serde_json::Value = serde_json::from_str(&stats_str).unwrap();
3032
3033        // Verify updated_at_timestamp_ms field exists in statistics
3034        assert!(stats["updated_at_timestamp_ms"].is_number());
3035        let updated_at = stats["updated_at_timestamp_ms"].as_u64().unwrap();
3036
3037        // Get the index metadata to compare with created_at
3038        let indices = dataset.load_indices().await.unwrap();
3039        let test_index = indices.iter().find(|idx| idx.name == "test_idx").unwrap();
3040        let created_at = test_index.created_at.unwrap().timestamp_millis() as u64;
3041
3042        // For a single index, updated_at should equal created_at
3043        assert_eq!(updated_at, created_at);
3044    }
3045
3046    #[tokio::test]
3047    async fn test_index_statistics_updated_at_multiple_deltas() {
3048        // Test that updated_at reflects max(created_at) across multiple delta indices
3049        let schema = Arc::new(Schema::new(vec![
3050            Field::new("id", DataType::Int32, false),
3051            Field::new(
3052                "vector",
3053                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4),
3054                false,
3055            ),
3056        ]));
3057
3058        // Create initial dataset (need more rows for PQ training)
3059        let num_rows = 300;
3060        let float_arr = generate_random_array(4 * num_rows);
3061        let vectors = FixedSizeListArray::try_new_from_values(float_arr, 4).unwrap();
3062        let record_batch = RecordBatch::try_new(
3063            schema.clone(),
3064            vec![
3065                Arc::new(Int32Array::from_iter_values(0..num_rows as i32)),
3066                Arc::new(vectors),
3067            ],
3068        )
3069        .unwrap();
3070
3071        let reader =
3072            RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
3073
3074        let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
3075
3076        // Create vector index
3077        let params = VectorIndexParams::ivf_pq(1, 8, 2, MetricType::L2, 2);
3078        dataset
3079            .create_index(
3080                &["vector"],
3081                IndexType::Vector,
3082                Some("test_vec_idx".to_string()),
3083                &params,
3084                false,
3085            )
3086            .await
3087            .unwrap();
3088
3089        // Get initial statistics
3090        let stats_str_1 = dataset.index_statistics("test_vec_idx").await.unwrap();
3091        let stats_1: serde_json::Value = serde_json::from_str(&stats_str_1).unwrap();
3092        let initial_updated_at = stats_1["updated_at_timestamp_ms"].as_u64().unwrap();
3093
3094        // Add more data to create additional delta indices
3095        std::thread::sleep(std::time::Duration::from_millis(10)); // Ensure different timestamp
3096
3097        let num_rows_2 = 50;
3098        let float_arr_2 = generate_random_array(4 * num_rows_2);
3099        let vectors_2 = FixedSizeListArray::try_new_from_values(float_arr_2, 4).unwrap();
3100        let record_batch_2 = RecordBatch::try_new(
3101            schema.clone(),
3102            vec![
3103                Arc::new(Int32Array::from_iter_values(
3104                    num_rows as i32..(num_rows + num_rows_2) as i32,
3105                )),
3106                Arc::new(vectors_2),
3107            ],
3108        )
3109        .unwrap();
3110
3111        let reader_2 =
3112            RecordBatchIterator::new(vec![record_batch_2].into_iter().map(Ok), schema.clone());
3113
3114        dataset.append(reader_2, None).await.unwrap();
3115
3116        // Update the index
3117        dataset
3118            .create_index(
3119                &["vector"],
3120                IndexType::Vector,
3121                Some("test_vec_idx".to_string()),
3122                &params,
3123                true,
3124            )
3125            .await
3126            .unwrap();
3127
3128        // Get updated statistics
3129        let stats_str_2 = dataset.index_statistics("test_vec_idx").await.unwrap();
3130        let stats_2: serde_json::Value = serde_json::from_str(&stats_str_2).unwrap();
3131        let final_updated_at = stats_2["updated_at_timestamp_ms"].as_u64().unwrap();
3132
3133        // The updated_at should be newer than the initial one
3134        assert!(final_updated_at >= initial_updated_at);
3135    }
3136
3137    #[tokio::test]
3138    async fn test_index_statistics_updated_at_none_when_no_created_at() {
3139        // Test backward compatibility: when indices were created before the created_at field,
3140        // updated_at_timestamp_ms should be null
3141
3142        // Use test data created with Lance 0.29.0 (before created_at field was added)
3143        let test_dir =
3144            copy_test_data_to_tmp("v0.30.0_pre_created_at/index_without_created_at").unwrap();
3145        let test_uri = test_dir.path_str();
3146        let test_uri = &test_uri;
3147
3148        let dataset = Dataset::open(test_uri).await.unwrap();
3149
3150        // Get the index metadata to verify created_at is None
3151        let indices = dataset.load_indices().await.unwrap();
3152        assert!(!indices.is_empty(), "Test dataset should have indices");
3153
3154        // Verify that the index created with old version has no created_at
3155        for index in indices.iter() {
3156            assert!(
3157                index.created_at.is_none(),
3158                "Index from old version should have created_at = None"
3159            );
3160        }
3161
3162        // Get index statistics - should work even with old indices
3163        let index_name = &indices[0].name;
3164        let stats_str = dataset.index_statistics(index_name).await.unwrap();
3165        let stats: serde_json::Value = serde_json::from_str(&stats_str).unwrap();
3166
3167        // Verify updated_at_timestamp_ms field is null when no indices have created_at
3168        assert!(
3169            stats["updated_at_timestamp_ms"].is_null(),
3170            "updated_at_timestamp_ms should be null when no indices have created_at timestamps"
3171        );
3172    }
3173    #[rstest]
3174    #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
3175    #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
3176    #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
3177    #[tokio::test]
3178    async fn test_create_empty_scalar_index(
3179        #[case] column_name: &str,
3180        #[case] index_type: IndexType,
3181        #[case] params: Box<dyn IndexParams>,
3182    ) {
3183        use lance_datagen::{array, BatchCount, ByteCount, RowCount};
3184
3185        // Create dataset with scalar and text columns (no vector column needed)
3186        let reader = lance_datagen::gen_batch()
3187            .col("i", array::step::<Int32Type>())
3188            .col("text", array::rand_utf8(ByteCount::from(10), false))
3189            .into_reader_rows(RowCount::from(100), BatchCount::from(1));
3190        let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
3191
3192        // Create an empty index with train=false
3193        // Test using IntoFuture - can await directly without calling .execute()
3194        dataset
3195            .create_index_builder(&[column_name], index_type, params.as_ref())
3196            .name("index".to_string())
3197            .train(false)
3198            .await
3199            .unwrap();
3200
3201        // Verify we can get index statistics
3202        let stats = dataset.index_statistics("index").await.unwrap();
3203        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3204        assert_eq!(
3205            stats["num_indexed_rows"], 0,
3206            "Empty index should have zero indexed rows"
3207        );
3208
3209        // Append new data using lance_datagen
3210        let append_reader = lance_datagen::gen_batch()
3211            .col("i", array::step::<Int32Type>())
3212            .col("text", array::rand_utf8(ByteCount::from(10), false))
3213            .into_reader_rows(RowCount::from(50), BatchCount::from(1));
3214
3215        dataset.append(append_reader, None).await.unwrap();
3216
3217        // Critical test: Verify the empty index is still present after append
3218        let indices_after_append = dataset.load_indices().await.unwrap();
3219        assert_eq!(
3220            indices_after_append.len(),
3221            1,
3222            "Index should be retained after append for index type {:?}",
3223            index_type
3224        );
3225
3226        let stats = dataset.index_statistics("index").await.unwrap();
3227        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3228        assert_eq!(
3229            stats["num_indexed_rows"], 0,
3230            "Empty index should still have zero indexed rows after append"
3231        );
3232
3233        // Test optimize_indices with empty index
3234        dataset.optimize_indices(&Default::default()).await.unwrap();
3235
3236        // Verify the index still exists after optimization
3237        let indices_after_optimize = dataset.load_indices().await.unwrap();
3238        assert_eq!(
3239            indices_after_optimize.len(),
3240            1,
3241            "Index should still exist after optimization"
3242        );
3243
3244        // Check index statistics after optimization
3245        let stats = dataset.index_statistics("index").await.unwrap();
3246        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3247        assert_eq!(
3248            stats["num_unindexed_rows"], 0,
3249            "Empty index should indexed all rows"
3250        );
3251    }
3252
3253    /// Helper function to check if an index is being used in a query plan
3254    fn assert_index_usage(plan: &str, column_name: &str, should_use_index: bool, context: &str) {
3255        let index_used = if column_name == "text" {
3256            // For inverted index, look for MatchQuery which indicates FTS index usage
3257            plan.contains("MatchQuery")
3258        } else {
3259            // For btree/bitmap, look for MaterializeIndex which indicates scalar index usage
3260            plan.contains("ScalarIndexQuery")
3261        };
3262
3263        if should_use_index {
3264            assert!(
3265                index_used,
3266                "Query plan should use index {}: {}",
3267                context, plan
3268            );
3269        } else {
3270            assert!(
3271                !index_used,
3272                "Query plan should NOT use index {}: {}",
3273                context, plan
3274            );
3275        }
3276    }
3277
3278    /// Test that scalar indices are retained after deleting all data from a table.
3279    ///
3280    /// This test verifies that when we:
3281    /// 1. Create a table with data
3282    /// 2. Add a scalar index with train=true
3283    /// 3. Delete all data in the table
3284    /// The index remains available on the table.
3285    #[rstest]
3286    #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
3287    #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
3288    #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
3289    #[tokio::test]
3290    async fn test_scalar_index_retained_after_delete_all(
3291        #[case] column_name: &str,
3292        #[case] index_type: IndexType,
3293        #[case] params: Box<dyn IndexParams>,
3294    ) {
3295        use lance_datagen::{array, BatchCount, ByteCount, RowCount};
3296
3297        // Create dataset with initial data
3298        let reader = lance_datagen::gen_batch()
3299            .col("i", array::step::<Int32Type>())
3300            .col("text", array::rand_utf8(ByteCount::from(10), false))
3301            .into_reader_rows(RowCount::from(100), BatchCount::from(1));
3302        let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
3303
3304        // Create index with train=true (normal index with data)
3305        dataset
3306            .create_index_builder(&[column_name], index_type, params.as_ref())
3307            .name("index".to_string())
3308            .train(true)
3309            .await
3310            .unwrap();
3311
3312        // Verify index was created and has indexed rows
3313        let stats = dataset.index_statistics("index").await.unwrap();
3314        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3315        assert_eq!(
3316            stats["num_indexed_rows"], 100,
3317            "Index should have indexed all 100 rows"
3318        );
3319
3320        // Verify index is being used in queries before delete
3321        let plan = if column_name == "text" {
3322            // Use full-text search for inverted index
3323            dataset
3324                .scan()
3325                .full_text_search(FullTextSearchQuery::new("test".to_string()))
3326                .unwrap()
3327                .explain_plan(false)
3328                .await
3329                .unwrap()
3330        } else {
3331            // Use equality filter for btree/bitmap indices
3332            dataset
3333                .scan()
3334                .filter(format!("{} = 50", column_name).as_str())
3335                .unwrap()
3336                .explain_plan(false)
3337                .await
3338                .unwrap()
3339        };
3340        // Verify index is being used before delete
3341        assert_index_usage(&plan, column_name, true, "before delete");
3342
3343        let indexes = dataset.load_indices().await.unwrap();
3344        let original_index = indexes[0].clone();
3345
3346        // Delete all rows from the table
3347        dataset.delete("true").await.unwrap();
3348
3349        // Verify table is empty
3350        let row_count = dataset.count_rows(None).await.unwrap();
3351        assert_eq!(row_count, 0, "Table should be empty after delete all");
3352
3353        // Critical test: Verify the index still exists after deleting all data
3354        let indices_after_delete = dataset.load_indices().await.unwrap();
3355        assert_eq!(
3356            indices_after_delete.len(),
3357            1,
3358            "Index should be retained after deleting all data"
3359        );
3360        assert_eq!(
3361            indices_after_delete[0].name, "index",
3362            "Index name should remain the same after delete"
3363        );
3364
3365        // Critical test: Verify the fragment bitmap is empty after delete
3366        let index_after_delete = &indices_after_delete[0];
3367        let effective_bitmap = index_after_delete
3368            .effective_fragment_bitmap(&dataset.fragment_bitmap)
3369            .unwrap();
3370        assert!(
3371            effective_bitmap.is_empty(),
3372            "Effective bitmap should be empty after deleting all data"
3373        );
3374        assert_eq!(
3375            index_after_delete.fragment_bitmap, original_index.fragment_bitmap,
3376            "Fragment bitmap should remain the same after delete"
3377        );
3378
3379        // Verify we can still get index statistics
3380        let stats = dataset.index_statistics("index").await.unwrap();
3381        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3382        assert_eq!(
3383            stats["num_indexed_rows"], 0,
3384            "Index should now report zero indexed rows after delete all"
3385        );
3386        assert_eq!(
3387            stats["num_unindexed_rows"], 0,
3388            "Index should report zero unindexed rows after delete all"
3389        );
3390        assert_eq!(
3391            stats["num_indexed_fragments"], 0,
3392            "Index should report zero indexed fragments after delete all"
3393        );
3394        assert_eq!(
3395            stats["num_unindexed_fragments"], 0,
3396            "Index should report zero unindexed fragments after delete all"
3397        );
3398
3399        // Verify index is NOT being used in queries after delete (empty bitmap)
3400        if column_name == "text" {
3401            // Inverted indexes will still appear to be used in FTS queries.
3402            // TODO: once metrics are working on FTS queries, we can check the
3403            // analyze plan output instead for index usage.
3404            let _plan_after_delete = dataset
3405                .scan()
3406                .project(&[column_name])
3407                .unwrap()
3408                .full_text_search(FullTextSearchQuery::new("test".to_string()))
3409                .unwrap()
3410                .explain_plan(false)
3411                .await
3412                .unwrap();
3413            assert_index_usage(
3414                &_plan_after_delete,
3415                column_name,
3416                true,
3417                "after delete (empty bitmap)",
3418            );
3419        } else {
3420            // Use equality filter for btree/bitmap indices
3421            let _plan_after_delete = dataset
3422                .scan()
3423                .filter(format!("{} = 50", column_name).as_str())
3424                .unwrap()
3425                .explain_plan(false)
3426                .await
3427                .unwrap();
3428            // Verify index is NOT being used after delete (empty bitmap)
3429            assert_index_usage(
3430                &_plan_after_delete,
3431                column_name,
3432                false,
3433                "after delete (empty bitmap)",
3434            );
3435        }
3436
3437        // Test that we can append new data and the index is still there
3438        let append_reader = lance_datagen::gen_batch()
3439            .col("i", array::step::<Int32Type>())
3440            .col("text", array::rand_utf8(ByteCount::from(10), false))
3441            .into_reader_rows(RowCount::from(50), BatchCount::from(1));
3442
3443        dataset.append(append_reader, None).await.unwrap();
3444
3445        // Verify index still exists after append
3446        let indices_after_append = dataset.load_indices().await.unwrap();
3447        assert_eq!(
3448            indices_after_append.len(),
3449            1,
3450            "Index should still exist after appending to empty table"
3451        );
3452        let stats = dataset.index_statistics("index").await.unwrap();
3453        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3454        assert_eq!(
3455            stats["num_indexed_rows"], 0,
3456            "Index should now report zero indexed rows after data is added"
3457        );
3458
3459        // Test optimize_indices after delete all
3460        dataset.optimize_indices(&Default::default()).await.unwrap();
3461
3462        // Verify index still exists after optimization
3463        let indices_after_optimize = dataset.load_indices().await.unwrap();
3464        assert_eq!(
3465            indices_after_optimize.len(),
3466            1,
3467            "Index should still exist after optimization following delete all"
3468        );
3469
3470        // Verify we can still get index statistics after optimization
3471        let stats = dataset.index_statistics("index").await.unwrap();
3472        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3473        assert_eq!(
3474            stats["num_indexed_rows"],
3475            dataset.count_rows(None).await.unwrap(),
3476            "Index should now cover all newly added rows after optimization"
3477        );
3478    }
3479
3480    /// Test that scalar indices are retained after updating rows in a table.
3481    ///
3482    /// This test verifies that when we:
3483    /// 1. Create a table with data
3484    /// 2. Add a scalar index with train=true
3485    /// 3. Update rows in the table
3486    /// The index remains available on the table.
3487    #[rstest]
3488    #[case::btree("i", IndexType::BTree, Box::new(ScalarIndexParams::default()))]
3489    #[case::bitmap("i", IndexType::Bitmap, Box::new(ScalarIndexParams::default()))]
3490    #[case::inverted("text", IndexType::Inverted, Box::new(InvertedIndexParams::default()))]
3491    #[tokio::test]
3492    async fn test_scalar_index_retained_after_update(
3493        #[case] column_name: &str,
3494        #[case] index_type: IndexType,
3495        #[case] params: Box<dyn IndexParams>,
3496    ) {
3497        use crate::dataset::UpdateBuilder;
3498        use lance_datagen::{array, BatchCount, ByteCount, RowCount};
3499
3500        // Create dataset with initial data
3501        let reader = lance_datagen::gen_batch()
3502            .col("i", array::step::<Int32Type>())
3503            .col("text", array::rand_utf8(ByteCount::from(10), false))
3504            .into_reader_rows(RowCount::from(100), BatchCount::from(1));
3505        let mut dataset = Dataset::write(reader, "memory://test", None).await.unwrap();
3506
3507        // Create index with train=true (normal index with data)
3508        dataset
3509            .create_index_builder(&[column_name], index_type, params.as_ref())
3510            .name("index".to_string())
3511            .train(true)
3512            .await
3513            .unwrap();
3514
3515        // Verify index was created and has indexed rows
3516        let stats = dataset.index_statistics("index").await.unwrap();
3517        let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
3518        assert_eq!(
3519            stats["num_indexed_rows"], 100,
3520            "Index should have indexed all 100 rows"
3521        );
3522
3523        // Verify index is being used in queries before update
3524        let plan = if column_name == "text" {
3525            // Use full-text search for inverted index
3526            dataset
3527                .scan()
3528                .project(&[column_name])
3529                .unwrap()
3530                .full_text_search(FullTextSearchQuery::new("test".to_string()))
3531                .unwrap()
3532                .explain_plan(false)
3533                .await
3534                .unwrap()
3535        } else {
3536            // Use equality filter for btree/bitmap indices
3537            dataset
3538                .scan()
3539                .filter(format!("{} = 50", column_name).as_str())
3540                .unwrap()
3541                .explain_plan(false)
3542                .await
3543                .unwrap()
3544        };
3545        // Verify index is being used before update
3546        assert_index_usage(&plan, column_name, true, "before update");
3547
3548        // Update some rows - update first 50 rows
3549        let update_result = UpdateBuilder::new(Arc::new(dataset))
3550            .set("i", "i + 1000")
3551            .unwrap()
3552            .set("text", "'updated_' || text")
3553            .unwrap()
3554            .build()
3555            .unwrap()
3556            .execute()
3557            .await
3558            .unwrap();
3559
3560        let mut dataset = update_result.new_dataset.as_ref().clone();
3561
3562        // Verify row count remains the same
3563        let row_count = dataset.count_rows(None).await.unwrap();
3564        assert_eq!(row_count, 100, "Row count should remain 100 after update");
3565
3566        // Critical test: Verify the index still exists after updating data
3567        let indices_after_update = dataset.load_indices().await.unwrap();
3568        assert_eq!(
3569            indices_after_update.len(),
3570            1,
3571            "Index should be retained after updating rows"
3572        );
3573
3574        // Critical test: Verify the effective fragment bitmap is empty after update
3575        let indices = dataset.load_indices().await.unwrap();
3576        let index = &indices[0];
3577        let effective_bitmap = index
3578            .effective_fragment_bitmap(&dataset.fragment_bitmap)
3579            .unwrap();
3580        assert!(
3581            effective_bitmap.is_empty(),
3582            "Effective fragment bitmap should be empty after updating all data"
3583        );
3584
3585        // Verify we can still get index statistics
3586        let stats_after_update = dataset.index_statistics("index").await.unwrap();
3587        let stats_after_update: serde_json::Value =
3588            serde_json::from_str(&stats_after_update).unwrap();
3589
3590        // The index should still be available
3591        assert_eq!(
3592            stats_after_update["num_indexed_rows"], 0,
3593            "Index statistics should be zero after update, as it is not re-trained"
3594        );
3595
3596        // Verify index behavior in queries after update (empty bitmap)
3597        if column_name == "text" {
3598            // Inverted indexes will still appear to be used in FTS queries even with empty bitmaps.
3599            // This is because FTS queries require an index to exist, and the query execution
3600            // will handle the empty bitmap appropriately by falling back to scanning unindexed data.
3601            // TODO: once metrics are working on FTS queries, we can check the
3602            // analyze plan output instead for actual index usage statistics.
3603            let _plan_after_update = dataset
3604                .scan()
3605                .project(&[column_name])
3606                .unwrap()
3607                .full_text_search(FullTextSearchQuery::new("test".to_string()))
3608                .unwrap()
3609                .explain_plan(false)
3610                .await
3611                .unwrap();
3612            assert_index_usage(
3613                &_plan_after_update,
3614                column_name,
3615                true, // FTS indices always appear in the plan, even with empty bitmaps
3616                "after update (empty bitmap)",
3617            );
3618        } else {
3619            // Use equality filter for btree/bitmap indices
3620            let _plan_after_update = dataset
3621                .scan()
3622                .filter(format!("{} = 50", column_name).as_str())
3623                .unwrap()
3624                .explain_plan(false)
3625                .await
3626                .unwrap();
3627            // With immutable bitmaps, index is still used even with empty effective bitmap
3628            // The prefilter will handle non-existent fragments
3629            assert_index_usage(
3630                &_plan_after_update,
3631                column_name,
3632                false,
3633                "after update (empty effective bitmap)",
3634            );
3635        }
3636
3637        // Test that we can optimize indices after update
3638        dataset.optimize_indices(&Default::default()).await.unwrap();
3639
3640        // Verify index still exists after optimization
3641        let indices_after_optimize = dataset.load_indices().await.unwrap();
3642        assert_eq!(
3643            indices_after_optimize.len(),
3644            1,
3645            "Index should still exist after optimization following update"
3646        );
3647
3648        let stats_after_optimization = dataset.index_statistics("index").await.unwrap();
3649        let stats_after_optimization: serde_json::Value =
3650            serde_json::from_str(&stats_after_optimization).unwrap();
3651
3652        // The index should still be available
3653        assert_eq!(
3654            stats_after_optimization["num_unindexed_rows"], 0,
3655            "Index should have zero unindexed rows after optimization"
3656        );
3657    }
3658
3659    // Helper function to validate indices after each clone iteration
3660    async fn validate_indices_after_clone(
3661        dataset: &Dataset,
3662        round: usize,
3663        expected_scalar_rows: usize,
3664        dimensions: u32,
3665    ) {
3666        // Verify cloned dataset has indices
3667        let indices = dataset.load_indices().await.unwrap();
3668        assert_eq!(
3669            indices.len(),
3670            2,
3671            "Round {}: Cloned dataset should have 2 indices",
3672            round
3673        );
3674        let index_names: HashSet<String> = indices.iter().map(|idx| idx.name.clone()).collect();
3675        assert!(
3676            index_names.contains("vector_idx"),
3677            "Round {}: Should contain vector_idx",
3678            round
3679        );
3680        assert!(
3681            index_names.contains("category_idx"),
3682            "Round {}: Should contain category_idx",
3683            round
3684        );
3685
3686        // Test basic data access without using indices for now
3687        // This ensures the dataset is accessible and data is intact
3688        // In chain cloning, each round adds 50 rows to the previous dataset
3689        // Round 1: 300 (original), Round 2: 350 (300 + 50 from round 1), Round 3: 400 (350 + 50 from round 2)
3690        let expected_total_rows = 300 + (round - 1) * 50;
3691        let total_rows = dataset.count_rows(None).await.unwrap();
3692        assert_eq!(
3693            total_rows, expected_total_rows,
3694            "Round {}: Should have {} rows after clone (chain cloning accumulates data)",
3695            round, expected_total_rows
3696        );
3697
3698        // Verify vector search
3699        let query_vector = generate_random_array(dimensions as usize);
3700        let search_results = dataset
3701            .scan()
3702            .nearest("vector", &query_vector, 5)
3703            .unwrap()
3704            .limit(Some(5), None)
3705            .unwrap()
3706            .try_into_batch()
3707            .await
3708            .unwrap();
3709        assert!(
3710            search_results.num_rows() > 0,
3711            "Round {}: Vector search should return results immediately after clone",
3712            round
3713        );
3714
3715        // Test basic scalar query to verify data integrity
3716        let scalar_results = dataset
3717            .scan()
3718            .filter("category = 'category_0'")
3719            .unwrap()
3720            .try_into_batch()
3721            .await
3722            .unwrap();
3723
3724        assert_eq!(
3725            expected_scalar_rows,
3726            scalar_results.num_rows(),
3727            "Round {}: Scalar query should return {} results",
3728            round,
3729            expected_scalar_rows
3730        );
3731    }
3732
3733    #[tokio::test]
3734    async fn test_shallow_clone_with_index() {
3735        let test_dir = TempStrDir::default();
3736        let test_uri = &test_dir;
3737
3738        // Create a schema with both vector and scalar columns
3739        let dimensions = 16u32;
3740        // Generate test data using lance_datagen (300 rows to satisfy PQ training requirements)
3741        let data = gen_batch()
3742            .col("id", array::step::<Int32Type>())
3743            .col("category", array::fill_utf8("category_0".to_string()))
3744            .col(
3745                "vector",
3746                array::rand_vec::<Float32Type>(Dimension::from(dimensions)),
3747            )
3748            .into_reader_rows(RowCount::from(300), BatchCount::from(1));
3749
3750        // Create initial dataset
3751        let mut dataset = Dataset::write(data, test_uri, None).await.unwrap();
3752        // Create vector index (IVF_PQ)
3753        let vector_params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10);
3754        dataset
3755            .create_index(
3756                &["vector"],
3757                IndexType::Vector,
3758                Some("vector_idx".to_string()),
3759                &vector_params,
3760                true,
3761            )
3762            .await
3763            .unwrap();
3764
3765        // Create scalar index (BTree)
3766        dataset
3767            .create_index(
3768                &["category"],
3769                IndexType::BTree,
3770                Some("category_idx".to_string()),
3771                &ScalarIndexParams::default(),
3772                true,
3773            )
3774            .await
3775            .unwrap();
3776
3777        // Verify indices were created
3778        let indices = dataset.load_indices().await.unwrap();
3779        assert_eq!(indices.len(), 2, "Should have 2 indices");
3780        let index_names: HashSet<String> = indices.iter().map(|idx| idx.name.clone()).collect();
3781        assert!(index_names.contains("vector_idx"));
3782        assert!(index_names.contains("category_idx"));
3783
3784        // Test scalar query on source dataset
3785        let scalar_results = dataset
3786            .scan()
3787            .filter("category = 'category_0'")
3788            .unwrap()
3789            .try_into_batch()
3790            .await
3791            .unwrap();
3792
3793        let source_scalar_query_rows = scalar_results.num_rows();
3794        assert!(
3795            scalar_results.num_rows() > 0,
3796            "Scalar query should return results"
3797        );
3798
3799        // Multiple shallow clone iterations test with chain cloning
3800        let clone_rounds = 3;
3801        let mut current_dataset = dataset;
3802
3803        for round in 1..=clone_rounds {
3804            let round_clone_dir = format!("{}/clone_round_{}", test_dir, round);
3805            let round_cloned_uri = &round_clone_dir;
3806            let tag_name = format!("shallow_clone_test_{}", round);
3807
3808            // Create tag for this round (use current dataset for chain cloning)
3809            let current_version = current_dataset.version().version;
3810            current_dataset
3811                .tags()
3812                .create(&tag_name, current_version)
3813                .await
3814                .unwrap();
3815
3816            // Perform shallow clone for this round (chain cloning from current dataset)
3817            let mut round_cloned_dataset = current_dataset
3818                .shallow_clone(round_cloned_uri, tag_name.as_str(), None)
3819                .await
3820                .unwrap();
3821
3822            // Immediately validate indices after each clone
3823            validate_indices_after_clone(
3824                &round_cloned_dataset,
3825                round,
3826                source_scalar_query_rows,
3827                dimensions,
3828            )
3829            .await;
3830
3831            // Complete validation cycle after each clone: append data, optimize index, validate
3832            // Append new data to the cloned dataset
3833            let new_data = gen_batch()
3834                .col(
3835                    "id",
3836                    array::step_custom::<Int32Type>(300 + (round * 50) as i32, 1),
3837                )
3838                .col("category", array::fill_utf8(format!("category_{}", round)))
3839                .col(
3840                    "vector",
3841                    array::rand_vec::<Float32Type>(Dimension::from(dimensions)),
3842                )
3843                .into_reader_rows(RowCount::from(50), BatchCount::from(1));
3844
3845            round_cloned_dataset = Dataset::write(
3846                new_data,
3847                round_cloned_uri,
3848                Some(WriteParams {
3849                    mode: WriteMode::Append,
3850                    ..Default::default()
3851                }),
3852            )
3853            .await
3854            .unwrap();
3855
3856            // Verify row count increased
3857            let expected_rows = 300 + round * 50;
3858            let total_rows = round_cloned_dataset.count_rows(None).await.unwrap();
3859            assert_eq!(
3860                total_rows, expected_rows,
3861                "Round {}: Should have {} rows after append",
3862                round, expected_rows
3863            );
3864
3865            let indices_before_optimize = round_cloned_dataset.load_indices().await.unwrap();
3866            let vector_idx_before = indices_before_optimize
3867                .iter()
3868                .find(|idx| idx.name == "vector_idx")
3869                .unwrap();
3870            let category_idx_before = indices_before_optimize
3871                .iter()
3872                .find(|idx| idx.name == "category_idx")
3873                .unwrap();
3874
3875            // Optimize indices
3876            round_cloned_dataset
3877                .optimize_indices(&OptimizeOptions::merge(indices_before_optimize.len()))
3878                .await
3879                .unwrap();
3880
3881            // Verify index UUID has changed
3882            let optimized_indices = round_cloned_dataset.load_indices().await.unwrap();
3883            let new_vector_idx = optimized_indices
3884                .iter()
3885                .find(|idx| idx.name == "vector_idx")
3886                .unwrap();
3887            let new_category_idx = optimized_indices
3888                .iter()
3889                .find(|idx| idx.name == "category_idx")
3890                .unwrap();
3891
3892            assert_ne!(
3893                new_vector_idx.uuid, vector_idx_before.uuid,
3894                "Round {}: Vector index should have a new UUID after optimization",
3895                round
3896            );
3897            assert_ne!(
3898                new_category_idx.uuid, category_idx_before.uuid,
3899                "Round {}: Category index should have a new UUID after optimization",
3900                round
3901            );
3902
3903            // Verify the location of index files
3904            use std::path::PathBuf;
3905            let clone_indices_dir = PathBuf::from(round_cloned_uri).join("_indices");
3906            let vector_index_dir = clone_indices_dir.join(new_vector_idx.uuid.to_string());
3907            let category_index_dir = clone_indices_dir.join(new_category_idx.uuid.to_string());
3908
3909            assert!(
3910                vector_index_dir.exists(),
3911                "Round {}: New vector index directory should exist in cloned dataset location: {:?}",
3912                round, vector_index_dir
3913            );
3914            assert!(
3915                category_index_dir.exists(),
3916                "Round {}: New category index directory should exist in cloned dataset location: {:?}",
3917                round, category_index_dir
3918            );
3919
3920            // Verify base id
3921            assert!(
3922                new_vector_idx.base_id.is_none(),
3923                "Round {}: New vector index should not have base_id after optimization in cloned dataset",
3924                round
3925            );
3926            assert!(
3927                new_category_idx.base_id.is_none(),
3928                "Round {}: New category index should not have base_id after optimization in cloned dataset",
3929                round
3930            );
3931
3932            // Verify the source location does NOT contain new data
3933            let original_indices_dir = PathBuf::from(current_dataset.uri()).join("_indices");
3934            let wrong_vector_dir = original_indices_dir.join(new_vector_idx.uuid.to_string());
3935            let wrong_category_dir = original_indices_dir.join(new_category_idx.uuid.to_string());
3936
3937            assert!(
3938                !wrong_vector_dir.exists(),
3939                "Round {}: New vector index should NOT be in original dataset location: {:?}",
3940                round,
3941                wrong_vector_dir
3942            );
3943            assert!(
3944                !wrong_category_dir.exists(),
3945                "Round {}: New category index should NOT be in original dataset location: {:?}",
3946                round,
3947                wrong_category_dir
3948            );
3949
3950            // Validate data integrity and index functionality after optimization
3951            let old_category_results = round_cloned_dataset
3952                .scan()
3953                .filter("category = 'category_0'")
3954                .unwrap()
3955                .try_into_batch()
3956                .await
3957                .unwrap();
3958
3959            let new_category_results = round_cloned_dataset
3960                .scan()
3961                .filter(&format!("category = 'category_{}'", round))
3962                .unwrap()
3963                .try_into_batch()
3964                .await
3965                .unwrap();
3966
3967            assert_eq!(
3968                source_scalar_query_rows,
3969                old_category_results.num_rows(),
3970                "Round {}: Should find old category data with {} rows",
3971                round,
3972                source_scalar_query_rows
3973            );
3974            assert!(
3975                new_category_results.num_rows() > 0,
3976                "Round {}: Should find new category data",
3977                round
3978            );
3979
3980            // Test vector search functionality
3981            let query_vector = generate_random_array(dimensions as usize);
3982            let search_results = round_cloned_dataset
3983                .scan()
3984                .nearest("vector", &query_vector, 10)
3985                .unwrap()
3986                .limit(Some(10), None)
3987                .unwrap()
3988                .try_into_batch()
3989                .await
3990                .unwrap();
3991
3992            assert!(
3993                search_results.num_rows() > 0,
3994                "Round {}: Vector search should return results after optimization",
3995                round
3996            );
3997
3998            // Verify statistic of indexes
3999            let vector_stats: serde_json::Value = serde_json::from_str(
4000                &round_cloned_dataset
4001                    .index_statistics("vector_idx")
4002                    .await
4003                    .unwrap(),
4004            )
4005            .unwrap();
4006            let category_stats: serde_json::Value = serde_json::from_str(
4007                &round_cloned_dataset
4008                    .index_statistics("category_idx")
4009                    .await
4010                    .unwrap(),
4011            )
4012            .unwrap();
4013
4014            assert_eq!(
4015                vector_stats["num_indexed_rows"].as_u64().unwrap(),
4016                expected_rows as u64,
4017                "Round {}: Vector index should have {} indexed rows",
4018                round,
4019                expected_rows
4020            );
4021            assert_eq!(
4022                category_stats["num_indexed_rows"].as_u64().unwrap(),
4023                expected_rows as u64,
4024                "Round {}: Category index should have {} indexed rows",
4025                round,
4026                expected_rows
4027            );
4028
4029            // Prepare for next round: use the cloned dataset as the source for next clone
4030            current_dataset = round_cloned_dataset;
4031        }
4032
4033        // Use the final cloned dataset for any remaining tests
4034        let final_cloned_dataset = current_dataset;
4035
4036        // Verify cloned dataset has indices
4037        let cloned_indices = final_cloned_dataset.load_indices().await.unwrap();
4038        assert_eq!(
4039            cloned_indices.len(),
4040            2,
4041            "Final cloned dataset should have 2 indices"
4042        );
4043        let cloned_index_names: HashSet<String> =
4044            cloned_indices.iter().map(|idx| idx.name.clone()).collect();
4045        assert!(cloned_index_names.contains("vector_idx"));
4046        assert!(cloned_index_names.contains("category_idx"));
4047
4048        // Test vector search on final cloned dataset
4049        let query_vector = generate_random_array(dimensions as usize);
4050        let search_results = final_cloned_dataset
4051            .scan()
4052            .nearest("vector", &query_vector, 5)
4053            .unwrap()
4054            .limit(Some(5), None)
4055            .unwrap()
4056            .try_into_batch()
4057            .await
4058            .unwrap();
4059
4060        assert!(
4061            search_results.num_rows() > 0,
4062            "Vector search should return results on final dataset"
4063        );
4064
4065        // Test scalar query on final cloned dataset
4066        let scalar_results = final_cloned_dataset
4067            .scan()
4068            .filter("category = 'category_0'")
4069            .unwrap()
4070            .try_into_batch()
4071            .await
4072            .unwrap();
4073
4074        assert_eq!(
4075            source_scalar_query_rows,
4076            scalar_results.num_rows(),
4077            "Scalar query should return results on final dataset"
4078        );
4079    }
4080
4081    #[tokio::test]
4082    async fn test_initialize_indices() {
4083        use crate::dataset::Dataset;
4084        use arrow_array::types::Float32Type;
4085        use lance_core::utils::tempfile::TempStrDir;
4086        use lance_datagen::{array, BatchCount, RowCount};
4087        use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
4088        use lance_linalg::distance::MetricType;
4089        use std::collections::HashSet;
4090
4091        // Create source dataset with various index types
4092        let test_dir = TempStrDir::default();
4093        let source_uri = format!("{}/{}", test_dir, "source");
4094        let target_uri = format!("{}/{}", test_dir, "target");
4095
4096        // Generate test data using lance_datagen (need at least 256 rows for PQ training)
4097        let source_reader = lance_datagen::gen_batch()
4098            .col("vector", array::rand_vec::<Float32Type>(8.into()))
4099            .col(
4100                "text",
4101                array::cycle_utf8_literals(&["hello world", "foo bar", "test data"]),
4102            )
4103            .col("id", array::step::<Int32Type>())
4104            .into_reader_rows(RowCount::from(300), BatchCount::from(1));
4105
4106        // Create source dataset
4107        let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
4108            .await
4109            .unwrap();
4110
4111        // Create indices on source dataset
4112        // 1. Vector index
4113        let vector_params = VectorIndexParams::ivf_pq(4, 8, 2, MetricType::L2, 10);
4114        source_dataset
4115            .create_index(
4116                &["vector"],
4117                IndexType::Vector,
4118                Some("vec_idx".to_string()),
4119                &vector_params,
4120                false,
4121            )
4122            .await
4123            .unwrap();
4124
4125        // 2. FTS index
4126        let fts_params = InvertedIndexParams::default();
4127        source_dataset
4128            .create_index(
4129                &["text"],
4130                IndexType::Inverted,
4131                Some("text_idx".to_string()),
4132                &fts_params,
4133                false,
4134            )
4135            .await
4136            .unwrap();
4137
4138        // 3. Scalar index
4139        let scalar_params = ScalarIndexParams::default();
4140        source_dataset
4141            .create_index(
4142                &["id"],
4143                IndexType::BTree,
4144                Some("id_idx".to_string()),
4145                &scalar_params,
4146                false,
4147            )
4148            .await
4149            .unwrap();
4150
4151        // Reload source dataset to get updated index metadata
4152        let source_dataset = Dataset::open(&source_uri).await.unwrap();
4153
4154        // Verify source has 3 indices
4155        let source_indices = source_dataset.load_indices().await.unwrap();
4156        assert_eq!(
4157            source_indices.len(),
4158            3,
4159            "Source dataset should have 3 indices"
4160        );
4161
4162        // Create target dataset with same schema but different data (need at least 256 rows for PQ)
4163        let target_reader = lance_datagen::gen_batch()
4164            .col("vector", array::rand_vec::<Float32Type>(8.into()))
4165            .col(
4166                "text",
4167                array::cycle_utf8_literals(&["foo bar", "test data", "hello world"]),
4168            )
4169            .col("id", array::step_custom::<Int32Type>(100, 1))
4170            .into_reader_rows(RowCount::from(300), BatchCount::from(1));
4171        let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
4172            .await
4173            .unwrap();
4174
4175        // Initialize indices from source dataset
4176        target_dataset
4177            .initialize_indices(&source_dataset)
4178            .await
4179            .unwrap();
4180
4181        // Verify target has same indices
4182        let target_indices = target_dataset.load_indices().await.unwrap();
4183        assert_eq!(
4184            target_indices.len(),
4185            3,
4186            "Target dataset should have 3 indices after initialization"
4187        );
4188
4189        // Check index names match
4190        let source_names: HashSet<String> =
4191            source_indices.iter().map(|idx| idx.name.clone()).collect();
4192        let target_names: HashSet<String> =
4193            target_indices.iter().map(|idx| idx.name.clone()).collect();
4194        assert_eq!(
4195            source_names, target_names,
4196            "Index names should match between source and target"
4197        );
4198
4199        // Verify indices are functional by running queries
4200        // 1. Test vector index
4201        let query_vector = generate_random_array(8);
4202        let search_results = target_dataset
4203            .scan()
4204            .nearest("vector", &query_vector, 5)
4205            .unwrap()
4206            .limit(Some(5), None)
4207            .unwrap()
4208            .try_into_batch()
4209            .await
4210            .unwrap();
4211        assert!(
4212            search_results.num_rows() > 0,
4213            "Vector index should be functional"
4214        );
4215
4216        // 2. Test scalar index
4217        let scalar_results = target_dataset
4218            .scan()
4219            .filter("id = 125")
4220            .unwrap()
4221            .try_into_batch()
4222            .await
4223            .unwrap();
4224        assert_eq!(
4225            scalar_results.num_rows(),
4226            1,
4227            "Scalar index should find exact match"
4228        );
4229    }
4230
4231    #[tokio::test]
4232    async fn test_initialize_indices_with_missing_field() {
4233        use crate::dataset::Dataset;
4234        use arrow_array::types::Int32Type;
4235        use lance_core::utils::tempfile::TempStrDir;
4236        use lance_datagen::{array, BatchCount, RowCount};
4237        use lance_index::scalar::ScalarIndexParams;
4238
4239        // Test that initialize_indices handles missing fields gracefully
4240        let test_dir = TempStrDir::default();
4241        let source_uri = format!("{}/{}", test_dir, "source");
4242        let target_uri = format!("{}/{}", test_dir, "target");
4243
4244        // Create source dataset with extra field
4245        let source_reader = lance_datagen::gen_batch()
4246            .col("id", array::step::<Int32Type>())
4247            .col("extra", array::cycle_utf8_literals(&["test"]))
4248            .into_reader_rows(RowCount::from(10), BatchCount::from(1));
4249        let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
4250            .await
4251            .unwrap();
4252
4253        // Create index on extra field in source
4254        source_dataset
4255            .create_index(
4256                &["extra"],
4257                IndexType::BTree,
4258                None,
4259                &ScalarIndexParams::default(),
4260                false,
4261            )
4262            .await
4263            .unwrap();
4264
4265        // Create target dataset without extra field
4266        let target_reader = lance_datagen::gen_batch()
4267            .col("id", array::step_custom::<Int32Type>(10, 1))
4268            .into_reader_rows(RowCount::from(10), BatchCount::from(1));
4269        let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
4270            .await
4271            .unwrap();
4272
4273        // Initialize indices should skip the index on missing field with an error
4274        let result = target_dataset.initialize_indices(&source_dataset).await;
4275
4276        // Should fail when field is missing
4277        assert!(result.is_err(), "Should error when field is missing");
4278        assert!(result
4279            .unwrap_err()
4280            .to_string()
4281            .contains("not found in target dataset"));
4282    }
4283
4284    #[tokio::test]
4285    async fn test_initialize_single_index() {
4286        use crate::dataset::Dataset;
4287        use crate::index::vector::VectorIndexParams;
4288        use arrow_array::types::{Float32Type, Int32Type};
4289        use lance_core::utils::tempfile::TempStrDir;
4290        use lance_datagen::{array, BatchCount, RowCount};
4291        use lance_index::scalar::ScalarIndexParams;
4292        use lance_linalg::distance::MetricType;
4293
4294        let test_dir = TempStrDir::default();
4295        let source_uri = format!("{}/{}", test_dir, "source");
4296        let target_uri = format!("{}/{}", test_dir, "target");
4297
4298        // Create source dataset (need at least 256 rows for PQ training)
4299        let source_reader = lance_datagen::gen_batch()
4300            .col("id", array::step::<Int32Type>())
4301            .col("name", array::rand_utf8(4.into(), false))
4302            .col("vector", array::rand_vec::<Float32Type>(8.into()))
4303            .into_reader_rows(RowCount::from(300), BatchCount::from(1));
4304        let mut source_dataset = Dataset::write(source_reader, &source_uri, None)
4305            .await
4306            .unwrap();
4307
4308        // Create multiple indices on source
4309        let scalar_params = ScalarIndexParams::default();
4310        source_dataset
4311            .create_index(
4312                &["id"],
4313                IndexType::BTree,
4314                Some("id_index".to_string()),
4315                &scalar_params,
4316                false,
4317            )
4318            .await
4319            .unwrap();
4320
4321        let vector_params = VectorIndexParams::ivf_pq(16, 8, 4, MetricType::L2, 50);
4322        source_dataset
4323            .create_index(
4324                &["vector"],
4325                IndexType::Vector,
4326                Some("vector_index".to_string()),
4327                &vector_params,
4328                false,
4329            )
4330            .await
4331            .unwrap();
4332
4333        // Reload source dataset to get updated index metadata
4334        let source_dataset = Dataset::open(&source_uri).await.unwrap();
4335
4336        // Create target dataset with same schema
4337        let target_reader = lance_datagen::gen_batch()
4338            .col("id", array::step::<Int32Type>())
4339            .col("name", array::rand_utf8(4.into(), false))
4340            .col("vector", array::rand_vec::<Float32Type>(8.into()))
4341            .into_reader_rows(RowCount::from(300), BatchCount::from(1));
4342        let mut target_dataset = Dataset::write(target_reader, &target_uri, None)
4343            .await
4344            .unwrap();
4345
4346        // Initialize only the vector index
4347        target_dataset
4348            .initialize_index(&source_dataset, "vector_index")
4349            .await
4350            .unwrap();
4351
4352        // Verify only vector index was created
4353        let target_indices = target_dataset.load_indices().await.unwrap();
4354        assert_eq!(target_indices.len(), 1, "Should have only 1 index");
4355        assert_eq!(
4356            target_indices[0].name, "vector_index",
4357            "Should have the vector index"
4358        );
4359
4360        // Initialize the scalar index
4361        target_dataset
4362            .initialize_index(&source_dataset, "id_index")
4363            .await
4364            .unwrap();
4365
4366        // Verify both indices now exist
4367        let target_indices = target_dataset.load_indices().await.unwrap();
4368        assert_eq!(target_indices.len(), 2, "Should have 2 indices");
4369
4370        let index_names: HashSet<String> =
4371            target_indices.iter().map(|idx| idx.name.clone()).collect();
4372        assert!(
4373            index_names.contains("vector_index"),
4374            "Should have vector index"
4375        );
4376        assert!(index_names.contains("id_index"), "Should have id index");
4377
4378        // Test error case - non-existent index
4379        let result = target_dataset
4380            .initialize_index(&source_dataset, "non_existent")
4381            .await;
4382        assert!(result.is_err(), "Should error for non-existent index");
4383        assert!(result
4384            .unwrap_err()
4385            .to_string()
4386            .contains("not found in source dataset"));
4387    }
4388
4389    #[tokio::test]
4390    async fn test_vector_index_on_nested_field_with_dots() {
4391        let dimensions = 16;
4392        let num_rows = 256;
4393
4394        // Create schema with nested field containing dots in the name
4395        let struct_field = Field::new(
4396            "embedding_data",
4397            DataType::Struct(
4398                vec![
4399                    Field::new(
4400                        "vector.v1", // Field name with dot
4401                        DataType::FixedSizeList(
4402                            Arc::new(Field::new("item", DataType::Float32, true)),
4403                            dimensions,
4404                        ),
4405                        false,
4406                    ),
4407                    Field::new(
4408                        "vector.v2", // Another field name with dot
4409                        DataType::FixedSizeList(
4410                            Arc::new(Field::new("item", DataType::Float32, true)),
4411                            dimensions,
4412                        ),
4413                        false,
4414                    ),
4415                    Field::new("metadata", DataType::Utf8, false),
4416                ]
4417                .into(),
4418            ),
4419            false,
4420        );
4421
4422        let schema = Arc::new(Schema::new(vec![
4423            Field::new("id", DataType::Int32, false),
4424            struct_field,
4425        ]));
4426
4427        // Generate test data
4428        let float_arr_v1 = generate_random_array(num_rows * dimensions as usize);
4429        let vectors_v1 = FixedSizeListArray::try_new_from_values(float_arr_v1, dimensions).unwrap();
4430
4431        let float_arr_v2 = generate_random_array(num_rows * dimensions as usize);
4432        let vectors_v2 = FixedSizeListArray::try_new_from_values(float_arr_v2, dimensions).unwrap();
4433
4434        let ids = Int32Array::from_iter_values(0..num_rows as i32);
4435        let metadata = StringArray::from_iter_values((0..num_rows).map(|i| format!("meta_{}", i)));
4436
4437        let struct_array = arrow_array::StructArray::from(vec![
4438            (
4439                Arc::new(Field::new(
4440                    "vector.v1",
4441                    DataType::FixedSizeList(
4442                        Arc::new(Field::new("item", DataType::Float32, true)),
4443                        dimensions,
4444                    ),
4445                    false,
4446                )),
4447                Arc::new(vectors_v1) as Arc<dyn arrow_array::Array>,
4448            ),
4449            (
4450                Arc::new(Field::new(
4451                    "vector.v2",
4452                    DataType::FixedSizeList(
4453                        Arc::new(Field::new("item", DataType::Float32, true)),
4454                        dimensions,
4455                    ),
4456                    false,
4457                )),
4458                Arc::new(vectors_v2) as Arc<dyn arrow_array::Array>,
4459            ),
4460            (
4461                Arc::new(Field::new("metadata", DataType::Utf8, false)),
4462                Arc::new(metadata) as Arc<dyn arrow_array::Array>,
4463            ),
4464        ]);
4465
4466        let batch =
4467            RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4468                .unwrap();
4469
4470        let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4471
4472        let test_dir = TempStrDir::default();
4473        let test_uri = &test_dir;
4474        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4475
4476        // Test creating index on nested field with dots using quoted syntax
4477        let nested_column_path_v1 = "embedding_data.`vector.v1`";
4478        let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10);
4479
4480        dataset
4481            .create_index(
4482                &[nested_column_path_v1],
4483                IndexType::Vector,
4484                Some("vec_v1_idx".to_string()),
4485                &params,
4486                true,
4487            )
4488            .await
4489            .unwrap();
4490
4491        // Verify index was created
4492        let indices = dataset.load_indices().await.unwrap();
4493        assert_eq!(indices.len(), 1);
4494        assert_eq!(indices[0].name, "vec_v1_idx");
4495
4496        // Verify the correct field was indexed
4497        let field_id = indices[0].fields[0];
4498        let field_path = dataset.schema().field_path(field_id).unwrap();
4499        assert_eq!(field_path, "embedding_data.`vector.v1`");
4500
4501        // Test creating index on the second vector field with dots
4502        let nested_column_path_v2 = "embedding_data.`vector.v2`";
4503        dataset
4504            .create_index(
4505                &[nested_column_path_v2],
4506                IndexType::Vector,
4507                Some("vec_v2_idx".to_string()),
4508                &params,
4509                true,
4510            )
4511            .await
4512            .unwrap();
4513
4514        // Verify both indices exist
4515        let indices = dataset.load_indices().await.unwrap();
4516        assert_eq!(indices.len(), 2);
4517
4518        // Verify we can query using the indexed fields and check the plan
4519        let query_vector = generate_random_array(dimensions as usize);
4520
4521        // Check the query plan for the first vector field
4522        let plan_v1 = dataset
4523            .scan()
4524            .nearest(nested_column_path_v1, &query_vector, 5)
4525            .unwrap()
4526            .explain_plan(false)
4527            .await
4528            .unwrap();
4529
4530        // Verify the vector index is being used (should show ANNSubIndex or ANNIvfPartition)
4531        assert!(
4532            plan_v1.contains("ANNSubIndex") || plan_v1.contains("ANNIvfPartition"),
4533            "Query plan should use vector index for nested field with dots. Plan: {}",
4534            plan_v1
4535        );
4536
4537        let search_results_v1 = dataset
4538            .scan()
4539            .nearest(nested_column_path_v1, &query_vector, 5)
4540            .unwrap()
4541            .try_into_batch()
4542            .await
4543            .unwrap();
4544
4545        assert_eq!(search_results_v1.num_rows(), 5);
4546
4547        // Check the query plan for the second vector field
4548        let plan_v2 = dataset
4549            .scan()
4550            .nearest(nested_column_path_v2, &query_vector, 5)
4551            .unwrap()
4552            .explain_plan(false)
4553            .await
4554            .unwrap();
4555
4556        // Verify the vector index is being used
4557        assert!(
4558            plan_v2.contains("ANNSubIndex") || plan_v2.contains("ANNIvfPartition"),
4559            "Query plan should use vector index for second nested field with dots. Plan: {}",
4560            plan_v2
4561        );
4562
4563        let search_results_v2 = dataset
4564            .scan()
4565            .nearest(nested_column_path_v2, &query_vector, 5)
4566            .unwrap()
4567            .try_into_batch()
4568            .await
4569            .unwrap();
4570
4571        assert_eq!(search_results_v2.num_rows(), 5);
4572    }
4573
4574    #[tokio::test]
4575    async fn test_vector_index_on_simple_nested_field() {
4576        // This test reproduces the Python test scenario from test_nested_field_vector_index
4577        // where the nested field path is simple (data.embedding) without dots in field names
4578        let dimensions = 16;
4579        let num_rows = 256;
4580
4581        // Create schema with simple nested field (no dots in field names)
4582        let struct_field = Field::new(
4583            "data",
4584            DataType::Struct(
4585                vec![
4586                    Field::new(
4587                        "embedding",
4588                        DataType::FixedSizeList(
4589                            Arc::new(Field::new("item", DataType::Float32, true)),
4590                            dimensions,
4591                        ),
4592                        false,
4593                    ),
4594                    Field::new("label", DataType::Utf8, false),
4595                ]
4596                .into(),
4597            ),
4598            false,
4599        );
4600
4601        let schema = Arc::new(Schema::new(vec![
4602            Field::new("id", DataType::Int32, false),
4603            struct_field,
4604        ]));
4605
4606        // Generate test data
4607        let float_arr = generate_random_array(num_rows * dimensions as usize);
4608        let vectors = FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
4609
4610        let ids = Int32Array::from_iter_values(0..num_rows as i32);
4611        let labels = StringArray::from_iter_values((0..num_rows).map(|i| format!("label_{}", i)));
4612
4613        let struct_array = arrow_array::StructArray::from(vec![
4614            (
4615                Arc::new(Field::new(
4616                    "embedding",
4617                    DataType::FixedSizeList(
4618                        Arc::new(Field::new("item", DataType::Float32, true)),
4619                        dimensions,
4620                    ),
4621                    false,
4622                )),
4623                Arc::new(vectors) as Arc<dyn arrow_array::Array>,
4624            ),
4625            (
4626                Arc::new(Field::new("label", DataType::Utf8, false)),
4627                Arc::new(labels) as Arc<dyn arrow_array::Array>,
4628            ),
4629        ]);
4630
4631        let batch =
4632            RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4633                .unwrap();
4634
4635        let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4636
4637        let test_dir = TempStrDir::default();
4638        let test_uri = &test_dir;
4639        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4640
4641        // Test creating index on nested field
4642        let nested_column_path = "data.embedding";
4643        let params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10);
4644
4645        dataset
4646            .create_index(
4647                &[nested_column_path],
4648                IndexType::Vector,
4649                Some("vec_idx".to_string()),
4650                &params,
4651                true,
4652            )
4653            .await
4654            .unwrap();
4655
4656        // Verify index was created
4657        let indices = dataset.load_indices().await.unwrap();
4658        assert_eq!(indices.len(), 1);
4659        assert_eq!(indices[0].name, "vec_idx");
4660
4661        // Verify the correct field was indexed
4662        let field_id = indices[0].fields[0];
4663        let field_path = dataset.schema().field_path(field_id).unwrap();
4664        assert_eq!(field_path, "data.embedding");
4665
4666        // Test querying with the index
4667        let query_vector = generate_random_array(dimensions as usize);
4668
4669        let plan = dataset
4670            .scan()
4671            .nearest(nested_column_path, &query_vector, 5)
4672            .unwrap()
4673            .explain_plan(false)
4674            .await
4675            .unwrap();
4676
4677        // Verify the vector index is being used
4678        assert!(
4679            plan.contains("ANNSubIndex") || plan.contains("ANNIvfPartition"),
4680            "Query plan should use vector index for nested field. Plan: {}",
4681            plan
4682        );
4683
4684        let search_results = dataset
4685            .scan()
4686            .nearest(nested_column_path, &query_vector, 5)
4687            .unwrap()
4688            .try_into_batch()
4689            .await
4690            .unwrap();
4691
4692        assert_eq!(search_results.num_rows(), 5);
4693    }
4694
4695    #[tokio::test]
4696    async fn test_btree_index_on_nested_field_with_dots() {
4697        // Test creating BTree index on nested field with dots in the name
4698        let test_dir = TempStrDir::default();
4699        let test_uri = &test_dir;
4700
4701        // Create schema with nested field containing dots
4702        let schema = Arc::new(Schema::new(vec![
4703            Field::new("id", DataType::Int32, false),
4704            Field::new(
4705                "data",
4706                DataType::Struct(
4707                    vec![
4708                        Field::new("value.v1", DataType::Int32, false),
4709                        Field::new("value.v2", DataType::Float32, false),
4710                        Field::new("text", DataType::Utf8, false),
4711                    ]
4712                    .into(),
4713                ),
4714                false,
4715            ),
4716        ]));
4717
4718        // Generate test data
4719        let num_rows = 1000;
4720        let ids = Int32Array::from_iter_values(0..num_rows);
4721        let values_v1 = Int32Array::from_iter_values((0..num_rows).map(|i| i % 100));
4722        let values_v2 = Float32Array::from_iter_values((0..num_rows).map(|i| (i as f32) * 0.1));
4723        let texts = StringArray::from_iter_values((0..num_rows).map(|i| format!("text_{}", i)));
4724
4725        let struct_array = arrow_array::StructArray::from(vec![
4726            (
4727                Arc::new(Field::new("value.v1", DataType::Int32, false)),
4728                Arc::new(values_v1) as Arc<dyn arrow_array::Array>,
4729            ),
4730            (
4731                Arc::new(Field::new("value.v2", DataType::Float32, false)),
4732                Arc::new(values_v2) as Arc<dyn arrow_array::Array>,
4733            ),
4734            (
4735                Arc::new(Field::new("text", DataType::Utf8, false)),
4736                Arc::new(texts) as Arc<dyn arrow_array::Array>,
4737            ),
4738        ]);
4739
4740        let batch =
4741            RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4742                .unwrap();
4743
4744        let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4745        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4746
4747        // Create BTree index on nested field with dots
4748        let nested_column_path = "data.`value.v1`";
4749        let params = ScalarIndexParams::default();
4750
4751        dataset
4752            .create_index(
4753                &[nested_column_path],
4754                IndexType::BTree,
4755                Some("btree_v1_idx".to_string()),
4756                &params,
4757                true,
4758            )
4759            .await
4760            .unwrap();
4761
4762        // Reload dataset to ensure index is loaded
4763        dataset = Dataset::open(test_uri).await.unwrap();
4764
4765        // Verify index was created
4766        let indices = dataset.load_indices().await.unwrap();
4767        assert_eq!(indices.len(), 1);
4768        assert_eq!(indices[0].name, "btree_v1_idx");
4769
4770        // Verify the correct field was indexed
4771        let field_id = indices[0].fields[0];
4772        let field_path = dataset.schema().field_path(field_id).unwrap();
4773        assert_eq!(field_path, "data.`value.v1`");
4774
4775        // Test querying with the index and verify it's being used
4776        let plan = dataset
4777            .scan()
4778            .filter("data.`value.v1` = 42")
4779            .unwrap()
4780            .prefilter(true)
4781            .explain_plan(false)
4782            .await
4783            .unwrap();
4784
4785        // Verify the query plan (scalar indices on nested fields may use optimized filters)
4786        // The index may be used internally even if not shown as ScalarIndexQuery
4787        assert!(
4788            plan.contains("ScalarIndexQuery"),
4789            "Query plan should show optimized read. Plan: {}",
4790            plan
4791        );
4792
4793        // Also test that the query returns results
4794        let results = dataset
4795            .scan()
4796            .filter("data.`value.v1` = 42")
4797            .unwrap()
4798            .prefilter(true)
4799            .try_into_batch()
4800            .await
4801            .unwrap();
4802
4803        assert!(results.num_rows() > 0);
4804    }
4805
4806    #[tokio::test]
4807    async fn test_bitmap_index_on_nested_field_with_dots() {
4808        // Test creating Bitmap index on nested field with dots in the name
4809        let test_dir = TempStrDir::default();
4810        let test_uri = &test_dir;
4811
4812        // Create schema with nested field containing dots - using low cardinality for bitmap
4813        let schema = Arc::new(Schema::new(vec![
4814            Field::new("id", DataType::Int32, false),
4815            Field::new(
4816                "metadata",
4817                DataType::Struct(
4818                    vec![
4819                        Field::new("status.code", DataType::Int32, false),
4820                        Field::new("category.name", DataType::Utf8, false),
4821                    ]
4822                    .into(),
4823                ),
4824                false,
4825            ),
4826        ]));
4827
4828        // Generate test data with low cardinality (good for bitmap index)
4829        let num_rows = 1000;
4830        let ids = Int32Array::from_iter_values(0..num_rows);
4831        // Only 10 unique status codes
4832        let status_codes = Int32Array::from_iter_values((0..num_rows).map(|i| i % 10));
4833        // Only 5 unique categories
4834        let categories =
4835            StringArray::from_iter_values((0..num_rows).map(|i| format!("category_{}", i % 5)));
4836
4837        let struct_array = arrow_array::StructArray::from(vec![
4838            (
4839                Arc::new(Field::new("status.code", DataType::Int32, false)),
4840                Arc::new(status_codes) as Arc<dyn arrow_array::Array>,
4841            ),
4842            (
4843                Arc::new(Field::new("category.name", DataType::Utf8, false)),
4844                Arc::new(categories) as Arc<dyn arrow_array::Array>,
4845            ),
4846        ]);
4847
4848        let batch =
4849            RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4850                .unwrap();
4851
4852        let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4853        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4854
4855        // Create Bitmap index on nested field with dots
4856        let nested_column_path = "metadata.`status.code`";
4857        let params = ScalarIndexParams::default();
4858
4859        dataset
4860            .create_index(
4861                &[nested_column_path],
4862                IndexType::Bitmap,
4863                Some("bitmap_status_idx".to_string()),
4864                &params,
4865                true,
4866            )
4867            .await
4868            .unwrap();
4869
4870        // Reload dataset to ensure index is loaded
4871        dataset = Dataset::open(test_uri).await.unwrap();
4872
4873        // Verify index was created
4874        let indices = dataset.load_indices().await.unwrap();
4875        assert_eq!(indices.len(), 1);
4876        assert_eq!(indices[0].name, "bitmap_status_idx");
4877
4878        // Verify the correct field was indexed
4879        let field_id = indices[0].fields[0];
4880        let field_path = dataset.schema().field_path(field_id).unwrap();
4881        assert_eq!(field_path, "metadata.`status.code`");
4882
4883        // Test querying with the index and verify it's being used
4884        let plan = dataset
4885            .scan()
4886            .filter("metadata.`status.code` = 5")
4887            .unwrap()
4888            .explain_plan(false)
4889            .await
4890            .unwrap();
4891
4892        // Verify the query plan (scalar indices on nested fields may use optimized filters)
4893        // The index may be used internally even if not shown as ScalarIndexQuery
4894        assert!(
4895            plan.contains("ScalarIndexQuery"),
4896            "Query plan should show optimized read. Plan: {}",
4897            plan
4898        );
4899
4900        // Also test that the query returns results
4901        let results = dataset
4902            .scan()
4903            .filter("metadata.`status.code` = 5")
4904            .unwrap()
4905            .try_into_batch()
4906            .await
4907            .unwrap();
4908
4909        // Should have ~100 rows with status code 5
4910        assert!(results.num_rows() > 0);
4911        assert_eq!(results.num_rows(), 100);
4912    }
4913
4914    #[tokio::test]
4915    async fn test_inverted_index_on_nested_field_with_dots() {
4916        use lance_index::scalar::inverted::tokenizer::InvertedIndexParams;
4917
4918        // Test creating Inverted index on nested text field with dots in the name
4919        let test_dir = TempStrDir::default();
4920        let test_uri = &test_dir;
4921
4922        // Create schema with nested text field containing dots
4923        let schema = Arc::new(Schema::new(vec![
4924            Field::new("id", DataType::Int32, false),
4925            Field::new(
4926                "document",
4927                DataType::Struct(
4928                    vec![
4929                        Field::new("content.text", DataType::Utf8, false),
4930                        Field::new("content.summary", DataType::Utf8, false),
4931                    ]
4932                    .into(),
4933                ),
4934                false,
4935            ),
4936        ]));
4937
4938        // Generate test data with text content
4939        let num_rows = 100;
4940        let ids = Int32Array::from_iter_values(0..num_rows as i32);
4941        let content_texts = StringArray::from_iter_values((0..num_rows).map(|i| match i % 3 {
4942            0 => format!("The quick brown fox jumps over the lazy dog {}", i),
4943            1 => format!(
4944                "Machine learning and artificial intelligence document {}",
4945                i
4946            ),
4947            _ => format!("Data science and analytics content piece {}", i),
4948        }));
4949        let summaries = StringArray::from_iter_values(
4950            (0..num_rows).map(|i| format!("Summary of document {}", i)),
4951        );
4952
4953        let struct_array = arrow_array::StructArray::from(vec![
4954            (
4955                Arc::new(Field::new("content.text", DataType::Utf8, false)),
4956                Arc::new(content_texts) as Arc<dyn arrow_array::Array>,
4957            ),
4958            (
4959                Arc::new(Field::new("content.summary", DataType::Utf8, false)),
4960                Arc::new(summaries) as Arc<dyn arrow_array::Array>,
4961            ),
4962        ]);
4963
4964        let batch =
4965            RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(struct_array)])
4966                .unwrap();
4967
4968        let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
4969        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
4970
4971        // Create Inverted index on nested text field with dots
4972        let nested_column_path = "document.`content.text`";
4973        let params = InvertedIndexParams::default();
4974
4975        dataset
4976            .create_index(
4977                &[nested_column_path],
4978                IndexType::Inverted,
4979                Some("inverted_content_idx".to_string()),
4980                &params,
4981                true,
4982            )
4983            .await
4984            .unwrap();
4985
4986        // Reload the dataset to ensure the index is loaded
4987        dataset = Dataset::open(test_uri).await.unwrap();
4988
4989        // Verify index was created
4990        let indices = dataset.load_indices().await.unwrap();
4991        assert_eq!(indices.len(), 1);
4992        assert_eq!(indices[0].name, "inverted_content_idx");
4993
4994        // Verify the correct field was indexed
4995        let field_id = indices[0].fields[0];
4996        let field_path = dataset.schema().field_path(field_id).unwrap();
4997        assert_eq!(field_path, "document.`content.text`");
4998
4999        // Test full-text search on the nested field with dots
5000        // Use the field_path that the index reports
5001        let query = FullTextSearchQuery::new("machine learning".to_string())
5002            .with_column(field_path.clone())
5003            .unwrap();
5004
5005        // Check the query plan uses the inverted index
5006        let plan = dataset
5007            .scan()
5008            .full_text_search(query.clone())
5009            .unwrap()
5010            .explain_plan(false)
5011            .await
5012            .unwrap();
5013
5014        // Verify the inverted index is being used
5015        assert!(
5016            plan.contains("MatchQuery") || plan.contains("PhraseQuery"),
5017            "Query plan should use inverted index for nested field with dots. Plan: {}",
5018            plan
5019        );
5020
5021        let results = dataset
5022            .scan()
5023            .full_text_search(query)
5024            .unwrap()
5025            .try_into_stream()
5026            .await
5027            .unwrap()
5028            .try_collect::<Vec<_>>()
5029            .await
5030            .unwrap();
5031
5032        // Verify we get results from the full-text search
5033        assert!(
5034            !results.is_empty(),
5035            "Full-text search should return results"
5036        );
5037
5038        // Check that we found documents containing "machine learning"
5039        let mut found_count = 0;
5040        for batch in results {
5041            found_count += batch.num_rows();
5042        }
5043        // We expect to find approximately 1/3 of documents (those with i % 3 == 1)
5044        assert!(
5045            found_count > 0,
5046            "Should find at least some documents with 'machine learning'"
5047        );
5048        assert!(found_count < num_rows, "Should not match all documents");
5049    }
5050
5051    #[tokio::test]
5052    async fn test_resolve_index_column() {
5053        use lance_datagen::{array, BatchCount, RowCount};
5054
5055        // Create a test dataset with a vector column
5056        let test_dir = tempfile::tempdir().unwrap();
5057        let test_uri = test_dir.path().to_str().unwrap();
5058
5059        let reader = lance_datagen::gen_batch()
5060            .col("id", array::step::<arrow_array::types::Int32Type>())
5061            .col(
5062                "vector",
5063                array::rand_vec::<arrow_array::types::Float32Type>(32.into()),
5064            )
5065            .into_reader_rows(RowCount::from(100), BatchCount::from(1));
5066
5067        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
5068
5069        // Create an index with a custom name
5070        let params = crate::index::vector::VectorIndexParams::ivf_flat(
5071            4,
5072            lance_linalg::distance::MetricType::L2,
5073        );
5074        dataset
5075            .create_index(
5076                &["vector"],
5077                IndexType::Vector,
5078                Some("my_vector_index".to_string()),
5079                &params,
5080                false,
5081            )
5082            .await
5083            .unwrap();
5084
5085        // Reload dataset to get the index metadata
5086        let dataset = Dataset::open(test_uri).await.unwrap();
5087        let indices = dataset.load_indices().await.unwrap();
5088        assert_eq!(indices.len(), 1);
5089        let index_meta = &indices[0];
5090
5091        // Test 1: Pass the actual column name
5092        let (field_path, field) =
5093            resolve_index_column(dataset.schema(), index_meta, "vector").unwrap();
5094        assert_eq!(field_path, "vector");
5095        assert_eq!(field.name, "vector");
5096
5097        // Test 2: Pass the index name (should resolve to the actual column)
5098        let (field_path2, field2) =
5099            resolve_index_column(dataset.schema(), index_meta, "my_vector_index").unwrap();
5100        assert_eq!(field_path2, "vector");
5101        assert_eq!(field2.name, "vector");
5102
5103        // Test 3: Pass a non-existent column name (should fail)
5104        let result = resolve_index_column(dataset.schema(), index_meta, "nonexistent");
5105        assert!(result.is_err());
5106        assert!(result
5107            .unwrap_err()
5108            .to_string()
5109            .contains("does not exist in the schema"));
5110    }
5111
5112    #[tokio::test]
5113    async fn test_resolve_index_column_error_cases() {
5114        use lance_datagen::{array, BatchCount, RowCount};
5115
5116        // Create a test dataset
5117        let test_dir = tempfile::tempdir().unwrap();
5118        let test_uri = test_dir.path().to_str().unwrap();
5119
5120        let reader = lance_datagen::gen_batch()
5121            .col("id", array::step::<arrow_array::types::Int32Type>())
5122            .col(
5123                "vector",
5124                array::rand_vec::<arrow_array::types::Float32Type>(32.into()),
5125            )
5126            .into_reader_rows(RowCount::from(100), BatchCount::from(1));
5127
5128        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
5129
5130        // Create an index
5131        let params = crate::index::vector::VectorIndexParams::ivf_flat(
5132            4,
5133            lance_linalg::distance::MetricType::L2,
5134        );
5135        dataset
5136            .create_index(
5137                &["vector"],
5138                IndexType::Vector,
5139                Some("my_index".to_string()),
5140                &params,
5141                false,
5142            )
5143            .await
5144            .unwrap();
5145
5146        // Reload dataset
5147        let dataset = Dataset::open(test_uri).await.unwrap();
5148        let indices = dataset.load_indices().await.unwrap();
5149        let index_meta = &indices[0];
5150
5151        // Test: Pass a column that doesn't exist and is not the index name
5152        let result = resolve_index_column(dataset.schema(), index_meta, "nonexistent_column");
5153        assert!(result.is_err());
5154        let err_msg = result.unwrap_err().to_string();
5155        assert!(
5156            err_msg.contains("does not exist in the schema"),
5157            "Error message should mention column doesn't exist, got: {}",
5158            err_msg
5159        );
5160    }
5161
5162    #[tokio::test]
5163    async fn test_resolve_index_column_nested_field() {
5164        use arrow_array::{RecordBatch, StructArray};
5165        use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
5166
5167        // Create a test dataset with nested struct manually
5168        let test_dir = tempfile::tempdir().unwrap();
5169        let test_uri = test_dir.path().to_str().unwrap();
5170
5171        // Create schema with nested structure: data.vector
5172        let vector_field = ArrowField::new(
5173            "vector",
5174            DataType::FixedSizeList(
5175                Arc::new(ArrowField::new("item", DataType::Float32, true)),
5176                8,
5177            ),
5178            false,
5179        );
5180        let struct_field = ArrowField::new(
5181            "data",
5182            DataType::Struct(vec![vector_field.clone()].into()),
5183            false,
5184        );
5185        let schema = Arc::new(ArrowSchema::new(vec![
5186            ArrowField::new("id", DataType::Int32, false),
5187            struct_field,
5188        ]));
5189
5190        // Create data
5191        let id_array = arrow_array::Int32Array::from(vec![1, 2, 3, 4, 5]);
5192
5193        // Create nested vector data
5194        let mut vector_values = Vec::new();
5195        for _ in 0..5 {
5196            for _ in 0..8 {
5197                vector_values.push(rand::random::<f32>());
5198            }
5199        }
5200        let vector_array = arrow_array::FixedSizeListArray::try_new_from_values(
5201            arrow_array::Float32Array::from(vector_values),
5202            8,
5203        )
5204        .unwrap();
5205
5206        let struct_array = StructArray::from(vec![(
5207            Arc::new(vector_field),
5208            Arc::new(vector_array) as arrow_array::ArrayRef,
5209        )]);
5210
5211        let batch = RecordBatch::try_new(
5212            schema.clone(),
5213            vec![Arc::new(id_array), Arc::new(struct_array)],
5214        )
5215        .unwrap();
5216
5217        let reader = Box::new(arrow_array::RecordBatchIterator::new(
5218            vec![Ok(batch)],
5219            schema,
5220        ));
5221
5222        let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
5223
5224        // Create an index on the nested field
5225        let params = crate::index::vector::VectorIndexParams::ivf_flat(
5226            2,
5227            lance_linalg::distance::MetricType::L2,
5228        );
5229        dataset
5230            .create_index(
5231                &["data.vector"],
5232                IndexType::Vector,
5233                Some("nested_vector_index".to_string()),
5234                &params,
5235                false,
5236            )
5237            .await
5238            .unwrap();
5239
5240        // Reload dataset to get the index metadata
5241        let dataset = Dataset::open(test_uri).await.unwrap();
5242        let indices = dataset.load_indices().await.unwrap();
5243        assert_eq!(indices.len(), 1);
5244        let index_meta = &indices[0];
5245
5246        // Test 1: Pass the nested field path directly
5247        let (field_path, field) =
5248            resolve_index_column(dataset.schema(), index_meta, "data.vector").unwrap();
5249        assert_eq!(field_path, "data.vector");
5250        assert_eq!(field.name, "vector");
5251
5252        // Test 2: Pass the index name, should resolve to the nested field path
5253        let (field_path2, field2) =
5254            resolve_index_column(dataset.schema(), index_meta, "nested_vector_index").unwrap();
5255        assert_eq!(field_path2, "data.vector");
5256        assert_eq!(field2.name, "vector");
5257
5258        // Verify the field path is correct for nested access
5259        assert!(
5260            field_path2.contains('.'),
5261            "Field path should contain '.' for nested field"
5262        );
5263    }
5264}