Skip to main content

lancedb/
table.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The LanceDB Authors
3
4//! LanceDB Table APIs
5
6use arrow_array::{RecordBatch, RecordBatchReader};
7use arrow_schema::{DataType, Field, Schema, SchemaRef};
8use async_trait::async_trait;
9use datafusion_execution::TaskContext;
10use datafusion_expr::Expr;
11use datafusion_physical_plan::ExecutionPlan;
12use datafusion_physical_plan::display::DisplayableExecutionPlan;
13use futures::StreamExt;
14use futures::stream::FuturesUnordered;
15pub use lance::dataset::ColumnAlteration;
16pub use lance::dataset::NewColumnTransform;
17pub use lance::dataset::ReadParams;
18pub use lance::dataset::Version;
19use lance::dataset::WriteMode;
20use lance::dataset::builder::DatasetBuilder;
21use lance::dataset::{InsertBuilder, WriteParams};
22use lance::index::DatasetIndexExt;
23use lance::index::vector::VectorIndexParams;
24use lance::index::vector::utils::infer_vector_dim;
25use lance::io::{ObjectStoreParams, WrappingObjectStore};
26use lance_datafusion::utils::StreamingWriteSource;
27use lance_index::IndexType;
28use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
29use lance_index::vector::bq::RQBuildParams;
30use lance_index::vector::hnsw::builder::HnswBuildParams;
31use lance_index::vector::ivf::IvfBuildParams;
32use lance_index::vector::pq::PQBuildParams;
33use lance_index::vector::sq::builder::SQBuildParams;
34use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsAccessor};
35pub use query::AnyQuery;
36
37use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
38use lance_namespace::LanceNamespace;
39use lance_namespace::models::DescribeTableRequest;
40use lance_table::format::Manifest;
41use lance_table::io::commit::CommitHandler;
42use lance_table::io::commit::ManifestNamingScheme;
43use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
44use serde::{Deserialize, Serialize};
45use std::collections::{HashMap, HashSet};
46use std::format;
47use std::path::Path;
48use std::sync::Arc;
49
50use crate::connection::NamespaceClientPushdownOperation;
51
52use crate::data::scannable::{PeekedScannable, Scannable, estimate_write_partitions};
53use crate::database::Database;
54use crate::embeddings::{EmbeddingDefinition, EmbeddingRegistry, MemoryRegistry};
55use crate::error::{Error, Result};
56use crate::index::IndexStatistics;
57use crate::index::vector::VectorIndex;
58use crate::index::{Index, IndexBuilder, vector::suggested_num_sub_vectors};
59use crate::index::{IndexConfig, IndexStatisticsImpl};
60use crate::query::{IntoQueryVector, Query, QueryExecutionOptions, TakeQuery, VectorQuery};
61use crate::table::datafusion::insert::InsertExec;
62use crate::utils::{
63    PatchReadParam, PatchWriteParam, supported_bitmap_data_type, supported_btree_data_type,
64    supported_fts_data_type, supported_label_list_data_type, supported_vector_data_type,
65};
66
67use self::dataset::DatasetConsistencyWrapper;
68use self::merge::MergeInsertBuilder;
69
70mod add_data;
71pub mod datafusion;
72pub(crate) mod dataset;
73pub mod delete;
74pub mod merge;
75pub mod optimize;
76pub mod query;
77pub mod schema_evolution;
78pub mod update;
79pub mod write_progress;
80use crate::index::waiter::wait_for_index;
81#[cfg(feature = "remote")]
82pub(crate) use add_data::PreprocessingOutput;
83pub use add_data::{AddDataBuilder, AddDataMode, AddResult, NaNVectorBehavior};
84pub use chrono::Duration;
85pub use delete::DeleteResult;
86use futures::future::join_all;
87pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
88pub use lance::dataset::scanner::DatasetRecordBatchStream;
89use lance::dataset::statistics::DatasetStatisticsExt;
90use lance_index::frag_reuse::FRAG_REUSE_INDEX_NAME;
91pub use lance_index::optimize::OptimizeOptions;
92pub use optimize::{CompactionOptions, OptimizeAction, OptimizeStats};
93pub use schema_evolution::{AddColumnsResult, AlterColumnsResult, DropColumnsResult};
94use serde_with::skip_serializing_none;
95pub use update::{UpdateBuilder, UpdateResult};
96
97/// Defines the type of column
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub enum ColumnKind {
100    /// Columns populated by data from the user (this is the most common case)
101    Physical,
102    /// Columns populated by applying an embedding function to the input
103    Embedding(EmbeddingDefinition),
104}
105
106/// Defines a column in a table
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct ColumnDefinition {
109    /// The source of the column data
110    pub kind: ColumnKind,
111}
112
113#[derive(Debug, Clone)]
114pub struct TableDefinition {
115    pub column_definitions: Vec<ColumnDefinition>,
116    pub schema: SchemaRef,
117}
118
119impl TableDefinition {
120    pub fn new(schema: SchemaRef, column_definitions: Vec<ColumnDefinition>) -> Self {
121        Self {
122            column_definitions,
123            schema,
124        }
125    }
126
127    pub fn new_from_schema(schema: SchemaRef) -> Self {
128        let column_definitions = schema
129            .fields()
130            .iter()
131            .map(|_| ColumnDefinition {
132                kind: ColumnKind::Physical,
133            })
134            .collect();
135        Self::new(schema, column_definitions)
136    }
137
138    pub fn try_from_rich_schema(schema: SchemaRef) -> Result<Self> {
139        let column_definitions = schema.metadata.get("lancedb::column_definitions");
140        if let Some(column_definitions) = column_definitions {
141            let column_definitions: Vec<ColumnDefinition> =
142                serde_json::from_str(column_definitions).map_err(|e| Error::Runtime {
143                    message: format!("Failed to deserialize column definitions: {}", e),
144                })?;
145            Ok(Self::new(schema, column_definitions))
146        } else {
147            let column_definitions = schema
148                .fields()
149                .iter()
150                .map(|_| ColumnDefinition {
151                    kind: ColumnKind::Physical,
152                })
153                .collect();
154            Ok(Self::new(schema, column_definitions))
155        }
156    }
157
158    pub fn into_rich_schema(self) -> SchemaRef {
159        // We have full control over the structure of column definitions.  This should
160        // not fail, except for a bug
161        let lancedb_metadata = serde_json::to_string(&self.column_definitions).unwrap();
162        let mut schema_with_metadata = (*self.schema).clone();
163        schema_with_metadata
164            .metadata
165            .insert("lancedb::column_definitions".to_string(), lancedb_metadata);
166        Arc::new(schema_with_metadata)
167    }
168}
169
170/// Describes what happens when a vector either contains NaN or
171/// does not have enough values
172#[derive(Clone, Debug, Default)]
173#[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992
174enum BadVectorHandling {
175    /// An error is returned
176    #[default]
177    Error,
178    /// The offending row is droppped
179    Drop,
180    /// The invalid/missing items are replaced by fill_value
181    Fill(f32),
182    /// The invalid items are replaced by NULL
183    None,
184}
185
186/// Options to use when writing data
187#[derive(Clone, Debug, Default)]
188pub struct WriteOptions {
189    // Coming soon: https://github.com/lancedb/lancedb/issues/992
190    // /// What behavior to take if the data contains invalid vectors
191    // pub on_bad_vectors: BadVectorHandling,
192    /// Advanced parameters that can be used to customize table creation
193    ///
194    /// Overlapping `OpenTableBuilder` options (e.g. [AddDataBuilder::mode]) will take
195    /// precedence over their counterparts in `WriteOptions` (e.g. [WriteParams::mode]).
196    pub lance_write_params: Option<WriteParams>,
197}
198
199/// Filters that can be used to limit the rows returned by a query
200pub enum Filter {
201    /// A SQL filter string
202    Sql(String),
203    /// A Datafusion logical expression
204    Datafusion(Expr),
205}
206
207#[async_trait]
208pub trait Tags: Send + Sync {
209    /// List the tags of the table.
210    async fn list(&self) -> Result<HashMap<String, TagContents>>;
211
212    /// Get the version of the table referenced by a tag.
213    async fn get_version(&self, tag: &str) -> Result<u64>;
214
215    /// Create a new tag for the given version of the table.
216    async fn create(&mut self, tag: &str, version: u64) -> Result<()>;
217
218    /// Delete a tag from the table.
219    async fn delete(&mut self, tag: &str) -> Result<()>;
220
221    /// Update an existing tag to point to a new version of the table.
222    async fn update(&mut self, tag: &str, version: u64) -> Result<()>;
223}
224
225pub use self::merge::MergeResult;
226
227/// A trait for anything "table-like".  This is used for both native tables (which target
228/// Lance datasets) and remote tables (which target LanceDB cloud)
229///
230/// This trait is still EXPERIMENTAL and subject to change in the future
231#[async_trait]
232pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
233    /// Get a reference to std::any::Any
234    fn as_any(&self) -> &dyn std::any::Any;
235    /// Get the name of the table.
236    fn name(&self) -> &str;
237    /// Get the namespace of the table.
238    fn namespace(&self) -> &[String];
239    /// Get the id of the table
240    ///
241    /// This is the namespace of the table concatenated with the name
242    /// separated by $
243    fn id(&self) -> &str;
244    /// Get the arrow [Schema] of the table.
245    async fn schema(&self) -> Result<SchemaRef>;
246    /// Count the number of rows in this table.
247    async fn count_rows(&self, filter: Option<Filter>) -> Result<usize>;
248    /// Create a physical plan for the query.
249    async fn create_plan(
250        &self,
251        query: &AnyQuery,
252        options: QueryExecutionOptions,
253    ) -> Result<Arc<dyn ExecutionPlan>>;
254    /// Execute a query and return the results as a stream of RecordBatches.
255    async fn query(
256        &self,
257        query: &AnyQuery,
258        options: QueryExecutionOptions,
259    ) -> Result<DatasetRecordBatchStream>;
260    /// Explain the plan for a query.
261    async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result<String> {
262        let plan = self.create_plan(query, Default::default()).await?;
263        let display = DisplayableExecutionPlan::new(plan.as_ref());
264
265        Ok(format!("{}", display.indent(verbose)))
266    }
267    async fn analyze_plan(
268        &self,
269        query: &AnyQuery,
270        options: QueryExecutionOptions,
271    ) -> Result<String>;
272
273    /// Add new records to the table.
274    async fn add(&self, add: AddDataBuilder) -> Result<AddResult>;
275    /// Delete rows from the table.
276    async fn delete(&self, predicate: &str) -> Result<DeleteResult>;
277    /// Update rows in the table.
278    async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult>;
279    /// Create an index on the provided column(s).
280    async fn create_index(&self, index: IndexBuilder) -> Result<()>;
281    /// List the indices on the table.
282    async fn list_indices(&self) -> Result<Vec<IndexConfig>>;
283    /// Drop an index from the table.
284    async fn drop_index(&self, name: &str) -> Result<()>;
285    /// Prewarm an index in the table.
286    async fn prewarm_index(&self, name: &str) -> Result<()>;
287    /// Prewarm data for the table.
288    ///
289    /// Currently only supported on remote tables.
290    /// If `columns` is `None`, all columns are prewarmed.
291    async fn prewarm_data(&self, columns: Option<Vec<String>>) -> Result<()>;
292    /// Get statistics about the index.
293    async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>>;
294    /// Merge insert new records into the table.
295    async fn merge_insert(
296        &self,
297        params: MergeInsertBuilder,
298        new_data: Box<dyn RecordBatchReader + Send>,
299    ) -> Result<MergeResult>;
300    /// Gets the table tag manager.
301    async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
302    /// Optimize the dataset.
303    async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
304    /// Add columns to the table.
305    async fn add_columns(
306        &self,
307        transforms: NewColumnTransform,
308        read_columns: Option<Vec<String>>,
309    ) -> Result<AddColumnsResult>;
310    /// Alter columns in the table.
311    async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult>;
312    /// Drop columns from the table.
313    async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult>;
314    /// Get the version of the table.
315    async fn version(&self) -> Result<u64>;
316    /// Checkout a specific version of the table.
317    async fn checkout(&self, version: u64) -> Result<()>;
318    /// Checkout a table version referenced by a tag.
319    /// Tags provide a human-readable way to reference specific versions of the table.
320    async fn checkout_tag(&self, tag: &str) -> Result<()>;
321    /// Checkout the latest version of the table.
322    async fn checkout_latest(&self) -> Result<()>;
323    /// Restore the table to the currently checked out version.
324    async fn restore(&self) -> Result<()>;
325    /// List the versions of the table.
326    async fn list_versions(&self) -> Result<Vec<Version>>;
327    /// Get the table definition.
328    async fn table_definition(&self) -> Result<TableDefinition>;
329    /// Get the table URI (storage location)
330    async fn uri(&self) -> Result<String>;
331    /// Get the storage options used when opening this table, if any.
332    #[deprecated(since = "0.25.0", note = "Use initial_storage_options() instead")]
333    async fn storage_options(&self) -> Option<HashMap<String, String>>;
334    /// Get the initial storage options that were passed in when opening this table.
335    ///
336    /// For dynamically refreshed options (e.g., credential vending), use [`Self::latest_storage_options`].
337    async fn initial_storage_options(&self) -> Option<HashMap<String, String>>;
338    /// Get the latest storage options, refreshing from provider if configured.
339    ///
340    /// Returns `Ok(Some(options))` if storage options are available (static or refreshed),
341    /// `Ok(None)` if no storage options were configured, or `Err(...)` if refresh failed.
342    async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>>;
343    /// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
344    /// are not fully indexed within the timeout.
345    async fn wait_for_index(
346        &self,
347        index_names: &[&str],
348        timeout: std::time::Duration,
349    ) -> Result<()>;
350    /// Get statistics on the table
351    async fn stats(&self) -> Result<TableStatistics>;
352    /// Create an ExecutionPlan for inserting data into the table.
353    ///
354    /// This is used by the DataFusion TableProvider implementation to support
355    /// INSERT INTO statements.
356    async fn create_insert_exec(
357        &self,
358        _input: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
359        _write_params: WriteParams,
360    ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>> {
361        Err(Error::NotSupported {
362            message: "create_insert_exec not implemented".to_string(),
363        })
364    }
365}
366
367/// A Table is a collection of strong typed Rows.
368///
369/// The type of the each row is defined in Apache Arrow [Schema].
370#[derive(Clone, Debug)]
371pub struct Table {
372    inner: Arc<dyn BaseTable>,
373    database: Option<Arc<dyn Database>>,
374    embedding_registry: Arc<dyn EmbeddingRegistry>,
375}
376
377#[cfg(all(test, feature = "remote"))]
378mod test_utils {
379    use super::*;
380
381    impl Table {
382        pub fn new_with_handler<T>(
383            name: impl Into<String>,
384            handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
385        ) -> Self
386        where
387            T: Into<reqwest::Body>,
388        {
389            let inner = Arc::new(crate::remote::table::RemoteTable::new_mock(
390                name.into(),
391                handler.clone(),
392                None,
393            ));
394            let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
395            Self {
396                inner,
397                database: Some(database),
398                // Registry is unused.
399                embedding_registry: Arc::new(MemoryRegistry::new()),
400            }
401        }
402
403        pub fn new_with_handler_version<T>(
404            name: impl Into<String>,
405            version: semver::Version,
406            handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
407        ) -> Self
408        where
409            T: Into<reqwest::Body>,
410        {
411            let inner = Arc::new(crate::remote::table::RemoteTable::new_mock(
412                name.into(),
413                handler.clone(),
414                Some(version),
415            ));
416            let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
417            Self {
418                inner,
419                database: Some(database),
420                // Registry is unused.
421                embedding_registry: Arc::new(MemoryRegistry::new()),
422            }
423        }
424
425        pub fn new_with_handler_and_config<T>(
426            name: impl Into<String>,
427            handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
428            config: crate::remote::ClientConfig,
429        ) -> Self
430        where
431            T: Into<reqwest::Body>,
432        {
433            let inner = Arc::new(crate::remote::table::RemoteTable::new_mock_with_config(
434                name.into(),
435                handler.clone(),
436                config.clone(),
437            ));
438            let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock_with_config(
439                handler, config,
440            ));
441            Self {
442                inner,
443                database: Some(database),
444                // Registry is unused.
445                embedding_registry: Arc::new(MemoryRegistry::new()),
446            }
447        }
448
449        pub fn new_with_handler_version_and_config<T>(
450            name: impl Into<String>,
451            version: semver::Version,
452            handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
453            config: crate::remote::ClientConfig,
454        ) -> Self
455        where
456            T: Into<reqwest::Body>,
457        {
458            let inner = Arc::new(
459                crate::remote::table::RemoteTable::new_mock_with_version_and_config(
460                    name.into(),
461                    handler.clone(),
462                    Some(version),
463                    config.clone(),
464                ),
465            );
466            let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock_with_config(
467                handler, config,
468            ));
469            Self {
470                inner,
471                database: Some(database),
472                // Registry is unused.
473                embedding_registry: Arc::new(MemoryRegistry::new()),
474            }
475        }
476    }
477}
478
479impl std::fmt::Display for Table {
480    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
481        write!(f, "{}", self.inner)
482    }
483}
484
485impl From<Arc<dyn BaseTable>> for Table {
486    fn from(inner: Arc<dyn BaseTable>) -> Self {
487        Self {
488            inner,
489            database: None,
490            embedding_registry: Arc::new(MemoryRegistry::new()),
491        }
492    }
493}
494
495impl Table {
496    pub fn new(inner: Arc<dyn BaseTable>, database: Arc<dyn Database>) -> Self {
497        Self {
498            inner,
499            database: Some(database),
500            embedding_registry: Arc::new(MemoryRegistry::new()),
501        }
502    }
503
504    pub fn base_table(&self) -> &Arc<dyn BaseTable> {
505        &self.inner
506    }
507
508    pub fn database(&self) -> &Arc<dyn Database> {
509        self.database.as_ref().unwrap()
510    }
511
512    pub fn embedding_registry(&self) -> &Arc<dyn EmbeddingRegistry> {
513        &self.embedding_registry
514    }
515
516    pub(crate) fn new_with_embedding_registry(
517        inner: Arc<dyn BaseTable>,
518        database: Arc<dyn Database>,
519        embedding_registry: Arc<dyn EmbeddingRegistry>,
520    ) -> Self {
521        Self {
522            inner,
523            database: Some(database),
524            embedding_registry,
525        }
526    }
527
528    /// Cast as [`NativeTable`], or return None it if is not a [`NativeTable`].
529    ///
530    /// Warning: This function will be removed soon (features exclusive to NativeTable
531    ///          will be added to Table)
532    pub fn as_native(&self) -> Option<&NativeTable> {
533        self.inner.as_native()
534    }
535
536    /// Get the name of the table.
537    pub fn name(&self) -> &str {
538        self.inner.name()
539    }
540
541    /// Get the namespace of the table.
542    pub fn namespace(&self) -> &[String] {
543        self.inner.namespace()
544    }
545
546    /// Get the ID of the table (namespace + name joined by '$').
547    pub fn id(&self) -> &str {
548        self.inner.id()
549    }
550
551    /// Get the dataset of the table if it is a native table
552    ///
553    /// Returns None otherwise
554    pub fn dataset(&self) -> Option<&dataset::DatasetConsistencyWrapper> {
555        self.inner.as_native().map(|t| &t.dataset)
556    }
557
558    /// Get the arrow [Schema] of the table.
559    pub async fn schema(&self) -> Result<SchemaRef> {
560        self.inner.schema().await
561    }
562
563    /// Count the number of rows in this dataset.
564    ///
565    /// # Arguments
566    ///
567    /// * `filter` if present, only count rows matching the filter
568    pub async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
569        self.inner.count_rows(filter.map(Filter::Sql)).await
570    }
571
572    /// Insert new records into this Table
573    ///
574    /// # Arguments
575    ///
576    /// * `data` data to be added to the Table
577    /// * `options` options to control how data is added
578    pub fn add<T: Scannable + 'static>(&self, data: T) -> AddDataBuilder {
579        AddDataBuilder::new(
580            self.inner.clone(),
581            Box::new(data),
582            Some(self.embedding_registry.clone()),
583        )
584    }
585
586    /// Update existing records in the Table
587    ///
588    /// An update operation can be used to adjust existing values.  Use the
589    /// returned builder to specify which columns to update.  The new value
590    /// can be a literal value (e.g. replacing nulls with some default value)
591    /// or an expression applied to the old value (e.g. incrementing a value)
592    ///
593    /// An optional condition can be specified (e.g. "only update if the old
594    /// value is 0")
595    ///
596    /// Note: if your condition is something like "some_id_column == 7" and
597    /// you are updating many rows (with different ids) then you will get
598    /// better performance with a single [`merge_insert`] call instead of
599    /// repeatedly calilng this method.
600    pub fn update(&self) -> UpdateBuilder {
601        UpdateBuilder::new(self.inner.clone())
602    }
603
604    /// Delete the rows from table that match the predicate.
605    ///
606    /// # Arguments
607    /// - `predicate` - The SQL predicate string to filter the rows to be deleted.
608    ///
609    /// # Example
610    ///
611    /// ```no_run
612    /// # use std::sync::Arc;
613    /// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
614    /// #   RecordBatchIterator, Int32Array};
615    /// # use arrow_schema::{Schema, Field, DataType};
616    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
617    /// let tmpdir = tempfile::tempdir().unwrap();
618    /// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
619    ///     .execute()
620    ///     .await
621    ///     .unwrap();
622    /// let schema = Arc::new(Schema::new(vec![
623    ///     Field::new("id", DataType::Int32, false),
624    ///     Field::new("vector", DataType::FixedSizeList(
625    ///         Arc::new(Field::new("item", DataType::Float32, true)), 128), true),
626    /// ]));
627    /// let data = RecordBatch::try_new(
628    ///     schema.clone(),
629    ///     vec![
630    ///         Arc::new(Int32Array::from_iter_values(0..10)),
631    ///         Arc::new(
632    ///             FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
633    ///                 (0..10).map(|_| Some(vec![Some(1.0); 128])),
634    ///                 128,
635    ///             ),
636    ///         ),
637    ///     ],
638    /// )
639    /// .unwrap();
640    /// let tbl = db
641    ///     .create_table("delete_test", data)
642    ///     .execute()
643    ///     .await
644    ///     .unwrap();
645    /// tbl.delete("id > 5").await.unwrap();
646    /// # });
647    /// ```
648    pub async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
649        self.inner.delete(predicate).await
650    }
651
652    /// Create an index on the provided column(s).
653    ///
654    /// Indices are used to speed up searches and are often needed when the size of the table
655    /// becomes large (the exact size depends on many factors but somewhere between 100K rows
656    /// and 1M rows is a good rule of thumb)
657    ///
658    /// There are a variety of indices available.  They are described more in
659    /// [`crate::index::Index`].  The simplest thing to do is to use `index::Index::Auto` which
660    /// will attempt to create the most useful index based on the column type and column
661    /// statistics. `BTree` index is created by default for numeric, temporal, and
662    /// string columns.
663    ///
664    /// Once an index is created it will remain until the data is overwritten (e.g. an
665    /// add operation with mode overwrite) or the indexed column is dropped.
666    ///
667    /// Indices are not automatically updated with new data.  If you add new data to the
668    /// table then the index will not include the new rows.  However, a table search will
669    /// still consider the unindexed rows.  Searches will issue both an indexed search (on
670    /// the data covered by the index) and a flat search (on the unindexed data) and the
671    /// results will be combined.
672    ///
673    /// If there is enough unindexed data then the flat search will become slow and the index
674    /// should be optimized.  Optimizing an index will add any unindexed data to the existing
675    /// index without rerunning the full index creation process.  For more details see
676    /// [Table::optimize].
677    ///
678    /// Note: Multi-column (composite) indices are not currently supported.  However, they will
679    /// be supported in the future and the API is designed to be compatible with them.
680    ///
681    /// # Examples
682    ///
683    /// ```no_run
684    /// # use std::sync::Arc;
685    /// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
686    /// #   RecordBatchIterator, Int32Array};
687    /// # use arrow_schema::{Schema, Field, DataType};
688    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
689    /// use lancedb::index::Index;
690    /// let tmpdir = tempfile::tempdir().unwrap();
691    /// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
692    ///     .execute()
693    ///     .await
694    ///     .unwrap();
695    /// # let tbl = db.open_table("idx_test").execute().await.unwrap();
696    /// // Create IVF PQ index on the "vector" column by default.
697    /// tbl.create_index(&["vector"], Index::Auto)
698    ///    .execute()
699    ///    .await
700    ///    .unwrap();
701    /// // Create a BTree index on the "id" column.
702    /// tbl.create_index(&["id"], Index::Auto)
703    ///     .execute()
704    ///     .await
705    ///     .unwrap();
706    /// // Create a LabelList index on the "tags" column.
707    /// tbl.create_index(&["tags"], Index::LabelList(Default::default()))
708    ///     .execute()
709    ///     .await
710    ///     .unwrap();
711    /// # });
712    /// ```
713    pub fn create_index(&self, columns: &[impl AsRef<str>], index: Index) -> IndexBuilder {
714        IndexBuilder::new(
715            self.inner.clone(),
716            columns
717                .iter()
718                .map(|val| val.as_ref().to_string())
719                .collect::<Vec<_>>(),
720            index,
721        )
722    }
723
724    /// See [Table::create_index]
725    /// For remote tables, this allows an optional wait_timeout to poll until asynchronous indexing is complete
726    pub fn create_index_with_timeout(
727        &self,
728        columns: &[impl AsRef<str>],
729        index: Index,
730        wait_timeout: Option<std::time::Duration>,
731    ) -> IndexBuilder {
732        let mut builder = IndexBuilder::new(
733            self.inner.clone(),
734            columns
735                .iter()
736                .map(|val| val.as_ref().to_string())
737                .collect::<Vec<_>>(),
738            index,
739        );
740        if let Some(timeout) = wait_timeout {
741            builder = builder.wait_timeout(timeout);
742        }
743        builder
744    }
745
746    /// Create a builder for a merge insert operation
747    ///
748    /// This operation can add rows, update rows, and remove rows all in a single
749    /// transaction. It is a very generic tool that can be used to create
750    /// behaviors like "insert if not exists", "update or insert (i.e. upsert)",
751    /// or even replace a portion of existing data with new data (e.g. replace
752    /// all data where month="january")
753    ///
754    /// The merge insert operation works by combining new data from a
755    /// **source table** with existing data in a **target table** by using a
756    /// join.  There are three categories of records.
757    ///
758    /// "Matched" records are records that exist in both the source table and
759    /// the target table. "Not matched" records exist only in the source table
760    /// (e.g. these are new data) "Not matched by source" records exist only
761    /// in the target table (this is old data)
762    ///
763    /// The builder returned by this method can be used to customize what
764    /// should happen for each category of data.
765    ///
766    /// Please note that the data may appear to be reordered as part of this
767    /// operation.  This is because updated rows will be deleted from the
768    /// dataset and then reinserted at the end with the new values.
769    ///
770    /// # Arguments
771    ///
772    /// * `on` One or more columns to join on.  This is how records from the
773    ///   source table and target table are matched.  Typically this is some
774    ///   kind of key or id column.
775    ///
776    /// # Examples
777    ///
778    /// ```no_run
779    /// # use std::sync::Arc;
780    /// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
781    /// #   RecordBatchIterator, Int32Array};
782    /// # use arrow_schema::{Schema, Field, DataType};
783    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
784    /// let tmpdir = tempfile::tempdir().unwrap();
785    /// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
786    ///     .execute()
787    ///     .await
788    ///     .unwrap();
789    /// # let tbl = db.open_table("idx_test").execute().await.unwrap();
790    /// # let schema = Arc::new(Schema::new(vec![
791    /// #  Field::new("id", DataType::Int32, false),
792    /// #  Field::new("vector", DataType::FixedSizeList(
793    /// #    Arc::new(Field::new("item", DataType::Float32, true)), 128), true),
794    /// # ]));
795    /// let new_data = RecordBatchIterator::new(
796    ///     vec![RecordBatch::try_new(
797    ///         schema.clone(),
798    ///         vec![
799    ///             Arc::new(Int32Array::from_iter_values(0..10)),
800    ///             Arc::new(
801    ///                 FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
802    ///                     (0..10).map(|_| Some(vec![Some(1.0); 128])),
803    ///                     128,
804    ///                 ),
805    ///             ),
806    ///         ],
807    ///     )
808    ///     .unwrap()]
809    ///     .into_iter()
810    ///     .map(Ok),
811    ///     schema.clone(),
812    /// );
813    /// // Perform an upsert operation
814    /// let mut merge_insert = tbl.merge_insert(&["id"]);
815    /// merge_insert
816    ///     .when_matched_update_all(None)
817    ///     .when_not_matched_insert_all();
818    /// merge_insert.execute(Box::new(new_data)).await.unwrap();
819    /// # });
820    /// ```
821    pub fn merge_insert(&self, on: &[&str]) -> MergeInsertBuilder {
822        MergeInsertBuilder::new(
823            self.inner.clone(),
824            on.iter().map(|s| s.to_string()).collect(),
825        )
826    }
827
828    /// Create a [`Query`] Builder.
829    ///
830    /// Queries allow you to search your existing data.  By default the query will
831    /// return all the data in the table in no particular order.  The builder
832    /// returned by this method can be used to control the query using filtering,
833    /// vector similarity, sorting, and more.
834    ///
835    /// Note: By default, all columns are returned.  For best performance, you should
836    /// only fetch the columns you need.  See [`Query::select_with_projection`] for
837    /// more details.
838    ///
839    /// When appropriate, various indices and statistics will be used to accelerate
840    /// the query.
841    ///
842    /// # Examples
843    ///
844    /// ## Vector search
845    ///
846    /// This example will find the 10 rows whose value in the "vector" column are
847    /// closest to the query vector [1.0, 2.0, 3.0].  If an index has been created
848    /// on the "vector" column then this will perform an ANN search.
849    ///
850    /// The [`Query::refine_factor`] and [`Query::nprobes`] methods are used to
851    /// control the recall / latency tradeoff of the search.
852    ///
853    /// ```no_run
854    /// # use arrow_array::RecordBatch;
855    /// # use futures::TryStreamExt;
856    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
857    /// # let conn = lancedb::connect("/tmp").execute().await.unwrap();
858    /// # let tbl = conn.open_table("tbl").execute().await.unwrap();
859    /// use crate::lancedb::Table;
860    /// use crate::lancedb::query::ExecutableQuery;
861    /// let stream = tbl
862    ///     .query()
863    ///     .nearest_to(&[1.0, 2.0, 3.0])
864    ///     .unwrap()
865    ///     .refine_factor(5)
866    ///     .nprobes(10)
867    ///     .execute()
868    ///     .await
869    ///     .unwrap();
870    /// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
871    /// # });
872    /// ```
873    ///
874    /// ## SQL-style filter
875    ///
876    /// This query will return up to 1000 rows whose value in the `id` column
877    /// is greater than 5.  LanceDb supports a broad set of filtering functions.
878    ///
879    /// ```no_run
880    /// # use arrow_array::RecordBatch;
881    /// # use futures::TryStreamExt;
882    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
883    /// # let conn = lancedb::connect("/tmp").execute().await.unwrap();
884    /// # let tbl = conn.open_table("tbl").execute().await.unwrap();
885    /// use crate::lancedb::Table;
886    /// use crate::lancedb::query::{ExecutableQuery, QueryBase};
887    /// let stream = tbl
888    ///     .query()
889    ///     .only_if("id > 5")
890    ///     .limit(1000)
891    ///     .execute()
892    ///     .await
893    ///     .unwrap();
894    /// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
895    /// # });
896    /// ```
897    ///
898    /// ## Full scan
899    ///
900    /// This query will return everything in the table in no particular
901    /// order.
902    ///
903    /// ```no_run
904    /// # use arrow_array::RecordBatch;
905    /// # use futures::TryStreamExt;
906    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
907    /// # let conn = lancedb::connect("/tmp").execute().await.unwrap();
908    /// # let tbl = conn.open_table("tbl").execute().await.unwrap();
909    /// use crate::lancedb::Table;
910    /// use crate::lancedb::query::ExecutableQuery;
911    /// let stream = tbl.query().execute().await.unwrap();
912    /// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
913    /// # });
914    /// ```
915    pub fn query(&self) -> Query {
916        Query::new(self.inner.clone())
917    }
918
919    /// Extract rows from the dataset using dataset offsets.
920    ///
921    /// Dataset offsets are 0-indexed and relative to the current version of the table.
922    /// They are not stable.  A row with an offset of N may have a different offset in a
923    /// different version of the table (e.g. if an earlier row is deleted).
924    ///
925    /// Offsets are useful for sampling as the set of all valid offsets is easily
926    /// known in advance to be [0, len(table)).
927    ///
928    /// No guarantees are made regarding the order in which results are returned.  If you
929    /// desire an output order that matches the order of the given offsets, you will need
930    /// to add the row offset column to the output and align it yourself.
931    ///
932    /// Parameters
933    /// ----------
934    /// offsets: list[int]
935    ///     The offsets to take.
936    ///
937    /// Returns
938    /// -------
939    /// pa.RecordBatch
940    ///     A record batch containing the rows at the given offsets.
941    pub fn take_offsets(&self, offsets: Vec<u64>) -> TakeQuery {
942        TakeQuery::from_offsets(self.inner.clone(), offsets)
943    }
944
945    /// Extract rows from the dataset using row ids.
946    ///
947    /// Row ids are not stable and are relative to the current version of the table.
948    /// They can change due to compaction and updates.
949    ///
950    /// Even so, row ids are more stable than offsets and can be useful in some situations.
951    ///
952    /// There is an ongoing effort to make row ids stable which is tracked at
953    /// https://github.com/lancedb/lancedb/issues/1120
954    ///
955    /// No guarantees are made regarding the order in which results are returned.  If you
956    /// desire an output order that matches the order of the given ids, you will need
957    /// to add the row id column to the output and align it yourself.
958    /// Parameters
959    /// ----------
960    /// row_ids: list[int]
961    ///     The row ids to take.
962    ///
963    pub fn take_row_ids(&self, row_ids: Vec<u64>) -> TakeQuery {
964        TakeQuery::from_row_ids(self.inner.clone(), row_ids)
965    }
966
967    /// Search the table with a given query vector.
968    ///
969    /// This is a convenience method for preparing a vector query and
970    /// is the same thing as calling `nearest_to` on the builder returned
971    /// by `query`.  See [`Query::nearest_to`] for more details.
972    pub fn vector_search(&self, query: impl IntoQueryVector) -> Result<VectorQuery> {
973        self.query().nearest_to(query)
974    }
975
976    /// Optimize the on-disk data and indices for better performance.
977    ///
978    /// Modeled after ``VACUUM`` in PostgreSQL.
979    ///
980    /// Optimization is discussed in more detail in the [OptimizeAction] documentation
981    /// and covers three operations:
982    ///
983    ///  * Compaction: Merges small files into larger ones
984    ///  * Prune: Removes old versions of the dataset
985    ///  * Index: Optimizes the indices, adding new data to existing indices
986    ///
987    /// The frequency an application should call optimize is based on the frequency of
988    /// data modifications.  If data is frequently added, deleted, or updated then
989    /// optimize should be run frequently.  A good rule of thumb is to run optimize if
990    /// you have added or modified 100,000 or more records or run more than 20 data
991    /// modification operations.
992    pub async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
993        self.inner.optimize(action).await
994    }
995
996    /// Add new columns to the table, providing values to fill in.
997    pub async fn add_columns(
998        &self,
999        transforms: NewColumnTransform,
1000        read_columns: Option<Vec<String>>,
1001    ) -> Result<AddColumnsResult> {
1002        self.inner.add_columns(transforms, read_columns).await
1003    }
1004
1005    /// Change a column's name or nullability.
1006    pub async fn alter_columns(
1007        &self,
1008        alterations: &[ColumnAlteration],
1009    ) -> Result<AlterColumnsResult> {
1010        self.inner.alter_columns(alterations).await
1011    }
1012
1013    /// Remove columns from the table.
1014    pub async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
1015        self.inner.drop_columns(columns).await
1016    }
1017
1018    /// Retrieve the version of the table
1019    ///
1020    /// LanceDb supports versioning.  Every operation that modifies the table increases
1021    /// version.  As long as a version hasn't been deleted you can `[Self::checkout]` that
1022    /// version to view the data at that point.  In addition, you can `[Self::restore]` the
1023    /// version to replace the current table with a previous version.
1024    pub async fn version(&self) -> Result<u64> {
1025        self.inner.version().await
1026    }
1027
1028    /// Checks out a specific version of the Table
1029    ///
1030    /// Any read operation on the table will now access the data at the checked out version.
1031    /// As a consequence, calling this method will disable any read consistency interval
1032    /// that was previously set.
1033    ///
1034    /// This is a read-only operation that turns the table into a sort of "view"
1035    /// or "detached head".  Other table instances will not be affected.  To make the change
1036    /// permanent you can use the `[Self::restore]` method.
1037    ///
1038    /// Any operation that modifies the table will fail while the table is in a checked
1039    /// out state.
1040    ///
1041    /// To return the table to a normal state use `[Self::checkout_latest]`
1042    pub async fn checkout(&self, version: u64) -> Result<()> {
1043        self.inner.checkout(version).await
1044    }
1045
1046    /// Checks out a specific version of the Table by tag
1047    ///
1048    /// Any read operation on the table will now access the data at the version referenced by the tag.
1049    /// As a consequence, calling this method will disable any read consistency interval
1050    /// that was previously set.
1051    ///
1052    /// This is a read-only operation that turns the table into a sort of "view"
1053    /// or "detached head".  Other table instances will not be affected.  To make the change
1054    /// permanent you can use the `[Self::restore]` method.
1055    ///
1056    /// Any operation that modifies the table will fail while the table is in a checked
1057    /// out state.
1058    ///
1059    /// To return the table to a normal state use `[Self::checkout_latest]`
1060    pub async fn checkout_tag(&self, tag: &str) -> Result<()> {
1061        self.inner.checkout_tag(tag).await
1062    }
1063
1064    /// Ensures the table is pointing at the latest version
1065    ///
1066    /// This can be used to manually update a table when the read_consistency_interval is None
1067    /// It can also be used to undo a `[Self::checkout]` operation
1068    pub async fn checkout_latest(&self) -> Result<()> {
1069        self.inner.checkout_latest().await
1070    }
1071
1072    /// Restore the table to the currently checked out version
1073    ///
1074    /// This operation will fail if checkout has not been called previously
1075    ///
1076    /// This operation will overwrite the latest version of the table with a
1077    /// previous version.  Any changes made since the checked out version will
1078    /// no longer be visible.
1079    ///
1080    /// Once the operation concludes the table will no longer be in a checked
1081    /// out state and the read_consistency_interval, if any, will apply.
1082    pub async fn restore(&self) -> Result<()> {
1083        self.inner.restore().await
1084    }
1085
1086    /// List all the versions of the table
1087    pub async fn list_versions(&self) -> Result<Vec<Version>> {
1088        self.inner.list_versions().await
1089    }
1090
1091    /// List all indices that have been created with [`Self::create_index`]
1092    pub async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
1093        self.inner.list_indices().await
1094    }
1095
1096    /// Get the table URI (storage location)
1097    ///
1098    /// Returns the full storage location of the table (e.g., S3/GCS path).
1099    /// For remote tables, this fetches the location from the server via describe.
1100    pub async fn uri(&self) -> Result<String> {
1101        self.inner.uri().await
1102    }
1103
1104    /// Get the storage options used when opening this table, if any.
1105    ///
1106    /// Warning: This is an internal API and the return value is subject to change.
1107    #[deprecated(since = "0.25.0", note = "Use initial_storage_options() instead")]
1108    pub async fn storage_options(&self) -> Option<HashMap<String, String>> {
1109        #[allow(deprecated)]
1110        self.inner.storage_options().await
1111    }
1112
1113    /// Get the initial storage options that were passed in when opening this table.
1114    ///
1115    /// For dynamically refreshed options (e.g., credential vending), use [`Self::latest_storage_options`].
1116    ///
1117    /// Warning: This is an internal API and the return value is subject to change.
1118    pub async fn initial_storage_options(&self) -> Option<HashMap<String, String>> {
1119        self.inner.initial_storage_options().await
1120    }
1121
1122    /// Get the latest storage options, refreshing from provider if configured.
1123    ///
1124    /// This method is useful for credential vending scenarios where storage options
1125    /// may be refreshed dynamically. If no dynamic provider is configured, this
1126    /// returns the initial static options.
1127    ///
1128    /// Warning: This is an internal API and the return value is subject to change.
1129    pub async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>> {
1130        self.inner.latest_storage_options().await
1131    }
1132
1133    /// Get statistics about an index.
1134    /// Returns None if the index does not exist.
1135    pub async fn index_stats(
1136        &self,
1137        index_name: impl AsRef<str>,
1138    ) -> Result<Option<IndexStatistics>> {
1139        self.inner.index_stats(index_name.as_ref()).await
1140    }
1141
1142    /// Drop an index from the table.
1143    ///
1144    /// Note: This is not yet available in LanceDB cloud.
1145    ///
1146    /// This does not delete the index from disk, it just removes it from the table.
1147    /// To delete the index, run [`Self::optimize()`] after dropping the index.
1148    ///
1149    /// Use [`Self::list_indices()`] to find the names of the indices.
1150    pub async fn drop_index(&self, name: &str) -> Result<()> {
1151        self.inner.drop_index(name).await
1152    }
1153
1154    /// Prewarm an index in the table.
1155    ///
1156    /// This is a hint to the database that the index will be accessed in the
1157    /// future and should be loaded into memory if possible.  This can reduce
1158    /// cold-start latency for subsequent queries.
1159    ///
1160    /// This call initiates prewarming and returns once the request is accepted.
1161    /// It is idempotent and safe to call from multiple clients concurrently.
1162    ///
1163    /// It is generally wasteful to call this if the index does not fit into the
1164    /// available cache.  Not all index types support prewarming; unsupported
1165    /// indices will silently ignore the request.
1166    ///
1167    /// Use [`Self::list_indices()`] to find the names of the indices.
1168    pub async fn prewarm_index(&self, name: &str) -> Result<()> {
1169        self.inner.prewarm_index(name).await
1170    }
1171
1172    /// Prewarm data for the table.
1173    ///
1174    /// This is a hint to the database that the given columns will be accessed in
1175    /// the future and the database should prefetch the data if possible.  This
1176    /// can reduce cold-start latency for subsequent queries.  Currently only
1177    /// supported on remote tables.
1178    ///
1179    /// This call initiates prewarming and returns once the request is accepted.
1180    /// It is idempotent and safe to call from multiple clients concurrently —
1181    /// calling it on already-prewarmed columns is a no-op on the server.
1182    ///
1183    /// This operation has a large upfront cost but can speed up future queries
1184    /// that need to fetch the given columns.  Large columns such as embeddings
1185    /// or binary data may not be practical to prewarm.  This feature is intended
1186    /// for workloads that issue many queries against the same columns.
1187    ///
1188    /// If `columns` is `None`, all columns are prewarmed.
1189    pub async fn prewarm_data(&self, columns: Option<Vec<String>>) -> Result<()> {
1190        self.inner.prewarm_data(columns).await
1191    }
1192
1193    /// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
1194    /// are not fully indexed within the timeout.
1195    pub async fn wait_for_index(
1196        &self,
1197        index_names: &[&str],
1198        timeout: std::time::Duration,
1199    ) -> Result<()> {
1200        self.inner.wait_for_index(index_names, timeout).await
1201    }
1202
1203    /// Get the tags manager.
1204    pub async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
1205        self.inner.tags().await
1206    }
1207
1208    /// Retrieve statistics on the table
1209    pub async fn stats(&self) -> Result<TableStatistics> {
1210        self.inner.stats().await
1211    }
1212}
1213
1214pub struct NativeTags {
1215    dataset: dataset::DatasetConsistencyWrapper,
1216}
1217#[async_trait]
1218impl Tags for NativeTags {
1219    async fn list(&self) -> Result<HashMap<String, TagContents>> {
1220        let dataset = self.dataset.get().await?;
1221        Ok(dataset.tags().list().await?)
1222    }
1223
1224    async fn get_version(&self, tag: &str) -> Result<u64> {
1225        let dataset = self.dataset.get().await?;
1226        Ok(dataset.tags().get_version(tag).await?)
1227    }
1228
1229    async fn create(&mut self, tag: &str, version: u64) -> Result<()> {
1230        let dataset = self.dataset.get().await?;
1231        dataset.tags().create(tag, version).await?;
1232        Ok(())
1233    }
1234
1235    async fn delete(&mut self, tag: &str) -> Result<()> {
1236        let dataset = self.dataset.get().await?;
1237        dataset.tags().delete(tag).await?;
1238        Ok(())
1239    }
1240
1241    async fn update(&mut self, tag: &str, version: u64) -> Result<()> {
1242        let dataset = self.dataset.get().await?;
1243        dataset.tags().update(tag, version).await?;
1244        Ok(())
1245    }
1246}
1247
1248pub trait NativeTableExt {
1249    /// Cast as [`NativeTable`], or return None it if is not a [`NativeTable`].
1250    fn as_native(&self) -> Option<&NativeTable>;
1251}
1252
1253impl NativeTableExt for Arc<dyn BaseTable> {
1254    fn as_native(&self) -> Option<&NativeTable> {
1255        self.as_any().downcast_ref::<NativeTable>()
1256    }
1257}
1258
1259/// A table in a LanceDB database.
1260#[derive(Clone)]
1261pub struct NativeTable {
1262    name: String,
1263    namespace: Vec<String>,
1264    id: String,
1265    uri: String,
1266    pub(crate) dataset: dataset::DatasetConsistencyWrapper,
1267    // This comes from the connection options. We store here so we can pass down
1268    // to the dataset when we recreate it (for example, in checkout_latest).
1269    read_consistency_interval: Option<std::time::Duration>,
1270    // Optional namespace client for namespace operations (e.g., managed versioning).
1271    // pub(crate) so query.rs can access the field for server-side query execution.
1272    pub(crate) namespace_client: Option<Arc<dyn LanceNamespace>>,
1273    // Operations to push down to the namespace server.
1274    // pub(crate) so query.rs can access the field for server-side query execution.
1275    pub(crate) pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1276}
1277
1278impl std::fmt::Debug for NativeTable {
1279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1280        f.debug_struct("NativeTable")
1281            .field("name", &self.name)
1282            .field("namespace", &self.namespace)
1283            .field("id", &self.id)
1284            .field("uri", &self.uri)
1285            .field("read_consistency_interval", &self.read_consistency_interval)
1286            .field("namespace_client", &self.namespace_client)
1287            .field("pushdown_operations", &self.pushdown_operations)
1288            .finish()
1289    }
1290}
1291
1292impl std::fmt::Display for NativeTable {
1293    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1294        write!(
1295            f,
1296            "NativeTable({}, uri={}, read_consistency_interval={})",
1297            self.name,
1298            self.uri,
1299            match self.read_consistency_interval {
1300                None => {
1301                    "None".to_string()
1302                }
1303                Some(duration) => {
1304                    format!("{}s", duration.as_secs_f64())
1305                }
1306            }
1307        )
1308    }
1309}
1310
1311impl NativeTable {
1312    /// Opens an existing Table
1313    ///
1314    /// # Arguments
1315    ///
1316    /// * `uri` - The uri to a [NativeTable]
1317    /// * `name` - The table name
1318    ///
1319    /// # Returns
1320    ///
1321    /// * A [NativeTable] object.
1322    pub async fn open(uri: &str) -> Result<Self> {
1323        let name = Self::get_table_name(uri)?;
1324        Self::open_with_params(
1325            uri,
1326            &name,
1327            vec![],
1328            None,
1329            None,
1330            None,
1331            None,
1332            HashSet::new(),
1333            None,
1334        )
1335        .await
1336    }
1337
1338    /// Opens an existing Table
1339    ///
1340    /// # Arguments
1341    ///
1342    /// * `base_path` - The base path where the table is located
1343    /// * `name` The Table name
1344    /// * `params` The [ReadParams] to use when opening the table
1345    /// * `namespace_client` - Optional namespace client for namespace operations
1346    /// * `pushdown_operations` - Operations to push down to the namespace server
1347    /// * `managed_versioning` - Whether managed versioning is enabled. If None and namespace_client
1348    ///   is provided, the value will be fetched via describe_table.
1349    ///
1350    /// # Returns
1351    ///
1352    /// * A [NativeTable] object.
1353    #[allow(clippy::too_many_arguments)]
1354    pub async fn open_with_params(
1355        uri: &str,
1356        name: &str,
1357        namespace: Vec<String>,
1358        write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1359        params: Option<ReadParams>,
1360        read_consistency_interval: Option<std::time::Duration>,
1361        namespace_client: Option<Arc<dyn LanceNamespace>>,
1362        pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1363        managed_versioning: Option<bool>,
1364    ) -> Result<Self> {
1365        let params = params.unwrap_or_default();
1366        // patch the params if we have a write store wrapper
1367        let params = match write_store_wrapper.clone() {
1368            Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1369            None => params,
1370        };
1371
1372        // Build table_id from namespace + name
1373        let mut table_id = namespace.clone();
1374        table_id.push(name.to_string());
1375
1376        // Determine if managed_versioning is enabled
1377        // Use the provided value if available, otherwise query the namespace
1378        let managed_versioning = match managed_versioning {
1379            Some(value) => value,
1380            None if namespace_client.is_some() => {
1381                let ns_client = namespace_client.as_ref().unwrap();
1382                let describe_request = DescribeTableRequest {
1383                    id: Some(table_id.clone()),
1384                    ..Default::default()
1385                };
1386                let response = ns_client
1387                    .describe_table(describe_request)
1388                    .await
1389                    .map_err(|e| Error::Runtime {
1390                        message: format!(
1391                            "Failed to describe table via namespace client: {}. \
1392                             If you don't need managed versioning, don't pass namespace_client.",
1393                            e
1394                        ),
1395                    })?;
1396                response.managed_versioning == Some(true)
1397            }
1398            None => false,
1399        };
1400
1401        let mut builder = DatasetBuilder::from_uri(uri).with_read_params(params);
1402
1403        // Set up commit handler when managed_versioning is enabled
1404        if managed_versioning && let Some(ref ns_client) = namespace_client {
1405            let external_store =
1406                LanceNamespaceExternalManifestStore::new(ns_client.clone(), table_id.clone());
1407            let commit_handler: Arc<dyn CommitHandler> = Arc::new(ExternalManifestCommitHandler {
1408                external_manifest_store: Arc::new(external_store),
1409            });
1410            builder = builder.with_commit_handler(commit_handler);
1411        }
1412
1413        let dataset = builder.load().await.map_err(|e| match e {
1414            lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
1415                name: name.to_string(),
1416                source: Box::new(e),
1417            },
1418            e => e.into(),
1419        })?;
1420
1421        let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
1422        let id = Self::build_id(&namespace, name);
1423
1424        Ok(Self {
1425            name: name.to_string(),
1426            namespace,
1427            id,
1428            uri: uri.to_string(),
1429            dataset,
1430            read_consistency_interval,
1431            namespace_client,
1432            pushdown_operations,
1433        })
1434    }
1435
1436    /// Set the namespace client for server-side query execution.
1437    ///
1438    /// When set, queries will be executed on the namespace server instead of locally.
1439    pub fn with_namespace_client(mut self, namespace_client: Arc<dyn LanceNamespace>) -> Self {
1440        self.namespace_client = Some(namespace_client);
1441        self
1442    }
1443
1444    /// Opens an existing Table using a namespace client.
1445    ///
1446    /// This method uses `DatasetBuilder::from_namespace` to open the table, which
1447    /// automatically fetches the table location and storage options from the namespace.
1448    /// This eliminates the need to pre-fetch and merge storage options before opening.
1449    ///
1450    /// # Arguments
1451    ///
1452    /// * `namespace_client` - The namespace client to use for fetching table metadata
1453    /// * `name` - The table name
1454    /// * `namespace` - The namespace path (e.g., vec!["parent", "child"])
1455    /// * `write_store_wrapper` - Optional wrapper for the object store on write path
1456    /// * `params` - Optional read parameters
1457    /// * `read_consistency_interval` - Optional interval for read consistency
1458    /// * `pushdown_operations` - Operations to push down to the namespace server.
1459    ///   When `QueryTable` is included, queries will be executed on the namespace server.
1460    /// * `session` - Optional session for object stores and caching
1461    ///
1462    /// # Returns
1463    ///
1464    /// * A [NativeTable] object.
1465    #[allow(clippy::too_many_arguments)]
1466    pub async fn open_from_namespace(
1467        namespace_client: Arc<dyn LanceNamespace>,
1468        name: &str,
1469        namespace: Vec<String>,
1470        write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1471        params: Option<ReadParams>,
1472        read_consistency_interval: Option<std::time::Duration>,
1473        pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1474        session: Option<Arc<lance::session::Session>>,
1475    ) -> Result<Self> {
1476        let mut params = params.unwrap_or_default();
1477
1478        // Set the session in read params
1479        if let Some(sess) = session {
1480            params.session(sess);
1481        }
1482
1483        // patch the params if we have a write store wrapper
1484        let params = match write_store_wrapper.clone() {
1485            Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1486            None => params,
1487        };
1488
1489        // Build table_id from namespace + name
1490        let mut table_id = namespace.clone();
1491        table_id.push(name.to_string());
1492
1493        // Use DatasetBuilder::from_namespace which automatically fetches location
1494        // and storage options from the namespace
1495        let builder = DatasetBuilder::from_namespace(namespace_client.clone(), table_id)
1496            .await
1497            .map_err(|e| match e {
1498                lance::Error::Namespace { source, .. } => Error::Runtime {
1499                    message: format!("Failed to get table info from namespace: {:?}", source),
1500                },
1501                e => e.into(),
1502            })?;
1503
1504        let dataset = builder
1505            .with_read_params(params)
1506            .load()
1507            .await
1508            .map_err(|e| match e {
1509                lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
1510                    name: name.to_string(),
1511                    source: Box::new(e),
1512                },
1513                e => e.into(),
1514            })?;
1515
1516        let uri = dataset.uri().to_string();
1517        let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
1518        let id = Self::build_id(&namespace, name);
1519
1520        let stored_namespace_client =
1521            if pushdown_operations.contains(&NamespaceClientPushdownOperation::QueryTable) {
1522                Some(namespace_client)
1523            } else {
1524                None
1525            };
1526
1527        Ok(Self {
1528            name: name.to_string(),
1529            namespace,
1530            id,
1531            uri,
1532            dataset,
1533            read_consistency_interval,
1534            namespace_client: stored_namespace_client,
1535            pushdown_operations,
1536        })
1537    }
1538
1539    fn get_table_name(uri: &str) -> Result<String> {
1540        let path = Path::new(uri);
1541        let name = path
1542            .file_stem()
1543            .ok_or(Error::TableNotFound {
1544                name: uri.to_string(),
1545                source: format!("Could not extract table name from URI: '{}'", uri).into(),
1546            })?
1547            .to_str()
1548            .ok_or(Error::InvalidTableName {
1549                name: uri.to_string(),
1550                reason: "Table name is not valid URL".to_string(),
1551            })?;
1552        Ok(name.to_string())
1553    }
1554
1555    fn build_id(namespace: &[String], name: &str) -> String {
1556        if namespace.is_empty() {
1557            name.to_string()
1558        } else {
1559            let mut parts = namespace.to_vec();
1560            parts.push(name.to_string());
1561            parts.join("$")
1562        }
1563    }
1564
1565    /// Creates a new Table
1566    ///
1567    /// # Arguments
1568    ///
1569    /// * `uri` - The URI to the table. When namespace is not empty, the caller must
1570    ///   provide an explicit URI (location) rather than deriving it from the table name.
1571    /// * `name` The Table name
1572    /// * `namespace` - The namespace path. When non-empty, an explicit URI must be provided.
1573    /// * `batches` RecordBatch to be saved in the database.
1574    /// * `params` - Write parameters.
1575    /// * `namespace_client` - Optional namespace client for namespace operations
1576    /// * `pushdown_operations` - Operations to push down to the namespace server
1577    ///
1578    /// # Returns
1579    ///
1580    /// * A [TableImpl] object.
1581    #[allow(clippy::too_many_arguments)]
1582    pub async fn create(
1583        uri: &str,
1584        name: &str,
1585        namespace: Vec<String>,
1586        batches: impl StreamingWriteSource,
1587        write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1588        params: Option<WriteParams>,
1589        read_consistency_interval: Option<std::time::Duration>,
1590        namespace_client: Option<Arc<dyn LanceNamespace>>,
1591        pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1592    ) -> Result<Self> {
1593        // Default params uses format v1.
1594        let params = params.unwrap_or(WriteParams {
1595            ..Default::default()
1596        });
1597        // patch the params if we have a write store wrapper
1598        let params = match write_store_wrapper.clone() {
1599            Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1600            None => params,
1601        };
1602
1603        let insert_builder = InsertBuilder::new(uri).with_params(&params);
1604        let dataset = insert_builder
1605            .execute_stream(batches)
1606            .await
1607            .map_err(|e| match e {
1608                lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists {
1609                    name: name.to_string(),
1610                },
1611                e => e.into(),
1612            })?;
1613
1614        let id = Self::build_id(&namespace, name);
1615
1616        Ok(Self {
1617            name: name.to_string(),
1618            namespace,
1619            id,
1620            uri: uri.to_string(),
1621            dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
1622            read_consistency_interval,
1623            namespace_client,
1624            pushdown_operations,
1625        })
1626    }
1627
1628    #[allow(clippy::too_many_arguments)]
1629    pub async fn create_empty(
1630        uri: &str,
1631        name: &str,
1632        namespace: Vec<String>,
1633        schema: SchemaRef,
1634        write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1635        params: Option<WriteParams>,
1636        read_consistency_interval: Option<std::time::Duration>,
1637        namespace_client: Option<Arc<dyn LanceNamespace>>,
1638        pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1639    ) -> Result<Self> {
1640        let data: Box<dyn Scannable> = Box::new(RecordBatch::new_empty(schema));
1641        Self::create(
1642            uri,
1643            name,
1644            namespace,
1645            data,
1646            write_store_wrapper,
1647            params,
1648            read_consistency_interval,
1649            namespace_client,
1650            pushdown_operations,
1651        )
1652        .await
1653    }
1654
1655    /// Creates a new Table using a namespace client for storage options.
1656    ///
1657    /// This method sets up a `StorageOptionsProvider` from the namespace client,
1658    /// enabling automatic credential refresh for cloud storage. The namespace
1659    /// is used for:
1660    /// 1. Setting up storage options provider for credential vending
1661    /// 2. Optionally enabling server-side query execution
1662    ///
1663    /// # Arguments
1664    ///
1665    /// * `namespace_client` - The namespace client to use for storage options
1666    /// * `uri` - The URI to the table (obtained from create_empty_table response)
1667    /// * `name` - The table name
1668    /// * `namespace` - The namespace path (e.g., vec!["parent", "child"])
1669    /// * `batches` - RecordBatch to be saved in the database
1670    /// * `write_store_wrapper` - Optional wrapper for the object store on write path
1671    /// * `params` - Optional write parameters
1672    /// * `read_consistency_interval` - Optional interval for read consistency
1673    /// * `pushdown_operations` - Operations to push down to the namespace server
1674    ///
1675    /// # Returns
1676    ///
1677    /// * A [NativeTable] object.
1678    #[allow(clippy::too_many_arguments)]
1679    pub async fn create_from_namespace(
1680        namespace_client: Arc<dyn LanceNamespace>,
1681        uri: &str,
1682        name: &str,
1683        namespace: Vec<String>,
1684        batches: impl StreamingWriteSource,
1685        write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1686        params: Option<WriteParams>,
1687        read_consistency_interval: Option<std::time::Duration>,
1688        pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1689        session: Option<Arc<lance::session::Session>>,
1690    ) -> Result<Self> {
1691        // Build table_id from namespace + name for the storage options provider
1692        let mut table_id = namespace.clone();
1693        table_id.push(name.to_string());
1694
1695        // Set up storage options provider from namespace
1696        let storage_options_provider = Arc::new(LanceNamespaceStorageOptionsProvider::new(
1697            namespace_client.clone(),
1698            table_id,
1699        ));
1700
1701        // Start with provided params or defaults
1702        let mut params = params.unwrap_or_default();
1703
1704        // Set the session in write params
1705        if let Some(sess) = session {
1706            params.session = Some(sess);
1707        }
1708
1709        // Ensure store_params exists and set the storage options provider
1710        let store_params = params
1711            .store_params
1712            .get_or_insert_with(ObjectStoreParams::default);
1713        let accessor = match store_params.storage_options().cloned() {
1714            Some(options) => {
1715                StorageOptionsAccessor::with_initial_and_provider(options, storage_options_provider)
1716            }
1717            None => StorageOptionsAccessor::with_provider(storage_options_provider),
1718        };
1719        store_params.storage_options_accessor = Some(Arc::new(accessor));
1720
1721        // Patch the params if we have a write store wrapper
1722        let params = match write_store_wrapper.clone() {
1723            Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1724            None => params,
1725        };
1726
1727        let insert_builder = InsertBuilder::new(uri).with_params(&params);
1728        let dataset = insert_builder
1729            .execute_stream(batches)
1730            .await
1731            .map_err(|e| match e {
1732                lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists {
1733                    name: name.to_string(),
1734                },
1735                e => e.into(),
1736            })?;
1737
1738        let id = Self::build_id(&namespace, name);
1739
1740        let stored_namespace_client =
1741            if pushdown_operations.contains(&NamespaceClientPushdownOperation::QueryTable) {
1742                Some(namespace_client)
1743            } else {
1744                None
1745            };
1746
1747        Ok(Self {
1748            name: name.to_string(),
1749            namespace,
1750            id,
1751            uri: uri.to_string(),
1752            dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
1753            read_consistency_interval,
1754            namespace_client: stored_namespace_client,
1755            pushdown_operations,
1756        })
1757    }
1758
1759    /// Merge new data into this table.
1760    pub async fn merge(
1761        &mut self,
1762        batches: impl RecordBatchReader + Send + 'static,
1763        left_on: &str,
1764        right_on: &str,
1765    ) -> Result<()> {
1766        self.dataset.ensure_mutable()?;
1767        let mut dataset = (*self.dataset.get().await?).clone();
1768        dataset.merge(batches, left_on, right_on).await?;
1769        self.dataset.update(dataset);
1770        Ok(())
1771    }
1772
1773    // TODO: why are these individual methods and not some single "get_stats" method?
1774    pub async fn count_fragments(&self) -> Result<usize> {
1775        Ok(self.dataset.get().await?.count_fragments())
1776    }
1777
1778    pub async fn count_deleted_rows(&self) -> Result<usize> {
1779        Ok(self.dataset.get().await?.count_deleted_rows().await?)
1780    }
1781
1782    pub async fn num_small_files(&self, max_rows_per_group: usize) -> Result<usize> {
1783        Ok(self
1784            .dataset
1785            .get()
1786            .await?
1787            .num_small_files(max_rows_per_group)
1788            .await)
1789    }
1790
1791    pub async fn load_indices(&self) -> Result<Vec<VectorIndex>> {
1792        let dataset = self.dataset.get().await?;
1793        let mf = dataset.manifest();
1794        let indices = dataset.load_indices().await?;
1795        Ok(indices
1796            .iter()
1797            .map(|i| VectorIndex::new_from_format(mf, i))
1798            .collect())
1799    }
1800
1801    // Helper to validate index type compatibility with field data type
1802    fn validate_index_type(
1803        field: &Field,
1804        index_name: &str,
1805        supported_fn: impl Fn(&DataType) -> bool,
1806    ) -> Result<()> {
1807        if !supported_fn(field.data_type()) {
1808            return Err(Error::Schema {
1809                message: format!(
1810                    "A {} index cannot be created on the field `{}` which has data type {}",
1811                    index_name,
1812                    field.name(),
1813                    field.data_type()
1814                ),
1815            });
1816        }
1817        Ok(())
1818    }
1819
1820    // Helper to build IVF params honoring table options.
1821    fn build_ivf_params(
1822        num_partitions: Option<u32>,
1823        target_partition_size: Option<u32>,
1824        sample_rate: u32,
1825        max_iterations: u32,
1826    ) -> IvfBuildParams {
1827        let mut ivf_params = match (num_partitions, target_partition_size) {
1828            (Some(num_partitions), _) => IvfBuildParams::new(num_partitions as usize),
1829            (None, Some(target_partition_size)) => {
1830                IvfBuildParams::with_target_partition_size(target_partition_size as usize)
1831            }
1832            (None, None) => IvfBuildParams::default(),
1833        };
1834        ivf_params.sample_rate = sample_rate as usize;
1835        ivf_params.max_iters = max_iterations as usize;
1836        ivf_params
1837    }
1838
1839    // Helper to get num_sub_vectors with default calculation
1840    fn get_num_sub_vectors(provided: Option<u32>, dim: u32, num_bits: Option<u32>) -> u32 {
1841        if let Some(provided) = provided {
1842            return provided;
1843        }
1844        let suggested = suggested_num_sub_vectors(dim);
1845        if num_bits.is_some_and(|num_bits| num_bits == 4) && !suggested.is_multiple_of(2) {
1846            // num_sub_vectors must be even when 4 bits are used
1847            suggested + 1
1848        } else {
1849            suggested
1850        }
1851    }
1852
1853    // Helper to extract vector dimension from field
1854    fn get_vector_dimension(field: &Field) -> Result<u32> {
1855        match field.data_type() {
1856            arrow_schema::DataType::FixedSizeList(_, n) => Ok(*n as u32),
1857            _ => Ok(infer_vector_dim(field.data_type())? as u32),
1858        }
1859    }
1860
1861    // Convert LanceDB Index to Lance IndexParams
1862    async fn make_index_params(
1863        &self,
1864        field: &Field,
1865        index_opts: Index,
1866    ) -> Result<Box<dyn lance::index::IndexParams>> {
1867        match index_opts {
1868            Index::Auto => {
1869                if supported_vector_data_type(field.data_type()) {
1870                    // Use IvfPq as the default for auto vector indices
1871                    let dim = Self::get_vector_dimension(field)?;
1872                    let ivf_params = lance_index::vector::ivf::IvfBuildParams::default();
1873                    let num_sub_vectors = Self::get_num_sub_vectors(None, dim, None);
1874                    let pq_params =
1875                        lance_index::vector::pq::PQBuildParams::new(num_sub_vectors as usize, 8);
1876                    let lance_idx_params =
1877                        lance::index::vector::VectorIndexParams::with_ivf_pq_params(
1878                            lance_linalg::distance::MetricType::L2,
1879                            ivf_params,
1880                            pq_params,
1881                        );
1882                    Ok(Box::new(lance_idx_params))
1883                } else if supported_btree_data_type(field.data_type()) {
1884                    Ok(Box::new(ScalarIndexParams::for_builtin(
1885                        BuiltinIndexType::BTree,
1886                    )))
1887                } else {
1888                    Err(Error::InvalidInput {
1889                        message: format!(
1890                            "there are no indices supported for the field `{}` with the data type {}",
1891                            field.name(),
1892                            field.data_type()
1893                        ),
1894                    })?
1895                }
1896            }
1897            Index::BTree(_) => {
1898                Self::validate_index_type(field, "BTree", supported_btree_data_type)?;
1899                Ok(Box::new(ScalarIndexParams::for_builtin(
1900                    BuiltinIndexType::BTree,
1901                )))
1902            }
1903            Index::Bitmap(_) => {
1904                Self::validate_index_type(field, "Bitmap", supported_bitmap_data_type)?;
1905                Ok(Box::new(ScalarIndexParams::for_builtin(
1906                    BuiltinIndexType::Bitmap,
1907                )))
1908            }
1909            Index::LabelList(_) => {
1910                Self::validate_index_type(field, "LabelList", supported_label_list_data_type)?;
1911                Ok(Box::new(ScalarIndexParams::for_builtin(
1912                    BuiltinIndexType::LabelList,
1913                )))
1914            }
1915            Index::FTS(fts_opts) => {
1916                Self::validate_index_type(field, "FTS", supported_fts_data_type)?;
1917                Ok(Box::new(fts_opts))
1918            }
1919            Index::IvfFlat(index) => {
1920                Self::validate_index_type(field, "IVF Flat", supported_vector_data_type)?;
1921                let ivf_params = Self::build_ivf_params(
1922                    index.num_partitions,
1923                    index.target_partition_size,
1924                    index.sample_rate,
1925                    index.max_iterations,
1926                );
1927                let lance_idx_params =
1928                    VectorIndexParams::with_ivf_flat_params(index.distance_type.into(), ivf_params);
1929                Ok(Box::new(lance_idx_params))
1930            }
1931            Index::IvfSq(index) => {
1932                Self::validate_index_type(field, "IVF SQ", supported_vector_data_type)?;
1933                let ivf_params = Self::build_ivf_params(
1934                    index.num_partitions,
1935                    index.target_partition_size,
1936                    index.sample_rate,
1937                    index.max_iterations,
1938                );
1939                let sq_params = SQBuildParams {
1940                    sample_rate: index.sample_rate as usize,
1941                    ..Default::default()
1942                };
1943                let lance_idx_params = VectorIndexParams::with_ivf_sq_params(
1944                    index.distance_type.into(),
1945                    ivf_params,
1946                    sq_params,
1947                );
1948                Ok(Box::new(lance_idx_params))
1949            }
1950            Index::IvfPq(index) => {
1951                Self::validate_index_type(field, "IVF PQ", supported_vector_data_type)?;
1952                let dim = Self::get_vector_dimension(field)?;
1953                let ivf_params = Self::build_ivf_params(
1954                    index.num_partitions,
1955                    index.target_partition_size,
1956                    index.sample_rate,
1957                    index.max_iterations,
1958                );
1959                let num_sub_vectors =
1960                    Self::get_num_sub_vectors(index.num_sub_vectors, dim, index.num_bits);
1961                let num_bits = index.num_bits.unwrap_or(8) as usize;
1962                let mut pq_params = PQBuildParams::new(num_sub_vectors as usize, num_bits);
1963                pq_params.max_iters = index.max_iterations as usize;
1964                let lance_idx_params = VectorIndexParams::with_ivf_pq_params(
1965                    index.distance_type.into(),
1966                    ivf_params,
1967                    pq_params,
1968                );
1969                Ok(Box::new(lance_idx_params))
1970            }
1971            Index::IvfRq(index) => {
1972                Self::validate_index_type(field, "IVF RQ", supported_vector_data_type)?;
1973                let ivf_params = Self::build_ivf_params(
1974                    index.num_partitions,
1975                    index.target_partition_size,
1976                    index.sample_rate,
1977                    index.max_iterations,
1978                );
1979                let rq_params = RQBuildParams::new(index.num_bits.unwrap_or(1) as u8);
1980                let lance_idx_params = VectorIndexParams::with_ivf_rq_params(
1981                    index.distance_type.into(),
1982                    ivf_params,
1983                    rq_params,
1984                );
1985                Ok(Box::new(lance_idx_params))
1986            }
1987            Index::IvfHnswPq(index) => {
1988                Self::validate_index_type(field, "IVF HNSW PQ", supported_vector_data_type)?;
1989                let dim = Self::get_vector_dimension(field)?;
1990                let ivf_params = Self::build_ivf_params(
1991                    index.num_partitions,
1992                    index.target_partition_size,
1993                    index.sample_rate,
1994                    index.max_iterations,
1995                );
1996                let num_sub_vectors =
1997                    Self::get_num_sub_vectors(index.num_sub_vectors, dim, index.num_bits);
1998                let hnsw_params = HnswBuildParams::default()
1999                    .num_edges(index.m as usize)
2000                    .ef_construction(index.ef_construction as usize);
2001                let pq_params = PQBuildParams::new(
2002                    num_sub_vectors as usize,
2003                    index.num_bits.unwrap_or(8) as usize,
2004                );
2005                let lance_idx_params = VectorIndexParams::with_ivf_hnsw_pq_params(
2006                    index.distance_type.into(),
2007                    ivf_params,
2008                    hnsw_params,
2009                    pq_params,
2010                );
2011                Ok(Box::new(lance_idx_params))
2012            }
2013            Index::IvfHnswSq(index) => {
2014                Self::validate_index_type(field, "IVF HNSW SQ", supported_vector_data_type)?;
2015                let ivf_params = Self::build_ivf_params(
2016                    index.num_partitions,
2017                    index.target_partition_size,
2018                    index.sample_rate,
2019                    index.max_iterations,
2020                );
2021                let hnsw_params = HnswBuildParams::default()
2022                    .num_edges(index.m as usize)
2023                    .ef_construction(index.ef_construction as usize);
2024                let sq_params = SQBuildParams {
2025                    sample_rate: index.sample_rate as usize,
2026                    ..Default::default()
2027                };
2028                let lance_idx_params = VectorIndexParams::with_ivf_hnsw_sq_params(
2029                    index.distance_type.into(),
2030                    ivf_params,
2031                    hnsw_params,
2032                    sq_params,
2033                );
2034                Ok(Box::new(lance_idx_params))
2035            }
2036            Index::IvfHnswFlat(index) => {
2037                Self::validate_index_type(field, "IVF HNSW FLAT", supported_vector_data_type)?;
2038                let ivf_params = Self::build_ivf_params(
2039                    index.num_partitions,
2040                    index.target_partition_size,
2041                    index.sample_rate,
2042                    index.max_iterations,
2043                );
2044                let hnsw_params = HnswBuildParams::default()
2045                    .num_edges(index.m as usize)
2046                    .ef_construction(index.ef_construction as usize);
2047                let lance_idx_params = VectorIndexParams::ivf_hnsw(
2048                    index.distance_type.into(),
2049                    ivf_params,
2050                    hnsw_params,
2051                );
2052                Ok(Box::new(lance_idx_params))
2053            }
2054        }
2055    }
2056
2057    // Helper method to get the correct IndexType based on the Index variant and field data type
2058    fn get_index_type_for_field(&self, field: &Field, index: &Index) -> IndexType {
2059        match index {
2060            Index::Auto => {
2061                if supported_vector_data_type(field.data_type()) {
2062                    IndexType::Vector
2063                } else if supported_btree_data_type(field.data_type()) {
2064                    IndexType::BTree
2065                } else {
2066                    // This should not happen since make_index_params would have failed
2067                    IndexType::BTree
2068                }
2069            }
2070            Index::BTree(_) => IndexType::BTree,
2071            Index::Bitmap(_) => IndexType::Bitmap,
2072            Index::LabelList(_) => IndexType::LabelList,
2073            Index::FTS(_) => IndexType::Inverted,
2074            Index::IvfFlat(_)
2075            | Index::IvfSq(_)
2076            | Index::IvfPq(_)
2077            | Index::IvfRq(_)
2078            | Index::IvfHnswPq(_)
2079            | Index::IvfHnswSq(_)
2080            | Index::IvfHnswFlat(_) => IndexType::Vector,
2081        }
2082    }
2083
2084    /// Check whether the table uses V2 manifest paths.
2085    ///
2086    /// See [Self::migrate_manifest_paths_v2] and [ManifestNamingScheme] for
2087    /// more information.
2088    pub async fn uses_v2_manifest_paths(&self) -> Result<bool> {
2089        let dataset = self.dataset.get().await?;
2090        Ok(dataset.manifest_location().naming_scheme == ManifestNamingScheme::V2)
2091    }
2092
2093    /// Migrate the table to use the new manifest path scheme.
2094    ///
2095    /// This function will rename all V1 manifests to V2 manifest paths.
2096    /// These paths provide more efficient opening of datasets with many versions
2097    /// on object stores.
2098    ///
2099    /// This function is idempotent, and can be run multiple times without
2100    /// changing the state of the object store.
2101    ///
2102    /// However, it should not be run while other concurrent operations are happening.
2103    /// And it should also run until completion before resuming other operations.
2104    ///
2105    /// You can use [Self::uses_v2_manifest_paths] to check if the table is already
2106    /// using V2 manifest paths.
2107    pub async fn migrate_manifest_paths_v2(&self) -> Result<()> {
2108        self.dataset.ensure_mutable()?;
2109        let mut dataset = (*self.dataset.get().await?).clone();
2110        dataset.migrate_manifest_paths_v2().await?;
2111        self.dataset.update(dataset);
2112        Ok(())
2113    }
2114
2115    /// Get the table manifest
2116    pub async fn manifest(&self) -> Result<Manifest> {
2117        let dataset = self.dataset.get().await?;
2118        Ok(dataset.manifest().clone())
2119    }
2120
2121    /// Update key-value pairs in config.
2122    pub async fn update_config(
2123        &self,
2124        upsert_values: impl IntoIterator<Item = (String, String)>,
2125    ) -> Result<()> {
2126        self.dataset.ensure_mutable()?;
2127        let mut dataset = (*self.dataset.get().await?).clone();
2128        dataset.update_config(upsert_values).await?;
2129        self.dataset.update(dataset);
2130        Ok(())
2131    }
2132
2133    /// Delete keys from the config
2134    pub async fn delete_config_keys(&self, delete_keys: &[&str]) -> Result<()> {
2135        self.dataset.ensure_mutable()?;
2136        let mut dataset = (*self.dataset.get().await?).clone();
2137        // TODO: update this when we implement metadata APIs
2138        #[allow(deprecated)]
2139        dataset.delete_config_keys(delete_keys).await?;
2140        self.dataset.update(dataset);
2141        Ok(())
2142    }
2143
2144    /// Update schema metadata
2145    pub async fn replace_schema_metadata(
2146        &self,
2147        upsert_values: impl IntoIterator<Item = (String, String)>,
2148    ) -> Result<()> {
2149        self.dataset.ensure_mutable()?;
2150        let mut dataset = (*self.dataset.get().await?).clone();
2151        // TODO: update this when we implement metadata APIs
2152        #[allow(deprecated)]
2153        dataset.replace_schema_metadata(upsert_values).await?;
2154        self.dataset.update(dataset);
2155        Ok(())
2156    }
2157
2158    /// Update field metadata
2159    ///
2160    /// # Arguments:
2161    /// * `new_values` - An iterator of tuples where the first element is the
2162    ///   field id and the second element is a hashmap of metadata key-value
2163    ///   pairs.
2164    ///
2165    pub async fn replace_field_metadata(
2166        &self,
2167        new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
2168    ) -> Result<()> {
2169        self.dataset.ensure_mutable()?;
2170        let mut dataset = (*self.dataset.get().await?).clone();
2171        dataset.replace_field_metadata(new_values).await?;
2172        self.dataset.update(dataset);
2173        Ok(())
2174    }
2175}
2176
2177#[async_trait::async_trait]
2178impl BaseTable for NativeTable {
2179    fn as_any(&self) -> &dyn std::any::Any {
2180        self
2181    }
2182
2183    fn name(&self) -> &str {
2184        self.name.as_str()
2185    }
2186
2187    fn namespace(&self) -> &[String] {
2188        &self.namespace
2189    }
2190
2191    fn id(&self) -> &str {
2192        &self.id
2193    }
2194
2195    async fn version(&self) -> Result<u64> {
2196        Ok(self.dataset.get().await?.version().version)
2197    }
2198
2199    async fn checkout(&self, version: u64) -> Result<()> {
2200        self.dataset.as_time_travel(version).await
2201    }
2202
2203    async fn checkout_tag(&self, tag: &str) -> Result<()> {
2204        self.dataset.as_time_travel(tag).await
2205    }
2206
2207    async fn checkout_latest(&self) -> Result<()> {
2208        self.dataset.as_latest().await?;
2209        self.dataset.reload().await
2210    }
2211
2212    async fn list_versions(&self) -> Result<Vec<Version>> {
2213        Ok(self.dataset.get().await?.versions().await?)
2214    }
2215
2216    async fn restore(&self) -> Result<()> {
2217        let version = self
2218            .dataset
2219            .time_travel_version()
2220            .ok_or_else(|| Error::InvalidInput {
2221                message: "you must run checkout before running restore".to_string(),
2222            })?;
2223        {
2224            // restore is the only "write" operation allowed in time travel mode
2225            let mut dataset = (*self.dataset.get().await?).clone();
2226            debug_assert_eq!(dataset.version().version, version);
2227            dataset.restore().await?;
2228        }
2229        self.dataset.as_latest().await?;
2230        Ok(())
2231    }
2232
2233    async fn schema(&self) -> Result<SchemaRef> {
2234        let lance_schema = self.dataset.get().await?.schema().clone();
2235        Ok(Arc::new(Schema::from(&lance_schema)))
2236    }
2237
2238    async fn table_definition(&self) -> Result<TableDefinition> {
2239        let schema = self.schema().await?;
2240        TableDefinition::try_from_rich_schema(schema)
2241    }
2242
2243    async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
2244        let dataset = self.dataset.get().await?;
2245        match filter {
2246            None => Ok(dataset.count_rows(None).await?),
2247            Some(Filter::Sql(sql)) => Ok(dataset.count_rows(Some(sql)).await?),
2248            Some(Filter::Datafusion(_)) => Err(Error::NotSupported {
2249                message: "Datafusion filters are not yet supported".to_string(),
2250            }),
2251        }
2252    }
2253
2254    async fn add(&self, mut add: AddDataBuilder) -> Result<AddResult> {
2255        let table_def = self.table_definition().await?;
2256
2257        self.dataset.ensure_mutable()?;
2258        let ds_wrapper = self.dataset.clone();
2259        let ds = self.dataset.get().await?;
2260
2261        let table_schema = Schema::from(&ds.schema().clone());
2262
2263        let num_partitions = if let Some(parallelism) = add.write_parallelism {
2264            parallelism
2265        } else {
2266            // Peek at the first batch to estimate a good partition count for
2267            // write parallelism.
2268            let mut peeked = PeekedScannable::new(add.data);
2269            let n = if let Some(first_batch) = peeked.peek().await {
2270                let max_partitions = lance_core::utils::tokio::get_num_compute_intensive_cpus();
2271                estimate_write_partitions(
2272                    first_batch.get_array_memory_size(),
2273                    first_batch.num_rows(),
2274                    peeked.num_rows(),
2275                    max_partitions,
2276                )
2277            } else {
2278                1
2279            };
2280            add.data = Box::new(peeked);
2281            n
2282        };
2283
2284        let output = add.into_plan(&table_schema, &table_def)?;
2285
2286        let lance_params = output
2287            .write_options
2288            .lance_write_params
2289            .unwrap_or(WriteParams {
2290                mode: match output.mode {
2291                    AddDataMode::Append => WriteMode::Append,
2292                    AddDataMode::Overwrite => WriteMode::Overwrite,
2293                },
2294                ..Default::default()
2295            });
2296
2297        // Repartition for write parallelism if beneficial.
2298        let plan = if num_partitions > 1 {
2299            Arc::new(
2300                datafusion_physical_plan::repartition::RepartitionExec::try_new(
2301                    output.plan,
2302                    datafusion_physical_plan::Partitioning::RoundRobinBatch(num_partitions),
2303                )?,
2304            ) as Arc<dyn ExecutionPlan>
2305        } else {
2306            output.plan
2307        };
2308
2309        let insert_exec = Arc::new(InsertExec::new(ds_wrapper.clone(), ds, plan, lance_params));
2310
2311        let tracker_for_tasks = output.tracker.clone();
2312        if let Some(ref t) = tracker_for_tasks {
2313            t.set_total_tasks(num_partitions);
2314        }
2315        let _finish = write_progress::FinishOnDrop(output.tracker);
2316
2317        // Execute all partitions in parallel.
2318        let task_ctx = Arc::new(TaskContext::default());
2319        let handles = FuturesUnordered::new();
2320        for partition in 0..num_partitions {
2321            let exec = insert_exec.clone();
2322            let ctx = task_ctx.clone();
2323            let tracker = tracker_for_tasks.clone();
2324            handles.push(tokio::spawn(async move {
2325                let _guard = tracker.as_ref().map(|t| t.track_task());
2326                let mut stream = exec
2327                    .execute(partition, ctx)
2328                    .map_err(|e| -> Error { e.into() })?;
2329                while let Some(batch) = stream.next().await {
2330                    batch.map_err(|e| -> Error { e.into() })?;
2331                }
2332                Ok::<_, Error>(())
2333            }));
2334        }
2335        for handle in handles {
2336            handle.await.map_err(|e| Error::Runtime {
2337                message: format!("Insert task panicked: {}", e),
2338            })??;
2339        }
2340
2341        let version = ds_wrapper.get().await?.manifest().version;
2342        Ok(AddResult { version })
2343    }
2344
2345    async fn create_index(&self, opts: IndexBuilder) -> Result<()> {
2346        if opts.columns.len() != 1 {
2347            return Err(Error::Schema {
2348                message: "Multi-column (composite) indices are not yet supported".to_string(),
2349            });
2350        }
2351        let schema = self.schema().await?;
2352
2353        let field = schema.field_with_name(&opts.columns[0])?;
2354
2355        let lance_idx_params = self.make_index_params(field, opts.index.clone()).await?;
2356        let index_type = self.get_index_type_for_field(field, &opts.index);
2357        let columns = [field.name().as_str()];
2358        self.dataset.ensure_mutable()?;
2359        let mut dataset = (*self.dataset.get().await?).clone();
2360        let mut builder = dataset
2361            .create_index_builder(&columns, index_type, lance_idx_params.as_ref())
2362            .train(opts.train)
2363            .replace(opts.replace);
2364
2365        if let Some(name) = opts.name {
2366            builder = builder.name(name);
2367        }
2368        builder.await?;
2369        self.dataset.update(dataset);
2370        Ok(())
2371    }
2372
2373    async fn drop_index(&self, index_name: &str) -> Result<()> {
2374        self.dataset.ensure_mutable()?;
2375        let mut dataset = (*self.dataset.get().await?).clone();
2376        dataset.drop_index(index_name).await?;
2377        self.dataset.update(dataset);
2378        Ok(())
2379    }
2380
2381    async fn prewarm_index(&self, index_name: &str) -> Result<()> {
2382        let dataset = self.dataset.get().await?;
2383        Ok(dataset.prewarm_index(index_name).await?)
2384    }
2385
2386    async fn prewarm_data(&self, _columns: Option<Vec<String>>) -> Result<()> {
2387        Err(Error::NotSupported {
2388            message: "prewarm_data is currently only supported on remote tables.".into(),
2389        })
2390    }
2391
2392    async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult> {
2393        // Delegate to the submodule implementation
2394        update::execute_update(self, update).await
2395    }
2396
2397    async fn create_plan(
2398        &self,
2399        query: &AnyQuery,
2400        options: QueryExecutionOptions,
2401    ) -> Result<Arc<dyn ExecutionPlan>> {
2402        query::create_plan(self, query, options).await
2403    }
2404
2405    async fn query(
2406        &self,
2407        query: &AnyQuery,
2408        options: QueryExecutionOptions,
2409    ) -> Result<DatasetRecordBatchStream> {
2410        query::execute_query(self, query, options).await
2411    }
2412
2413    async fn analyze_plan(
2414        &self,
2415        query: &AnyQuery,
2416        options: QueryExecutionOptions,
2417    ) -> Result<String> {
2418        query::analyze_query_plan(self, query, options).await
2419    }
2420
2421    async fn merge_insert(
2422        &self,
2423        params: MergeInsertBuilder,
2424        new_data: Box<dyn RecordBatchReader + Send>,
2425    ) -> Result<MergeResult> {
2426        merge::execute_merge_insert(self, params, new_data).await
2427    }
2428
2429    /// Delete rows from the table
2430    async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
2431        // Delegate to the submodule implementation
2432        delete::execute_delete(self, predicate).await
2433    }
2434
2435    async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
2436        Ok(Box::new(NativeTags {
2437            dataset: self.dataset.clone(),
2438        }))
2439    }
2440
2441    async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
2442        // Delegate to the submodule implementation
2443        optimize::execute_optimize(self, action).await
2444    }
2445
2446    async fn add_columns(
2447        &self,
2448        transforms: NewColumnTransform,
2449        read_columns: Option<Vec<String>>,
2450    ) -> Result<AddColumnsResult> {
2451        schema_evolution::execute_add_columns(self, transforms, read_columns).await
2452    }
2453
2454    async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult> {
2455        schema_evolution::execute_alter_columns(self, alterations).await
2456    }
2457
2458    async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
2459        schema_evolution::execute_drop_columns(self, columns).await
2460    }
2461
2462    async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
2463        let dataset = self.dataset.get().await?;
2464        let indices = dataset.load_indices().await?;
2465        let results = futures::stream::iter(indices.as_slice()).then(|idx| async {
2466
2467            // skip Lance internal indexes
2468            if idx.name == FRAG_REUSE_INDEX_NAME {
2469                return None;
2470            }
2471
2472            let stats = match dataset.index_statistics(idx.name.as_str()).await {
2473                Ok(stats) => stats,
2474                Err(e) => {
2475                    log::warn!("Failed to get statistics for index {} ({}): {}", idx.name, idx.uuid, e);
2476                    return None;
2477                }
2478            };
2479
2480            let stats: serde_json::Value = match serde_json::from_str(&stats) {
2481                Ok(stats) => stats,
2482                Err(e) => {
2483                    log::warn!("Failed to deserialize index statistics for index {} ({}): {}", idx.name, idx.uuid, e);
2484                    return None;
2485                }
2486            };
2487
2488            let Some(index_type) = stats.get("index_type").and_then(|v| v.as_str()) else {
2489                log::warn!("Index statistics was missing 'index_type' field for index {} ({})", idx.name, idx.uuid);
2490                return None;
2491            };
2492
2493            let index_type: crate::index::IndexType = match index_type.parse() {
2494                Ok(index_type) => index_type,
2495                Err(e) => {
2496                    log::warn!("Failed to parse index type for index {} ({}): {}", idx.name, idx.uuid, e);
2497                    return None;
2498                }
2499            };
2500
2501            let mut columns = Vec::with_capacity(idx.fields.len());
2502            for field_id in &idx.fields {
2503                let Some(field) = dataset.schema().field_by_id(*field_id) else {
2504                    log::warn!("The index {} ({}) referenced a field with id {} which does not exist in the schema", idx.name, idx.uuid, field_id);
2505                    return None;
2506                };
2507                columns.push(field.name.clone());
2508            }
2509
2510            let name = idx.name.clone();
2511            Some(IndexConfig { index_type, columns, name })
2512        }).collect::<Vec<_>>().await;
2513
2514        Ok(results.into_iter().flatten().collect())
2515    }
2516
2517    async fn uri(&self) -> Result<String> {
2518        Ok(self.uri.clone())
2519    }
2520
2521    async fn storage_options(&self) -> Option<HashMap<String, String>> {
2522        self.initial_storage_options().await
2523    }
2524
2525    async fn initial_storage_options(&self) -> Option<HashMap<String, String>> {
2526        self.dataset
2527            .get()
2528            .await
2529            .ok()
2530            .and_then(|dataset| dataset.initial_storage_options().cloned())
2531    }
2532
2533    async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>> {
2534        let dataset = self.dataset.get().await?;
2535        Ok(dataset.latest_storage_options().await?.map(|o| o.0))
2536    }
2537
2538    async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>> {
2539        let stats = match self
2540            .dataset
2541            .get()
2542            .await?
2543            .index_statistics(index_name.as_ref())
2544            .await
2545        {
2546            Ok(stats) => stats,
2547            Err(lance_core::Error::IndexNotFound { .. }) => return Ok(None),
2548            Err(e) => return Err(Error::from(e)),
2549        };
2550
2551        let mut stats: IndexStatisticsImpl =
2552            serde_json::from_str(&stats).map_err(|e| Error::InvalidInput {
2553                message: format!("error deserializing index statistics: {}", e),
2554            })?;
2555
2556        let first_index = stats.indices.pop().ok_or_else(|| Error::InvalidInput {
2557            message: "index statistics is empty".to_string(),
2558        })?;
2559        // Index type should be present at one of the levels.
2560        let index_type =
2561            stats
2562                .index_type
2563                .or(first_index.index_type)
2564                .ok_or_else(|| Error::InvalidInput {
2565                    message: "index statistics was missing index type".to_string(),
2566                })?;
2567        let loss = stats
2568            .indices
2569            .iter()
2570            .map(|index| index.loss.unwrap_or_default())
2571            .sum::<f64>();
2572
2573        let loss = first_index.loss.map(|first_loss| first_loss + loss);
2574        Ok(Some(IndexStatistics {
2575            num_indexed_rows: stats.num_indexed_rows,
2576            num_unindexed_rows: stats.num_unindexed_rows,
2577            index_type,
2578            distance_type: first_index.metric_type,
2579            num_indices: stats.num_indices,
2580            loss,
2581        }))
2582    }
2583
2584    /// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
2585    /// are not fully indexed within the timeout.
2586    async fn wait_for_index(
2587        &self,
2588        index_names: &[&str],
2589        timeout: std::time::Duration,
2590    ) -> Result<()> {
2591        wait_for_index(self, index_names, timeout).await
2592    }
2593
2594    async fn stats(&self) -> Result<TableStatistics> {
2595        let num_rows = self.count_rows(None).await?;
2596        let num_indices = self.list_indices().await?.len();
2597        let ds = self.dataset.get().await?;
2598        let ds_clone = (*ds).clone();
2599        let ds_stats = Arc::new(ds_clone).calculate_data_stats().await?;
2600        let total_bytes = ds_stats.fields.iter().map(|f| f.bytes_on_disk).sum::<u64>() as usize;
2601
2602        let frags = ds.get_fragments();
2603        let mut sorted_sizes = join_all(
2604            frags
2605                .iter()
2606                .map(|frag| async move { frag.physical_rows().await.unwrap_or(0) }),
2607        )
2608        .await;
2609        sorted_sizes.sort();
2610
2611        let small_frag_threshold = 100000;
2612        let num_fragments = sorted_sizes.len();
2613        let num_small_fragments = sorted_sizes
2614            .iter()
2615            .filter(|&&size| size < small_frag_threshold)
2616            .count();
2617
2618        let p25 = *sorted_sizes.get(num_fragments / 4).unwrap_or(&0);
2619        let p50 = *sorted_sizes.get(num_fragments / 2).unwrap_or(&0);
2620        let p75 = *sorted_sizes.get(num_fragments * 3 / 4).unwrap_or(&0);
2621        let p99 = *sorted_sizes.get(num_fragments * 99 / 100).unwrap_or(&0);
2622        let min = sorted_sizes.first().copied().unwrap_or(0);
2623        let max = sorted_sizes.last().copied().unwrap_or(0);
2624        let mean = if num_fragments == 0 {
2625            0
2626        } else {
2627            sorted_sizes.iter().copied().sum::<usize>() / num_fragments
2628        };
2629
2630        let frag_stats = FragmentStatistics {
2631            num_fragments,
2632            num_small_fragments,
2633            lengths: FragmentSummaryStats {
2634                min,
2635                max,
2636                mean,
2637                p25,
2638                p50,
2639                p75,
2640                p99,
2641            },
2642        };
2643        let stats = TableStatistics {
2644            total_bytes,
2645            num_rows,
2646            num_indices,
2647            fragment_stats: frag_stats,
2648        };
2649        Ok(stats)
2650    }
2651
2652    async fn create_insert_exec(
2653        &self,
2654        input: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
2655        write_params: WriteParams,
2656    ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>> {
2657        let ds = self.dataset.get().await?;
2658        let dataset = Arc::new((*ds).clone());
2659        Ok(Arc::new(datafusion::insert::InsertExec::new(
2660            self.dataset.clone(),
2661            dataset,
2662            input,
2663            write_params,
2664        )))
2665    }
2666}
2667
2668#[skip_serializing_none]
2669#[derive(Debug, Deserialize, PartialEq)]
2670pub struct TableStatistics {
2671    /// The total number of bytes in the table
2672    pub total_bytes: usize,
2673
2674    /// The number of rows in the table
2675    pub num_rows: usize,
2676
2677    /// The number of indices in the table
2678    pub num_indices: usize,
2679
2680    /// Statistics on table fragments
2681    pub fragment_stats: FragmentStatistics,
2682}
2683
2684#[skip_serializing_none]
2685#[derive(Debug, Deserialize, PartialEq)]
2686pub struct FragmentStatistics {
2687    /// The number of fragments in the table
2688    pub num_fragments: usize,
2689
2690    /// The number of uncompacted fragments in the table
2691    pub num_small_fragments: usize,
2692
2693    /// Statistics on the number of rows in the table fragments
2694    pub lengths: FragmentSummaryStats,
2695    // todo: add size statistics
2696    // /// Statistics on the number of bytes in the table fragments
2697    // sizes: FragmentStats,
2698}
2699
2700#[skip_serializing_none]
2701#[derive(Debug, Deserialize, PartialEq)]
2702pub struct FragmentSummaryStats {
2703    pub min: usize,
2704    pub max: usize,
2705    pub mean: usize,
2706    pub p25: usize,
2707    pub p50: usize,
2708    pub p75: usize,
2709    pub p99: usize,
2710}
2711
2712#[cfg(test)]
2713#[allow(deprecated)]
2714mod tests {
2715    use std::sync::Arc;
2716    use std::sync::atomic::{AtomicBool, Ordering};
2717    use std::time::Duration;
2718
2719    use arrow_array::{
2720        Array, BooleanArray, FixedSizeListArray, Int32Array, LargeStringArray, RecordBatch,
2721        RecordBatchIterator, RecordBatchReader, StringArray,
2722        builder::{ListBuilder, StringBuilder},
2723    };
2724    use arrow_array::{BinaryArray, LargeBinaryArray};
2725    use arrow_data::ArrayDataBuilder;
2726    use arrow_schema::{DataType, Field, Schema};
2727    use futures::TryStreamExt;
2728    use lance::Dataset;
2729    use lance::io::{ObjectStoreParams, WrappingObjectStore};
2730    use tempfile::tempdir;
2731
2732    use super::*;
2733    use crate::connect;
2734    use crate::connection::ConnectBuilder;
2735    use crate::index::scalar::{BTreeIndexBuilder, BitmapIndexBuilder};
2736    use crate::index::vector::{IvfHnswPqIndexBuilder, IvfHnswSqIndexBuilder};
2737    use crate::query::Select;
2738    use crate::query::{ExecutableQuery, QueryBase};
2739    use crate::test_utils::connection::new_test_connection;
2740    #[tokio::test]
2741    async fn test_open() {
2742        let tmp_dir = tempdir().unwrap();
2743        let dataset_path = tmp_dir.path().join("test.lance");
2744
2745        let batch = make_test_batches();
2746        let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
2747        Dataset::write(reader, dataset_path.to_str().unwrap(), None)
2748            .await
2749            .unwrap();
2750
2751        let table = NativeTable::open(dataset_path.to_str().unwrap())
2752            .await
2753            .unwrap();
2754
2755        assert_eq!(table.name, "test")
2756    }
2757
2758    #[tokio::test]
2759    async fn test_open_not_found() {
2760        let tmp_dir = tempdir().unwrap();
2761        let uri = tmp_dir.path().to_str().unwrap();
2762        let table = NativeTable::open(uri).await;
2763        assert!(matches!(table.unwrap_err(), Error::TableNotFound { .. }));
2764    }
2765
2766    #[test]
2767    #[cfg(not(windows))]
2768    fn test_object_store_path() {
2769        use std::path::Path as StdPath;
2770        let p = StdPath::new("s3://bucket/path/to/file");
2771        let c = p.join("subfile");
2772        assert_eq!(c.to_str().unwrap(), "s3://bucket/path/to/file/subfile");
2773    }
2774
2775    #[tokio::test]
2776    async fn test_count_rows() {
2777        let tmp_dir = tempdir().unwrap();
2778        let uri = tmp_dir.path().to_str().unwrap();
2779
2780        let batch = make_test_batches();
2781        let reader: Box<dyn RecordBatchReader + Send> = Box::new(RecordBatchIterator::new(
2782            vec![Ok(batch.clone())],
2783            batch.schema(),
2784        ));
2785        let table = NativeTable::create(
2786            uri,
2787            "test",
2788            vec![],
2789            reader,
2790            None,
2791            None,
2792            None,
2793            None,
2794            HashSet::new(),
2795        )
2796        .await
2797        .unwrap();
2798
2799        assert_eq!(table.count_rows(None).await.unwrap(), 10);
2800        assert_eq!(
2801            table
2802                .count_rows(Some(Filter::Sql("i >= 5".to_string())))
2803                .await
2804                .unwrap(),
2805            5
2806        );
2807    }
2808
2809    #[derive(Default, Debug)]
2810    struct NoOpCacheWrapper {
2811        called: AtomicBool,
2812    }
2813
2814    impl NoOpCacheWrapper {
2815        fn called(&self) -> bool {
2816            self.called.load(Ordering::Relaxed)
2817        }
2818    }
2819
2820    impl WrappingObjectStore for NoOpCacheWrapper {
2821        fn wrap(
2822            &self,
2823            _store_prefix: &str,
2824            original: Arc<dyn object_store::ObjectStore>,
2825        ) -> Arc<dyn object_store::ObjectStore> {
2826            self.called.store(true, Ordering::Relaxed);
2827            original
2828        }
2829    }
2830
2831    #[tokio::test]
2832    async fn test_open_table_options() {
2833        let tmp_dir = tempdir().unwrap();
2834        let dataset_path = tmp_dir.path().join("test.lance");
2835        let uri = dataset_path.to_str().unwrap();
2836        let conn = connect(uri).execute().await.unwrap();
2837
2838        let batches = make_test_batches();
2839
2840        conn.create_table("my_table", batches)
2841            .execute()
2842            .await
2843            .unwrap();
2844
2845        let wrapper = Arc::new(NoOpCacheWrapper::default());
2846
2847        let object_store_params = ObjectStoreParams {
2848            object_store_wrapper: Some(wrapper.clone()),
2849            ..Default::default()
2850        };
2851        let param = ReadParams {
2852            store_options: Some(object_store_params),
2853            ..Default::default()
2854        };
2855        assert!(!wrapper.called());
2856        conn.open_table("my_table")
2857            .lance_read_params(param)
2858            .execute()
2859            .await
2860            .unwrap();
2861        assert!(wrapper.called());
2862    }
2863
2864    fn make_test_batches() -> RecordBatch {
2865        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
2866        RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from_iter_values(0..10))]).unwrap()
2867    }
2868
2869    #[tokio::test]
2870    async fn test_tags() {
2871        let tmp_dir = tempdir().unwrap();
2872        let uri = tmp_dir.path().to_str().unwrap();
2873
2874        let conn = ConnectBuilder::new(uri)
2875            .read_consistency_interval(Duration::from_secs(0))
2876            .execute()
2877            .await
2878            .unwrap();
2879        let table = conn
2880            .create_table("my_table", some_sample_data())
2881            .execute()
2882            .await
2883            .unwrap();
2884        assert_eq!(table.version().await.unwrap(), 1);
2885        table.add(some_sample_data()).execute().await.unwrap();
2886        assert_eq!(table.version().await.unwrap(), 2);
2887        let mut tags_manager = table.tags().await.unwrap();
2888        let tags = tags_manager.list().await.unwrap();
2889        assert!(tags.is_empty(), "Tags should be empty initially");
2890        let tag1 = "tag1";
2891        tags_manager.create(tag1, 1).await.unwrap();
2892        assert_eq!(tags_manager.get_version(tag1).await.unwrap(), 1);
2893        let tags = tags_manager.list().await.unwrap();
2894        assert_eq!(tags.len(), 1);
2895        assert!(tags.contains_key(tag1));
2896        assert_eq!(tags.get(tag1).unwrap().version, 1);
2897        tags_manager.create("tag2", 2).await.unwrap();
2898        assert_eq!(tags_manager.get_version("tag2").await.unwrap(), 2);
2899        let tags = tags_manager.list().await.unwrap();
2900        assert_eq!(tags.len(), 2);
2901        assert!(tags.contains_key(tag1));
2902        assert_eq!(tags.get(tag1).unwrap().version, 1);
2903        assert!(tags.contains_key("tag2"));
2904        assert_eq!(tags.get("tag2").unwrap().version, 2);
2905        // Test update and delete
2906        table.add(some_sample_data()).execute().await.unwrap();
2907        tags_manager.update(tag1, 3).await.unwrap();
2908        assert_eq!(tags_manager.get_version(tag1).await.unwrap(), 3);
2909        tags_manager.delete("tag2").await.unwrap();
2910        let tags = tags_manager.list().await.unwrap();
2911        assert_eq!(tags.len(), 1);
2912        assert!(tags.contains_key(tag1));
2913        assert_eq!(tags.get(tag1).unwrap().version, 3);
2914        // Test checkout tag
2915        table.add(some_sample_data()).execute().await.unwrap();
2916        assert_eq!(table.version().await.unwrap(), 4);
2917        table.checkout_tag(tag1).await.unwrap();
2918        assert_eq!(table.version().await.unwrap(), 3);
2919        table.checkout_latest().await.unwrap();
2920        assert_eq!(table.version().await.unwrap(), 4);
2921    }
2922
2923    #[tokio::test]
2924    async fn test_create_index() {
2925        use arrow_array::RecordBatch;
2926        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
2927        use rand;
2928        use std::iter::repeat_with;
2929
2930        use arrow_array::Float32Array;
2931
2932        let tmp_dir = tempdir().unwrap();
2933        let uri = tmp_dir.path().to_str().unwrap();
2934        let conn = connect(uri).execute().await.unwrap();
2935
2936        let dimension = 16;
2937        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
2938            "embeddings",
2939            DataType::FixedSizeList(
2940                Arc::new(Field::new("item", DataType::Float32, true)),
2941                dimension,
2942            ),
2943            false,
2944        )]));
2945
2946        let float_arr = Float32Array::from(
2947            repeat_with(rand::random::<f32>)
2948                .take(512 * dimension as usize)
2949                .collect::<Vec<f32>>(),
2950        );
2951
2952        let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
2953        let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
2954
2955        let table = conn.create_table("test", batch).execute().await.unwrap();
2956
2957        assert_eq!(table.index_stats("my_index").await.unwrap(), None);
2958
2959        table
2960            .create_index(&["embeddings"], Index::Auto)
2961            .execute()
2962            .await
2963            .unwrap();
2964
2965        let index_configs = table.list_indices().await.unwrap();
2966        assert_eq!(index_configs.len(), 1);
2967        let index = index_configs.into_iter().next().unwrap();
2968        assert_eq!(index.index_type, crate::index::IndexType::IvfPq);
2969        assert_eq!(index.columns, vec!["embeddings".to_string()]);
2970        assert_eq!(table.count_rows(None).await.unwrap(), 512);
2971        assert_eq!(table.name(), "test");
2972
2973        let indices = table.as_native().unwrap().load_indices().await.unwrap();
2974        let index_name = &indices[0].index_name;
2975        let stats = table.index_stats(index_name).await.unwrap().unwrap();
2976        assert_eq!(stats.num_indexed_rows, 512);
2977        assert_eq!(stats.num_unindexed_rows, 0);
2978        assert_eq!(stats.index_type, crate::index::IndexType::IvfPq);
2979        assert_eq!(stats.distance_type, Some(crate::DistanceType::L2));
2980        assert!(stats.loss.is_some());
2981
2982        table.drop_index(index_name).await.unwrap();
2983        assert_eq!(table.list_indices().await.unwrap().len(), 0);
2984    }
2985
2986    #[tokio::test]
2987    async fn test_dynamic_select() {
2988        let tc = new_test_connection().await.unwrap();
2989        let db = tc.connection;
2990
2991        let table = db
2992            .create_table("test", some_sample_data())
2993            .execute()
2994            .await
2995            .unwrap();
2996
2997        let query = table.query().select(Select::dynamic(&[("i_alias", "i")]));
2998
2999        let result = query.execute().await;
3000        let batches = result
3001            .expect("should have result")
3002            .try_collect::<Vec<_>>()
3003            .await
3004            .unwrap();
3005
3006        for batch in batches {
3007            assert!(batch.column_by_name("i_alias").is_some());
3008        }
3009    }
3010
3011    #[tokio::test]
3012    async fn test_ivf_pq_uses_default_partition_size_for_num_partitions() {
3013        use arrow_array::{Float32Array, RecordBatch};
3014        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3015
3016        use crate::index::vector::IvfPqIndexBuilder;
3017
3018        let tmp_dir = tempdir().unwrap();
3019        let uri = tmp_dir.path().to_str().unwrap();
3020        let conn = connect(uri).execute().await.unwrap();
3021
3022        const PARTITION_SIZE: usize = 8192;
3023        let num_rows = PARTITION_SIZE * 2;
3024        let dimension = 8usize;
3025        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3026            "embeddings",
3027            DataType::FixedSizeList(
3028                Arc::new(Field::new("item", DataType::Float32, true)),
3029                dimension as i32,
3030            ),
3031            false,
3032        )]));
3033
3034        let float_arr =
3035            Float32Array::from_iter_values((0..(num_rows * dimension)).map(|v| v as f32));
3036        let vectors = Arc::new(create_fixed_size_list(float_arr, dimension as i32).unwrap());
3037        let batch = RecordBatch::try_new(schema.clone(), vec![vectors]).unwrap();
3038
3039        let table = conn.create_table("test", batch).execute().await.unwrap();
3040        let native_table = table.as_native().unwrap();
3041        let builder = IvfPqIndexBuilder::default();
3042        table
3043            .create_index(&["embeddings"], Index::IvfPq(builder))
3044            .execute()
3045            .await
3046            .unwrap();
3047        table
3048            .wait_for_index(&["embeddings_idx"], std::time::Duration::from_secs(30))
3049            .await
3050            .unwrap();
3051
3052        use lance::index::DatasetIndexInternalExt;
3053        use lance::index::vector::ivf::v2::IvfPq as LanceIvfPq;
3054        use lance_index::metrics::NoOpMetricsCollector;
3055        use lance_index::vector::VectorIndex as LanceVectorIndex;
3056
3057        let indices = native_table.load_indices().await.unwrap();
3058        let index_uuid = indices[0].index_uuid.clone();
3059
3060        let dataset_guard = native_table.dataset.get().await.unwrap();
3061        let dataset = (*dataset_guard).clone();
3062        drop(dataset_guard);
3063
3064        let lance_index = dataset
3065            .open_vector_index("embeddings", &index_uuid, &NoOpMetricsCollector)
3066            .await
3067            .unwrap();
3068        let ivf_index = lance_index
3069            .as_any()
3070            .downcast_ref::<LanceIvfPq>()
3071            .expect("expected IvfPq index");
3072        let partition_count = ivf_index.ivf_model().num_partitions();
3073
3074        let expected_partitions = num_rows / PARTITION_SIZE;
3075        assert_eq!(partition_count, expected_partitions);
3076    }
3077
3078    #[tokio::test]
3079    async fn test_create_index_ivf_hnsw_sq() {
3080        use arrow_array::RecordBatch;
3081        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3082        use rand;
3083        use std::iter::repeat_with;
3084
3085        use arrow_array::Float32Array;
3086
3087        let tmp_dir = tempdir().unwrap();
3088        let uri = tmp_dir.path().to_str().unwrap();
3089        let conn = connect(uri).execute().await.unwrap();
3090
3091        let dimension = 16;
3092        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3093            "embeddings",
3094            DataType::FixedSizeList(
3095                Arc::new(Field::new("item", DataType::Float32, true)),
3096                dimension,
3097            ),
3098            false,
3099        )]));
3100
3101        let float_arr = Float32Array::from(
3102            repeat_with(rand::random::<f32>)
3103                .take(512 * dimension as usize)
3104                .collect::<Vec<f32>>(),
3105        );
3106
3107        let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3108        let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
3109
3110        let table = conn.create_table("test", batch).execute().await.unwrap();
3111
3112        let stats = table.index_stats("my_index").await.unwrap();
3113        assert!(stats.is_none());
3114
3115        let index = IvfHnswSqIndexBuilder::default();
3116        table
3117            .create_index(&["embeddings"], Index::IvfHnswSq(index))
3118            .execute()
3119            .await
3120            .unwrap();
3121
3122        let index_configs = table.list_indices().await.unwrap();
3123        assert_eq!(index_configs.len(), 1);
3124        let index = index_configs.into_iter().next().unwrap();
3125        assert_eq!(index.index_type, crate::index::IndexType::IvfHnswSq);
3126        assert_eq!(index.columns, vec!["embeddings".to_string()]);
3127        assert_eq!(table.count_rows(None).await.unwrap(), 512);
3128        assert_eq!(table.name(), "test");
3129
3130        let indices = table.as_native().unwrap().load_indices().await.unwrap();
3131        let index_name = &indices[0].index_name;
3132        let stats = table.index_stats(index_name).await.unwrap().unwrap();
3133        assert_eq!(stats.num_indexed_rows, 512);
3134        assert_eq!(stats.num_unindexed_rows, 0);
3135    }
3136
3137    #[tokio::test]
3138    async fn test_create_index_ivf_hnsw_pq() {
3139        use arrow_array::RecordBatch;
3140        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3141        use rand;
3142        use std::iter::repeat_with;
3143
3144        use arrow_array::Float32Array;
3145
3146        let tmp_dir = tempdir().unwrap();
3147        let uri = tmp_dir.path().to_str().unwrap();
3148        let conn = connect(uri).execute().await.unwrap();
3149
3150        let dimension = 16;
3151        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3152            "embeddings",
3153            DataType::FixedSizeList(
3154                Arc::new(Field::new("item", DataType::Float32, true)),
3155                dimension,
3156            ),
3157            false,
3158        )]));
3159
3160        let float_arr = Float32Array::from(
3161            repeat_with(rand::random::<f32>)
3162                .take(512 * dimension as usize)
3163                .collect::<Vec<f32>>(),
3164        );
3165
3166        let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3167        let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
3168
3169        let table = conn.create_table("test", batch).execute().await.unwrap();
3170        let stats = table.index_stats("my_index").await.unwrap();
3171        assert!(stats.is_none());
3172
3173        let index = IvfHnswPqIndexBuilder::default();
3174        table
3175            .create_index(&["embeddings"], Index::IvfHnswPq(index))
3176            .execute()
3177            .await
3178            .unwrap();
3179        table
3180            .wait_for_index(&["embeddings_idx"], Duration::from_millis(10))
3181            .await
3182            .unwrap();
3183        let index_configs = table.list_indices().await.unwrap();
3184        assert_eq!(index_configs.len(), 1);
3185        let index = index_configs.into_iter().next().unwrap();
3186        assert_eq!(index.index_type, crate::index::IndexType::IvfHnswPq);
3187        assert_eq!(index.columns, vec!["embeddings".to_string()]);
3188        assert_eq!(table.count_rows(None).await.unwrap(), 512);
3189        assert_eq!(table.name(), "test");
3190
3191        let indices: Vec<VectorIndex> = table.as_native().unwrap().load_indices().await.unwrap();
3192        let index_name = &indices[0].index_name;
3193        let stats = table.index_stats(index_name).await.unwrap().unwrap();
3194        assert_eq!(stats.num_indexed_rows, 512);
3195        assert_eq!(stats.num_unindexed_rows, 0);
3196    }
3197
3198    #[tokio::test]
3199    async fn test_create_index_ivf_hnsw_flat() {
3200        use arrow_array::RecordBatch;
3201        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3202        use rand;
3203        use std::iter::repeat_with;
3204
3205        use crate::index::vector::IvfHnswFlatIndexBuilder;
3206        use arrow_array::Float32Array;
3207
3208        let tmp_dir = tempdir().unwrap();
3209        let uri = tmp_dir.path().to_str().unwrap();
3210        let conn = connect(uri).execute().await.unwrap();
3211
3212        let dimension = 16;
3213        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3214            "embeddings",
3215            DataType::FixedSizeList(
3216                Arc::new(Field::new("item", DataType::Float32, true)),
3217                dimension,
3218            ),
3219            false,
3220        )]));
3221
3222        let float_arr = Float32Array::from(
3223            repeat_with(rand::random::<f32>)
3224                .take(512 * dimension as usize)
3225                .collect::<Vec<f32>>(),
3226        );
3227
3228        let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3229        let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
3230
3231        let table = conn.create_table("test", batch).execute().await.unwrap();
3232
3233        let index = IvfHnswFlatIndexBuilder::default();
3234        table
3235            .create_index(&["embeddings"], Index::IvfHnswFlat(index))
3236            .execute()
3237            .await
3238            .unwrap();
3239
3240        let index_configs = table.list_indices().await.unwrap();
3241        assert_eq!(index_configs.len(), 1);
3242        let index = index_configs.into_iter().next().unwrap();
3243        assert_eq!(index.index_type, crate::index::IndexType::IvfHnswFlat);
3244        assert_eq!(index.columns, vec!["embeddings".to_string()]);
3245        assert_eq!(table.count_rows(None).await.unwrap(), 512);
3246    }
3247
3248    fn create_fixed_size_list<T: Array>(values: T, list_size: i32) -> Result<FixedSizeListArray> {
3249        let list_type = DataType::FixedSizeList(
3250            Arc::new(Field::new("item", values.data_type().clone(), true)),
3251            list_size,
3252        );
3253        let data = ArrayDataBuilder::new(list_type)
3254            .len(values.len() / list_size as usize)
3255            .add_child_data(values.into_data())
3256            .build()
3257            .unwrap();
3258
3259        Ok(FixedSizeListArray::from(data))
3260    }
3261
3262    fn some_sample_data() -> Box<dyn arrow_array::RecordBatchReader + Send> {
3263        let batch = RecordBatch::try_new(
3264            Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
3265            vec![Arc::new(Int32Array::from(vec![1]))],
3266        )
3267        .unwrap();
3268        let schema = batch.schema().clone();
3269        let batch = Ok(batch);
3270
3271        Box::new(RecordBatchIterator::new(vec![batch], schema))
3272    }
3273
3274    #[tokio::test]
3275    async fn test_create_scalar_index() {
3276        let tmp_dir = tempdir().unwrap();
3277        let uri = tmp_dir.path().to_str().unwrap();
3278
3279        let batch = RecordBatch::try_new(
3280            Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
3281            vec![Arc::new(Int32Array::from(vec![1]))],
3282        )
3283        .unwrap();
3284        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3285        let table = conn
3286            .create_table("my_table", batch.clone())
3287            .execute()
3288            .await
3289            .unwrap();
3290
3291        // Can create an index on a scalar column (will default to btree)
3292        table
3293            .create_index(&["i"], Index::Auto)
3294            .execute()
3295            .await
3296            .unwrap();
3297        table
3298            .wait_for_index(&["i_idx"], Duration::from_millis(10))
3299            .await
3300            .unwrap();
3301        let index_configs = table.list_indices().await.unwrap();
3302        assert_eq!(index_configs.len(), 1);
3303        let index = index_configs.into_iter().next().unwrap();
3304        assert_eq!(index.index_type, crate::index::IndexType::BTree);
3305        assert_eq!(index.columns, vec!["i".to_string()]);
3306
3307        // Can also specify btree
3308        table
3309            .create_index(&["i"], Index::BTree(BTreeIndexBuilder::default()))
3310            .execute()
3311            .await
3312            .unwrap();
3313
3314        let index_configs = table.list_indices().await.unwrap();
3315        assert_eq!(index_configs.len(), 1);
3316        let index = index_configs.into_iter().next().unwrap();
3317        assert_eq!(index.index_type, crate::index::IndexType::BTree);
3318        assert_eq!(index.columns, vec!["i".to_string()]);
3319
3320        let indices = table.as_native().unwrap().load_indices().await.unwrap();
3321        let index_name = &indices[0].index_name;
3322        let stats = table.index_stats(index_name).await.unwrap().unwrap();
3323        assert_eq!(stats.num_indexed_rows, 1);
3324        assert_eq!(stats.num_unindexed_rows, 0);
3325    }
3326
3327    #[tokio::test]
3328    async fn test_create_bitmap_index() {
3329        let tmp_dir = tempdir().unwrap();
3330        let uri = tmp_dir.path().to_str().unwrap();
3331
3332        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3333
3334        let schema = Arc::new(Schema::new(vec![
3335            Field::new("id", DataType::Int32, false),
3336            Field::new("category", DataType::Utf8, true),
3337            Field::new("large_category", DataType::LargeUtf8, true),
3338            Field::new("is_active", DataType::Boolean, true),
3339            Field::new("data", DataType::Binary, true),
3340            Field::new("large_data", DataType::LargeBinary, true),
3341        ]));
3342
3343        let batch = RecordBatch::try_new(
3344            schema.clone(),
3345            vec![
3346                Arc::new(Int32Array::from_iter_values(0..100)),
3347                Arc::new(StringArray::from_iter_values(
3348                    (0..100).map(|i| format!("category_{}", i % 5)),
3349                )),
3350                Arc::new(LargeStringArray::from_iter_values(
3351                    (0..100).map(|i| format!("large_category_{}", i % 5)),
3352                )),
3353                Arc::new(BooleanArray::from_iter((0..100).map(|i| Some(i % 2 == 0)))),
3354                Arc::new(BinaryArray::from_iter_values(
3355                    (0_u32..100).map(|i| i.to_le_bytes()),
3356                )),
3357                Arc::new(LargeBinaryArray::from_iter_values(
3358                    (0_u32..100).map(|i| i.to_le_bytes()),
3359                )),
3360            ],
3361        )
3362        .unwrap();
3363
3364        let table = conn
3365            .create_table("test_bitmap", batch.clone())
3366            .execute()
3367            .await
3368            .unwrap();
3369
3370        // Create bitmap index on the "category" column
3371        table
3372            .create_index(&["category"], Index::Bitmap(Default::default()))
3373            .execute()
3374            .await
3375            .unwrap();
3376
3377        // Create bitmap index on the "is_active" column
3378        table
3379            .create_index(&["is_active"], Index::Bitmap(Default::default()))
3380            .execute()
3381            .await
3382            .unwrap();
3383
3384        // Create bitmap index on the "data" column
3385        table
3386            .create_index(&["data"], Index::Bitmap(Default::default()))
3387            .execute()
3388            .await
3389            .unwrap();
3390
3391        // Create bitmap index on the "large_data" column
3392        table
3393            .create_index(&["large_data"], Index::Bitmap(Default::default()))
3394            .execute()
3395            .await
3396            .unwrap();
3397
3398        // Create bitmap index on the "large_category" column
3399        table
3400            .create_index(&["large_category"], Index::Bitmap(Default::default()))
3401            .execute()
3402            .await
3403            .unwrap();
3404
3405        // Verify the index was created
3406        let index_configs = table.list_indices().await.unwrap();
3407        assert_eq!(index_configs.len(), 5);
3408
3409        let mut configs_iter = index_configs.into_iter();
3410        let index = configs_iter.next().unwrap();
3411        assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
3412        assert_eq!(index.columns, vec!["category".to_string()]);
3413
3414        let index = configs_iter.next().unwrap();
3415        assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
3416        assert_eq!(index.columns, vec!["is_active".to_string()]);
3417
3418        let index = configs_iter.next().unwrap();
3419        assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
3420        assert_eq!(index.columns, vec!["data".to_string()]);
3421
3422        let index = configs_iter.next().unwrap();
3423        assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
3424        assert_eq!(index.columns, vec!["large_data".to_string()]);
3425
3426        let index = configs_iter.next().unwrap();
3427        assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
3428        assert_eq!(index.columns, vec!["large_category".to_string()]);
3429    }
3430
3431    #[tokio::test]
3432    async fn test_create_label_list_index() {
3433        let tmp_dir = tempdir().unwrap();
3434        let uri = tmp_dir.path().to_str().unwrap();
3435
3436        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3437
3438        let schema = Arc::new(Schema::new(vec![
3439            Field::new("id", DataType::Int32, false),
3440            Field::new(
3441                "tags",
3442                DataType::List(Field::new("item", DataType::Utf8, true).into()),
3443                true,
3444            ),
3445        ]));
3446
3447        const TAGS: [&str; 3] = ["cat", "dog", "fish"];
3448
3449        let values_builder = StringBuilder::new();
3450        let mut builder = ListBuilder::new(values_builder);
3451        for i in 0..120 {
3452            builder.values().append_value(TAGS[i % 3]);
3453            if i % 3 == 0 {
3454                builder.append(true)
3455            }
3456        }
3457        let tags = Arc::new(builder.finish());
3458
3459        let batch = RecordBatch::try_new(
3460            schema.clone(),
3461            vec![Arc::new(Int32Array::from_iter_values(0..40)), tags],
3462        )
3463        .unwrap();
3464
3465        let table = conn
3466            .create_table("test_bitmap", batch.clone())
3467            .execute()
3468            .await
3469            .unwrap();
3470
3471        // Can not create btree or bitmap index on list column
3472        assert!(
3473            table
3474                .create_index(&["tags"], Index::BTree(Default::default()))
3475                .execute()
3476                .await
3477                .is_err()
3478        );
3479        assert!(
3480            table
3481                .create_index(&["tags"], Index::Bitmap(Default::default()))
3482                .execute()
3483                .await
3484                .is_err()
3485        );
3486
3487        // Create bitmap index on the "category" column
3488        table
3489            .create_index(&["tags"], Index::LabelList(Default::default()))
3490            .execute()
3491            .await
3492            .unwrap();
3493
3494        // Verify the index was created
3495        let index_configs = table.list_indices().await.unwrap();
3496        assert_eq!(index_configs.len(), 1);
3497        let index = index_configs.into_iter().next().unwrap();
3498        assert_eq!(index.index_type, crate::index::IndexType::LabelList);
3499        assert_eq!(index.columns, vec!["tags".to_string()]);
3500    }
3501
3502    #[tokio::test]
3503    async fn test_create_inverted_index() {
3504        let tmp_dir = tempdir().unwrap();
3505        let uri = tmp_dir.path().to_str().unwrap();
3506
3507        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3508        const WORDS: [&str; 3] = ["cat", "dog", "fish"];
3509        let mut text_builder = StringBuilder::new();
3510        let num_rows = 120;
3511        for i in 0..num_rows {
3512            text_builder.append_value(WORDS[i % 3]);
3513        }
3514        let text = Arc::new(text_builder.finish());
3515
3516        let schema = Arc::new(Schema::new(vec![
3517            Field::new("id", DataType::Int32, false),
3518            Field::new("text", DataType::Utf8, true),
3519        ]));
3520        let batch = RecordBatch::try_new(
3521            schema.clone(),
3522            vec![
3523                Arc::new(Int32Array::from_iter_values(0..num_rows as i32)),
3524                text,
3525            ],
3526        )
3527        .unwrap();
3528
3529        let table = conn
3530            .create_table("test_bitmap", batch.clone())
3531            .execute()
3532            .await
3533            .unwrap();
3534
3535        table
3536            .create_index(&["text"], Index::FTS(Default::default()))
3537            .execute()
3538            .await
3539            .unwrap();
3540        let index_configs = table.list_indices().await.unwrap();
3541        assert_eq!(index_configs.len(), 1);
3542        let index = index_configs.into_iter().next().unwrap();
3543        assert_eq!(index.index_type, crate::index::IndexType::FTS);
3544        assert_eq!(index.columns, vec!["text".to_string()]);
3545        assert_eq!(index.name, "text_idx");
3546
3547        let stats = table.index_stats("text_idx").await.unwrap().unwrap();
3548        assert_eq!(stats.num_indexed_rows, num_rows);
3549        assert_eq!(stats.num_unindexed_rows, 0);
3550        assert_eq!(stats.index_type, crate::index::IndexType::FTS);
3551        assert_eq!(stats.distance_type, None);
3552
3553        // Make sure we can call prewarm without error
3554        table.prewarm_index("text_idx").await.unwrap();
3555    }
3556
3557    // Windows does not support precise sleep durations due to timer resolution limitations.
3558    #[cfg(not(target_os = "windows"))]
3559    #[tokio::test]
3560    async fn test_read_consistency_interval() {
3561        let intervals = vec![
3562            None,
3563            Some(0),
3564            Some(100), // 100 ms
3565        ];
3566
3567        for interval in intervals {
3568            let data = some_sample_data();
3569
3570            let tmp_dir = tempdir().unwrap();
3571            let uri = tmp_dir.path().to_str().unwrap();
3572
3573            let conn1 = ConnectBuilder::new(uri).execute().await.unwrap();
3574            let table1 = conn1
3575                .create_empty_table("my_table", RecordBatchReader::schema(&data))
3576                .execute()
3577                .await
3578                .unwrap();
3579
3580            let mut conn2 = ConnectBuilder::new(uri);
3581            if let Some(interval) = interval {
3582                conn2 = conn2.read_consistency_interval(std::time::Duration::from_millis(interval));
3583            }
3584            let conn2 = conn2.execute().await.unwrap();
3585            let table2 = conn2.open_table("my_table").execute().await.unwrap();
3586
3587            assert_eq!(table1.count_rows(None).await.unwrap(), 0);
3588            assert_eq!(table2.count_rows(None).await.unwrap(), 0);
3589
3590            table1.add(data).execute().await.unwrap();
3591            assert_eq!(table1.count_rows(None).await.unwrap(), 1);
3592
3593            match interval {
3594                None => {
3595                    assert_eq!(table2.count_rows(None).await.unwrap(), 0);
3596                    table2.checkout_latest().await.unwrap();
3597                    assert_eq!(table2.count_rows(None).await.unwrap(), 1);
3598                }
3599                Some(0) => {
3600                    assert_eq!(table2.count_rows(None).await.unwrap(), 1);
3601                }
3602                Some(100) => {
3603                    assert_eq!(table2.count_rows(None).await.unwrap(), 0);
3604                    tokio::time::sleep(Duration::from_millis(100)).await;
3605                    assert_eq!(table2.count_rows(None).await.unwrap(), 1);
3606                }
3607                _ => unreachable!(),
3608            }
3609        }
3610    }
3611
3612    #[tokio::test]
3613    async fn test_time_travel_write() {
3614        let tmp_dir = tempdir().unwrap();
3615        let uri = tmp_dir.path().to_str().unwrap();
3616
3617        let conn = ConnectBuilder::new(uri)
3618            .read_consistency_interval(Duration::from_secs(0))
3619            .execute()
3620            .await
3621            .unwrap();
3622        let table = conn
3623            .create_table("my_table", some_sample_data())
3624            .execute()
3625            .await
3626            .unwrap();
3627        let version = table.version().await.unwrap();
3628        table.add(some_sample_data()).execute().await.unwrap();
3629        table.checkout(version).await.unwrap();
3630        assert!(table.add(some_sample_data()).execute().await.is_err())
3631    }
3632
3633    #[tokio::test]
3634    async fn test_update_dataset_config() {
3635        let tmp_dir = tempdir().unwrap();
3636        let uri = tmp_dir.path().to_str().unwrap();
3637
3638        let conn = ConnectBuilder::new(uri)
3639            .read_consistency_interval(Duration::from_secs(0))
3640            .execute()
3641            .await
3642            .unwrap();
3643
3644        let table = conn
3645            .create_table("my_table", some_sample_data())
3646            .execute()
3647            .await
3648            .unwrap();
3649        let native_tbl = table.as_native().unwrap();
3650
3651        let manifest = native_tbl.manifest().await.unwrap();
3652        let base_config_len = manifest.config.len();
3653
3654        native_tbl
3655            .update_config(vec![("test_key1".to_string(), "test_val1".to_string())])
3656            .await
3657            .unwrap();
3658
3659        let manifest = native_tbl.manifest().await.unwrap();
3660        assert_eq!(manifest.config.len(), 1 + base_config_len);
3661        assert_eq!(
3662            manifest.config.get("test_key1"),
3663            Some(&"test_val1".to_string())
3664        );
3665
3666        native_tbl
3667            .update_config(vec![("test_key2".to_string(), "test_val2".to_string())])
3668            .await
3669            .unwrap();
3670        let manifest = native_tbl.manifest().await.unwrap();
3671        assert_eq!(manifest.config.len(), 2 + base_config_len);
3672        assert_eq!(
3673            manifest.config.get("test_key1"),
3674            Some(&"test_val1".to_string())
3675        );
3676        assert_eq!(
3677            manifest.config.get("test_key2"),
3678            Some(&"test_val2".to_string())
3679        );
3680
3681        native_tbl
3682            .update_config(vec![(
3683                "test_key2".to_string(),
3684                "test_val2_update".to_string(),
3685            )])
3686            .await
3687            .unwrap();
3688        let manifest = native_tbl.manifest().await.unwrap();
3689        assert_eq!(manifest.config.len(), 2 + base_config_len);
3690        assert_eq!(
3691            manifest.config.get("test_key1"),
3692            Some(&"test_val1".to_string())
3693        );
3694        assert_eq!(
3695            manifest.config.get("test_key2"),
3696            Some(&"test_val2_update".to_string())
3697        );
3698
3699        native_tbl.delete_config_keys(&["test_key1"]).await.unwrap();
3700        let manifest = native_tbl.manifest().await.unwrap();
3701        assert_eq!(manifest.config.len(), 1 + base_config_len);
3702        assert_eq!(
3703            manifest.config.get("test_key2"),
3704            Some(&"test_val2_update".to_string())
3705        );
3706    }
3707
3708    #[tokio::test]
3709    async fn test_schema_metadata_config() {
3710        let tmp_dir = tempdir().unwrap();
3711        let uri = tmp_dir.path().to_str().unwrap();
3712
3713        let conn = ConnectBuilder::new(uri)
3714            .read_consistency_interval(Duration::from_secs(0))
3715            .execute()
3716            .await
3717            .unwrap();
3718        let table = conn
3719            .create_table("my_table", some_sample_data())
3720            .execute()
3721            .await
3722            .unwrap();
3723
3724        let native_tbl = table.as_native().unwrap();
3725        let schema = native_tbl.schema().await.unwrap();
3726        let metadata = schema.metadata();
3727        assert_eq!(metadata.len(), 0);
3728
3729        native_tbl
3730            .replace_schema_metadata(vec![("test_key1".to_string(), "test_val1".to_string())])
3731            .await
3732            .unwrap();
3733
3734        let schema = native_tbl.schema().await.unwrap();
3735        let metadata = schema.metadata();
3736        assert_eq!(metadata.len(), 1);
3737        assert_eq!(metadata.get("test_key1"), Some(&"test_val1".to_string()));
3738
3739        native_tbl
3740            .replace_schema_metadata(vec![
3741                ("test_key1".to_string(), "test_val1_update".to_string()),
3742                ("test_key2".to_string(), "test_val2".to_string()),
3743            ])
3744            .await
3745            .unwrap();
3746        let schema = native_tbl.schema().await.unwrap();
3747        let metadata = schema.metadata();
3748        assert_eq!(metadata.len(), 2);
3749        assert_eq!(
3750            metadata.get("test_key1"),
3751            Some(&"test_val1_update".to_string())
3752        );
3753        assert_eq!(metadata.get("test_key2"), Some(&"test_val2".to_string()));
3754
3755        native_tbl
3756            .replace_schema_metadata(vec![(
3757                "test_key2".to_string(),
3758                "test_val2_update".to_string(),
3759            )])
3760            .await
3761            .unwrap();
3762        let schema = native_tbl.schema().await.unwrap();
3763        let metadata = schema.metadata();
3764        assert_eq!(
3765            metadata.get("test_key2"),
3766            Some(&"test_val2_update".to_string())
3767        );
3768    }
3769
3770    #[tokio::test]
3771    pub async fn test_field_metadata_update() {
3772        let tmp_dir = tempdir().unwrap();
3773        let uri = tmp_dir.path().to_str().unwrap();
3774
3775        let conn = ConnectBuilder::new(uri)
3776            .read_consistency_interval(Duration::from_secs(0))
3777            .execute()
3778            .await
3779            .unwrap();
3780        let table = conn
3781            .create_table("my_table", some_sample_data())
3782            .execute()
3783            .await
3784            .unwrap();
3785
3786        let native_tbl = table.as_native().unwrap();
3787        let schema = native_tbl.manifest().await.unwrap().schema;
3788
3789        let field = schema.field("i").unwrap();
3790        assert_eq!(field.metadata.len(), 0);
3791
3792        native_tbl
3793            .replace_schema_metadata(vec![(
3794                "test_key2".to_string(),
3795                "test_val2_update".to_string(),
3796            )])
3797            .await
3798            .unwrap();
3799
3800        let schema = native_tbl.schema().await.unwrap();
3801        let metadata = schema.metadata();
3802        assert_eq!(metadata.len(), 1);
3803        assert_eq!(
3804            metadata.get("test_key2"),
3805            Some(&"test_val2_update".to_string())
3806        );
3807
3808        let mut new_field_metadata = HashMap::<String, String>::new();
3809        new_field_metadata.insert("test_field_key1".into(), "test_field_val1".into());
3810        native_tbl
3811            .replace_field_metadata(vec![(field.id as u32, new_field_metadata)])
3812            .await
3813            .unwrap();
3814
3815        let schema = native_tbl.manifest().await.unwrap().schema;
3816        let field = schema.field("i").unwrap();
3817        assert_eq!(field.metadata.len(), 1);
3818        assert_eq!(
3819            field.metadata.get("test_field_key1"),
3820            Some(&"test_field_val1".to_string())
3821        );
3822    }
3823
3824    #[tokio::test]
3825    pub async fn test_stats() {
3826        let tmp_dir = tempdir().unwrap();
3827        let uri = tmp_dir.path().to_str().unwrap();
3828
3829        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3830
3831        let schema = Arc::new(Schema::new(vec![
3832            Field::new("id", DataType::Int32, false),
3833            Field::new("foo", DataType::Int32, true),
3834        ]));
3835        let batch = RecordBatch::try_new(
3836            schema.clone(),
3837            vec![
3838                Arc::new(Int32Array::from_iter_values(0..100)),
3839                Arc::new(Int32Array::from_iter_values(0..100)),
3840            ],
3841        )
3842        .unwrap();
3843
3844        let table = conn
3845            .create_table("test_stats", batch.clone())
3846            .execute()
3847            .await
3848            .unwrap();
3849        for _ in 0..10 {
3850            let batch = RecordBatch::try_new(
3851                schema.clone(),
3852                vec![
3853                    Arc::new(Int32Array::from_iter_values(0..15)),
3854                    Arc::new(Int32Array::from_iter_values(0..15)),
3855                ],
3856            )
3857            .unwrap();
3858            table.add(batch.clone()).execute().await.unwrap();
3859        }
3860
3861        let empty_table = conn
3862            .create_table("test_stats_empty", RecordBatch::new_empty(batch.schema()))
3863            .execute()
3864            .await
3865            .unwrap();
3866
3867        let res = table.stats().await.unwrap();
3868        println!("{:#?}", res);
3869        assert_eq!(
3870            res,
3871            TableStatistics {
3872                num_rows: 250,
3873                num_indices: 0,
3874                total_bytes: 2300,
3875                fragment_stats: FragmentStatistics {
3876                    num_fragments: 11,
3877                    num_small_fragments: 11,
3878                    lengths: FragmentSummaryStats {
3879                        min: 15,
3880                        max: 100,
3881                        mean: 22,
3882                        p25: 15,
3883                        p50: 15,
3884                        p75: 15,
3885                        p99: 100,
3886                    },
3887                },
3888            }
3889        );
3890        let res = empty_table.stats().await.unwrap();
3891        println!("{:#?}", res);
3892        assert_eq!(
3893            res,
3894            TableStatistics {
3895                num_rows: 0,
3896                num_indices: 0,
3897                total_bytes: 0,
3898                fragment_stats: FragmentStatistics {
3899                    num_fragments: 0,
3900                    num_small_fragments: 0,
3901                    lengths: FragmentSummaryStats {
3902                        min: 0,
3903                        max: 0,
3904                        mean: 0,
3905                        p25: 0,
3906                        p50: 0,
3907                        p75: 0,
3908                        p99: 0,
3909                    },
3910                },
3911            }
3912        )
3913    }
3914
3915    #[tokio::test]
3916    pub async fn test_list_indices_skip_frag_reuse() {
3917        let tmp_dir = tempdir().unwrap();
3918        let uri = tmp_dir.path().to_str().unwrap();
3919
3920        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3921
3922        let schema = Arc::new(Schema::new(vec![
3923            Field::new("id", DataType::Int32, false),
3924            Field::new("foo", DataType::Int32, true),
3925        ]));
3926        let batch = RecordBatch::try_new(
3927            schema.clone(),
3928            vec![
3929                Arc::new(Int32Array::from_iter_values(0..100)),
3930                Arc::new(Int32Array::from_iter_values(0..100)),
3931            ],
3932        )
3933        .unwrap();
3934
3935        let table = conn
3936            .create_table("test_list_indices_skip_frag_reuse", batch.clone())
3937            .execute()
3938            .await
3939            .unwrap();
3940
3941        table.add(batch.clone()).execute().await.unwrap();
3942
3943        table
3944            .create_index(&["id"], Index::Bitmap(BitmapIndexBuilder {}))
3945            .execute()
3946            .await
3947            .unwrap();
3948
3949        table
3950            .optimize(OptimizeAction::Compact {
3951                options: CompactionOptions {
3952                    target_rows_per_fragment: 2_000,
3953                    defer_index_remap: true,
3954                    ..Default::default()
3955                },
3956                remap_options: None,
3957            })
3958            .await
3959            .unwrap();
3960
3961        let result = table.list_indices().await.unwrap();
3962        assert_eq!(result.len(), 1);
3963        assert_eq!(result[0].index_type, crate::index::IndexType::Bitmap);
3964    }
3965}