Skip to main content

lancedb/
table.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The LanceDB Authors
3
4//! LanceDB Table APIs
5
6use arrow::array::{AsArray, FixedSizeListBuilder, Float32Builder};
7use arrow::datatypes::{Float32Type, UInt8Type};
8use arrow_array::{RecordBatchIterator, RecordBatchReader};
9use arrow_schema::{DataType, Field, Schema, SchemaRef};
10use async_trait::async_trait;
11use datafusion_expr::Expr;
12use datafusion_physical_plan::display::DisplayableExecutionPlan;
13use datafusion_physical_plan::projection::ProjectionExec;
14use datafusion_physical_plan::repartition::RepartitionExec;
15use datafusion_physical_plan::union::UnionExec;
16use datafusion_physical_plan::ExecutionPlan;
17use futures::{FutureExt, StreamExt, TryFutureExt};
18use lance::dataset::builder::DatasetBuilder;
19use lance::dataset::cleanup::RemovalStats;
20use lance::dataset::optimize::{compact_files, CompactionMetrics, IndexRemapperOptions};
21use lance::dataset::scanner::Scanner;
22pub use lance::dataset::ColumnAlteration;
23pub use lance::dataset::NewColumnTransform;
24pub use lance::dataset::ReadParams;
25pub use lance::dataset::Version;
26use lance::dataset::{InsertBuilder, WhenMatched, WriteMode, WriteParams};
27use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource};
28use lance::index::vector::utils::infer_vector_dim;
29use lance::index::vector::VectorIndexParams;
30use lance::io::{ObjectStoreParams, WrappingObjectStore};
31use lance_datafusion::exec::{analyze_plan as lance_analyze_plan, execute_plan};
32use lance_datafusion::utils::StreamingWriteSource;
33use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
34use lance_index::vector::bq::RQBuildParams;
35use lance_index::vector::hnsw::builder::HnswBuildParams;
36use lance_index::vector::ivf::IvfBuildParams;
37use lance_index::vector::pq::PQBuildParams;
38use lance_index::vector::sq::builder::SQBuildParams;
39use lance_index::DatasetIndexExt;
40use lance_index::IndexType;
41use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsAccessor};
42use lance_namespace::models::{
43    QueryTableRequest as NsQueryTableRequest, QueryTableRequestColumns,
44    QueryTableRequestFullTextQuery, QueryTableRequestVector, StringFtsQuery,
45};
46use lance_namespace::LanceNamespace;
47use lance_table::format::Manifest;
48use lance_table::io::commit::ManifestNamingScheme;
49use log::info;
50use serde::{Deserialize, Serialize};
51use std::collections::HashMap;
52use std::format;
53use std::path::Path;
54use std::sync::Arc;
55
56use crate::arrow::IntoArrow;
57use crate::connection::NoData;
58use crate::database::Database;
59use crate::embeddings::{EmbeddingDefinition, EmbeddingRegistry, MaybeEmbedded, MemoryRegistry};
60use crate::error::{Error, Result};
61use crate::index::vector::VectorIndex;
62use crate::index::IndexStatistics;
63use crate::index::{vector::suggested_num_sub_vectors, Index, IndexBuilder};
64use crate::index::{IndexConfig, IndexStatisticsImpl};
65use crate::query::{
66    IntoQueryVector, Query, QueryExecutionOptions, QueryFilter, QueryRequest, Select, TakeQuery,
67    VectorQuery, VectorQueryRequest, DEFAULT_TOP_K,
68};
69use crate::utils::{
70    default_vector_column, supported_bitmap_data_type, supported_btree_data_type,
71    supported_fts_data_type, supported_label_list_data_type, supported_vector_data_type,
72    PatchReadParam, PatchWriteParam, TimeoutStream,
73};
74
75use self::dataset::DatasetConsistencyWrapper;
76use self::merge::MergeInsertBuilder;
77
78pub mod datafusion;
79pub(crate) mod dataset;
80pub mod delete;
81pub mod merge;
82pub mod schema_evolution;
83pub mod update;
84
85use crate::index::waiter::wait_for_index;
86pub use chrono::Duration;
87pub use delete::DeleteResult;
88use futures::future::{join_all, Either};
89pub use lance::dataset::optimize::CompactionOptions;
90pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
91pub use lance::dataset::scanner::DatasetRecordBatchStream;
92use lance::dataset::statistics::DatasetStatisticsExt;
93use lance_index::frag_reuse::FRAG_REUSE_INDEX_NAME;
94pub use lance_index::optimize::OptimizeOptions;
95pub use schema_evolution::{AddColumnsResult, AlterColumnsResult, DropColumnsResult};
96use serde_with::skip_serializing_none;
97pub use update::{UpdateBuilder, UpdateResult};
98
99/// Defines the type of column
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub enum ColumnKind {
102    /// Columns populated by data from the user (this is the most common case)
103    Physical,
104    /// Columns populated by applying an embedding function to the input
105    Embedding(EmbeddingDefinition),
106}
107
108/// Defines a column in a table
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct ColumnDefinition {
111    /// The source of the column data
112    pub kind: ColumnKind,
113}
114
115#[derive(Debug, Clone)]
116pub struct TableDefinition {
117    pub column_definitions: Vec<ColumnDefinition>,
118    pub schema: SchemaRef,
119}
120
121impl TableDefinition {
122    pub fn new(schema: SchemaRef, column_definitions: Vec<ColumnDefinition>) -> Self {
123        Self {
124            column_definitions,
125            schema,
126        }
127    }
128
129    pub fn new_from_schema(schema: SchemaRef) -> Self {
130        let column_definitions = schema
131            .fields()
132            .iter()
133            .map(|_| ColumnDefinition {
134                kind: ColumnKind::Physical,
135            })
136            .collect();
137        Self::new(schema, column_definitions)
138    }
139
140    pub fn try_from_rich_schema(schema: SchemaRef) -> Result<Self> {
141        let column_definitions = schema.metadata.get("lancedb::column_definitions");
142        if let Some(column_definitions) = column_definitions {
143            let column_definitions: Vec<ColumnDefinition> =
144                serde_json::from_str(column_definitions).map_err(|e| Error::Runtime {
145                    message: format!("Failed to deserialize column definitions: {}", e),
146                })?;
147            Ok(Self::new(schema, column_definitions))
148        } else {
149            let column_definitions = schema
150                .fields()
151                .iter()
152                .map(|_| ColumnDefinition {
153                    kind: ColumnKind::Physical,
154                })
155                .collect();
156            Ok(Self::new(schema, column_definitions))
157        }
158    }
159
160    pub fn into_rich_schema(self) -> SchemaRef {
161        // We have full control over the structure of column definitions.  This should
162        // not fail, except for a bug
163        let lancedb_metadata = serde_json::to_string(&self.column_definitions).unwrap();
164        let mut schema_with_metadata = (*self.schema).clone();
165        schema_with_metadata
166            .metadata
167            .insert("lancedb::column_definitions".to_string(), lancedb_metadata);
168        Arc::new(schema_with_metadata)
169    }
170}
171
172/// Optimize the dataset.
173///
174/// Similar to `VACUUM` in PostgreSQL, it offers different options to
175/// optimize different parts of the table on disk.
176///
177/// By default, it optimizes everything, as [`OptimizeAction::All`].
178pub enum OptimizeAction {
179    /// Run all optimizations with default values
180    All,
181    /// Compacts files in the dataset
182    ///
183    /// LanceDb uses a readonly filesystem for performance and safe concurrency.  Every time
184    /// new data is added it will be added into new files.  Small files
185    /// can hurt both read and write performance.  Compaction will merge small files
186    /// into larger ones.
187    ///
188    /// All operations that modify data (add, delete, update, merge insert, etc.) will create
189    /// new files.  If these operations are run frequently then compaction should run frequently.
190    ///
191    /// If these operations are never run (search only) then compaction is not necessary.
192    Compact {
193        options: CompactionOptions,
194        remap_options: Option<Arc<dyn IndexRemapperOptions>>,
195    },
196    /// Prune old version of datasets
197    ///
198    /// Every change in LanceDb is additive.  When data is removed from a dataset a new version is
199    /// created that doesn't contain the removed data.  However, the old version, which does contain
200    /// the removed data, is left in place.  This is necessary for consistency and concurrency and
201    /// also enables time travel functionality like the ability to checkout an older version of the
202    /// dataset to undo changes.
203    ///
204    /// Over time, these old versions can consume a lot of disk space.  The prune operation will
205    /// remove versions of the dataset that are older than a certain age.  This will free up the
206    /// space used by that old data.
207    ///
208    /// Once a version is pruned it can no longer be checked out.
209    Prune {
210        /// The duration of time to keep versions of the dataset.
211        older_than: Option<Duration>,
212        /// Because they may be part of an in-progress transaction, files newer than 7 days old are not deleted by default.
213        /// If you are sure that there are no in-progress transactions, then you can set this to True to delete all files older than `older_than`.
214        delete_unverified: Option<bool>,
215        /// If true, an error will be returned if there are any old versions that are still tagged.
216        error_if_tagged_old_versions: Option<bool>,
217    },
218    /// Optimize the indices
219    ///
220    /// This operation optimizes all indices in the table.  When new data is added to LanceDb
221    /// it is not added to the indices.  However, it can still turn up in searches because the search
222    /// function will scan both the indexed data and the unindexed data in parallel.  Over time, the
223    /// unindexed data can become large enough that the search performance is slow.  This operation
224    /// will add the unindexed data to the indices without rerunning the full index creation process.
225    ///
226    /// Optimizing an index is faster than re-training the index but it does not typically adjust the
227    /// underlying model relied upon by the index.  This can eventually lead to poor search accuracy
228    /// and so users may still want to occasionally retrain the index after adding a large amount of
229    /// data.
230    ///
231    /// For example, when using IVF, an index will create clusters.  Optimizing an index assigns unindexed
232    /// data to the existing clusters, but it does not move the clusters or create new clusters.
233    Index(OptimizeOptions),
234}
235
236impl Default for OptimizeAction {
237    fn default() -> Self {
238        Self::All
239    }
240}
241
242/// Statistics about the optimization.
243pub struct OptimizeStats {
244    /// Stats of the file compaction.
245    pub compaction: Option<CompactionMetrics>,
246
247    /// Stats of the version pruning
248    pub prune: Option<RemovalStats>,
249}
250
251/// Describes what happens when a vector either contains NaN or
252/// does not have enough values
253#[derive(Clone, Debug, Default)]
254#[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992
255enum BadVectorHandling {
256    /// An error is returned
257    #[default]
258    Error,
259    /// The offending row is droppped
260    Drop,
261    /// The invalid/missing items are replaced by fill_value
262    Fill(f32),
263    /// The invalid items are replaced by NULL
264    None,
265}
266
267/// Options to use when writing data
268#[derive(Clone, Debug, Default)]
269pub struct WriteOptions {
270    // Coming soon: https://github.com/lancedb/lancedb/issues/992
271    // /// What behavior to take if the data contains invalid vectors
272    // pub on_bad_vectors: BadVectorHandling,
273    /// Advanced parameters that can be used to customize table creation
274    ///
275    /// Overlapping `OpenTableBuilder` options (e.g. [AddDataBuilder::mode]) will take
276    /// precedence over their counterparts in `WriteOptions` (e.g. [WriteParams::mode]).
277    pub lance_write_params: Option<WriteParams>,
278}
279
280#[derive(Debug, Clone, Default)]
281pub enum AddDataMode {
282    /// Rows will be appended to the table (the default)
283    #[default]
284    Append,
285    /// The existing table will be overwritten with the new data
286    Overwrite,
287}
288
289/// A builder for configuring a [`crate::connection::Connection::create_table`] or [`Table::add`]
290/// operation
291pub struct AddDataBuilder<T: IntoArrow> {
292    parent: Arc<dyn BaseTable>,
293    pub(crate) data: T,
294    pub(crate) mode: AddDataMode,
295    pub(crate) write_options: WriteOptions,
296    embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
297}
298
299impl<T: IntoArrow> std::fmt::Debug for AddDataBuilder<T> {
300    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
301        f.debug_struct("AddDataBuilder")
302            .field("parent", &self.parent)
303            .field("mode", &self.mode)
304            .field("write_options", &self.write_options)
305            .finish()
306    }
307}
308
309impl<T: IntoArrow> AddDataBuilder<T> {
310    pub fn mode(mut self, mode: AddDataMode) -> Self {
311        self.mode = mode;
312        self
313    }
314
315    pub fn write_options(mut self, options: WriteOptions) -> Self {
316        self.write_options = options;
317        self
318    }
319
320    pub async fn execute(self) -> Result<AddResult> {
321        let parent = self.parent.clone();
322        let data = self.data.into_arrow()?;
323        let without_data = AddDataBuilder::<NoData> {
324            data: NoData {},
325            mode: self.mode,
326            parent: self.parent,
327            write_options: self.write_options,
328            embedding_registry: self.embedding_registry,
329        };
330        parent.add(without_data, data).await
331    }
332}
333
334/// Filters that can be used to limit the rows returned by a query
335pub enum Filter {
336    /// A SQL filter string
337    Sql(String),
338    /// A Datafusion logical expression
339    Datafusion(Expr),
340}
341
342/// A query that can be used to search a LanceDB table
343#[derive(Debug, Clone)]
344pub enum AnyQuery {
345    Query(QueryRequest),
346    VectorQuery(VectorQueryRequest),
347}
348
349#[async_trait]
350pub trait Tags: Send + Sync {
351    /// List the tags of the table.
352    async fn list(&self) -> Result<HashMap<String, TagContents>>;
353
354    /// Get the version of the table referenced by a tag.
355    async fn get_version(&self, tag: &str) -> Result<u64>;
356
357    /// Create a new tag for the given version of the table.
358    async fn create(&mut self, tag: &str, version: u64) -> Result<()>;
359
360    /// Delete a tag from the table.
361    async fn delete(&mut self, tag: &str) -> Result<()>;
362
363    /// Update an existing tag to point to a new version of the table.
364    async fn update(&mut self, tag: &str, version: u64) -> Result<()>;
365}
366
367#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
368pub struct AddResult {
369    // The commit version associated with the operation.
370    // A version of `0` indicates compatibility with legacy servers that do not return
371    /// a commit version.
372    #[serde(default)]
373    pub version: u64,
374}
375
376#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
377pub struct MergeResult {
378    // The commit version associated with the operation.
379    // A version of `0` indicates compatibility with legacy servers that do not return
380    /// a commit version.
381    #[serde(default)]
382    pub version: u64,
383    /// Number of inserted rows (for user statistics)
384    #[serde(default)]
385    pub num_inserted_rows: u64,
386    /// Number of updated rows (for user statistics)
387    #[serde(default)]
388    pub num_updated_rows: u64,
389    /// Number of deleted rows (for user statistics)
390    /// Note: This is different from internal references to 'deleted_rows', since we technically "delete" updated rows during processing.
391    /// However those rows are not shared with the user.
392    #[serde(default)]
393    pub num_deleted_rows: u64,
394    /// Number of attempts performed during the merge operation.
395    /// This includes the initial attempt plus any retries due to transaction conflicts.
396    /// A value of 1 means the operation succeeded on the first try.
397    #[serde(default)]
398    pub num_attempts: u32,
399}
400
401/// A trait for anything "table-like".  This is used for both native tables (which target
402/// Lance datasets) and remote tables (which target LanceDB cloud)
403///
404/// This trait is still EXPERIMENTAL and subject to change in the future
405#[async_trait]
406pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
407    /// Get a reference to std::any::Any
408    fn as_any(&self) -> &dyn std::any::Any;
409    /// Get the name of the table.
410    fn name(&self) -> &str;
411    /// Get the namespace of the table.
412    fn namespace(&self) -> &[String];
413    /// Get the id of the table
414    ///
415    /// This is the namespace of the table concatenated with the name
416    /// separated by $
417    fn id(&self) -> &str;
418    /// Get the arrow [Schema] of the table.
419    async fn schema(&self) -> Result<SchemaRef>;
420    /// Count the number of rows in this table.
421    async fn count_rows(&self, filter: Option<Filter>) -> Result<usize>;
422    /// Create a physical plan for the query.
423    async fn create_plan(
424        &self,
425        query: &AnyQuery,
426        options: QueryExecutionOptions,
427    ) -> Result<Arc<dyn ExecutionPlan>>;
428    /// Execute a query and return the results as a stream of RecordBatches.
429    async fn query(
430        &self,
431        query: &AnyQuery,
432        options: QueryExecutionOptions,
433    ) -> Result<DatasetRecordBatchStream>;
434    /// Explain the plan for a query.
435    async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result<String> {
436        let plan = self.create_plan(query, Default::default()).await?;
437        let display = DisplayableExecutionPlan::new(plan.as_ref());
438
439        Ok(format!("{}", display.indent(verbose)))
440    }
441    async fn analyze_plan(
442        &self,
443        query: &AnyQuery,
444        options: QueryExecutionOptions,
445    ) -> Result<String>;
446
447    /// Add new records to the table.
448    async fn add(
449        &self,
450        add: AddDataBuilder<NoData>,
451        data: Box<dyn arrow_array::RecordBatchReader + Send>,
452    ) -> Result<AddResult>;
453    /// Delete rows from the table.
454    async fn delete(&self, predicate: &str) -> Result<DeleteResult>;
455    /// Update rows in the table.
456    async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult>;
457    /// Create an index on the provided column(s).
458    async fn create_index(&self, index: IndexBuilder) -> Result<()>;
459    /// List the indices on the table.
460    async fn list_indices(&self) -> Result<Vec<IndexConfig>>;
461    /// Drop an index from the table.
462    async fn drop_index(&self, name: &str) -> Result<()>;
463    /// Prewarm an index in the table
464    async fn prewarm_index(&self, name: &str) -> Result<()>;
465    /// Get statistics about the index.
466    async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>>;
467    /// Merge insert new records into the table.
468    async fn merge_insert(
469        &self,
470        params: MergeInsertBuilder,
471        new_data: Box<dyn RecordBatchReader + Send>,
472    ) -> Result<MergeResult>;
473    /// Gets the table tag manager.
474    async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
475    /// Optimize the dataset.
476    async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
477    /// Add columns to the table.
478    async fn add_columns(
479        &self,
480        transforms: NewColumnTransform,
481        read_columns: Option<Vec<String>>,
482    ) -> Result<AddColumnsResult>;
483    /// Alter columns in the table.
484    async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult>;
485    /// Drop columns from the table.
486    async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult>;
487    /// Get the version of the table.
488    async fn version(&self) -> Result<u64>;
489    /// Checkout a specific version of the table.
490    async fn checkout(&self, version: u64) -> Result<()>;
491    /// Checkout a table version referenced by a tag.
492    /// Tags provide a human-readable way to reference specific versions of the table.
493    async fn checkout_tag(&self, tag: &str) -> Result<()>;
494    /// Checkout the latest version of the table.
495    async fn checkout_latest(&self) -> Result<()>;
496    /// Restore the table to the currently checked out version.
497    async fn restore(&self) -> Result<()>;
498    /// List the versions of the table.
499    async fn list_versions(&self) -> Result<Vec<Version>>;
500    /// Get the table definition.
501    async fn table_definition(&self) -> Result<TableDefinition>;
502    /// Get the table URI (storage location)
503    async fn uri(&self) -> Result<String>;
504    /// Get the storage options used when opening this table, if any.
505    #[deprecated(since = "0.25.0", note = "Use initial_storage_options() instead")]
506    async fn storage_options(&self) -> Option<HashMap<String, String>>;
507    /// Get the initial storage options that were passed in when opening this table.
508    ///
509    /// For dynamically refreshed options (e.g., credential vending), use [`Self::latest_storage_options`].
510    async fn initial_storage_options(&self) -> Option<HashMap<String, String>>;
511    /// Get the latest storage options, refreshing from provider if configured.
512    ///
513    /// Returns `Ok(Some(options))` if storage options are available (static or refreshed),
514    /// `Ok(None)` if no storage options were configured, or `Err(...)` if refresh failed.
515    async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>>;
516    /// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
517    /// are not fully indexed within the timeout.
518    async fn wait_for_index(
519        &self,
520        index_names: &[&str],
521        timeout: std::time::Duration,
522    ) -> Result<()>;
523    /// Get statistics on the table
524    async fn stats(&self) -> Result<TableStatistics>;
525    /// Create an ExecutionPlan for inserting data into the table.
526    ///
527    /// This is used by the DataFusion TableProvider implementation to support
528    /// INSERT INTO statements.
529    async fn create_insert_exec(
530        &self,
531        _input: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
532        _write_params: WriteParams,
533    ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>> {
534        Err(Error::NotSupported {
535            message: "create_insert_exec not implemented".to_string(),
536        })
537    }
538}
539
540/// A Table is a collection of strong typed Rows.
541///
542/// The type of the each row is defined in Apache Arrow [Schema].
543#[derive(Clone, Debug)]
544pub struct Table {
545    inner: Arc<dyn BaseTable>,
546    database: Option<Arc<dyn Database>>,
547    embedding_registry: Arc<dyn EmbeddingRegistry>,
548}
549
550#[cfg(all(test, feature = "remote"))]
551mod test_utils {
552    use super::*;
553
554    impl Table {
555        pub fn new_with_handler<T>(
556            name: impl Into<String>,
557            handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
558        ) -> Self
559        where
560            T: Into<reqwest::Body>,
561        {
562            let inner = Arc::new(crate::remote::table::RemoteTable::new_mock(
563                name.into(),
564                handler.clone(),
565                None,
566            ));
567            let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
568            Self {
569                inner,
570                database: Some(database),
571                // Registry is unused.
572                embedding_registry: Arc::new(MemoryRegistry::new()),
573            }
574        }
575
576        pub fn new_with_handler_version<T>(
577            name: impl Into<String>,
578            version: semver::Version,
579            handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
580        ) -> Self
581        where
582            T: Into<reqwest::Body>,
583        {
584            let inner = Arc::new(crate::remote::table::RemoteTable::new_mock(
585                name.into(),
586                handler.clone(),
587                Some(version),
588            ));
589            let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
590            Self {
591                inner,
592                database: Some(database),
593                // Registry is unused.
594                embedding_registry: Arc::new(MemoryRegistry::new()),
595            }
596        }
597    }
598}
599
600impl std::fmt::Display for Table {
601    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
602        write!(f, "{}", self.inner)
603    }
604}
605
606impl From<Arc<dyn BaseTable>> for Table {
607    fn from(inner: Arc<dyn BaseTable>) -> Self {
608        Self {
609            inner,
610            database: None,
611            embedding_registry: Arc::new(MemoryRegistry::new()),
612        }
613    }
614}
615
616impl Table {
617    pub fn new(inner: Arc<dyn BaseTable>, database: Arc<dyn Database>) -> Self {
618        Self {
619            inner,
620            database: Some(database),
621            embedding_registry: Arc::new(MemoryRegistry::new()),
622        }
623    }
624
625    pub fn base_table(&self) -> &Arc<dyn BaseTable> {
626        &self.inner
627    }
628
629    pub fn database(&self) -> &Arc<dyn Database> {
630        self.database.as_ref().unwrap()
631    }
632
633    pub fn embedding_registry(&self) -> &Arc<dyn EmbeddingRegistry> {
634        &self.embedding_registry
635    }
636
637    pub(crate) fn new_with_embedding_registry(
638        inner: Arc<dyn BaseTable>,
639        database: Arc<dyn Database>,
640        embedding_registry: Arc<dyn EmbeddingRegistry>,
641    ) -> Self {
642        Self {
643            inner,
644            database: Some(database),
645            embedding_registry,
646        }
647    }
648
649    /// Cast as [`NativeTable`], or return None it if is not a [`NativeTable`].
650    ///
651    /// Warning: This function will be removed soon (features exclusive to NativeTable
652    ///          will be added to Table)
653    pub fn as_native(&self) -> Option<&NativeTable> {
654        self.inner.as_native()
655    }
656
657    /// Get the name of the table.
658    pub fn name(&self) -> &str {
659        self.inner.name()
660    }
661
662    /// Get the namespace of the table.
663    pub fn namespace(&self) -> &[String] {
664        self.inner.namespace()
665    }
666
667    /// Get the ID of the table (namespace + name joined by '$').
668    pub fn id(&self) -> &str {
669        self.inner.id()
670    }
671
672    /// Get the dataset of the table if it is a native table
673    ///
674    /// Returns None otherwise
675    pub fn dataset(&self) -> Option<&dataset::DatasetConsistencyWrapper> {
676        self.inner.as_native().map(|t| &t.dataset)
677    }
678
679    /// Get the arrow [Schema] of the table.
680    pub async fn schema(&self) -> Result<SchemaRef> {
681        self.inner.schema().await
682    }
683
684    /// Count the number of rows in this dataset.
685    ///
686    /// # Arguments
687    ///
688    /// * `filter` if present, only count rows matching the filter
689    pub async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
690        self.inner.count_rows(filter.map(Filter::Sql)).await
691    }
692
693    /// Insert new records into this Table
694    ///
695    /// # Arguments
696    ///
697    /// * `batches` data to be added to the Table
698    /// * `options` options to control how data is added
699    pub fn add<T: IntoArrow>(&self, batches: T) -> AddDataBuilder<T> {
700        AddDataBuilder {
701            parent: self.inner.clone(),
702            data: batches,
703            mode: AddDataMode::Append,
704            write_options: WriteOptions::default(),
705            embedding_registry: Some(self.embedding_registry.clone()),
706        }
707    }
708
709    /// Update existing records in the Table
710    ///
711    /// An update operation can be used to adjust existing values.  Use the
712    /// returned builder to specify which columns to update.  The new value
713    /// can be a literal value (e.g. replacing nulls with some default value)
714    /// or an expression applied to the old value (e.g. incrementing a value)
715    ///
716    /// An optional condition can be specified (e.g. "only update if the old
717    /// value is 0")
718    ///
719    /// Note: if your condition is something like "some_id_column == 7" and
720    /// you are updating many rows (with different ids) then you will get
721    /// better performance with a single [`merge_insert`] call instead of
722    /// repeatedly calilng this method.
723    pub fn update(&self) -> UpdateBuilder {
724        UpdateBuilder::new(self.inner.clone())
725    }
726
727    /// Delete the rows from table that match the predicate.
728    ///
729    /// # Arguments
730    /// - `predicate` - The SQL predicate string to filter the rows to be deleted.
731    ///
732    /// # Example
733    ///
734    /// ```no_run
735    /// # use std::sync::Arc;
736    /// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
737    /// #   RecordBatchIterator, Int32Array};
738    /// # use arrow_schema::{Schema, Field, DataType};
739    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
740    /// let tmpdir = tempfile::tempdir().unwrap();
741    /// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
742    ///     .execute()
743    ///     .await
744    ///     .unwrap();
745    /// # let schema = Arc::new(Schema::new(vec![
746    /// #  Field::new("id", DataType::Int32, false),
747    /// #  Field::new("vector", DataType::FixedSizeList(
748    /// #    Arc::new(Field::new("item", DataType::Float32, true)), 128), true),
749    /// # ]));
750    /// let batches = RecordBatchIterator::new(
751    ///     vec![RecordBatch::try_new(
752    ///         schema.clone(),
753    ///         vec![
754    ///             Arc::new(Int32Array::from_iter_values(0..10)),
755    ///             Arc::new(
756    ///                 FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
757    ///                     (0..10).map(|_| Some(vec![Some(1.0); 128])),
758    ///                     128,
759    ///                 ),
760    ///             ),
761    ///         ],
762    ///     )
763    ///     .unwrap()]
764    ///     .into_iter()
765    ///     .map(Ok),
766    ///     schema.clone(),
767    /// );
768    /// let tbl = db
769    ///     .create_table("delete_test", Box::new(batches))
770    ///     .execute()
771    ///     .await
772    ///     .unwrap();
773    /// tbl.delete("id > 5").await.unwrap();
774    /// # });
775    /// ```
776    pub async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
777        self.inner.delete(predicate).await
778    }
779
780    /// Create an index on the provided column(s).
781    ///
782    /// Indices are used to speed up searches and are often needed when the size of the table
783    /// becomes large (the exact size depends on many factors but somewhere between 100K rows
784    /// and 1M rows is a good rule of thumb)
785    ///
786    /// There are a variety of indices available.  They are described more in
787    /// [`crate::index::Index`].  The simplest thing to do is to use `index::Index::Auto` which
788    /// will attempt to create the most useful index based on the column type and column
789    /// statistics. `BTree` index is created by default for numeric, temporal, and
790    /// string columns.
791    ///
792    /// Once an index is created it will remain until the data is overwritten (e.g. an
793    /// add operation with mode overwrite) or the indexed column is dropped.
794    ///
795    /// Indices are not automatically updated with new data.  If you add new data to the
796    /// table then the index will not include the new rows.  However, a table search will
797    /// still consider the unindexed rows.  Searches will issue both an indexed search (on
798    /// the data covered by the index) and a flat search (on the unindexed data) and the
799    /// results will be combined.
800    ///
801    /// If there is enough unindexed data then the flat search will become slow and the index
802    /// should be optimized.  Optimizing an index will add any unindexed data to the existing
803    /// index without rerunning the full index creation process.  For more details see
804    /// [Table::optimize].
805    ///
806    /// Note: Multi-column (composite) indices are not currently supported.  However, they will
807    /// be supported in the future and the API is designed to be compatible with them.
808    ///
809    /// # Examples
810    ///
811    /// ```no_run
812    /// # use std::sync::Arc;
813    /// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
814    /// #   RecordBatchIterator, Int32Array};
815    /// # use arrow_schema::{Schema, Field, DataType};
816    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
817    /// use lancedb::index::Index;
818    /// let tmpdir = tempfile::tempdir().unwrap();
819    /// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
820    ///     .execute()
821    ///     .await
822    ///     .unwrap();
823    /// # let tbl = db.open_table("idx_test").execute().await.unwrap();
824    /// // Create IVF PQ index on the "vector" column by default.
825    /// tbl.create_index(&["vector"], Index::Auto)
826    ///    .execute()
827    ///    .await
828    ///    .unwrap();
829    /// // Create a BTree index on the "id" column.
830    /// tbl.create_index(&["id"], Index::Auto)
831    ///     .execute()
832    ///     .await
833    ///     .unwrap();
834    /// // Create a LabelList index on the "tags" column.
835    /// tbl.create_index(&["tags"], Index::LabelList(Default::default()))
836    ///     .execute()
837    ///     .await
838    ///     .unwrap();
839    /// # });
840    /// ```
841    pub fn create_index(&self, columns: &[impl AsRef<str>], index: Index) -> IndexBuilder {
842        IndexBuilder::new(
843            self.inner.clone(),
844            columns
845                .iter()
846                .map(|val| val.as_ref().to_string())
847                .collect::<Vec<_>>(),
848            index,
849        )
850    }
851
852    /// See [Table::create_index]
853    /// For remote tables, this allows an optional wait_timeout to poll until asynchronous indexing is complete
854    pub fn create_index_with_timeout(
855        &self,
856        columns: &[impl AsRef<str>],
857        index: Index,
858        wait_timeout: Option<std::time::Duration>,
859    ) -> IndexBuilder {
860        let mut builder = IndexBuilder::new(
861            self.inner.clone(),
862            columns
863                .iter()
864                .map(|val| val.as_ref().to_string())
865                .collect::<Vec<_>>(),
866            index,
867        );
868        if let Some(timeout) = wait_timeout {
869            builder = builder.wait_timeout(timeout);
870        }
871        builder
872    }
873
874    /// Create a builder for a merge insert operation
875    ///
876    /// This operation can add rows, update rows, and remove rows all in a single
877    /// transaction. It is a very generic tool that can be used to create
878    /// behaviors like "insert if not exists", "update or insert (i.e. upsert)",
879    /// or even replace a portion of existing data with new data (e.g. replace
880    /// all data where month="january")
881    ///
882    /// The merge insert operation works by combining new data from a
883    /// **source table** with existing data in a **target table** by using a
884    /// join.  There are three categories of records.
885    ///
886    /// "Matched" records are records that exist in both the source table and
887    /// the target table. "Not matched" records exist only in the source table
888    /// (e.g. these are new data) "Not matched by source" records exist only
889    /// in the target table (this is old data)
890    ///
891    /// The builder returned by this method can be used to customize what
892    /// should happen for each category of data.
893    ///
894    /// Please note that the data may appear to be reordered as part of this
895    /// operation.  This is because updated rows will be deleted from the
896    /// dataset and then reinserted at the end with the new values.
897    ///
898    /// # Arguments
899    ///
900    /// * `on` One or more columns to join on.  This is how records from the
901    ///   source table and target table are matched.  Typically this is some
902    ///   kind of key or id column.
903    ///
904    /// # Examples
905    ///
906    /// ```no_run
907    /// # use std::sync::Arc;
908    /// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
909    /// #   RecordBatchIterator, Int32Array};
910    /// # use arrow_schema::{Schema, Field, DataType};
911    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
912    /// let tmpdir = tempfile::tempdir().unwrap();
913    /// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
914    ///     .execute()
915    ///     .await
916    ///     .unwrap();
917    /// # let tbl = db.open_table("idx_test").execute().await.unwrap();
918    /// # let schema = Arc::new(Schema::new(vec![
919    /// #  Field::new("id", DataType::Int32, false),
920    /// #  Field::new("vector", DataType::FixedSizeList(
921    /// #    Arc::new(Field::new("item", DataType::Float32, true)), 128), true),
922    /// # ]));
923    /// let new_data = RecordBatchIterator::new(
924    ///     vec![RecordBatch::try_new(
925    ///         schema.clone(),
926    ///         vec![
927    ///             Arc::new(Int32Array::from_iter_values(0..10)),
928    ///             Arc::new(
929    ///                 FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
930    ///                     (0..10).map(|_| Some(vec![Some(1.0); 128])),
931    ///                     128,
932    ///                 ),
933    ///             ),
934    ///         ],
935    ///     )
936    ///     .unwrap()]
937    ///     .into_iter()
938    ///     .map(Ok),
939    ///     schema.clone(),
940    /// );
941    /// // Perform an upsert operation
942    /// let mut merge_insert = tbl.merge_insert(&["id"]);
943    /// merge_insert
944    ///     .when_matched_update_all(None)
945    ///     .when_not_matched_insert_all();
946    /// merge_insert.execute(Box::new(new_data)).await.unwrap();
947    /// # });
948    /// ```
949    pub fn merge_insert(&self, on: &[&str]) -> MergeInsertBuilder {
950        MergeInsertBuilder::new(
951            self.inner.clone(),
952            on.iter().map(|s| s.to_string()).collect(),
953        )
954    }
955
956    /// Create a [`Query`] Builder.
957    ///
958    /// Queries allow you to search your existing data.  By default the query will
959    /// return all the data in the table in no particular order.  The builder
960    /// returned by this method can be used to control the query using filtering,
961    /// vector similarity, sorting, and more.
962    ///
963    /// Note: By default, all columns are returned.  For best performance, you should
964    /// only fetch the columns you need.  See [`Query::select_with_projection`] for
965    /// more details.
966    ///
967    /// When appropriate, various indices and statistics will be used to accelerate
968    /// the query.
969    ///
970    /// # Examples
971    ///
972    /// ## Vector search
973    ///
974    /// This example will find the 10 rows whose value in the "vector" column are
975    /// closest to the query vector [1.0, 2.0, 3.0].  If an index has been created
976    /// on the "vector" column then this will perform an ANN search.
977    ///
978    /// The [`Query::refine_factor`] and [`Query::nprobes`] methods are used to
979    /// control the recall / latency tradeoff of the search.
980    ///
981    /// ```no_run
982    /// # use arrow_array::RecordBatch;
983    /// # use futures::TryStreamExt;
984    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
985    /// # let conn = lancedb::connect("/tmp").execute().await.unwrap();
986    /// # let tbl = conn.open_table("tbl").execute().await.unwrap();
987    /// use crate::lancedb::Table;
988    /// use crate::lancedb::query::ExecutableQuery;
989    /// let stream = tbl
990    ///     .query()
991    ///     .nearest_to(&[1.0, 2.0, 3.0])
992    ///     .unwrap()
993    ///     .refine_factor(5)
994    ///     .nprobes(10)
995    ///     .execute()
996    ///     .await
997    ///     .unwrap();
998    /// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
999    /// # });
1000    /// ```
1001    ///
1002    /// ## SQL-style filter
1003    ///
1004    /// This query will return up to 1000 rows whose value in the `id` column
1005    /// is greater than 5.  LanceDb supports a broad set of filtering functions.
1006    ///
1007    /// ```no_run
1008    /// # use arrow_array::RecordBatch;
1009    /// # use futures::TryStreamExt;
1010    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1011    /// # let conn = lancedb::connect("/tmp").execute().await.unwrap();
1012    /// # let tbl = conn.open_table("tbl").execute().await.unwrap();
1013    /// use crate::lancedb::Table;
1014    /// use crate::lancedb::query::{ExecutableQuery, QueryBase};
1015    /// let stream = tbl
1016    ///     .query()
1017    ///     .only_if("id > 5")
1018    ///     .limit(1000)
1019    ///     .execute()
1020    ///     .await
1021    ///     .unwrap();
1022    /// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1023    /// # });
1024    /// ```
1025    ///
1026    /// ## Full scan
1027    ///
1028    /// This query will return everything in the table in no particular
1029    /// order.
1030    ///
1031    /// ```no_run
1032    /// # use arrow_array::RecordBatch;
1033    /// # use futures::TryStreamExt;
1034    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1035    /// # let conn = lancedb::connect("/tmp").execute().await.unwrap();
1036    /// # let tbl = conn.open_table("tbl").execute().await.unwrap();
1037    /// use crate::lancedb::Table;
1038    /// use crate::lancedb::query::ExecutableQuery;
1039    /// let stream = tbl.query().execute().await.unwrap();
1040    /// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1041    /// # });
1042    /// ```
1043    pub fn query(&self) -> Query {
1044        Query::new(self.inner.clone())
1045    }
1046
1047    /// Extract rows from the dataset using dataset offsets.
1048    ///
1049    /// Dataset offsets are 0-indexed and relative to the current version of the table.
1050    /// They are not stable.  A row with an offset of N may have a different offset in a
1051    /// different version of the table (e.g. if an earlier row is deleted).
1052    ///
1053    /// Offsets are useful for sampling as the set of all valid offsets is easily
1054    /// known in advance to be [0, len(table)).
1055    ///
1056    /// No guarantees are made regarding the order in which results are returned.  If you
1057    /// desire an output order that matches the order of the given offsets, you will need
1058    /// to add the row offset column to the output and align it yourself.
1059    ///
1060    /// Parameters
1061    /// ----------
1062    /// offsets: list[int]
1063    ///     The offsets to take.
1064    ///
1065    /// Returns
1066    /// -------
1067    /// pa.RecordBatch
1068    ///     A record batch containing the rows at the given offsets.
1069    pub fn take_offsets(&self, offsets: Vec<u64>) -> TakeQuery {
1070        TakeQuery::from_offsets(self.inner.clone(), offsets)
1071    }
1072
1073    /// Extract rows from the dataset using row ids.
1074    ///
1075    /// Row ids are not stable and are relative to the current version of the table.
1076    /// They can change due to compaction and updates.
1077    ///
1078    /// Even so, row ids are more stable than offsets and can be useful in some situations.
1079    ///
1080    /// There is an ongoing effort to make row ids stable which is tracked at
1081    /// https://github.com/lancedb/lancedb/issues/1120
1082    ///
1083    /// No guarantees are made regarding the order in which results are returned.  If you
1084    /// desire an output order that matches the order of the given ids, you will need
1085    /// to add the row id column to the output and align it yourself.
1086    /// Parameters
1087    /// ----------
1088    /// row_ids: list[int]
1089    ///     The row ids to take.
1090    ///
1091    pub fn take_row_ids(&self, row_ids: Vec<u64>) -> TakeQuery {
1092        TakeQuery::from_row_ids(self.inner.clone(), row_ids)
1093    }
1094
1095    /// Search the table with a given query vector.
1096    ///
1097    /// This is a convenience method for preparing a vector query and
1098    /// is the same thing as calling `nearest_to` on the builder returned
1099    /// by `query`.  See [`Query::nearest_to`] for more details.
1100    pub fn vector_search(&self, query: impl IntoQueryVector) -> Result<VectorQuery> {
1101        self.query().nearest_to(query)
1102    }
1103
1104    /// Optimize the on-disk data and indices for better performance.
1105    ///
1106    /// Modeled after ``VACUUM`` in PostgreSQL.
1107    ///
1108    /// Optimization is discussed in more detail in the [OptimizeAction] documentation
1109    /// and covers three operations:
1110    ///
1111    ///  * Compaction: Merges small files into larger ones
1112    ///  * Prune: Removes old versions of the dataset
1113    ///  * Index: Optimizes the indices, adding new data to existing indices
1114    ///
1115    /// <section class="warning">Experimental API</section>
1116    ///
1117    /// The optimization process is undergoing active development and may change.
1118    /// Our goal with these changes is to improve the performance of optimization and
1119    /// reduce the complexity.
1120    ///
1121    /// That being said, it is essential today to run optimize if you want the best
1122    /// performance.  It should be stable and safe to use in production, but it our
1123    /// hope that the API may be simplified (or not even need to be called) in the future.
1124    ///
1125    /// The frequency an application shoudl call optimize is based on the frequency of
1126    /// data modifications.  If data is frequently added, deleted, or updated then
1127    /// optimize should be run frequently.  A good rule of thumb is to run optimize if
1128    /// you have added or modified 100,000 or more records or run more than 20 data
1129    /// modification operations.
1130    pub async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
1131        self.inner.optimize(action).await
1132    }
1133
1134    /// Add new columns to the table, providing values to fill in.
1135    pub async fn add_columns(
1136        &self,
1137        transforms: NewColumnTransform,
1138        read_columns: Option<Vec<String>>,
1139    ) -> Result<AddColumnsResult> {
1140        self.inner.add_columns(transforms, read_columns).await
1141    }
1142
1143    /// Change a column's name or nullability.
1144    pub async fn alter_columns(
1145        &self,
1146        alterations: &[ColumnAlteration],
1147    ) -> Result<AlterColumnsResult> {
1148        self.inner.alter_columns(alterations).await
1149    }
1150
1151    /// Remove columns from the table.
1152    pub async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
1153        self.inner.drop_columns(columns).await
1154    }
1155
1156    /// Retrieve the version of the table
1157    ///
1158    /// LanceDb supports versioning.  Every operation that modifies the table increases
1159    /// version.  As long as a version hasn't been deleted you can `[Self::checkout]` that
1160    /// version to view the data at that point.  In addition, you can `[Self::restore]` the
1161    /// version to replace the current table with a previous version.
1162    pub async fn version(&self) -> Result<u64> {
1163        self.inner.version().await
1164    }
1165
1166    /// Checks out a specific version of the Table
1167    ///
1168    /// Any read operation on the table will now access the data at the checked out version.
1169    /// As a consequence, calling this method will disable any read consistency interval
1170    /// that was previously set.
1171    ///
1172    /// This is a read-only operation that turns the table into a sort of "view"
1173    /// or "detached head".  Other table instances will not be affected.  To make the change
1174    /// permanent you can use the `[Self::restore]` method.
1175    ///
1176    /// Any operation that modifies the table will fail while the table is in a checked
1177    /// out state.
1178    ///
1179    /// To return the table to a normal state use `[Self::checkout_latest]`
1180    pub async fn checkout(&self, version: u64) -> Result<()> {
1181        self.inner.checkout(version).await
1182    }
1183
1184    /// Checks out a specific version of the Table by tag
1185    ///
1186    /// Any read operation on the table will now access the data at the version referenced by the tag.
1187    /// As a consequence, calling this method will disable any read consistency interval
1188    /// that was previously set.
1189    ///
1190    /// This is a read-only operation that turns the table into a sort of "view"
1191    /// or "detached head".  Other table instances will not be affected.  To make the change
1192    /// permanent you can use the `[Self::restore]` method.
1193    ///
1194    /// Any operation that modifies the table will fail while the table is in a checked
1195    /// out state.
1196    ///
1197    /// To return the table to a normal state use `[Self::checkout_latest]`
1198    pub async fn checkout_tag(&self, tag: &str) -> Result<()> {
1199        self.inner.checkout_tag(tag).await
1200    }
1201
1202    /// Ensures the table is pointing at the latest version
1203    ///
1204    /// This can be used to manually update a table when the read_consistency_interval is None
1205    /// It can also be used to undo a `[Self::checkout]` operation
1206    pub async fn checkout_latest(&self) -> Result<()> {
1207        self.inner.checkout_latest().await
1208    }
1209
1210    /// Restore the table to the currently checked out version
1211    ///
1212    /// This operation will fail if checkout has not been called previously
1213    ///
1214    /// This operation will overwrite the latest version of the table with a
1215    /// previous version.  Any changes made since the checked out version will
1216    /// no longer be visible.
1217    ///
1218    /// Once the operation concludes the table will no longer be in a checked
1219    /// out state and the read_consistency_interval, if any, will apply.
1220    pub async fn restore(&self) -> Result<()> {
1221        self.inner.restore().await
1222    }
1223
1224    /// List all the versions of the table
1225    pub async fn list_versions(&self) -> Result<Vec<Version>> {
1226        self.inner.list_versions().await
1227    }
1228
1229    /// List all indices that have been created with [`Self::create_index`]
1230    pub async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
1231        self.inner.list_indices().await
1232    }
1233
1234    /// Get the table URI (storage location)
1235    ///
1236    /// Returns the full storage location of the table (e.g., S3/GCS path).
1237    /// For remote tables, this fetches the location from the server via describe.
1238    pub async fn uri(&self) -> Result<String> {
1239        self.inner.uri().await
1240    }
1241
1242    /// Get the storage options used when opening this table, if any.
1243    ///
1244    /// Warning: This is an internal API and the return value is subject to change.
1245    #[deprecated(since = "0.25.0", note = "Use initial_storage_options() instead")]
1246    pub async fn storage_options(&self) -> Option<HashMap<String, String>> {
1247        #[allow(deprecated)]
1248        self.inner.storage_options().await
1249    }
1250
1251    /// Get the initial storage options that were passed in when opening this table.
1252    ///
1253    /// For dynamically refreshed options (e.g., credential vending), use [`Self::latest_storage_options`].
1254    ///
1255    /// Warning: This is an internal API and the return value is subject to change.
1256    pub async fn initial_storage_options(&self) -> Option<HashMap<String, String>> {
1257        self.inner.initial_storage_options().await
1258    }
1259
1260    /// Get the latest storage options, refreshing from provider if configured.
1261    ///
1262    /// This method is useful for credential vending scenarios where storage options
1263    /// may be refreshed dynamically. If no dynamic provider is configured, this
1264    /// returns the initial static options.
1265    ///
1266    /// Warning: This is an internal API and the return value is subject to change.
1267    pub async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>> {
1268        self.inner.latest_storage_options().await
1269    }
1270
1271    /// Get statistics about an index.
1272    /// Returns None if the index does not exist.
1273    pub async fn index_stats(
1274        &self,
1275        index_name: impl AsRef<str>,
1276    ) -> Result<Option<IndexStatistics>> {
1277        self.inner.index_stats(index_name.as_ref()).await
1278    }
1279
1280    /// Drop an index from the table.
1281    ///
1282    /// Note: This is not yet available in LanceDB cloud.
1283    ///
1284    /// This does not delete the index from disk, it just removes it from the table.
1285    /// To delete the index, run [`Self::optimize()`] after dropping the index.
1286    ///
1287    /// Use [`Self::list_indices()`] to find the names of the indices.
1288    pub async fn drop_index(&self, name: &str) -> Result<()> {
1289        self.inner.drop_index(name).await
1290    }
1291
1292    /// Prewarm an index in the table
1293    ///
1294    /// This is a hint to fully load the index into memory.  It can be used to
1295    /// avoid cold starts
1296    ///
1297    /// It is generally wasteful to call this if the index does not fit into the
1298    /// available cache.
1299    ///
1300    /// Note: This function is not yet supported on all indices, in which case it
1301    /// may do nothing.
1302    ///
1303    /// Use [`Self::list_indices()`] to find the names of the indices.
1304    pub async fn prewarm_index(&self, name: &str) -> Result<()> {
1305        self.inner.prewarm_index(name).await
1306    }
1307
1308    /// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
1309    /// are not fully indexed within the timeout.
1310    pub async fn wait_for_index(
1311        &self,
1312        index_names: &[&str],
1313        timeout: std::time::Duration,
1314    ) -> Result<()> {
1315        self.inner.wait_for_index(index_names, timeout).await
1316    }
1317
1318    /// Get the tags manager.
1319    pub async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
1320        self.inner.tags().await
1321    }
1322
1323    // Take many execution plans and map them into a single plan that adds
1324    // a query_index column and unions them.
1325    pub(crate) fn multi_vector_plan(
1326        plans: Vec<Arc<dyn ExecutionPlan>>,
1327    ) -> Result<Arc<dyn ExecutionPlan>> {
1328        if plans.is_empty() {
1329            return Err(Error::InvalidInput {
1330                message: "No plans provided".to_string(),
1331            });
1332        }
1333        // Projection to keeping all existing columns
1334        let first_plan = plans[0].clone();
1335        let project_all_columns = first_plan
1336            .schema()
1337            .fields()
1338            .iter()
1339            .enumerate()
1340            .map(|(i, field)| {
1341                let expr =
1342                    datafusion_physical_plan::expressions::Column::new(field.name().as_str(), i);
1343                let expr = Arc::new(expr) as Arc<dyn datafusion_physical_plan::PhysicalExpr>;
1344                (expr, field.name().clone())
1345            })
1346            .collect::<Vec<_>>();
1347
1348        let projected_plans = plans
1349            .into_iter()
1350            .enumerate()
1351            .map(|(plan_i, plan)| {
1352                let query_index = datafusion_common::ScalarValue::Int32(Some(plan_i as i32));
1353                let query_index_expr =
1354                    datafusion_physical_plan::expressions::Literal::new(query_index);
1355                let query_index_expr =
1356                    Arc::new(query_index_expr) as Arc<dyn datafusion_physical_plan::PhysicalExpr>;
1357                let mut projections = vec![(query_index_expr, "query_index".to_string())];
1358                projections.extend_from_slice(&project_all_columns);
1359                let projection = ProjectionExec::try_new(projections, plan).unwrap();
1360                Arc::new(projection) as Arc<dyn datafusion_physical_plan::ExecutionPlan>
1361            })
1362            .collect::<Vec<_>>();
1363
1364        let unioned = UnionExec::try_new(projected_plans).map_err(|err| Error::Runtime {
1365            message: err.to_string(),
1366        })?;
1367        // We require 1 partition in the final output
1368        let repartitioned = RepartitionExec::try_new(
1369            unioned,
1370            datafusion_physical_plan::Partitioning::RoundRobinBatch(1),
1371        )
1372        .unwrap();
1373        Ok(Arc::new(repartitioned))
1374    }
1375
1376    /// Retrieve statistics on the table
1377    pub async fn stats(&self) -> Result<TableStatistics> {
1378        self.inner.stats().await
1379    }
1380}
1381
1382pub struct NativeTags {
1383    dataset: dataset::DatasetConsistencyWrapper,
1384}
1385#[async_trait]
1386impl Tags for NativeTags {
1387    async fn list(&self) -> Result<HashMap<String, TagContents>> {
1388        let dataset = self.dataset.get().await?;
1389        Ok(dataset.tags().list().await?)
1390    }
1391
1392    async fn get_version(&self, tag: &str) -> Result<u64> {
1393        let dataset = self.dataset.get().await?;
1394        Ok(dataset.tags().get_version(tag).await?)
1395    }
1396
1397    async fn create(&mut self, tag: &str, version: u64) -> Result<()> {
1398        let dataset = self.dataset.get().await?;
1399        dataset.tags().create(tag, version).await?;
1400        Ok(())
1401    }
1402
1403    async fn delete(&mut self, tag: &str) -> Result<()> {
1404        let dataset = self.dataset.get().await?;
1405        dataset.tags().delete(tag).await?;
1406        Ok(())
1407    }
1408
1409    async fn update(&mut self, tag: &str, version: u64) -> Result<()> {
1410        let dataset = self.dataset.get().await?;
1411        dataset.tags().update(tag, version).await?;
1412        Ok(())
1413    }
1414}
1415
1416pub trait NativeTableExt {
1417    /// Cast as [`NativeTable`], or return None it if is not a [`NativeTable`].
1418    fn as_native(&self) -> Option<&NativeTable>;
1419}
1420
1421impl NativeTableExt for Arc<dyn BaseTable> {
1422    fn as_native(&self) -> Option<&NativeTable> {
1423        self.as_any().downcast_ref::<NativeTable>()
1424    }
1425}
1426
1427/// A table in a LanceDB database.
1428#[derive(Clone)]
1429pub struct NativeTable {
1430    name: String,
1431    namespace: Vec<String>,
1432    id: String,
1433    uri: String,
1434    pub(crate) dataset: dataset::DatasetConsistencyWrapper,
1435    // This comes from the connection options. We store here so we can pass down
1436    // to the dataset when we recreate it (for example, in checkout_latest).
1437    read_consistency_interval: Option<std::time::Duration>,
1438    // Optional namespace client for server-side query execution.
1439    // When set, queries will be executed on the namespace server instead of locally.
1440    namespace_client: Option<Arc<dyn LanceNamespace>>,
1441}
1442
1443impl std::fmt::Debug for NativeTable {
1444    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1445        f.debug_struct("NativeTable")
1446            .field("name", &self.name)
1447            .field("namespace", &self.namespace)
1448            .field("id", &self.id)
1449            .field("uri", &self.uri)
1450            .field("read_consistency_interval", &self.read_consistency_interval)
1451            .field("namespace_client", &self.namespace_client)
1452            .finish()
1453    }
1454}
1455
1456impl std::fmt::Display for NativeTable {
1457    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1458        write!(
1459            f,
1460            "NativeTable({}, uri={}, read_consistency_interval={})",
1461            self.name,
1462            self.uri,
1463            match self.read_consistency_interval {
1464                None => {
1465                    "None".to_string()
1466                }
1467                Some(duration) => {
1468                    format!("{}s", duration.as_secs_f64())
1469                }
1470            }
1471        )
1472    }
1473}
1474
1475impl NativeTable {
1476    /// Opens an existing Table
1477    ///
1478    /// # Arguments
1479    ///
1480    /// * `uri` - The uri to a [NativeTable]
1481    /// * `name` - The table name
1482    ///
1483    /// # Returns
1484    ///
1485    /// * A [NativeTable] object.
1486    pub async fn open(uri: &str) -> Result<Self> {
1487        let name = Self::get_table_name(uri)?;
1488        Self::open_with_params(uri, &name, vec![], None, None, None, None).await
1489    }
1490
1491    /// Opens an existing Table
1492    ///
1493    /// # Arguments
1494    ///
1495    /// * `base_path` - The base path where the table is located
1496    /// * `name` The Table name
1497    /// * `params` The [ReadParams] to use when opening the table
1498    /// * `namespace_client` - Optional namespace client for server-side query execution
1499    ///
1500    /// # Returns
1501    ///
1502    /// * A [NativeTable] object.
1503    #[allow(clippy::too_many_arguments)]
1504    pub async fn open_with_params(
1505        uri: &str,
1506        name: &str,
1507        namespace: Vec<String>,
1508        write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1509        params: Option<ReadParams>,
1510        read_consistency_interval: Option<std::time::Duration>,
1511        namespace_client: Option<Arc<dyn LanceNamespace>>,
1512    ) -> Result<Self> {
1513        let params = params.unwrap_or_default();
1514        // patch the params if we have a write store wrapper
1515        let params = match write_store_wrapper.clone() {
1516            Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1517            None => params,
1518        };
1519
1520        let dataset = DatasetBuilder::from_uri(uri)
1521            .with_read_params(params)
1522            .load()
1523            .await
1524            .map_err(|e| match e {
1525                lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
1526                    name: name.to_string(),
1527                    source: Box::new(e),
1528                },
1529                source => Error::Lance { source },
1530            })?;
1531
1532        let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
1533        let id = Self::build_id(&namespace, name);
1534
1535        Ok(Self {
1536            name: name.to_string(),
1537            namespace,
1538            id,
1539            uri: uri.to_string(),
1540            dataset,
1541            read_consistency_interval,
1542            namespace_client,
1543        })
1544    }
1545
1546    /// Set the namespace client for server-side query execution.
1547    ///
1548    /// When set, queries will be executed on the namespace server instead of locally.
1549    pub fn with_namespace_client(mut self, namespace_client: Arc<dyn LanceNamespace>) -> Self {
1550        self.namespace_client = Some(namespace_client);
1551        self
1552    }
1553
1554    /// Opens an existing Table using a namespace client.
1555    ///
1556    /// This method uses `DatasetBuilder::from_namespace` to open the table, which
1557    /// automatically fetches the table location and storage options from the namespace.
1558    /// This eliminates the need to pre-fetch and merge storage options before opening.
1559    ///
1560    /// # Arguments
1561    ///
1562    /// * `namespace_client` - The namespace client to use for fetching table metadata
1563    /// * `name` - The table name
1564    /// * `namespace` - The namespace path (e.g., vec!["parent", "child"])
1565    /// * `write_store_wrapper` - Optional wrapper for the object store on write path
1566    /// * `params` - Optional read parameters
1567    /// * `read_consistency_interval` - Optional interval for read consistency
1568    /// * `server_side_query_enabled` - Whether to enable server-side query execution.
1569    ///   When true, the namespace_client will be stored and queries will be executed
1570    ///   on the namespace server. When false, the namespace is only used for opening
1571    ///   the table, and queries are executed locally.
1572    /// * `session` - Optional session for object stores and caching
1573    ///
1574    /// # Returns
1575    ///
1576    /// * A [NativeTable] object.
1577    #[allow(clippy::too_many_arguments)]
1578    pub async fn open_from_namespace(
1579        namespace_client: Arc<dyn LanceNamespace>,
1580        name: &str,
1581        namespace: Vec<String>,
1582        write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1583        params: Option<ReadParams>,
1584        read_consistency_interval: Option<std::time::Duration>,
1585        server_side_query_enabled: bool,
1586        session: Option<Arc<lance::session::Session>>,
1587    ) -> Result<Self> {
1588        let mut params = params.unwrap_or_default();
1589
1590        // Set the session in read params
1591        if let Some(sess) = session {
1592            params.session(sess);
1593        }
1594
1595        // patch the params if we have a write store wrapper
1596        let params = match write_store_wrapper.clone() {
1597            Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1598            None => params,
1599        };
1600
1601        // Build table_id from namespace + name
1602        let mut table_id = namespace.clone();
1603        table_id.push(name.to_string());
1604
1605        // Use DatasetBuilder::from_namespace which automatically fetches location
1606        // and storage options from the namespace
1607        let builder = DatasetBuilder::from_namespace(namespace_client.clone(), table_id)
1608            .await
1609            .map_err(|e| match e {
1610                lance::Error::Namespace { source, .. } => Error::Runtime {
1611                    message: format!("Failed to get table info from namespace: {:?}", source),
1612                },
1613                source => Error::Lance { source },
1614            })?;
1615
1616        let dataset = builder
1617            .with_read_params(params)
1618            .load()
1619            .await
1620            .map_err(|e| match e {
1621                lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
1622                    name: name.to_string(),
1623                    source: Box::new(e),
1624                },
1625                source => Error::Lance { source },
1626            })?;
1627
1628        let uri = dataset.uri().to_string();
1629        let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
1630        let id = Self::build_id(&namespace, name);
1631
1632        let stored_namespace_client = if server_side_query_enabled {
1633            Some(namespace_client)
1634        } else {
1635            None
1636        };
1637
1638        Ok(Self {
1639            name: name.to_string(),
1640            namespace,
1641            id,
1642            uri,
1643            dataset,
1644            read_consistency_interval,
1645            namespace_client: stored_namespace_client,
1646        })
1647    }
1648
1649    fn get_table_name(uri: &str) -> Result<String> {
1650        let path = Path::new(uri);
1651        let name = path
1652            .file_stem()
1653            .ok_or(Error::TableNotFound {
1654                name: uri.to_string(),
1655                source: format!("Could not extract table name from URI: '{}'", uri).into(),
1656            })?
1657            .to_str()
1658            .ok_or(Error::InvalidTableName {
1659                name: uri.to_string(),
1660                reason: "Table name is not valid URL".to_string(),
1661            })?;
1662        Ok(name.to_string())
1663    }
1664
1665    fn build_id(namespace: &[String], name: &str) -> String {
1666        if namespace.is_empty() {
1667            name.to_string()
1668        } else {
1669            let mut parts = namespace.to_vec();
1670            parts.push(name.to_string());
1671            parts.join("$")
1672        }
1673    }
1674
1675    /// Creates a new Table
1676    ///
1677    /// # Arguments
1678    ///
1679    /// * `uri` - The URI to the table. When namespace is not empty, the caller must
1680    ///   provide an explicit URI (location) rather than deriving it from the table name.
1681    /// * `name` The Table name
1682    /// * `namespace` - The namespace path. When non-empty, an explicit URI must be provided.
1683    /// * `batches` RecordBatch to be saved in the database.
1684    /// * `params` - Write parameters.
1685    /// * `namespace_client` - Optional namespace client for server-side query execution
1686    ///
1687    /// # Returns
1688    ///
1689    /// * A [TableImpl] object.
1690    #[allow(clippy::too_many_arguments)]
1691    pub async fn create(
1692        uri: &str,
1693        name: &str,
1694        namespace: Vec<String>,
1695        batches: impl StreamingWriteSource,
1696        write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1697        params: Option<WriteParams>,
1698        read_consistency_interval: Option<std::time::Duration>,
1699        namespace_client: Option<Arc<dyn LanceNamespace>>,
1700    ) -> Result<Self> {
1701        // Default params uses format v1.
1702        let params = params.unwrap_or(WriteParams {
1703            ..Default::default()
1704        });
1705        // patch the params if we have a write store wrapper
1706        let params = match write_store_wrapper.clone() {
1707            Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1708            None => params,
1709        };
1710
1711        let insert_builder = InsertBuilder::new(uri).with_params(&params);
1712        let dataset = insert_builder
1713            .execute_stream(batches)
1714            .await
1715            .map_err(|e| match e {
1716                lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists {
1717                    name: name.to_string(),
1718                },
1719                source => Error::Lance { source },
1720            })?;
1721
1722        let id = Self::build_id(&namespace, name);
1723
1724        Ok(Self {
1725            name: name.to_string(),
1726            namespace,
1727            id,
1728            uri: uri.to_string(),
1729            dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
1730            read_consistency_interval,
1731            namespace_client,
1732        })
1733    }
1734
1735    #[allow(clippy::too_many_arguments)]
1736    pub async fn create_empty(
1737        uri: &str,
1738        name: &str,
1739        namespace: Vec<String>,
1740        schema: SchemaRef,
1741        write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1742        params: Option<WriteParams>,
1743        read_consistency_interval: Option<std::time::Duration>,
1744        namespace_client: Option<Arc<dyn LanceNamespace>>,
1745    ) -> Result<Self> {
1746        let batches = RecordBatchIterator::new(vec![], schema);
1747        Self::create(
1748            uri,
1749            name,
1750            namespace,
1751            batches,
1752            write_store_wrapper,
1753            params,
1754            read_consistency_interval,
1755            namespace_client,
1756        )
1757        .await
1758    }
1759
1760    /// Creates a new Table using a namespace client for storage options.
1761    ///
1762    /// This method sets up a `StorageOptionsProvider` from the namespace client,
1763    /// enabling automatic credential refresh for cloud storage. The namespace
1764    /// is used for:
1765    /// 1. Setting up storage options provider for credential vending
1766    /// 2. Optionally enabling server-side query execution
1767    ///
1768    /// # Arguments
1769    ///
1770    /// * `namespace_client` - The namespace client to use for storage options
1771    /// * `uri` - The URI to the table (obtained from create_empty_table response)
1772    /// * `name` - The table name
1773    /// * `namespace` - The namespace path (e.g., vec!["parent", "child"])
1774    /// * `batches` - RecordBatch to be saved in the database
1775    /// * `write_store_wrapper` - Optional wrapper for the object store on write path
1776    /// * `params` - Optional write parameters
1777    /// * `read_consistency_interval` - Optional interval for read consistency
1778    /// * `server_side_query_enabled` - Whether to enable server-side query execution
1779    ///
1780    /// # Returns
1781    ///
1782    /// * A [NativeTable] object.
1783    #[allow(clippy::too_many_arguments)]
1784    pub async fn create_from_namespace(
1785        namespace_client: Arc<dyn LanceNamespace>,
1786        uri: &str,
1787        name: &str,
1788        namespace: Vec<String>,
1789        batches: impl StreamingWriteSource,
1790        write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1791        params: Option<WriteParams>,
1792        read_consistency_interval: Option<std::time::Duration>,
1793        server_side_query_enabled: bool,
1794        session: Option<Arc<lance::session::Session>>,
1795    ) -> Result<Self> {
1796        // Build table_id from namespace + name for the storage options provider
1797        let mut table_id = namespace.clone();
1798        table_id.push(name.to_string());
1799
1800        // Set up storage options provider from namespace
1801        let storage_options_provider = Arc::new(LanceNamespaceStorageOptionsProvider::new(
1802            namespace_client.clone(),
1803            table_id,
1804        ));
1805
1806        // Start with provided params or defaults
1807        let mut params = params.unwrap_or_default();
1808
1809        // Set the session in write params
1810        if let Some(sess) = session {
1811            params.session = Some(sess);
1812        }
1813
1814        // Ensure store_params exists and set the storage options provider
1815        let store_params = params
1816            .store_params
1817            .get_or_insert_with(ObjectStoreParams::default);
1818        let accessor = match store_params.storage_options().cloned() {
1819            Some(options) => {
1820                StorageOptionsAccessor::with_initial_and_provider(options, storage_options_provider)
1821            }
1822            None => StorageOptionsAccessor::with_provider(storage_options_provider),
1823        };
1824        store_params.storage_options_accessor = Some(Arc::new(accessor));
1825
1826        // Patch the params if we have a write store wrapper
1827        let params = match write_store_wrapper.clone() {
1828            Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1829            None => params,
1830        };
1831
1832        let insert_builder = InsertBuilder::new(uri).with_params(&params);
1833        let dataset = insert_builder
1834            .execute_stream(batches)
1835            .await
1836            .map_err(|e| match e {
1837                lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists {
1838                    name: name.to_string(),
1839                },
1840                source => Error::Lance { source },
1841            })?;
1842
1843        let id = Self::build_id(&namespace, name);
1844
1845        let stored_namespace_client = if server_side_query_enabled {
1846            Some(namespace_client)
1847        } else {
1848            None
1849        };
1850
1851        Ok(Self {
1852            name: name.to_string(),
1853            namespace,
1854            id,
1855            uri: uri.to_string(),
1856            dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
1857            read_consistency_interval,
1858            namespace_client: stored_namespace_client,
1859        })
1860    }
1861
1862    async fn optimize_indices(&self, options: &OptimizeOptions) -> Result<()> {
1863        info!("LanceDB: optimizing indices: {:?}", options);
1864        self.dataset
1865            .get_mut()
1866            .await?
1867            .optimize_indices(options)
1868            .await?;
1869        Ok(())
1870    }
1871
1872    /// Merge new data into this table.
1873    pub async fn merge(
1874        &mut self,
1875        batches: impl RecordBatchReader + Send + 'static,
1876        left_on: &str,
1877        right_on: &str,
1878    ) -> Result<()> {
1879        self.dataset
1880            .get_mut()
1881            .await?
1882            .merge(batches, left_on, right_on)
1883            .await?;
1884        Ok(())
1885    }
1886
1887    /// Remove old versions of the dataset from disk.
1888    ///
1889    /// # Arguments
1890    /// * `older_than` - The duration of time to keep versions of the dataset.
1891    /// * `delete_unverified` - Because they may be part of an in-progress
1892    ///   transaction, files newer than 7 days old are not deleted by default.
1893    ///   If you are sure that there are no in-progress transactions, then you
1894    ///   can set this to True to delete all files older than `older_than`.
1895    ///
1896    /// This calls into [lance::dataset::Dataset::cleanup_old_versions] and
1897    /// returns the result.
1898    async fn cleanup_old_versions(
1899        &self,
1900        older_than: Duration,
1901        delete_unverified: Option<bool>,
1902        error_if_tagged_old_versions: Option<bool>,
1903    ) -> Result<RemovalStats> {
1904        Ok(self
1905            .dataset
1906            .get_mut()
1907            .await?
1908            .cleanup_old_versions(older_than, delete_unverified, error_if_tagged_old_versions)
1909            .await?)
1910    }
1911
1912    /// Compact files in the dataset.
1913    ///
1914    /// This can be run after making several small appends to optimize the table
1915    /// for faster reads.
1916    ///
1917    /// This calls into [lance::dataset::optimize::compact_files].
1918    async fn compact_files(
1919        &self,
1920        options: CompactionOptions,
1921        remap_options: Option<Arc<dyn IndexRemapperOptions>>,
1922    ) -> Result<CompactionMetrics> {
1923        let mut dataset_mut = self.dataset.get_mut().await?;
1924        let metrics = compact_files(&mut dataset_mut, options, remap_options).await?;
1925        Ok(metrics)
1926    }
1927
1928    // TODO: why are these individual methods and not some single "get_stats" method?
1929    pub async fn count_fragments(&self) -> Result<usize> {
1930        Ok(self.dataset.get().await?.count_fragments())
1931    }
1932
1933    pub async fn count_deleted_rows(&self) -> Result<usize> {
1934        Ok(self.dataset.get().await?.count_deleted_rows().await?)
1935    }
1936
1937    pub async fn num_small_files(&self, max_rows_per_group: usize) -> Result<usize> {
1938        Ok(self
1939            .dataset
1940            .get()
1941            .await?
1942            .num_small_files(max_rows_per_group)
1943            .await)
1944    }
1945
1946    pub async fn load_indices(&self) -> Result<Vec<VectorIndex>> {
1947        let dataset = self.dataset.get().await?;
1948        let mf = dataset.manifest();
1949        let indices = dataset.load_indices().await?;
1950        Ok(indices
1951            .iter()
1952            .map(|i| VectorIndex::new_from_format(mf, i))
1953            .collect())
1954    }
1955
1956    // Helper to validate index type compatibility with field data type
1957    fn validate_index_type(
1958        field: &Field,
1959        index_name: &str,
1960        supported_fn: impl Fn(&DataType) -> bool,
1961    ) -> Result<()> {
1962        if !supported_fn(field.data_type()) {
1963            return Err(Error::Schema {
1964                message: format!(
1965                    "A {} index cannot be created on the field `{}` which has data type {}",
1966                    index_name,
1967                    field.name(),
1968                    field.data_type()
1969                ),
1970            });
1971        }
1972        Ok(())
1973    }
1974
1975    // Helper to build IVF params honoring table options.
1976    fn build_ivf_params(
1977        num_partitions: Option<u32>,
1978        target_partition_size: Option<u32>,
1979        sample_rate: u32,
1980        max_iterations: u32,
1981    ) -> IvfBuildParams {
1982        let mut ivf_params = match (num_partitions, target_partition_size) {
1983            (Some(num_partitions), _) => IvfBuildParams::new(num_partitions as usize),
1984            (None, Some(target_partition_size)) => {
1985                IvfBuildParams::with_target_partition_size(target_partition_size as usize)
1986            }
1987            (None, None) => IvfBuildParams::default(),
1988        };
1989        ivf_params.sample_rate = sample_rate as usize;
1990        ivf_params.max_iters = max_iterations as usize;
1991        ivf_params
1992    }
1993
1994    // Helper to get num_sub_vectors with default calculation
1995    fn get_num_sub_vectors(provided: Option<u32>, dim: u32, num_bits: Option<u32>) -> u32 {
1996        if let Some(provided) = provided {
1997            return provided;
1998        }
1999        let suggested = suggested_num_sub_vectors(dim);
2000        if num_bits.is_some_and(|num_bits| num_bits == 4) && !suggested.is_multiple_of(2) {
2001            // num_sub_vectors must be even when 4 bits are used
2002            suggested + 1
2003        } else {
2004            suggested
2005        }
2006    }
2007
2008    // Helper to extract vector dimension from field
2009    fn get_vector_dimension(field: &Field) -> Result<u32> {
2010        match field.data_type() {
2011            arrow_schema::DataType::FixedSizeList(_, n) => Ok(*n as u32),
2012            _ => Ok(infer_vector_dim(field.data_type())? as u32),
2013        }
2014    }
2015
2016    // Convert LanceDB Index to Lance IndexParams
2017    async fn make_index_params(
2018        &self,
2019        field: &Field,
2020        index_opts: Index,
2021    ) -> Result<Box<dyn lance::index::IndexParams>> {
2022        match index_opts {
2023            Index::Auto => {
2024                if supported_vector_data_type(field.data_type()) {
2025                    // Use IvfPq as the default for auto vector indices
2026                    let dim = Self::get_vector_dimension(field)?;
2027                    let ivf_params = lance_index::vector::ivf::IvfBuildParams::default();
2028                    let num_sub_vectors = Self::get_num_sub_vectors(None, dim, None);
2029                    let pq_params =
2030                        lance_index::vector::pq::PQBuildParams::new(num_sub_vectors as usize, 8);
2031                    let lance_idx_params =
2032                        lance::index::vector::VectorIndexParams::with_ivf_pq_params(
2033                            lance_linalg::distance::MetricType::L2,
2034                            ivf_params,
2035                            pq_params,
2036                        );
2037                    Ok(Box::new(lance_idx_params))
2038                } else if supported_btree_data_type(field.data_type()) {
2039                    Ok(Box::new(ScalarIndexParams::for_builtin(
2040                        BuiltinIndexType::BTree,
2041                    )))
2042                } else {
2043                    Err(Error::InvalidInput {
2044                        message: format!(
2045                            "there are no indices supported for the field `{}` with the data type {}",
2046                            field.name(),
2047                            field.data_type()
2048                        ),
2049                    })?
2050                }
2051            }
2052            Index::BTree(_) => {
2053                Self::validate_index_type(field, "BTree", supported_btree_data_type)?;
2054                Ok(Box::new(ScalarIndexParams::for_builtin(
2055                    BuiltinIndexType::BTree,
2056                )))
2057            }
2058            Index::Bitmap(_) => {
2059                Self::validate_index_type(field, "Bitmap", supported_bitmap_data_type)?;
2060                Ok(Box::new(ScalarIndexParams::for_builtin(
2061                    BuiltinIndexType::Bitmap,
2062                )))
2063            }
2064            Index::LabelList(_) => {
2065                Self::validate_index_type(field, "LabelList", supported_label_list_data_type)?;
2066                Ok(Box::new(ScalarIndexParams::for_builtin(
2067                    BuiltinIndexType::LabelList,
2068                )))
2069            }
2070            Index::FTS(fts_opts) => {
2071                Self::validate_index_type(field, "FTS", supported_fts_data_type)?;
2072                Ok(Box::new(fts_opts))
2073            }
2074            Index::IvfFlat(index) => {
2075                Self::validate_index_type(field, "IVF Flat", supported_vector_data_type)?;
2076                let ivf_params = Self::build_ivf_params(
2077                    index.num_partitions,
2078                    index.target_partition_size,
2079                    index.sample_rate,
2080                    index.max_iterations,
2081                );
2082                let lance_idx_params =
2083                    VectorIndexParams::with_ivf_flat_params(index.distance_type.into(), ivf_params);
2084                Ok(Box::new(lance_idx_params))
2085            }
2086            Index::IvfSq(index) => {
2087                Self::validate_index_type(field, "IVF SQ", supported_vector_data_type)?;
2088                let ivf_params = Self::build_ivf_params(
2089                    index.num_partitions,
2090                    index.target_partition_size,
2091                    index.sample_rate,
2092                    index.max_iterations,
2093                );
2094                let sq_params = SQBuildParams {
2095                    sample_rate: index.sample_rate as usize,
2096                    ..Default::default()
2097                };
2098                let lance_idx_params = VectorIndexParams::with_ivf_sq_params(
2099                    index.distance_type.into(),
2100                    ivf_params,
2101                    sq_params,
2102                );
2103                Ok(Box::new(lance_idx_params))
2104            }
2105            Index::IvfPq(index) => {
2106                Self::validate_index_type(field, "IVF PQ", supported_vector_data_type)?;
2107                let dim = Self::get_vector_dimension(field)?;
2108                let ivf_params = Self::build_ivf_params(
2109                    index.num_partitions,
2110                    index.target_partition_size,
2111                    index.sample_rate,
2112                    index.max_iterations,
2113                );
2114                let num_sub_vectors =
2115                    Self::get_num_sub_vectors(index.num_sub_vectors, dim, index.num_bits);
2116                let num_bits = index.num_bits.unwrap_or(8) as usize;
2117                let mut pq_params = PQBuildParams::new(num_sub_vectors as usize, num_bits);
2118                pq_params.max_iters = index.max_iterations as usize;
2119                let lance_idx_params = VectorIndexParams::with_ivf_pq_params(
2120                    index.distance_type.into(),
2121                    ivf_params,
2122                    pq_params,
2123                );
2124                Ok(Box::new(lance_idx_params))
2125            }
2126            Index::IvfRq(index) => {
2127                Self::validate_index_type(field, "IVF RQ", supported_vector_data_type)?;
2128                let ivf_params = Self::build_ivf_params(
2129                    index.num_partitions,
2130                    index.target_partition_size,
2131                    index.sample_rate,
2132                    index.max_iterations,
2133                );
2134                let rq_params = RQBuildParams::new(index.num_bits.unwrap_or(1) as u8);
2135                let lance_idx_params = VectorIndexParams::with_ivf_rq_params(
2136                    index.distance_type.into(),
2137                    ivf_params,
2138                    rq_params,
2139                );
2140                Ok(Box::new(lance_idx_params))
2141            }
2142            Index::IvfHnswPq(index) => {
2143                Self::validate_index_type(field, "IVF HNSW PQ", supported_vector_data_type)?;
2144                let dim = Self::get_vector_dimension(field)?;
2145                let ivf_params = Self::build_ivf_params(
2146                    index.num_partitions,
2147                    index.target_partition_size,
2148                    index.sample_rate,
2149                    index.max_iterations,
2150                );
2151                let num_sub_vectors =
2152                    Self::get_num_sub_vectors(index.num_sub_vectors, dim, index.num_bits);
2153                let hnsw_params = HnswBuildParams::default()
2154                    .num_edges(index.m as usize)
2155                    .ef_construction(index.ef_construction as usize);
2156                let pq_params = PQBuildParams::new(
2157                    num_sub_vectors as usize,
2158                    index.num_bits.unwrap_or(8) as usize,
2159                );
2160                let lance_idx_params = VectorIndexParams::with_ivf_hnsw_pq_params(
2161                    index.distance_type.into(),
2162                    ivf_params,
2163                    hnsw_params,
2164                    pq_params,
2165                );
2166                Ok(Box::new(lance_idx_params))
2167            }
2168            Index::IvfHnswSq(index) => {
2169                Self::validate_index_type(field, "IVF HNSW SQ", supported_vector_data_type)?;
2170                let ivf_params = Self::build_ivf_params(
2171                    index.num_partitions,
2172                    index.target_partition_size,
2173                    index.sample_rate,
2174                    index.max_iterations,
2175                );
2176                let hnsw_params = HnswBuildParams::default()
2177                    .num_edges(index.m as usize)
2178                    .ef_construction(index.ef_construction as usize);
2179                let sq_params = SQBuildParams {
2180                    sample_rate: index.sample_rate as usize,
2181                    ..Default::default()
2182                };
2183                let lance_idx_params = VectorIndexParams::with_ivf_hnsw_sq_params(
2184                    index.distance_type.into(),
2185                    ivf_params,
2186                    hnsw_params,
2187                    sq_params,
2188                );
2189                Ok(Box::new(lance_idx_params))
2190            }
2191        }
2192    }
2193
2194    // Helper method to get the correct IndexType based on the Index variant and field data type
2195    fn get_index_type_for_field(&self, field: &Field, index: &Index) -> IndexType {
2196        match index {
2197            Index::Auto => {
2198                if supported_vector_data_type(field.data_type()) {
2199                    IndexType::Vector
2200                } else if supported_btree_data_type(field.data_type()) {
2201                    IndexType::BTree
2202                } else {
2203                    // This should not happen since make_index_params would have failed
2204                    IndexType::BTree
2205                }
2206            }
2207            Index::BTree(_) => IndexType::BTree,
2208            Index::Bitmap(_) => IndexType::Bitmap,
2209            Index::LabelList(_) => IndexType::LabelList,
2210            Index::FTS(_) => IndexType::Inverted,
2211            Index::IvfFlat(_)
2212            | Index::IvfSq(_)
2213            | Index::IvfPq(_)
2214            | Index::IvfRq(_)
2215            | Index::IvfHnswPq(_)
2216            | Index::IvfHnswSq(_) => IndexType::Vector,
2217        }
2218    }
2219
2220    async fn generic_query(
2221        &self,
2222        query: &AnyQuery,
2223        options: QueryExecutionOptions,
2224    ) -> Result<DatasetRecordBatchStream> {
2225        let plan = self.create_plan(query, options.clone()).await?;
2226        let inner = execute_plan(plan, Default::default())?;
2227        let inner = if let Some(timeout) = options.timeout {
2228            TimeoutStream::new_boxed(inner, timeout)
2229        } else {
2230            inner
2231        };
2232        Ok(DatasetRecordBatchStream::new(inner))
2233    }
2234
2235    /// Execute a query on the namespace server instead of locally.
2236    async fn namespace_query(
2237        &self,
2238        namespace_client: Arc<dyn LanceNamespace>,
2239        query: &AnyQuery,
2240        _options: QueryExecutionOptions,
2241    ) -> Result<DatasetRecordBatchStream> {
2242        // Build table_id from namespace + table name
2243        let mut table_id = self.namespace.clone();
2244        table_id.push(self.name.clone());
2245
2246        // Convert AnyQuery to namespace QueryTableRequest
2247        let mut ns_request = self.convert_to_namespace_query(query)?;
2248        // Set the table ID on the request
2249        ns_request.id = Some(table_id);
2250
2251        // Call the namespace query_table API
2252        let response_bytes = namespace_client
2253            .query_table(ns_request)
2254            .await
2255            .map_err(|e| Error::Runtime {
2256                message: format!("Failed to execute server-side query: {}", e),
2257            })?;
2258
2259        // Parse the Arrow IPC response into a RecordBatchStream
2260        self.parse_arrow_ipc_response(response_bytes).await
2261    }
2262
2263    /// Convert a QueryFilter to a SQL string for the namespace API.
2264    fn filter_to_sql(&self, filter: &QueryFilter) -> Result<String> {
2265        match filter {
2266            QueryFilter::Sql(sql) => Ok(sql.clone()),
2267            QueryFilter::Substrait(_) => Err(Error::NotSupported {
2268                message: "Substrait filters are not supported for server-side queries".to_string(),
2269            }),
2270            QueryFilter::Datafusion(_) => Err(Error::NotSupported {
2271                message: "Datafusion expression filters are not supported for server-side queries. Use SQL filter instead.".to_string(),
2272            }),
2273        }
2274    }
2275
2276    /// Convert an AnyQuery to the namespace QueryTableRequest format.
2277    fn convert_to_namespace_query(&self, query: &AnyQuery) -> Result<NsQueryTableRequest> {
2278        match query {
2279            AnyQuery::VectorQuery(vq) => {
2280                // Extract the query vector(s)
2281                let vector = self.extract_query_vector(&vq.query_vector)?;
2282
2283                // Convert filter to SQL string
2284                let filter = match &vq.base.filter {
2285                    Some(f) => Some(self.filter_to_sql(f)?),
2286                    None => None,
2287                };
2288
2289                // Convert select to columns list
2290                let columns = match &vq.base.select {
2291                    Select::All => None,
2292                    Select::Columns(cols) => Some(Box::new(QueryTableRequestColumns {
2293                        column_names: Some(cols.clone()),
2294                        column_aliases: None,
2295                    })),
2296                    Select::Dynamic(_) => {
2297                        return Err(Error::NotSupported {
2298                            message:
2299                                "Dynamic column selection is not supported for server-side queries"
2300                                    .to_string(),
2301                        });
2302                    }
2303                };
2304
2305                // Check for unsupported features
2306                if vq.base.reranker.is_some() {
2307                    return Err(Error::NotSupported {
2308                        message: "Reranker is not supported for server-side queries".to_string(),
2309                    });
2310                }
2311
2312                // Convert FTS query if present
2313                let full_text_query = vq.base.full_text_search.as_ref().map(|fts| {
2314                    let columns = fts.columns();
2315                    let columns_vec = if columns.is_empty() {
2316                        None
2317                    } else {
2318                        Some(columns.into_iter().collect())
2319                    };
2320                    Box::new(QueryTableRequestFullTextQuery {
2321                        string_query: Some(Box::new(StringFtsQuery {
2322                            query: fts.query.to_string(),
2323                            columns: columns_vec,
2324                        })),
2325                        structured_query: None,
2326                    })
2327                });
2328
2329                Ok(NsQueryTableRequest {
2330                    id: None, // Will be set in namespace_query
2331                    k: vq.base.limit.unwrap_or(10) as i32,
2332                    vector: Box::new(vector),
2333                    vector_column: vq.column.clone(),
2334                    filter,
2335                    columns,
2336                    offset: vq.base.offset.map(|o| o as i32),
2337                    distance_type: vq.distance_type.map(|dt| dt.to_string()),
2338                    nprobes: Some(vq.minimum_nprobes as i32),
2339                    ef: vq.ef.map(|e| e as i32),
2340                    refine_factor: vq.refine_factor.map(|r| r as i32),
2341                    lower_bound: vq.lower_bound,
2342                    upper_bound: vq.upper_bound,
2343                    prefilter: Some(vq.base.prefilter),
2344                    fast_search: Some(vq.base.fast_search),
2345                    with_row_id: Some(vq.base.with_row_id),
2346                    bypass_vector_index: Some(!vq.use_index),
2347                    full_text_query,
2348                    ..Default::default()
2349                })
2350            }
2351            AnyQuery::Query(q) => {
2352                // For non-vector queries, pass an empty vector (similar to remote table implementation)
2353                if q.reranker.is_some() {
2354                    return Err(Error::NotSupported {
2355                        message: "Reranker is not supported for server-side query execution"
2356                            .to_string(),
2357                    });
2358                }
2359
2360                let filter = q
2361                    .filter
2362                    .as_ref()
2363                    .map(|f| self.filter_to_sql(f))
2364                    .transpose()?;
2365
2366                let columns = match &q.select {
2367                    Select::All => None,
2368                    Select::Columns(cols) => Some(Box::new(QueryTableRequestColumns {
2369                        column_names: Some(cols.clone()),
2370                        column_aliases: None,
2371                    })),
2372                    Select::Dynamic(_) => {
2373                        return Err(Error::NotSupported {
2374                            message: "Dynamic columns are not supported for server-side query"
2375                                .to_string(),
2376                        });
2377                    }
2378                };
2379
2380                // Handle full text search if present
2381                let full_text_query = q.full_text_search.as_ref().map(|fts| {
2382                    let columns_vec = if fts.columns().is_empty() {
2383                        None
2384                    } else {
2385                        Some(fts.columns().iter().cloned().collect())
2386                    };
2387                    Box::new(QueryTableRequestFullTextQuery {
2388                        string_query: Some(Box::new(StringFtsQuery {
2389                            query: fts.query.to_string(),
2390                            columns: columns_vec,
2391                        })),
2392                        structured_query: None,
2393                    })
2394                });
2395
2396                // Empty vector for non-vector queries
2397                let vector = Box::new(QueryTableRequestVector {
2398                    single_vector: Some(vec![]),
2399                    multi_vector: None,
2400                });
2401
2402                Ok(NsQueryTableRequest {
2403                    id: None, // Will be set by caller
2404                    vector,
2405                    k: q.limit.unwrap_or(10) as i32,
2406                    filter,
2407                    columns,
2408                    prefilter: Some(q.prefilter),
2409                    offset: q.offset.map(|o| o as i32),
2410                    vector_column: None, // No vector column for plain queries
2411                    with_row_id: Some(q.with_row_id),
2412                    bypass_vector_index: Some(true), // No vector index for plain queries
2413                    full_text_query,
2414                    ..Default::default()
2415                })
2416            }
2417        }
2418    }
2419
2420    /// Extract query vector(s) from Arrow arrays into the namespace format.
2421    fn extract_query_vector(
2422        &self,
2423        query_vectors: &[Arc<dyn arrow_array::Array>],
2424    ) -> Result<QueryTableRequestVector> {
2425        if query_vectors.is_empty() {
2426            return Err(Error::InvalidInput {
2427                message: "Query vector is required for vector search".to_string(),
2428            });
2429        }
2430
2431        // Handle single vector case
2432        if query_vectors.len() == 1 {
2433            let arr = &query_vectors[0];
2434            let single_vector = self.array_to_f32_vec(arr)?;
2435            Ok(QueryTableRequestVector {
2436                single_vector: Some(single_vector),
2437                multi_vector: None,
2438            })
2439        } else {
2440            // Handle multi-vector case
2441            let multi_vector: Result<Vec<Vec<f32>>> = query_vectors
2442                .iter()
2443                .map(|arr| self.array_to_f32_vec(arr))
2444                .collect();
2445            Ok(QueryTableRequestVector {
2446                single_vector: None,
2447                multi_vector: Some(multi_vector?),
2448            })
2449        }
2450    }
2451
2452    /// Convert an Arrow array to a Vec<f32>.
2453    fn array_to_f32_vec(&self, arr: &Arc<dyn arrow_array::Array>) -> Result<Vec<f32>> {
2454        // Handle FixedSizeList (common for vectors)
2455        if let Some(fsl) = arr
2456            .as_any()
2457            .downcast_ref::<arrow_array::FixedSizeListArray>()
2458        {
2459            let values = fsl.values();
2460            if let Some(f32_arr) = values.as_any().downcast_ref::<arrow_array::Float32Array>() {
2461                return Ok(f32_arr.values().to_vec());
2462            }
2463        }
2464
2465        // Handle direct Float32Array
2466        if let Some(f32_arr) = arr.as_any().downcast_ref::<arrow_array::Float32Array>() {
2467            return Ok(f32_arr.values().to_vec());
2468        }
2469
2470        Err(Error::InvalidInput {
2471            message: "Query vector must be Float32 type".to_string(),
2472        })
2473    }
2474
2475    /// Parse Arrow IPC response from the namespace server.
2476    async fn parse_arrow_ipc_response(
2477        &self,
2478        bytes: bytes::Bytes,
2479    ) -> Result<DatasetRecordBatchStream> {
2480        use arrow_ipc::reader::StreamReader;
2481        use std::io::Cursor;
2482
2483        let cursor = Cursor::new(bytes);
2484        let reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Runtime {
2485            message: format!("Failed to parse Arrow IPC response: {}", e),
2486        })?;
2487
2488        // Collect all record batches
2489        let schema = reader.schema();
2490        let batches: Vec<_> = reader
2491            .into_iter()
2492            .collect::<std::result::Result<Vec<_>, _>>()
2493            .map_err(|e| Error::Runtime {
2494                message: format!("Failed to read Arrow IPC batches: {}", e),
2495            })?;
2496
2497        // Create a stream from the batches
2498        let stream = futures::stream::iter(batches.into_iter().map(Ok));
2499        let record_batch_stream = Box::pin(
2500            datafusion_physical_plan::stream::RecordBatchStreamAdapter::new(schema, stream),
2501        );
2502
2503        Ok(DatasetRecordBatchStream::new(record_batch_stream))
2504    }
2505
2506    /// Check whether the table uses V2 manifest paths.
2507    ///
2508    /// See [Self::migrate_manifest_paths_v2] and [ManifestNamingScheme] for
2509    /// more information.
2510    pub async fn uses_v2_manifest_paths(&self) -> Result<bool> {
2511        let dataset = self.dataset.get().await?;
2512        Ok(dataset.manifest_location().naming_scheme == ManifestNamingScheme::V2)
2513    }
2514
2515    /// Migrate the table to use the new manifest path scheme.
2516    ///
2517    /// This function will rename all V1 manifests to V2 manifest paths.
2518    /// These paths provide more efficient opening of datasets with many versions
2519    /// on object stores.
2520    ///
2521    /// This function is idempotent, and can be run multiple times without
2522    /// changing the state of the object store.
2523    ///
2524    /// However, it should not be run while other concurrent operations are happening.
2525    /// And it should also run until completion before resuming other operations.
2526    ///
2527    /// You can use [Self::uses_v2_manifest_paths] to check if the table is already
2528    /// using V2 manifest paths.
2529    pub async fn migrate_manifest_paths_v2(&self) -> Result<()> {
2530        let mut dataset = self.dataset.get_mut().await?;
2531        dataset.migrate_manifest_paths_v2().await?;
2532        Ok(())
2533    }
2534
2535    /// Get the table manifest
2536    pub async fn manifest(&self) -> Result<Manifest> {
2537        let dataset = self.dataset.get().await?;
2538        Ok(dataset.manifest().clone())
2539    }
2540
2541    /// Update key-value pairs in config.
2542    pub async fn update_config(
2543        &self,
2544        upsert_values: impl IntoIterator<Item = (String, String)>,
2545    ) -> Result<()> {
2546        let mut dataset = self.dataset.get_mut().await?;
2547        dataset.update_config(upsert_values).await?;
2548        Ok(())
2549    }
2550
2551    /// Delete keys from the config
2552    pub async fn delete_config_keys(&self, delete_keys: &[&str]) -> Result<()> {
2553        let mut dataset = self.dataset.get_mut().await?;
2554        // TODO: update this when we implement metadata APIs
2555        #[allow(deprecated)]
2556        dataset.delete_config_keys(delete_keys).await?;
2557        Ok(())
2558    }
2559
2560    /// Update schema metadata
2561    pub async fn replace_schema_metadata(
2562        &self,
2563        upsert_values: impl IntoIterator<Item = (String, String)>,
2564    ) -> Result<()> {
2565        let mut dataset = self.dataset.get_mut().await?;
2566        // TODO: update this when we implement metadata APIs
2567        #[allow(deprecated)]
2568        dataset.replace_schema_metadata(upsert_values).await?;
2569        Ok(())
2570    }
2571
2572    /// Update field metadata
2573    ///
2574    /// # Arguments:
2575    /// * `new_values` - An iterator of tuples where the first element is the
2576    ///   field id and the second element is a hashmap of metadata key-value
2577    ///   pairs.
2578    ///
2579    pub async fn replace_field_metadata(
2580        &self,
2581        new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
2582    ) -> Result<()> {
2583        let mut dataset = self.dataset.get_mut().await?;
2584        dataset.replace_field_metadata(new_values).await?;
2585        Ok(())
2586    }
2587}
2588
2589#[async_trait::async_trait]
2590impl BaseTable for NativeTable {
2591    fn as_any(&self) -> &dyn std::any::Any {
2592        self
2593    }
2594
2595    fn name(&self) -> &str {
2596        self.name.as_str()
2597    }
2598
2599    fn namespace(&self) -> &[String] {
2600        &self.namespace
2601    }
2602
2603    fn id(&self) -> &str {
2604        &self.id
2605    }
2606
2607    async fn version(&self) -> Result<u64> {
2608        Ok(self.dataset.get().await?.version().version)
2609    }
2610
2611    async fn checkout(&self, version: u64) -> Result<()> {
2612        self.dataset.as_time_travel(version).await
2613    }
2614
2615    async fn checkout_tag(&self, tag: &str) -> Result<()> {
2616        self.dataset.as_time_travel(tag).await
2617    }
2618
2619    async fn checkout_latest(&self) -> Result<()> {
2620        self.dataset
2621            .as_latest(self.read_consistency_interval)
2622            .await?;
2623        self.dataset.reload().await
2624    }
2625
2626    async fn list_versions(&self) -> Result<Vec<Version>> {
2627        Ok(self.dataset.get().await?.versions().await?)
2628    }
2629
2630    async fn restore(&self) -> Result<()> {
2631        let version =
2632            self.dataset
2633                .time_travel_version()
2634                .await
2635                .ok_or_else(|| Error::InvalidInput {
2636                    message: "you must run checkout before running restore".to_string(),
2637                })?;
2638        {
2639            // Use get_mut_unchecked as restore is the only "write" operation that is allowed
2640            // when the table is in time travel mode.
2641            // Also, drop the guard after .restore because as_latest will need it
2642            let mut dataset = self.dataset.get_mut_unchecked().await?;
2643            debug_assert_eq!(dataset.version().version, version);
2644            dataset.restore().await?;
2645        }
2646        self.dataset
2647            .as_latest(self.read_consistency_interval)
2648            .await?;
2649        Ok(())
2650    }
2651
2652    async fn schema(&self) -> Result<SchemaRef> {
2653        let lance_schema = self.dataset.get().await?.schema().clone();
2654        Ok(Arc::new(Schema::from(&lance_schema)))
2655    }
2656
2657    async fn table_definition(&self) -> Result<TableDefinition> {
2658        let schema = self.schema().await?;
2659        TableDefinition::try_from_rich_schema(schema)
2660    }
2661
2662    async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
2663        let dataset = self.dataset.get().await?;
2664        match filter {
2665            None => Ok(dataset.count_rows(None).await?),
2666            Some(Filter::Sql(sql)) => Ok(dataset.count_rows(Some(sql)).await?),
2667            Some(Filter::Datafusion(_)) => Err(Error::NotSupported {
2668                message: "Datafusion filters are not yet supported".to_string(),
2669            }),
2670        }
2671    }
2672
2673    async fn add(
2674        &self,
2675        add: AddDataBuilder<NoData>,
2676        data: Box<dyn RecordBatchReader + Send>,
2677    ) -> Result<AddResult> {
2678        let data = Box::new(MaybeEmbedded::try_new(
2679            data,
2680            self.table_definition().await?,
2681            add.embedding_registry,
2682        )?) as Box<dyn RecordBatchReader + Send>;
2683
2684        let lance_params = add.write_options.lance_write_params.unwrap_or(WriteParams {
2685            mode: match add.mode {
2686                AddDataMode::Append => WriteMode::Append,
2687                AddDataMode::Overwrite => WriteMode::Overwrite,
2688            },
2689            ..Default::default()
2690        });
2691
2692        let dataset = {
2693            // Limited scope for the mutable borrow of self.dataset avoids deadlock.
2694            let ds = self.dataset.get_mut().await?;
2695            InsertBuilder::new(Arc::new(ds.clone()))
2696                .with_params(&lance_params)
2697                .execute_stream(data)
2698                .await?
2699        };
2700        let version = dataset.manifest().version;
2701        self.dataset.set_latest(dataset).await;
2702        Ok(AddResult { version })
2703    }
2704
2705    async fn create_index(&self, opts: IndexBuilder) -> Result<()> {
2706        if opts.columns.len() != 1 {
2707            return Err(Error::Schema {
2708                message: "Multi-column (composite) indices are not yet supported".to_string(),
2709            });
2710        }
2711        let schema = self.schema().await?;
2712
2713        let field = schema.field_with_name(&opts.columns[0])?;
2714
2715        let lance_idx_params = self.make_index_params(field, opts.index.clone()).await?;
2716        let index_type = self.get_index_type_for_field(field, &opts.index);
2717        let columns = [field.name().as_str()];
2718        let mut dataset = self.dataset.get_mut().await?;
2719        let mut builder = dataset
2720            .create_index_builder(&columns, index_type, lance_idx_params.as_ref())
2721            .train(opts.train)
2722            .replace(opts.replace);
2723
2724        if let Some(name) = opts.name {
2725            builder = builder.name(name);
2726        }
2727        builder.await?;
2728        Ok(())
2729    }
2730
2731    async fn drop_index(&self, index_name: &str) -> Result<()> {
2732        let mut dataset = self.dataset.get_mut().await?;
2733        dataset.drop_index(index_name).await?;
2734        Ok(())
2735    }
2736
2737    async fn prewarm_index(&self, index_name: &str) -> Result<()> {
2738        let dataset = self.dataset.get().await?;
2739        Ok(dataset.prewarm_index(index_name).await?)
2740    }
2741
2742    async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult> {
2743        // Delegate to the submodule implementation
2744        update::execute_update(self, update).await
2745    }
2746
2747    async fn create_plan(
2748        &self,
2749        query: &AnyQuery,
2750        options: QueryExecutionOptions,
2751    ) -> Result<Arc<dyn ExecutionPlan>> {
2752        let query = match query {
2753            AnyQuery::VectorQuery(query) => query.clone(),
2754            AnyQuery::Query(query) => VectorQueryRequest::from_plain_query(query.clone()),
2755        };
2756
2757        let ds_ref = self.dataset.get().await?;
2758        let schema = ds_ref.schema();
2759        let mut column = query.column.clone();
2760
2761        let mut query_vector = query.query_vector.first().cloned();
2762        if query.query_vector.len() > 1 {
2763            if column.is_none() {
2764                // Infer a vector column with the same dimension of the query vector.
2765                let arrow_schema = Schema::from(ds_ref.schema());
2766                column = Some(default_vector_column(
2767                    &arrow_schema,
2768                    Some(query.query_vector[0].len() as i32),
2769                )?);
2770            }
2771            let vector_field = schema.field(column.as_ref().unwrap()).unwrap();
2772            if let DataType::List(_) = vector_field.data_type() {
2773                // it's multivector, then the vectors should be treated as single query
2774                // concatenate the vectors into a FixedSizeList<FixedSizeList<_>>
2775                // it's also possible to concatenate the vectors into a List<FixedSizeList<_>>,
2776                // but FixedSizeList is more efficient and easier to construct
2777                let vectors = query
2778                    .query_vector
2779                    .iter()
2780                    .map(|arr| arr.as_ref())
2781                    .collect::<Vec<_>>();
2782                let dim = vectors[0].len();
2783                let mut fsl_builder = FixedSizeListBuilder::with_capacity(
2784                    Float32Builder::with_capacity(dim),
2785                    dim as i32,
2786                    vectors.len(),
2787                );
2788                for vec in vectors {
2789                    fsl_builder
2790                        .values()
2791                        .append_slice(vec.as_primitive::<Float32Type>().values());
2792                    fsl_builder.append(true);
2793                }
2794                query_vector = Some(Arc::new(fsl_builder.finish()));
2795            } else {
2796                // If there are multiple query vectors, create a plan for each of them and union them.
2797                let query_vecs = query.query_vector.clone();
2798                let plan_futures = query_vecs
2799                    .into_iter()
2800                    .map(|query_vector| {
2801                        let mut sub_query = query.clone();
2802                        sub_query.query_vector = vec![query_vector];
2803                        let options_ref = options.clone();
2804                        async move {
2805                            self.create_plan(&AnyQuery::VectorQuery(sub_query), options_ref)
2806                                .await
2807                        }
2808                    })
2809                    .collect::<Vec<_>>();
2810                let plans = futures::future::try_join_all(plan_futures).await?;
2811                return Table::multi_vector_plan(plans);
2812            }
2813        }
2814
2815        let mut scanner: Scanner = ds_ref.scan();
2816
2817        if let Some(query_vector) = query_vector {
2818            // If there is a vector query, default to limit=10 if unspecified
2819            let column = if let Some(col) = column {
2820                col
2821            } else {
2822                // Infer a vector column with the same dimension of the query vector.
2823                let arrow_schema = Schema::from(ds_ref.schema());
2824                default_vector_column(&arrow_schema, Some(query_vector.len() as i32))?
2825            };
2826
2827            let (_, element_type) = lance::index::vector::utils::get_vector_type(schema, &column)?;
2828            let is_binary = matches!(element_type, DataType::UInt8);
2829            let top_k = query.base.limit.unwrap_or(DEFAULT_TOP_K) + query.base.offset.unwrap_or(0);
2830            if is_binary {
2831                let query_vector = arrow::compute::cast(&query_vector, &DataType::UInt8)?;
2832                let query_vector = query_vector.as_primitive::<UInt8Type>();
2833                scanner.nearest(&column, query_vector, top_k)?;
2834            } else {
2835                scanner.nearest(&column, query_vector.as_ref(), top_k)?;
2836            }
2837            scanner.minimum_nprobes(query.minimum_nprobes);
2838            if let Some(maximum_nprobes) = query.maximum_nprobes {
2839                scanner.maximum_nprobes(maximum_nprobes);
2840            }
2841        }
2842        scanner.limit(
2843            query.base.limit.map(|limit| limit as i64),
2844            query.base.offset.map(|offset| offset as i64),
2845        )?;
2846        if let Some(ef) = query.ef {
2847            scanner.ef(ef);
2848        }
2849        scanner.distance_range(query.lower_bound, query.upper_bound);
2850        scanner.use_index(query.use_index);
2851        scanner.prefilter(query.base.prefilter);
2852        match query.base.select {
2853            Select::Columns(ref columns) => {
2854                scanner.project(columns.as_slice())?;
2855            }
2856            Select::Dynamic(ref select_with_transform) => {
2857                scanner.project_with_transform(select_with_transform.as_slice())?;
2858            }
2859            Select::All => {}
2860        }
2861
2862        if query.base.with_row_id {
2863            scanner.with_row_id();
2864        }
2865
2866        scanner.batch_size(options.max_batch_length as usize);
2867
2868        if query.base.fast_search {
2869            scanner.fast_search();
2870        }
2871
2872        match &query.base.select {
2873            Select::Columns(select) => {
2874                scanner.project(select.as_slice())?;
2875            }
2876            Select::Dynamic(select_with_transform) => {
2877                scanner.project_with_transform(select_with_transform.as_slice())?;
2878            }
2879            Select::All => { /* Do nothing */ }
2880        }
2881
2882        if let Some(filter) = &query.base.filter {
2883            match filter {
2884                QueryFilter::Sql(sql) => {
2885                    scanner.filter(sql)?;
2886                }
2887                QueryFilter::Substrait(substrait) => {
2888                    scanner.filter_substrait(substrait)?;
2889                }
2890                QueryFilter::Datafusion(expr) => {
2891                    scanner.filter_expr(expr.clone());
2892                }
2893            }
2894        }
2895
2896        if let Some(fts) = &query.base.full_text_search {
2897            scanner.full_text_search(fts.clone())?;
2898        }
2899
2900        if let Some(refine_factor) = query.refine_factor {
2901            scanner.refine(refine_factor);
2902        }
2903
2904        if let Some(distance_type) = query.distance_type {
2905            scanner.distance_metric(distance_type.into());
2906        }
2907
2908        if query.base.disable_scoring_autoprojection {
2909            scanner.disable_scoring_autoprojection();
2910        }
2911
2912        Ok(scanner.create_plan().await?)
2913    }
2914
2915    async fn query(
2916        &self,
2917        query: &AnyQuery,
2918        options: QueryExecutionOptions,
2919    ) -> Result<DatasetRecordBatchStream> {
2920        // If namespace client is configured, use server-side query execution
2921        if let Some(ref namespace_client) = self.namespace_client {
2922            return self
2923                .namespace_query(namespace_client.clone(), query, options)
2924                .await;
2925        }
2926        self.generic_query(query, options).await
2927    }
2928
2929    async fn analyze_plan(
2930        &self,
2931        query: &AnyQuery,
2932        options: QueryExecutionOptions,
2933    ) -> Result<String> {
2934        let plan = self.create_plan(query, options).await?;
2935        Ok(lance_analyze_plan(plan, Default::default()).await?)
2936    }
2937
2938    async fn merge_insert(
2939        &self,
2940        params: MergeInsertBuilder,
2941        new_data: Box<dyn RecordBatchReader + Send>,
2942    ) -> Result<MergeResult> {
2943        let dataset = Arc::new(self.dataset.get().await?.clone());
2944        let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
2945        match (
2946            params.when_matched_update_all,
2947            params.when_matched_update_all_filt,
2948        ) {
2949            (false, _) => builder.when_matched(WhenMatched::DoNothing),
2950            (true, None) => builder.when_matched(WhenMatched::UpdateAll),
2951            (true, Some(filt)) => builder.when_matched(WhenMatched::update_if(&dataset, &filt)?),
2952        };
2953        if params.when_not_matched_insert_all {
2954            builder.when_not_matched(lance::dataset::WhenNotMatched::InsertAll);
2955        } else {
2956            builder.when_not_matched(lance::dataset::WhenNotMatched::DoNothing);
2957        }
2958        if params.when_not_matched_by_source_delete {
2959            let behavior = if let Some(filter) = params.when_not_matched_by_source_delete_filt {
2960                WhenNotMatchedBySource::delete_if(dataset.as_ref(), &filter)?
2961            } else {
2962                WhenNotMatchedBySource::Delete
2963            };
2964            builder.when_not_matched_by_source(behavior);
2965        } else {
2966            builder.when_not_matched_by_source(WhenNotMatchedBySource::Keep);
2967        }
2968        builder.use_index(params.use_index);
2969
2970        let future = if let Some(timeout) = params.timeout {
2971            // The default retry timeout is 30s, so we pass the full timeout down
2972            // as well in case it is longer than that.
2973            let future = builder
2974                .retry_timeout(timeout)
2975                .try_build()?
2976                .execute_reader(new_data);
2977            Either::Left(tokio::time::timeout(timeout, future).map(|res| match res {
2978                Ok(Ok((new_dataset, stats))) => Ok((new_dataset, stats)),
2979                Ok(Err(e)) => Err(e.into()),
2980                Err(_) => Err(Error::Runtime {
2981                    message: "merge insert timed out".to_string(),
2982                }),
2983            }))
2984        } else {
2985            let job = builder.try_build()?;
2986            Either::Right(job.execute_reader(new_data).map_err(|e| e.into()))
2987        };
2988        let (new_dataset, stats) = future.await?;
2989        let version = new_dataset.manifest().version;
2990        self.dataset.set_latest(new_dataset.as_ref().clone()).await;
2991        Ok(MergeResult {
2992            version,
2993            num_updated_rows: stats.num_updated_rows,
2994            num_inserted_rows: stats.num_inserted_rows,
2995            num_deleted_rows: stats.num_deleted_rows,
2996            num_attempts: stats.num_attempts,
2997        })
2998    }
2999
3000    /// Delete rows from the table
3001    async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
3002        // Delegate to the submodule implementation
3003        delete::execute_delete(self, predicate).await
3004    }
3005
3006    async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
3007        Ok(Box::new(NativeTags {
3008            dataset: self.dataset.clone(),
3009        }))
3010    }
3011
3012    async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
3013        let mut stats = OptimizeStats {
3014            compaction: None,
3015            prune: None,
3016        };
3017        match action {
3018            OptimizeAction::All => {
3019                stats.compaction = self
3020                    .optimize(OptimizeAction::Compact {
3021                        options: CompactionOptions::default(),
3022                        remap_options: None,
3023                    })
3024                    .await?
3025                    .compaction;
3026                stats.prune = self
3027                    .optimize(OptimizeAction::Prune {
3028                        older_than: None,
3029                        delete_unverified: None,
3030                        error_if_tagged_old_versions: None,
3031                    })
3032                    .await?
3033                    .prune;
3034                self.optimize(OptimizeAction::Index(OptimizeOptions::default()))
3035                    .await?;
3036            }
3037            OptimizeAction::Compact {
3038                options,
3039                remap_options,
3040            } => {
3041                stats.compaction = Some(self.compact_files(options, remap_options).await?);
3042            }
3043            OptimizeAction::Prune {
3044                older_than,
3045                delete_unverified,
3046                error_if_tagged_old_versions,
3047            } => {
3048                stats.prune = Some(
3049                    self.cleanup_old_versions(
3050                        older_than.unwrap_or(Duration::try_days(7).expect("valid delta")),
3051                        delete_unverified,
3052                        error_if_tagged_old_versions,
3053                    )
3054                    .await?,
3055                );
3056            }
3057            OptimizeAction::Index(options) => {
3058                self.optimize_indices(&options).await?;
3059            }
3060        }
3061        Ok(stats)
3062    }
3063
3064    async fn add_columns(
3065        &self,
3066        transforms: NewColumnTransform,
3067        read_columns: Option<Vec<String>>,
3068    ) -> Result<AddColumnsResult> {
3069        schema_evolution::execute_add_columns(self, transforms, read_columns).await
3070    }
3071
3072    async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult> {
3073        schema_evolution::execute_alter_columns(self, alterations).await
3074    }
3075
3076    async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
3077        schema_evolution::execute_drop_columns(self, columns).await
3078    }
3079
3080    async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
3081        let dataset = self.dataset.get().await?;
3082        let indices = dataset.load_indices().await?;
3083        let results = futures::stream::iter(indices.as_slice()).then(|idx| async {
3084
3085            // skip Lance internal indexes
3086            if idx.name == FRAG_REUSE_INDEX_NAME {
3087                return None;
3088            }
3089
3090            let stats = match dataset.index_statistics(idx.name.as_str()).await {
3091                Ok(stats) => stats,
3092                Err(e) => {
3093                    log::warn!("Failed to get statistics for index {} ({}): {}", idx.name, idx.uuid, e);
3094                    return None;
3095                }
3096            };
3097
3098            let stats: serde_json::Value = match serde_json::from_str(&stats) {
3099                Ok(stats) => stats,
3100                Err(e) => {
3101                    log::warn!("Failed to deserialize index statistics for index {} ({}): {}", idx.name, idx.uuid, e);
3102                    return None;
3103                }
3104            };
3105
3106            let Some(index_type) = stats.get("index_type").and_then(|v| v.as_str()) else {
3107                log::warn!("Index statistics was missing 'index_type' field for index {} ({})", idx.name, idx.uuid);
3108                return None;
3109            };
3110
3111            let index_type: crate::index::IndexType = match index_type.parse() {
3112                Ok(index_type) => index_type,
3113                Err(e) => {
3114                    log::warn!("Failed to parse index type for index {} ({}): {}", idx.name, idx.uuid, e);
3115                    return None;
3116                }
3117            };
3118
3119            let mut columns = Vec::with_capacity(idx.fields.len());
3120            for field_id in &idx.fields {
3121                let Some(field) = dataset.schema().field_by_id(*field_id) else {
3122                    log::warn!("The index {} ({}) referenced a field with id {} which does not exist in the schema", idx.name, idx.uuid, field_id);
3123                    return None;
3124                };
3125                columns.push(field.name.clone());
3126            }
3127
3128            let name = idx.name.clone();
3129            Some(IndexConfig { index_type, columns, name })
3130        }).collect::<Vec<_>>().await;
3131
3132        Ok(results.into_iter().flatten().collect())
3133    }
3134
3135    async fn uri(&self) -> Result<String> {
3136        Ok(self.uri.clone())
3137    }
3138
3139    async fn storage_options(&self) -> Option<HashMap<String, String>> {
3140        self.initial_storage_options().await
3141    }
3142
3143    async fn initial_storage_options(&self) -> Option<HashMap<String, String>> {
3144        self.dataset
3145            .get()
3146            .await
3147            .ok()
3148            .and_then(|dataset| dataset.initial_storage_options().cloned())
3149    }
3150
3151    async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>> {
3152        let dataset = self.dataset.get().await?;
3153        Ok(dataset.latest_storage_options().await?.map(|o| o.0))
3154    }
3155
3156    async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>> {
3157        let stats = match self
3158            .dataset
3159            .get()
3160            .await?
3161            .index_statistics(index_name.as_ref())
3162            .await
3163        {
3164            Ok(stats) => stats,
3165            Err(lance_core::Error::IndexNotFound { .. }) => return Ok(None),
3166            Err(e) => return Err(Error::from(e)),
3167        };
3168
3169        let mut stats: IndexStatisticsImpl =
3170            serde_json::from_str(&stats).map_err(|e| Error::InvalidInput {
3171                message: format!("error deserializing index statistics: {}", e),
3172            })?;
3173
3174        let first_index = stats.indices.pop().ok_or_else(|| Error::InvalidInput {
3175            message: "index statistics is empty".to_string(),
3176        })?;
3177        // Index type should be present at one of the levels.
3178        let index_type =
3179            stats
3180                .index_type
3181                .or(first_index.index_type)
3182                .ok_or_else(|| Error::InvalidInput {
3183                    message: "index statistics was missing index type".to_string(),
3184                })?;
3185        let loss = stats
3186            .indices
3187            .iter()
3188            .map(|index| index.loss.unwrap_or_default())
3189            .sum::<f64>();
3190
3191        let loss = first_index.loss.map(|first_loss| first_loss + loss);
3192        Ok(Some(IndexStatistics {
3193            num_indexed_rows: stats.num_indexed_rows,
3194            num_unindexed_rows: stats.num_unindexed_rows,
3195            index_type,
3196            distance_type: first_index.metric_type,
3197            num_indices: stats.num_indices,
3198            loss,
3199        }))
3200    }
3201
3202    /// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
3203    /// are not fully indexed within the timeout.
3204    async fn wait_for_index(
3205        &self,
3206        index_names: &[&str],
3207        timeout: std::time::Duration,
3208    ) -> Result<()> {
3209        wait_for_index(self, index_names, timeout).await
3210    }
3211
3212    async fn stats(&self) -> Result<TableStatistics> {
3213        let num_rows = self.count_rows(None).await?;
3214        let num_indices = self.list_indices().await?.len();
3215        let ds = self.dataset.get().await?;
3216        let ds_clone = (*ds).clone();
3217        let ds_stats = Arc::new(ds_clone).calculate_data_stats().await?;
3218        let total_bytes = ds_stats.fields.iter().map(|f| f.bytes_on_disk).sum::<u64>() as usize;
3219
3220        let frags = ds.get_fragments();
3221        let mut sorted_sizes = join_all(
3222            frags
3223                .iter()
3224                .map(|frag| async move { frag.physical_rows().await.unwrap_or(0) }),
3225        )
3226        .await;
3227        sorted_sizes.sort();
3228
3229        let small_frag_threshold = 100000;
3230        let num_fragments = sorted_sizes.len();
3231        let num_small_fragments = sorted_sizes
3232            .iter()
3233            .filter(|&&size| size < small_frag_threshold)
3234            .count();
3235
3236        let p25 = *sorted_sizes.get(num_fragments / 4).unwrap_or(&0);
3237        let p50 = *sorted_sizes.get(num_fragments / 2).unwrap_or(&0);
3238        let p75 = *sorted_sizes.get(num_fragments * 3 / 4).unwrap_or(&0);
3239        let p99 = *sorted_sizes.get(num_fragments * 99 / 100).unwrap_or(&0);
3240        let min = sorted_sizes.first().copied().unwrap_or(0);
3241        let max = sorted_sizes.last().copied().unwrap_or(0);
3242        let mean = if num_fragments == 0 {
3243            0
3244        } else {
3245            sorted_sizes.iter().copied().sum::<usize>() / num_fragments
3246        };
3247
3248        let frag_stats = FragmentStatistics {
3249            num_fragments,
3250            num_small_fragments,
3251            lengths: FragmentSummaryStats {
3252                min,
3253                max,
3254                mean,
3255                p25,
3256                p50,
3257                p75,
3258                p99,
3259            },
3260        };
3261        let stats = TableStatistics {
3262            total_bytes,
3263            num_rows,
3264            num_indices,
3265            fragment_stats: frag_stats,
3266        };
3267        Ok(stats)
3268    }
3269
3270    async fn create_insert_exec(
3271        &self,
3272        input: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
3273        write_params: WriteParams,
3274    ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>> {
3275        let ds = self.dataset.get().await?;
3276        let dataset = Arc::new((*ds).clone());
3277        Ok(Arc::new(datafusion::insert::InsertExec::new(
3278            self.dataset.clone(),
3279            dataset,
3280            input,
3281            write_params,
3282        )))
3283    }
3284}
3285
3286#[skip_serializing_none]
3287#[derive(Debug, Deserialize, PartialEq)]
3288pub struct TableStatistics {
3289    /// The total number of bytes in the table
3290    pub total_bytes: usize,
3291
3292    /// The number of rows in the table
3293    pub num_rows: usize,
3294
3295    /// The number of indices in the table
3296    pub num_indices: usize,
3297
3298    /// Statistics on table fragments
3299    pub fragment_stats: FragmentStatistics,
3300}
3301
3302#[skip_serializing_none]
3303#[derive(Debug, Deserialize, PartialEq)]
3304pub struct FragmentStatistics {
3305    /// The number of fragments in the table
3306    pub num_fragments: usize,
3307
3308    /// The number of uncompacted fragments in the table
3309    pub num_small_fragments: usize,
3310
3311    /// Statistics on the number of rows in the table fragments
3312    pub lengths: FragmentSummaryStats,
3313    // todo: add size statistics
3314    // /// Statistics on the number of bytes in the table fragments
3315    // sizes: FragmentStats,
3316}
3317
3318#[skip_serializing_none]
3319#[derive(Debug, Deserialize, PartialEq)]
3320pub struct FragmentSummaryStats {
3321    pub min: usize,
3322    pub max: usize,
3323    pub mean: usize,
3324    pub p25: usize,
3325    pub p50: usize,
3326    pub p75: usize,
3327    pub p99: usize,
3328}
3329
3330#[cfg(test)]
3331#[allow(deprecated)]
3332mod tests {
3333    use std::sync::atomic::{AtomicBool, Ordering};
3334    use std::sync::Arc;
3335    use std::time::Duration;
3336
3337    use arrow_array::{
3338        builder::{ListBuilder, StringBuilder},
3339        Array, BooleanArray, FixedSizeListArray, Float32Array, Int32Array, LargeStringArray,
3340        RecordBatch, RecordBatchIterator, RecordBatchReader, StringArray,
3341    };
3342    use arrow_array::{BinaryArray, LargeBinaryArray};
3343    use arrow_data::ArrayDataBuilder;
3344    use arrow_schema::{DataType, Field, Schema};
3345    use lance::dataset::WriteMode;
3346    use lance::io::{ObjectStoreParams, WrappingObjectStore};
3347    use lance::Dataset;
3348    use rand::Rng;
3349    use tempfile::tempdir;
3350
3351    use super::*;
3352    use crate::connect;
3353    use crate::connection::ConnectBuilder;
3354    use crate::index::scalar::{BTreeIndexBuilder, BitmapIndexBuilder};
3355    use crate::index::vector::{IvfHnswPqIndexBuilder, IvfHnswSqIndexBuilder};
3356
3357    #[tokio::test]
3358    async fn test_open() {
3359        let tmp_dir = tempdir().unwrap();
3360        let dataset_path = tmp_dir.path().join("test.lance");
3361
3362        let batches = make_test_batches();
3363        Dataset::write(batches, dataset_path.to_str().unwrap(), None)
3364            .await
3365            .unwrap();
3366
3367        let table = NativeTable::open(dataset_path.to_str().unwrap())
3368            .await
3369            .unwrap();
3370
3371        assert_eq!(table.name, "test")
3372    }
3373
3374    #[tokio::test]
3375    async fn test_open_not_found() {
3376        let tmp_dir = tempdir().unwrap();
3377        let uri = tmp_dir.path().to_str().unwrap();
3378        let table = NativeTable::open(uri).await;
3379        assert!(matches!(table.unwrap_err(), Error::TableNotFound { .. }));
3380    }
3381
3382    #[test]
3383    #[cfg(not(windows))]
3384    fn test_object_store_path() {
3385        use std::path::Path as StdPath;
3386        let p = StdPath::new("s3://bucket/path/to/file");
3387        let c = p.join("subfile");
3388        assert_eq!(c.to_str().unwrap(), "s3://bucket/path/to/file/subfile");
3389    }
3390
3391    #[tokio::test]
3392    async fn test_count_rows() {
3393        let tmp_dir = tempdir().unwrap();
3394        let uri = tmp_dir.path().to_str().unwrap();
3395
3396        let batches = make_test_batches();
3397        let batches = Box::new(batches) as Box<dyn RecordBatchReader + Send>;
3398        let table = NativeTable::create(uri, "test", vec![], batches, None, None, None, None)
3399            .await
3400            .unwrap();
3401
3402        assert_eq!(table.count_rows(None).await.unwrap(), 10);
3403        assert_eq!(
3404            table
3405                .count_rows(Some(Filter::Sql("i >= 5".to_string())))
3406                .await
3407                .unwrap(),
3408            5
3409        );
3410    }
3411
3412    #[tokio::test]
3413    async fn test_add() {
3414        let tmp_dir = tempdir().unwrap();
3415        let uri = tmp_dir.path().to_str().unwrap();
3416        let conn = connect(uri).execute().await.unwrap();
3417
3418        let batches = make_test_batches();
3419        let schema = batches.schema().clone();
3420        let table = conn.create_table("test", batches).execute().await.unwrap();
3421        assert_eq!(table.count_rows(None).await.unwrap(), 10);
3422
3423        let new_batches = RecordBatchIterator::new(
3424            vec![RecordBatch::try_new(
3425                schema.clone(),
3426                vec![Arc::new(Int32Array::from_iter_values(100..110))],
3427            )
3428            .unwrap()]
3429            .into_iter()
3430            .map(Ok),
3431            schema.clone(),
3432        );
3433
3434        table.add(new_batches).execute().await.unwrap();
3435        assert_eq!(table.count_rows(None).await.unwrap(), 20);
3436        assert_eq!(table.name(), "test");
3437    }
3438
3439    #[tokio::test]
3440    async fn test_merge_insert() {
3441        let tmp_dir = tempdir().unwrap();
3442        let uri = tmp_dir.path().to_str().unwrap();
3443        let conn = connect(uri).execute().await.unwrap();
3444
3445        // Create a dataset with i=0..10
3446        let batches = merge_insert_test_batches(0, 0);
3447        let table = conn
3448            .create_table("my_table", batches)
3449            .execute()
3450            .await
3451            .unwrap();
3452        assert_eq!(table.count_rows(None).await.unwrap(), 10);
3453
3454        // Create new data with i=5..15
3455        let new_batches = Box::new(merge_insert_test_batches(5, 1));
3456
3457        // Perform a "insert if not exists"
3458        let mut merge_insert_builder = table.merge_insert(&["i"]);
3459        merge_insert_builder.when_not_matched_insert_all();
3460        let result = merge_insert_builder.execute(new_batches).await.unwrap();
3461        // Only 5 rows should actually be inserted
3462        assert_eq!(table.count_rows(None).await.unwrap(), 15);
3463        assert_eq!(result.num_inserted_rows, 5);
3464        assert_eq!(result.num_updated_rows, 0);
3465        assert_eq!(result.num_deleted_rows, 0);
3466        assert_eq!(result.num_attempts, 1);
3467
3468        // Create new data with i=15..25 (no id matches)
3469        let new_batches = Box::new(merge_insert_test_batches(15, 2));
3470        // Perform a "bulk update" (should not affect anything)
3471        let mut merge_insert_builder = table.merge_insert(&["i"]);
3472        merge_insert_builder.when_matched_update_all(None);
3473        merge_insert_builder.execute(new_batches).await.unwrap();
3474        // No new rows should have been inserted
3475        assert_eq!(table.count_rows(None).await.unwrap(), 15);
3476        assert_eq!(
3477            table.count_rows(Some("age = 2".to_string())).await.unwrap(),
3478            0
3479        );
3480
3481        // Conditional update that only replaces the age=0 data
3482        let new_batches = Box::new(merge_insert_test_batches(5, 3));
3483        let mut merge_insert_builder = table.merge_insert(&["i"]);
3484        merge_insert_builder.when_matched_update_all(Some("target.age = 0".to_string()));
3485        merge_insert_builder.execute(new_batches).await.unwrap();
3486        assert_eq!(
3487            table.count_rows(Some("age = 3".to_string())).await.unwrap(),
3488            5
3489        );
3490    }
3491
3492    #[tokio::test]
3493    async fn test_merge_insert_use_index() {
3494        let tmp_dir = tempdir().unwrap();
3495        let uri = tmp_dir.path().to_str().unwrap();
3496        let conn = connect(uri).execute().await.unwrap();
3497
3498        // Create a dataset with i=0..10
3499        let batches = merge_insert_test_batches(0, 0);
3500        let table = conn
3501            .create_table("my_table", batches)
3502            .execute()
3503            .await
3504            .unwrap();
3505        assert_eq!(table.count_rows(None).await.unwrap(), 10);
3506
3507        // Test use_index=true (default behavior)
3508        let new_batches = Box::new(merge_insert_test_batches(5, 1));
3509        let mut merge_insert_builder = table.merge_insert(&["i"]);
3510        merge_insert_builder.when_not_matched_insert_all();
3511        merge_insert_builder.use_index(true);
3512        merge_insert_builder.execute(new_batches).await.unwrap();
3513        assert_eq!(table.count_rows(None).await.unwrap(), 15);
3514
3515        // Test use_index=false (force table scan)
3516        let new_batches = Box::new(merge_insert_test_batches(15, 2));
3517        let mut merge_insert_builder = table.merge_insert(&["i"]);
3518        merge_insert_builder.when_not_matched_insert_all();
3519        merge_insert_builder.use_index(false);
3520        merge_insert_builder.execute(new_batches).await.unwrap();
3521        assert_eq!(table.count_rows(None).await.unwrap(), 25);
3522    }
3523
3524    #[tokio::test]
3525    async fn test_add_overwrite() {
3526        let tmp_dir = tempdir().unwrap();
3527        let uri = tmp_dir.path().to_str().unwrap();
3528        let conn = connect(uri).execute().await.unwrap();
3529
3530        let batches = make_test_batches();
3531        let schema = batches.schema().clone();
3532        let table = conn.create_table("test", batches).execute().await.unwrap();
3533        assert_eq!(table.count_rows(None).await.unwrap(), 10);
3534
3535        let batches = vec![RecordBatch::try_new(
3536            schema.clone(),
3537            vec![Arc::new(Int32Array::from_iter_values(100..110))],
3538        )
3539        .unwrap()]
3540        .into_iter()
3541        .map(Ok);
3542
3543        let new_batches = RecordBatchIterator::new(batches.clone(), schema.clone());
3544
3545        // Can overwrite using AddDataOptions::mode
3546        table
3547            .add(new_batches)
3548            .mode(AddDataMode::Overwrite)
3549            .execute()
3550            .await
3551            .unwrap();
3552        assert_eq!(table.count_rows(None).await.unwrap(), 10);
3553        assert_eq!(table.name(), "test");
3554
3555        // Can overwrite using underlying WriteParams (which
3556        // take precedence over AddDataOptions::mode)
3557
3558        let param: WriteParams = WriteParams {
3559            mode: WriteMode::Overwrite,
3560            ..Default::default()
3561        };
3562
3563        let new_batches = RecordBatchIterator::new(batches.clone(), schema.clone());
3564        table
3565            .add(new_batches)
3566            .write_options(WriteOptions {
3567                lance_write_params: Some(param),
3568            })
3569            .mode(AddDataMode::Append)
3570            .execute()
3571            .await
3572            .unwrap();
3573        assert_eq!(table.count_rows(None).await.unwrap(), 10);
3574        assert_eq!(table.name(), "test");
3575    }
3576
3577    #[derive(Default, Debug)]
3578    struct NoOpCacheWrapper {
3579        called: AtomicBool,
3580    }
3581
3582    impl NoOpCacheWrapper {
3583        fn called(&self) -> bool {
3584            self.called.load(Ordering::Relaxed)
3585        }
3586    }
3587
3588    impl WrappingObjectStore for NoOpCacheWrapper {
3589        fn wrap(
3590            &self,
3591            _store_prefix: &str,
3592            original: Arc<dyn object_store::ObjectStore>,
3593        ) -> Arc<dyn object_store::ObjectStore> {
3594            self.called.store(true, Ordering::Relaxed);
3595            original
3596        }
3597    }
3598
3599    #[tokio::test]
3600    async fn test_open_table_options() {
3601        let tmp_dir = tempdir().unwrap();
3602        let dataset_path = tmp_dir.path().join("test.lance");
3603        let uri = dataset_path.to_str().unwrap();
3604        let conn = connect(uri).execute().await.unwrap();
3605
3606        let batches = make_test_batches();
3607
3608        conn.create_table("my_table", batches)
3609            .execute()
3610            .await
3611            .unwrap();
3612
3613        let wrapper = Arc::new(NoOpCacheWrapper::default());
3614
3615        let object_store_params = ObjectStoreParams {
3616            object_store_wrapper: Some(wrapper.clone()),
3617            ..Default::default()
3618        };
3619        let param = ReadParams {
3620            store_options: Some(object_store_params),
3621            ..Default::default()
3622        };
3623        assert!(!wrapper.called());
3624        conn.open_table("my_table")
3625            .lance_read_params(param)
3626            .execute()
3627            .await
3628            .unwrap();
3629        assert!(wrapper.called());
3630    }
3631
3632    fn merge_insert_test_batches(
3633        offset: i32,
3634        age: i32,
3635    ) -> impl RecordBatchReader + Send + Sync + 'static {
3636        let schema = Arc::new(Schema::new(vec![
3637            Field::new("i", DataType::Int32, false),
3638            Field::new("age", DataType::Int32, false),
3639        ]));
3640        RecordBatchIterator::new(
3641            vec![RecordBatch::try_new(
3642                schema.clone(),
3643                vec![
3644                    Arc::new(Int32Array::from_iter_values(offset..(offset + 10))),
3645                    Arc::new(Int32Array::from_iter_values(std::iter::repeat_n(age, 10))),
3646                ],
3647            )],
3648            schema,
3649        )
3650    }
3651
3652    fn make_test_batches() -> impl RecordBatchReader + Send + Sync + 'static {
3653        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
3654        RecordBatchIterator::new(
3655            vec![RecordBatch::try_new(
3656                schema.clone(),
3657                vec![Arc::new(Int32Array::from_iter_values(0..10))],
3658            )],
3659            schema,
3660        )
3661    }
3662
3663    #[tokio::test]
3664    async fn test_tags() {
3665        let tmp_dir = tempdir().unwrap();
3666        let uri = tmp_dir.path().to_str().unwrap();
3667
3668        let conn = ConnectBuilder::new(uri)
3669            .read_consistency_interval(Duration::from_secs(0))
3670            .execute()
3671            .await
3672            .unwrap();
3673        let table = conn
3674            .create_table("my_table", some_sample_data())
3675            .execute()
3676            .await
3677            .unwrap();
3678        assert_eq!(table.version().await.unwrap(), 1);
3679        table.add(some_sample_data()).execute().await.unwrap();
3680        assert_eq!(table.version().await.unwrap(), 2);
3681        let mut tags_manager = table.tags().await.unwrap();
3682        let tags = tags_manager.list().await.unwrap();
3683        assert!(tags.is_empty(), "Tags should be empty initially");
3684        let tag1 = "tag1";
3685        tags_manager.create(tag1, 1).await.unwrap();
3686        assert_eq!(tags_manager.get_version(tag1).await.unwrap(), 1);
3687        let tags = tags_manager.list().await.unwrap();
3688        assert_eq!(tags.len(), 1);
3689        assert!(tags.contains_key(tag1));
3690        assert_eq!(tags.get(tag1).unwrap().version, 1);
3691        tags_manager.create("tag2", 2).await.unwrap();
3692        assert_eq!(tags_manager.get_version("tag2").await.unwrap(), 2);
3693        let tags = tags_manager.list().await.unwrap();
3694        assert_eq!(tags.len(), 2);
3695        assert!(tags.contains_key(tag1));
3696        assert_eq!(tags.get(tag1).unwrap().version, 1);
3697        assert!(tags.contains_key("tag2"));
3698        assert_eq!(tags.get("tag2").unwrap().version, 2);
3699        // Test update and delete
3700        table.add(some_sample_data()).execute().await.unwrap();
3701        tags_manager.update(tag1, 3).await.unwrap();
3702        assert_eq!(tags_manager.get_version(tag1).await.unwrap(), 3);
3703        tags_manager.delete("tag2").await.unwrap();
3704        let tags = tags_manager.list().await.unwrap();
3705        assert_eq!(tags.len(), 1);
3706        assert!(tags.contains_key(tag1));
3707        assert_eq!(tags.get(tag1).unwrap().version, 3);
3708        // Test checkout tag
3709        table.add(some_sample_data()).execute().await.unwrap();
3710        assert_eq!(table.version().await.unwrap(), 4);
3711        table.checkout_tag(tag1).await.unwrap();
3712        assert_eq!(table.version().await.unwrap(), 3);
3713        table.checkout_latest().await.unwrap();
3714        assert_eq!(table.version().await.unwrap(), 4);
3715    }
3716
3717    #[tokio::test]
3718    async fn test_create_index() {
3719        use arrow_array::RecordBatch;
3720        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3721        use rand;
3722        use std::iter::repeat_with;
3723
3724        use arrow_array::Float32Array;
3725
3726        let tmp_dir = tempdir().unwrap();
3727        let uri = tmp_dir.path().to_str().unwrap();
3728        let conn = connect(uri).execute().await.unwrap();
3729
3730        let dimension = 16;
3731        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3732            "embeddings",
3733            DataType::FixedSizeList(
3734                Arc::new(Field::new("item", DataType::Float32, true)),
3735                dimension,
3736            ),
3737            false,
3738        )]));
3739
3740        let mut rng = rand::thread_rng();
3741        let float_arr = Float32Array::from(
3742            repeat_with(|| rng.gen::<f32>())
3743                .take(512 * dimension as usize)
3744                .collect::<Vec<f32>>(),
3745        );
3746
3747        let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3748        let batches = RecordBatchIterator::new(
3749            vec![RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap()]
3750                .into_iter()
3751                .map(Ok),
3752            schema,
3753        );
3754
3755        let table = conn.create_table("test", batches).execute().await.unwrap();
3756
3757        assert_eq!(table.index_stats("my_index").await.unwrap(), None);
3758
3759        table
3760            .create_index(&["embeddings"], Index::Auto)
3761            .execute()
3762            .await
3763            .unwrap();
3764
3765        let index_configs = table.list_indices().await.unwrap();
3766        assert_eq!(index_configs.len(), 1);
3767        let index = index_configs.into_iter().next().unwrap();
3768        assert_eq!(index.index_type, crate::index::IndexType::IvfPq);
3769        assert_eq!(index.columns, vec!["embeddings".to_string()]);
3770        assert_eq!(table.count_rows(None).await.unwrap(), 512);
3771        assert_eq!(table.name(), "test");
3772
3773        let indices = table.as_native().unwrap().load_indices().await.unwrap();
3774        let index_name = &indices[0].index_name;
3775        let stats = table.index_stats(index_name).await.unwrap().unwrap();
3776        assert_eq!(stats.num_indexed_rows, 512);
3777        assert_eq!(stats.num_unindexed_rows, 0);
3778        assert_eq!(stats.index_type, crate::index::IndexType::IvfPq);
3779        assert_eq!(stats.distance_type, Some(crate::DistanceType::L2));
3780        assert!(stats.loss.is_some());
3781
3782        table.drop_index(index_name).await.unwrap();
3783        assert_eq!(table.list_indices().await.unwrap().len(), 0);
3784    }
3785
3786    #[tokio::test]
3787    async fn test_ivf_pq_uses_default_partition_size_for_num_partitions() {
3788        use arrow_array::{Float32Array, RecordBatch};
3789        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3790
3791        use crate::index::vector::IvfPqIndexBuilder;
3792
3793        let tmp_dir = tempdir().unwrap();
3794        let uri = tmp_dir.path().to_str().unwrap();
3795        let conn = connect(uri).execute().await.unwrap();
3796
3797        const PARTITION_SIZE: usize = 8192;
3798        let num_rows = PARTITION_SIZE * 2;
3799        let dimension = 8usize;
3800        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3801            "embeddings",
3802            DataType::FixedSizeList(
3803                Arc::new(Field::new("item", DataType::Float32, true)),
3804                dimension as i32,
3805            ),
3806            false,
3807        )]));
3808
3809        let float_arr =
3810            Float32Array::from_iter_values((0..(num_rows * dimension)).map(|v| v as f32));
3811        let vectors = Arc::new(create_fixed_size_list(float_arr, dimension as i32).unwrap());
3812        let batches = RecordBatchIterator::new(
3813            vec![RecordBatch::try_new(schema.clone(), vec![vectors]).unwrap()]
3814                .into_iter()
3815                .map(Ok),
3816            schema,
3817        );
3818
3819        let table = conn.create_table("test", batches).execute().await.unwrap();
3820        let native_table = table.as_native().unwrap();
3821        let builder = IvfPqIndexBuilder::default();
3822        table
3823            .create_index(&["embeddings"], Index::IvfPq(builder))
3824            .execute()
3825            .await
3826            .unwrap();
3827        table
3828            .wait_for_index(&["embeddings_idx"], std::time::Duration::from_secs(30))
3829            .await
3830            .unwrap();
3831
3832        use lance::index::vector::ivf::v2::IvfPq as LanceIvfPq;
3833        use lance::index::DatasetIndexInternalExt;
3834        use lance_index::metrics::NoOpMetricsCollector;
3835        use lance_index::vector::VectorIndex as LanceVectorIndex;
3836
3837        let indices = native_table.load_indices().await.unwrap();
3838        let index_uuid = indices[0].index_uuid.clone();
3839
3840        let dataset_guard = native_table.dataset.get().await.unwrap();
3841        let dataset = (*dataset_guard).clone();
3842        drop(dataset_guard);
3843
3844        let lance_index = dataset
3845            .open_vector_index("embeddings", &index_uuid, &NoOpMetricsCollector)
3846            .await
3847            .unwrap();
3848        let ivf_index = lance_index
3849            .as_any()
3850            .downcast_ref::<LanceIvfPq>()
3851            .expect("expected IvfPq index");
3852        let partition_count = ivf_index.ivf_model().num_partitions();
3853
3854        let expected_partitions = num_rows / PARTITION_SIZE;
3855        assert_eq!(partition_count, expected_partitions);
3856    }
3857
3858    #[tokio::test]
3859    async fn test_create_index_ivf_hnsw_sq() {
3860        use arrow_array::RecordBatch;
3861        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3862        use rand;
3863        use std::iter::repeat_with;
3864
3865        use arrow_array::Float32Array;
3866
3867        let tmp_dir = tempdir().unwrap();
3868        let uri = tmp_dir.path().to_str().unwrap();
3869        let conn = connect(uri).execute().await.unwrap();
3870
3871        let dimension = 16;
3872        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3873            "embeddings",
3874            DataType::FixedSizeList(
3875                Arc::new(Field::new("item", DataType::Float32, true)),
3876                dimension,
3877            ),
3878            false,
3879        )]));
3880
3881        let mut rng = rand::thread_rng();
3882        let float_arr = Float32Array::from(
3883            repeat_with(|| rng.gen::<f32>())
3884                .take(512 * dimension as usize)
3885                .collect::<Vec<f32>>(),
3886        );
3887
3888        let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3889        let batches = RecordBatchIterator::new(
3890            vec![RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap()]
3891                .into_iter()
3892                .map(Ok),
3893            schema,
3894        );
3895
3896        let table = conn.create_table("test", batches).execute().await.unwrap();
3897
3898        let stats = table.index_stats("my_index").await.unwrap();
3899        assert!(stats.is_none());
3900
3901        let index = IvfHnswSqIndexBuilder::default();
3902        table
3903            .create_index(&["embeddings"], Index::IvfHnswSq(index))
3904            .execute()
3905            .await
3906            .unwrap();
3907
3908        let index_configs = table.list_indices().await.unwrap();
3909        assert_eq!(index_configs.len(), 1);
3910        let index = index_configs.into_iter().next().unwrap();
3911        assert_eq!(index.index_type, crate::index::IndexType::IvfHnswSq);
3912        assert_eq!(index.columns, vec!["embeddings".to_string()]);
3913        assert_eq!(table.count_rows(None).await.unwrap(), 512);
3914        assert_eq!(table.name(), "test");
3915
3916        let indices = table.as_native().unwrap().load_indices().await.unwrap();
3917        let index_name = &indices[0].index_name;
3918        let stats = table.index_stats(index_name).await.unwrap().unwrap();
3919        assert_eq!(stats.num_indexed_rows, 512);
3920        assert_eq!(stats.num_unindexed_rows, 0);
3921    }
3922
3923    #[tokio::test]
3924    async fn test_create_index_ivf_hnsw_pq() {
3925        use arrow_array::RecordBatch;
3926        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3927        use rand;
3928        use std::iter::repeat_with;
3929
3930        use arrow_array::Float32Array;
3931
3932        let tmp_dir = tempdir().unwrap();
3933        let uri = tmp_dir.path().to_str().unwrap();
3934        let conn = connect(uri).execute().await.unwrap();
3935
3936        let dimension = 16;
3937        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3938            "embeddings",
3939            DataType::FixedSizeList(
3940                Arc::new(Field::new("item", DataType::Float32, true)),
3941                dimension,
3942            ),
3943            false,
3944        )]));
3945
3946        let mut rng = rand::thread_rng();
3947        let float_arr = Float32Array::from(
3948            repeat_with(|| rng.gen::<f32>())
3949                .take(512 * dimension as usize)
3950                .collect::<Vec<f32>>(),
3951        );
3952
3953        let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3954        let batches = RecordBatchIterator::new(
3955            vec![RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap()]
3956                .into_iter()
3957                .map(Ok),
3958            schema,
3959        );
3960
3961        let table = conn.create_table("test", batches).execute().await.unwrap();
3962        let stats = table.index_stats("my_index").await.unwrap();
3963        assert!(stats.is_none());
3964
3965        let index = IvfHnswPqIndexBuilder::default();
3966        table
3967            .create_index(&["embeddings"], Index::IvfHnswPq(index))
3968            .execute()
3969            .await
3970            .unwrap();
3971        table
3972            .wait_for_index(&["embeddings_idx"], Duration::from_millis(10))
3973            .await
3974            .unwrap();
3975        let index_configs = table.list_indices().await.unwrap();
3976        assert_eq!(index_configs.len(), 1);
3977        let index = index_configs.into_iter().next().unwrap();
3978        assert_eq!(index.index_type, crate::index::IndexType::IvfHnswPq);
3979        assert_eq!(index.columns, vec!["embeddings".to_string()]);
3980        assert_eq!(table.count_rows(None).await.unwrap(), 512);
3981        assert_eq!(table.name(), "test");
3982
3983        let indices: Vec<VectorIndex> = table.as_native().unwrap().load_indices().await.unwrap();
3984        let index_name = &indices[0].index_name;
3985        let stats = table.index_stats(index_name).await.unwrap().unwrap();
3986        assert_eq!(stats.num_indexed_rows, 512);
3987        assert_eq!(stats.num_unindexed_rows, 0);
3988    }
3989
3990    fn create_fixed_size_list<T: Array>(values: T, list_size: i32) -> Result<FixedSizeListArray> {
3991        let list_type = DataType::FixedSizeList(
3992            Arc::new(Field::new("item", values.data_type().clone(), true)),
3993            list_size,
3994        );
3995        let data = ArrayDataBuilder::new(list_type)
3996            .len(values.len() / list_size as usize)
3997            .add_child_data(values.into_data())
3998            .build()
3999            .unwrap();
4000
4001        Ok(FixedSizeListArray::from(data))
4002    }
4003
4004    fn some_sample_data() -> Box<dyn RecordBatchReader + Send> {
4005        let batch = RecordBatch::try_new(
4006            Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
4007            vec![Arc::new(Int32Array::from(vec![1]))],
4008        )
4009        .unwrap();
4010        let schema = batch.schema().clone();
4011        let batch = Ok(batch);
4012
4013        Box::new(RecordBatchIterator::new(vec![batch], schema))
4014    }
4015
4016    #[tokio::test]
4017    async fn test_create_scalar_index() {
4018        let tmp_dir = tempdir().unwrap();
4019        let uri = tmp_dir.path().to_str().unwrap();
4020
4021        let batch = RecordBatch::try_new(
4022            Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
4023            vec![Arc::new(Int32Array::from(vec![1]))],
4024        )
4025        .unwrap();
4026        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4027        let table = conn
4028            .create_table(
4029                "my_table",
4030                RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()),
4031            )
4032            .execute()
4033            .await
4034            .unwrap();
4035
4036        // Can create an index on a scalar column (will default to btree)
4037        table
4038            .create_index(&["i"], Index::Auto)
4039            .execute()
4040            .await
4041            .unwrap();
4042        table
4043            .wait_for_index(&["i_idx"], Duration::from_millis(10))
4044            .await
4045            .unwrap();
4046        let index_configs = table.list_indices().await.unwrap();
4047        assert_eq!(index_configs.len(), 1);
4048        let index = index_configs.into_iter().next().unwrap();
4049        assert_eq!(index.index_type, crate::index::IndexType::BTree);
4050        assert_eq!(index.columns, vec!["i".to_string()]);
4051
4052        // Can also specify btree
4053        table
4054            .create_index(&["i"], Index::BTree(BTreeIndexBuilder::default()))
4055            .execute()
4056            .await
4057            .unwrap();
4058
4059        let index_configs = table.list_indices().await.unwrap();
4060        assert_eq!(index_configs.len(), 1);
4061        let index = index_configs.into_iter().next().unwrap();
4062        assert_eq!(index.index_type, crate::index::IndexType::BTree);
4063        assert_eq!(index.columns, vec!["i".to_string()]);
4064
4065        let indices = table.as_native().unwrap().load_indices().await.unwrap();
4066        let index_name = &indices[0].index_name;
4067        let stats = table.index_stats(index_name).await.unwrap().unwrap();
4068        assert_eq!(stats.num_indexed_rows, 1);
4069        assert_eq!(stats.num_unindexed_rows, 0);
4070    }
4071
4072    #[tokio::test]
4073    async fn test_create_bitmap_index() {
4074        let tmp_dir = tempdir().unwrap();
4075        let uri = tmp_dir.path().to_str().unwrap();
4076
4077        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4078
4079        let schema = Arc::new(Schema::new(vec![
4080            Field::new("id", DataType::Int32, false),
4081            Field::new("category", DataType::Utf8, true),
4082            Field::new("large_category", DataType::LargeUtf8, true),
4083            Field::new("is_active", DataType::Boolean, true),
4084            Field::new("data", DataType::Binary, true),
4085            Field::new("large_data", DataType::LargeBinary, true),
4086        ]));
4087
4088        let batch = RecordBatch::try_new(
4089            schema.clone(),
4090            vec![
4091                Arc::new(Int32Array::from_iter_values(0..100)),
4092                Arc::new(StringArray::from_iter_values(
4093                    (0..100).map(|i| format!("category_{}", i % 5)),
4094                )),
4095                Arc::new(LargeStringArray::from_iter_values(
4096                    (0..100).map(|i| format!("large_category_{}", i % 5)),
4097                )),
4098                Arc::new(BooleanArray::from_iter((0..100).map(|i| Some(i % 2 == 0)))),
4099                Arc::new(BinaryArray::from_iter_values(
4100                    (0_u32..100).map(|i| i.to_le_bytes()),
4101                )),
4102                Arc::new(LargeBinaryArray::from_iter_values(
4103                    (0_u32..100).map(|i| i.to_le_bytes()),
4104                )),
4105            ],
4106        )
4107        .unwrap();
4108
4109        let table = conn
4110            .create_table(
4111                "test_bitmap",
4112                RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()),
4113            )
4114            .execute()
4115            .await
4116            .unwrap();
4117
4118        // Create bitmap index on the "category" column
4119        table
4120            .create_index(&["category"], Index::Bitmap(Default::default()))
4121            .execute()
4122            .await
4123            .unwrap();
4124
4125        // Create bitmap index on the "is_active" column
4126        table
4127            .create_index(&["is_active"], Index::Bitmap(Default::default()))
4128            .execute()
4129            .await
4130            .unwrap();
4131
4132        // Create bitmap index on the "data" column
4133        table
4134            .create_index(&["data"], Index::Bitmap(Default::default()))
4135            .execute()
4136            .await
4137            .unwrap();
4138
4139        // Create bitmap index on the "large_data" column
4140        table
4141            .create_index(&["large_data"], Index::Bitmap(Default::default()))
4142            .execute()
4143            .await
4144            .unwrap();
4145
4146        // Create bitmap index on the "large_category" column
4147        table
4148            .create_index(&["large_category"], Index::Bitmap(Default::default()))
4149            .execute()
4150            .await
4151            .unwrap();
4152
4153        // Verify the index was created
4154        let index_configs = table.list_indices().await.unwrap();
4155        assert_eq!(index_configs.len(), 5);
4156
4157        let mut configs_iter = index_configs.into_iter();
4158        let index = configs_iter.next().unwrap();
4159        assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4160        assert_eq!(index.columns, vec!["category".to_string()]);
4161
4162        let index = configs_iter.next().unwrap();
4163        assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4164        assert_eq!(index.columns, vec!["is_active".to_string()]);
4165
4166        let index = configs_iter.next().unwrap();
4167        assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4168        assert_eq!(index.columns, vec!["data".to_string()]);
4169
4170        let index = configs_iter.next().unwrap();
4171        assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4172        assert_eq!(index.columns, vec!["large_data".to_string()]);
4173
4174        let index = configs_iter.next().unwrap();
4175        assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4176        assert_eq!(index.columns, vec!["large_category".to_string()]);
4177    }
4178
4179    #[tokio::test]
4180    async fn test_create_label_list_index() {
4181        let tmp_dir = tempdir().unwrap();
4182        let uri = tmp_dir.path().to_str().unwrap();
4183
4184        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4185
4186        let schema = Arc::new(Schema::new(vec![
4187            Field::new("id", DataType::Int32, false),
4188            Field::new(
4189                "tags",
4190                DataType::List(Field::new("item", DataType::Utf8, true).into()),
4191                true,
4192            ),
4193        ]));
4194
4195        const TAGS: [&str; 3] = ["cat", "dog", "fish"];
4196
4197        let values_builder = StringBuilder::new();
4198        let mut builder = ListBuilder::new(values_builder);
4199        for i in 0..120 {
4200            builder.values().append_value(TAGS[i % 3]);
4201            if i % 3 == 0 {
4202                builder.append(true)
4203            }
4204        }
4205        let tags = Arc::new(builder.finish());
4206
4207        let batch = RecordBatch::try_new(
4208            schema.clone(),
4209            vec![Arc::new(Int32Array::from_iter_values(0..40)), tags],
4210        )
4211        .unwrap();
4212
4213        let table = conn
4214            .create_table(
4215                "test_bitmap",
4216                RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()),
4217            )
4218            .execute()
4219            .await
4220            .unwrap();
4221
4222        // Can not create btree or bitmap index on list column
4223        assert!(table
4224            .create_index(&["tags"], Index::BTree(Default::default()))
4225            .execute()
4226            .await
4227            .is_err());
4228        assert!(table
4229            .create_index(&["tags"], Index::Bitmap(Default::default()))
4230            .execute()
4231            .await
4232            .is_err());
4233
4234        // Create bitmap index on the "category" column
4235        table
4236            .create_index(&["tags"], Index::LabelList(Default::default()))
4237            .execute()
4238            .await
4239            .unwrap();
4240
4241        // Verify the index was created
4242        let index_configs = table.list_indices().await.unwrap();
4243        assert_eq!(index_configs.len(), 1);
4244        let index = index_configs.into_iter().next().unwrap();
4245        assert_eq!(index.index_type, crate::index::IndexType::LabelList);
4246        assert_eq!(index.columns, vec!["tags".to_string()]);
4247    }
4248
4249    #[tokio::test]
4250    async fn test_create_inverted_index() {
4251        let tmp_dir = tempdir().unwrap();
4252        let uri = tmp_dir.path().to_str().unwrap();
4253
4254        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4255        const WORDS: [&str; 3] = ["cat", "dog", "fish"];
4256        let mut text_builder = StringBuilder::new();
4257        let num_rows = 120;
4258        for i in 0..num_rows {
4259            text_builder.append_value(WORDS[i % 3]);
4260        }
4261        let text = Arc::new(text_builder.finish());
4262
4263        let schema = Arc::new(Schema::new(vec![
4264            Field::new("id", DataType::Int32, false),
4265            Field::new("text", DataType::Utf8, true),
4266        ]));
4267        let batch = RecordBatch::try_new(
4268            schema.clone(),
4269            vec![
4270                Arc::new(Int32Array::from_iter_values(0..num_rows as i32)),
4271                text,
4272            ],
4273        )
4274        .unwrap();
4275
4276        let table = conn
4277            .create_table(
4278                "test_bitmap",
4279                RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()),
4280            )
4281            .execute()
4282            .await
4283            .unwrap();
4284
4285        table
4286            .create_index(&["text"], Index::FTS(Default::default()))
4287            .execute()
4288            .await
4289            .unwrap();
4290        let index_configs = table.list_indices().await.unwrap();
4291        assert_eq!(index_configs.len(), 1);
4292        let index = index_configs.into_iter().next().unwrap();
4293        assert_eq!(index.index_type, crate::index::IndexType::FTS);
4294        assert_eq!(index.columns, vec!["text".to_string()]);
4295        assert_eq!(index.name, "text_idx");
4296
4297        let stats = table.index_stats("text_idx").await.unwrap().unwrap();
4298        assert_eq!(stats.num_indexed_rows, num_rows);
4299        assert_eq!(stats.num_unindexed_rows, 0);
4300        assert_eq!(stats.index_type, crate::index::IndexType::FTS);
4301        assert_eq!(stats.distance_type, None);
4302
4303        // Make sure we can call prewarm without error
4304        table.prewarm_index("text_idx").await.unwrap();
4305    }
4306
4307    // Windows does not support precise sleep durations due to timer resolution limitations.
4308    #[cfg(not(target_os = "windows"))]
4309    #[tokio::test]
4310    async fn test_read_consistency_interval() {
4311        let intervals = vec![
4312            None,
4313            Some(0),
4314            Some(100), // 100 ms
4315        ];
4316
4317        for interval in intervals {
4318            let data = some_sample_data();
4319
4320            let tmp_dir = tempdir().unwrap();
4321            let uri = tmp_dir.path().to_str().unwrap();
4322
4323            let conn1 = ConnectBuilder::new(uri).execute().await.unwrap();
4324            let table1 = conn1
4325                .create_empty_table("my_table", data.schema())
4326                .execute()
4327                .await
4328                .unwrap();
4329
4330            let mut conn2 = ConnectBuilder::new(uri);
4331            if let Some(interval) = interval {
4332                conn2 = conn2.read_consistency_interval(std::time::Duration::from_millis(interval));
4333            }
4334            let conn2 = conn2.execute().await.unwrap();
4335            let table2 = conn2.open_table("my_table").execute().await.unwrap();
4336
4337            assert_eq!(table1.count_rows(None).await.unwrap(), 0);
4338            assert_eq!(table2.count_rows(None).await.unwrap(), 0);
4339
4340            table1.add(data).execute().await.unwrap();
4341            assert_eq!(table1.count_rows(None).await.unwrap(), 1);
4342
4343            match interval {
4344                None => {
4345                    assert_eq!(table2.count_rows(None).await.unwrap(), 0);
4346                    table2.checkout_latest().await.unwrap();
4347                    assert_eq!(table2.count_rows(None).await.unwrap(), 1);
4348                }
4349                Some(0) => {
4350                    assert_eq!(table2.count_rows(None).await.unwrap(), 1);
4351                }
4352                Some(100) => {
4353                    assert_eq!(table2.count_rows(None).await.unwrap(), 0);
4354                    tokio::time::sleep(Duration::from_millis(100)).await;
4355                    assert_eq!(table2.count_rows(None).await.unwrap(), 1);
4356                }
4357                _ => unreachable!(),
4358            }
4359        }
4360    }
4361
4362    #[tokio::test]
4363    async fn test_time_travel_write() {
4364        let tmp_dir = tempdir().unwrap();
4365        let uri = tmp_dir.path().to_str().unwrap();
4366
4367        let conn = ConnectBuilder::new(uri)
4368            .read_consistency_interval(Duration::from_secs(0))
4369            .execute()
4370            .await
4371            .unwrap();
4372        let table = conn
4373            .create_table("my_table", some_sample_data())
4374            .execute()
4375            .await
4376            .unwrap();
4377        let version = table.version().await.unwrap();
4378        table.add(some_sample_data()).execute().await.unwrap();
4379        table.checkout(version).await.unwrap();
4380        assert!(table.add(some_sample_data()).execute().await.is_err())
4381    }
4382
4383    #[tokio::test]
4384    async fn test_update_dataset_config() {
4385        let tmp_dir = tempdir().unwrap();
4386        let uri = tmp_dir.path().to_str().unwrap();
4387
4388        let conn = ConnectBuilder::new(uri)
4389            .read_consistency_interval(Duration::from_secs(0))
4390            .execute()
4391            .await
4392            .unwrap();
4393
4394        let table = conn
4395            .create_table("my_table", some_sample_data())
4396            .execute()
4397            .await
4398            .unwrap();
4399        let native_tbl = table.as_native().unwrap();
4400
4401        let manifest = native_tbl.manifest().await.unwrap();
4402        let base_config_len = manifest.config.len();
4403
4404        native_tbl
4405            .update_config(vec![("test_key1".to_string(), "test_val1".to_string())])
4406            .await
4407            .unwrap();
4408
4409        let manifest = native_tbl.manifest().await.unwrap();
4410        assert_eq!(manifest.config.len(), 1 + base_config_len);
4411        assert_eq!(
4412            manifest.config.get("test_key1"),
4413            Some(&"test_val1".to_string())
4414        );
4415
4416        native_tbl
4417            .update_config(vec![("test_key2".to_string(), "test_val2".to_string())])
4418            .await
4419            .unwrap();
4420        let manifest = native_tbl.manifest().await.unwrap();
4421        assert_eq!(manifest.config.len(), 2 + base_config_len);
4422        assert_eq!(
4423            manifest.config.get("test_key1"),
4424            Some(&"test_val1".to_string())
4425        );
4426        assert_eq!(
4427            manifest.config.get("test_key2"),
4428            Some(&"test_val2".to_string())
4429        );
4430
4431        native_tbl
4432            .update_config(vec![(
4433                "test_key2".to_string(),
4434                "test_val2_update".to_string(),
4435            )])
4436            .await
4437            .unwrap();
4438        let manifest = native_tbl.manifest().await.unwrap();
4439        assert_eq!(manifest.config.len(), 2 + base_config_len);
4440        assert_eq!(
4441            manifest.config.get("test_key1"),
4442            Some(&"test_val1".to_string())
4443        );
4444        assert_eq!(
4445            manifest.config.get("test_key2"),
4446            Some(&"test_val2_update".to_string())
4447        );
4448
4449        native_tbl.delete_config_keys(&["test_key1"]).await.unwrap();
4450        let manifest = native_tbl.manifest().await.unwrap();
4451        assert_eq!(manifest.config.len(), 1 + base_config_len);
4452        assert_eq!(
4453            manifest.config.get("test_key2"),
4454            Some(&"test_val2_update".to_string())
4455        );
4456    }
4457
4458    #[tokio::test]
4459    async fn test_schema_metadata_config() {
4460        let tmp_dir = tempdir().unwrap();
4461        let uri = tmp_dir.path().to_str().unwrap();
4462
4463        let conn = ConnectBuilder::new(uri)
4464            .read_consistency_interval(Duration::from_secs(0))
4465            .execute()
4466            .await
4467            .unwrap();
4468        let table = conn
4469            .create_table("my_table", some_sample_data())
4470            .execute()
4471            .await
4472            .unwrap();
4473
4474        let native_tbl = table.as_native().unwrap();
4475        let schema = native_tbl.schema().await.unwrap();
4476        let metadata = schema.metadata();
4477        assert_eq!(metadata.len(), 0);
4478
4479        native_tbl
4480            .replace_schema_metadata(vec![("test_key1".to_string(), "test_val1".to_string())])
4481            .await
4482            .unwrap();
4483
4484        let schema = native_tbl.schema().await.unwrap();
4485        let metadata = schema.metadata();
4486        assert_eq!(metadata.len(), 1);
4487        assert_eq!(metadata.get("test_key1"), Some(&"test_val1".to_string()));
4488
4489        native_tbl
4490            .replace_schema_metadata(vec![
4491                ("test_key1".to_string(), "test_val1_update".to_string()),
4492                ("test_key2".to_string(), "test_val2".to_string()),
4493            ])
4494            .await
4495            .unwrap();
4496        let schema = native_tbl.schema().await.unwrap();
4497        let metadata = schema.metadata();
4498        assert_eq!(metadata.len(), 2);
4499        assert_eq!(
4500            metadata.get("test_key1"),
4501            Some(&"test_val1_update".to_string())
4502        );
4503        assert_eq!(metadata.get("test_key2"), Some(&"test_val2".to_string()));
4504
4505        native_tbl
4506            .replace_schema_metadata(vec![(
4507                "test_key2".to_string(),
4508                "test_val2_update".to_string(),
4509            )])
4510            .await
4511            .unwrap();
4512        let schema = native_tbl.schema().await.unwrap();
4513        let metadata = schema.metadata();
4514        assert_eq!(
4515            metadata.get("test_key2"),
4516            Some(&"test_val2_update".to_string())
4517        );
4518    }
4519
4520    #[tokio::test]
4521    pub async fn test_field_metadata_update() {
4522        let tmp_dir = tempdir().unwrap();
4523        let uri = tmp_dir.path().to_str().unwrap();
4524
4525        let conn = ConnectBuilder::new(uri)
4526            .read_consistency_interval(Duration::from_secs(0))
4527            .execute()
4528            .await
4529            .unwrap();
4530        let table = conn
4531            .create_table("my_table", some_sample_data())
4532            .execute()
4533            .await
4534            .unwrap();
4535
4536        let native_tbl = table.as_native().unwrap();
4537        let schema = native_tbl.manifest().await.unwrap().schema;
4538
4539        let field = schema.field("i").unwrap();
4540        assert_eq!(field.metadata.len(), 0);
4541
4542        native_tbl
4543            .replace_schema_metadata(vec![(
4544                "test_key2".to_string(),
4545                "test_val2_update".to_string(),
4546            )])
4547            .await
4548            .unwrap();
4549
4550        let schema = native_tbl.schema().await.unwrap();
4551        let metadata = schema.metadata();
4552        assert_eq!(metadata.len(), 1);
4553        assert_eq!(
4554            metadata.get("test_key2"),
4555            Some(&"test_val2_update".to_string())
4556        );
4557
4558        let mut new_field_metadata = HashMap::<String, String>::new();
4559        new_field_metadata.insert("test_field_key1".into(), "test_field_val1".into());
4560        native_tbl
4561            .replace_field_metadata(vec![(field.id as u32, new_field_metadata)])
4562            .await
4563            .unwrap();
4564
4565        let schema = native_tbl.manifest().await.unwrap().schema;
4566        let field = schema.field("i").unwrap();
4567        assert_eq!(field.metadata.len(), 1);
4568        assert_eq!(
4569            field.metadata.get("test_field_key1"),
4570            Some(&"test_field_val1".to_string())
4571        );
4572    }
4573
4574    #[tokio::test]
4575    pub async fn test_stats() {
4576        let tmp_dir = tempdir().unwrap();
4577        let uri = tmp_dir.path().to_str().unwrap();
4578
4579        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4580
4581        let schema = Arc::new(Schema::new(vec![
4582            Field::new("id", DataType::Int32, false),
4583            Field::new("foo", DataType::Int32, true),
4584        ]));
4585        let batch = RecordBatch::try_new(
4586            schema.clone(),
4587            vec![
4588                Arc::new(Int32Array::from_iter_values(0..100)),
4589                Arc::new(Int32Array::from_iter_values(0..100)),
4590            ],
4591        )
4592        .unwrap();
4593
4594        let table = conn
4595            .create_table(
4596                "test_stats",
4597                RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()),
4598            )
4599            .execute()
4600            .await
4601            .unwrap();
4602        for _ in 0..10 {
4603            let batch = RecordBatch::try_new(
4604                schema.clone(),
4605                vec![
4606                    Arc::new(Int32Array::from_iter_values(0..15)),
4607                    Arc::new(Int32Array::from_iter_values(0..15)),
4608                ],
4609            )
4610            .unwrap();
4611            table
4612                .add(RecordBatchIterator::new(
4613                    vec![Ok(batch.clone())],
4614                    batch.schema(),
4615                ))
4616                .execute()
4617                .await
4618                .unwrap();
4619        }
4620
4621        let empty_table = conn
4622            .create_table(
4623                "test_stats_empty",
4624                RecordBatchIterator::new(vec![], batch.schema()),
4625            )
4626            .execute()
4627            .await
4628            .unwrap();
4629
4630        let res = table.stats().await.unwrap();
4631        println!("{:#?}", res);
4632        assert_eq!(
4633            res,
4634            TableStatistics {
4635                num_rows: 250,
4636                num_indices: 0,
4637                total_bytes: 2000,
4638                fragment_stats: FragmentStatistics {
4639                    num_fragments: 11,
4640                    num_small_fragments: 11,
4641                    lengths: FragmentSummaryStats {
4642                        min: 15,
4643                        max: 100,
4644                        mean: 22,
4645                        p25: 15,
4646                        p50: 15,
4647                        p75: 15,
4648                        p99: 100,
4649                    },
4650                },
4651            }
4652        );
4653        let res = empty_table.stats().await.unwrap();
4654        println!("{:#?}", res);
4655        assert_eq!(
4656            res,
4657            TableStatistics {
4658                num_rows: 0,
4659                num_indices: 0,
4660                total_bytes: 0,
4661                fragment_stats: FragmentStatistics {
4662                    num_fragments: 0,
4663                    num_small_fragments: 0,
4664                    lengths: FragmentSummaryStats {
4665                        min: 0,
4666                        max: 0,
4667                        mean: 0,
4668                        p25: 0,
4669                        p50: 0,
4670                        p75: 0,
4671                        p99: 0,
4672                    },
4673                },
4674            }
4675        )
4676    }
4677
4678    #[tokio::test]
4679    pub async fn test_list_indices_skip_frag_reuse() {
4680        let tmp_dir = tempdir().unwrap();
4681        let uri = tmp_dir.path().to_str().unwrap();
4682
4683        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4684
4685        let schema = Arc::new(Schema::new(vec![
4686            Field::new("id", DataType::Int32, false),
4687            Field::new("foo", DataType::Int32, true),
4688        ]));
4689        let batch = RecordBatch::try_new(
4690            schema.clone(),
4691            vec![
4692                Arc::new(Int32Array::from_iter_values(0..100)),
4693                Arc::new(Int32Array::from_iter_values(0..100)),
4694            ],
4695        )
4696        .unwrap();
4697
4698        let table = conn
4699            .create_table(
4700                "test_list_indices_skip_frag_reuse",
4701                RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()),
4702            )
4703            .execute()
4704            .await
4705            .unwrap();
4706
4707        table
4708            .add(RecordBatchIterator::new(
4709                vec![Ok(batch.clone())],
4710                batch.schema(),
4711            ))
4712            .execute()
4713            .await
4714            .unwrap();
4715
4716        table
4717            .create_index(&["id"], Index::Bitmap(BitmapIndexBuilder {}))
4718            .execute()
4719            .await
4720            .unwrap();
4721
4722        table
4723            .optimize(OptimizeAction::Compact {
4724                options: CompactionOptions {
4725                    target_rows_per_fragment: 2_000,
4726                    defer_index_remap: true,
4727                    ..Default::default()
4728                },
4729                remap_options: None,
4730            })
4731            .await
4732            .unwrap();
4733
4734        let result = table.list_indices().await.unwrap();
4735        assert_eq!(result.len(), 1);
4736        assert_eq!(result[0].index_type, crate::index::IndexType::Bitmap);
4737    }
4738
4739    #[tokio::test]
4740    async fn test_convert_to_namespace_query_vector() {
4741        let tmp_dir = tempdir().unwrap();
4742        let dataset_path = tmp_dir.path().join("test_ns_query.lance");
4743
4744        let batches = make_test_batches();
4745        Dataset::write(batches, dataset_path.to_str().unwrap(), None)
4746            .await
4747            .unwrap();
4748
4749        let table = NativeTable::open(dataset_path.to_str().unwrap())
4750            .await
4751            .unwrap();
4752
4753        // Create a vector query
4754        let query_vector = Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 4.0]));
4755        let vq = VectorQueryRequest {
4756            base: QueryRequest {
4757                limit: Some(10),
4758                offset: Some(5),
4759                filter: Some(QueryFilter::Sql("id > 0".to_string())),
4760                select: Select::Columns(vec!["id".to_string()]),
4761                ..Default::default()
4762            },
4763            column: Some("vector".to_string()),
4764            query_vector: vec![query_vector as Arc<dyn Array>],
4765            minimum_nprobes: 20,
4766            distance_type: Some(crate::DistanceType::L2),
4767            ..Default::default()
4768        };
4769
4770        let any_query = AnyQuery::VectorQuery(vq);
4771        let ns_request = table.convert_to_namespace_query(&any_query).unwrap();
4772
4773        assert_eq!(ns_request.k, 10);
4774        assert_eq!(ns_request.offset, Some(5));
4775        assert_eq!(ns_request.filter, Some("id > 0".to_string()));
4776        assert_eq!(
4777            ns_request
4778                .columns
4779                .as_ref()
4780                .and_then(|c| c.column_names.as_ref()),
4781            Some(&vec!["id".to_string()])
4782        );
4783        assert_eq!(ns_request.vector_column, Some("vector".to_string()));
4784        assert_eq!(ns_request.distance_type, Some("l2".to_string()));
4785        assert!(ns_request.vector.single_vector.is_some());
4786        assert_eq!(
4787            ns_request.vector.single_vector.as_ref().unwrap(),
4788            &vec![1.0, 2.0, 3.0, 4.0]
4789        );
4790    }
4791
4792    #[tokio::test]
4793    async fn test_convert_to_namespace_query_plain_query() {
4794        let tmp_dir = tempdir().unwrap();
4795        let dataset_path = tmp_dir.path().join("test_ns_plain.lance");
4796
4797        let batches = make_test_batches();
4798        Dataset::write(batches, dataset_path.to_str().unwrap(), None)
4799            .await
4800            .unwrap();
4801
4802        let table = NativeTable::open(dataset_path.to_str().unwrap())
4803            .await
4804            .unwrap();
4805
4806        // Create a plain (non-vector) query with filter and select
4807        let q = QueryRequest {
4808            limit: Some(20),
4809            offset: Some(5),
4810            filter: Some(QueryFilter::Sql("id > 5".to_string())),
4811            select: Select::Columns(vec!["id".to_string()]),
4812            with_row_id: true,
4813            ..Default::default()
4814        };
4815
4816        let any_query = AnyQuery::Query(q);
4817        let ns_request = table.convert_to_namespace_query(&any_query).unwrap();
4818
4819        // Plain queries should pass an empty vector
4820        assert_eq!(ns_request.k, 20);
4821        assert_eq!(ns_request.offset, Some(5));
4822        assert_eq!(ns_request.filter, Some("id > 5".to_string()));
4823        assert_eq!(
4824            ns_request
4825                .columns
4826                .as_ref()
4827                .and_then(|c| c.column_names.as_ref()),
4828            Some(&vec!["id".to_string()])
4829        );
4830        assert_eq!(ns_request.with_row_id, Some(true));
4831        assert_eq!(ns_request.bypass_vector_index, Some(true));
4832        assert!(ns_request.vector_column.is_none()); // No vector column for plain queries
4833
4834        // Should have an empty vector
4835        assert!(ns_request.vector.single_vector.as_ref().unwrap().is_empty());
4836    }
4837}