Skip to main content

lancedb/
table.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The LanceDB Authors
3
4//! LanceDB Table APIs
5
6use arrow_array::{RecordBatch, RecordBatchReader};
7use arrow_schema::{DataType, Field, Schema, SchemaRef};
8use async_trait::async_trait;
9use datafusion_execution::TaskContext;
10use datafusion_expr::Expr;
11use datafusion_physical_plan::ExecutionPlan;
12use datafusion_physical_plan::display::DisplayableExecutionPlan;
13use futures::StreamExt;
14use futures::stream::FuturesUnordered;
15pub use lance::dataset::ColumnAlteration;
16pub use lance::dataset::NewColumnTransform;
17pub use lance::dataset::ReadParams;
18pub use lance::dataset::Version;
19use lance::dataset::WriteMode;
20use lance::dataset::builder::DatasetBuilder;
21use lance::dataset::{InsertBuilder, WriteParams};
22use lance::index::DatasetIndexExt;
23use lance::index::vector::VectorIndexParams;
24use lance::index::vector::utils::infer_vector_dim;
25use lance::io::{ObjectStoreParams, WrappingObjectStore};
26use lance_datafusion::utils::StreamingWriteSource;
27use lance_index::IndexType;
28use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
29use lance_index::vector::bq::RQBuildParams;
30use lance_index::vector::hnsw::builder::HnswBuildParams;
31use lance_index::vector::ivf::IvfBuildParams;
32use lance_index::vector::pq::PQBuildParams;
33use lance_index::vector::sq::builder::SQBuildParams;
34use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsAccessor};
35pub use query::AnyQuery;
36
37use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
38use lance_namespace::LanceNamespace;
39use lance_namespace::error::NamespaceError;
40use lance_namespace::models::DescribeTableRequest;
41use lance_table::format::Manifest;
42use lance_table::io::commit::CommitHandler;
43use lance_table::io::commit::ManifestNamingScheme;
44use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
45use serde::{Deserialize, Serialize};
46use std::collections::{HashMap, HashSet};
47use std::format;
48use std::path::Path;
49use std::sync::Arc;
50
51use crate::connection::NamespaceClientPushdownOperation;
52
53use crate::data::scannable::{PeekedScannable, Scannable, estimate_write_partitions};
54use crate::database::Database;
55use crate::embeddings::{EmbeddingDefinition, EmbeddingRegistry, MemoryRegistry};
56use crate::error::{Error, Result};
57use crate::index::IndexStatistics;
58use crate::index::vector::VectorIndex;
59use crate::index::{Index, IndexBuilder, vector::suggested_num_sub_vectors};
60use crate::index::{IndexConfig, IndexStatisticsImpl};
61use crate::query::{IntoQueryVector, Query, QueryExecutionOptions, TakeQuery, VectorQuery};
62use crate::table::datafusion::insert::InsertExec;
63use crate::utils::{
64    PatchReadParam, PatchWriteParam, supported_bitmap_data_type, supported_btree_data_type,
65    supported_fts_data_type, supported_label_list_data_type, supported_vector_data_type,
66};
67
68use self::dataset::DatasetConsistencyWrapper;
69use self::merge::MergeInsertBuilder;
70
71mod add_data;
72pub mod datafusion;
73pub(crate) mod dataset;
74pub mod delete;
75pub mod merge;
76pub mod optimize;
77mod primary_key;
78pub mod query;
79pub mod schema_evolution;
80pub mod update;
81pub mod write_progress;
82use crate::index::waiter::wait_for_index;
83#[cfg(feature = "remote")]
84pub(crate) use add_data::PreprocessingOutput;
85pub use add_data::{AddDataBuilder, AddDataMode, AddResult, NaNVectorBehavior};
86pub use chrono::Duration;
87pub use delete::DeleteResult;
88use futures::future::join_all;
89pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
90pub use lance::dataset::scanner::DatasetRecordBatchStream;
91use lance::dataset::statistics::DatasetStatisticsExt;
92use lance_index::frag_reuse::FRAG_REUSE_INDEX_NAME;
93pub use lance_index::optimize::OptimizeOptions;
94pub use optimize::{CompactionOptions, OptimizeAction, OptimizeStats};
95pub use schema_evolution::{AddColumnsResult, AlterColumnsResult, DropColumnsResult};
96use serde_with::skip_serializing_none;
97pub use update::{UpdateBuilder, UpdateResult};
98
99/// Walk a boxed error chain to find the innermost `NamespaceError`.
100///
101/// Callers like `DatasetBuilder::from_namespace` re-wrap their inner namespace error
102/// inside a fresh `lance::Error::Namespace`, so a single downcast at the top level
103/// won't find it. This walks `.source()` to unwrap arbitrarily nested layers.
104fn find_namespace_error<'a>(
105    err: &'a (dyn std::error::Error + 'static),
106) -> Option<&'a NamespaceError> {
107    let mut current: Option<&(dyn std::error::Error + 'static)> = Some(err);
108    while let Some(e) = current {
109        if let Some(ns_err) = e.downcast_ref::<NamespaceError>() {
110            return Some(ns_err);
111        }
112        current = e.source();
113    }
114    None
115}
116
117/// Map a `lance::Error` coming from a `lance-namespace` call into a `lancedb::Error`,
118/// preserving the fine-grained namespace error code (e.g. `TableNotFound`,
119/// `TableAlreadyExists`). Errors that aren't recognized namespace error variants fall
120/// through to a generic runtime error rather than `TableNotFound`/`TableAlreadyExists`.
121pub(crate) fn map_namespace_lance_error(err: lance::Error, table_name: &str) -> Error {
122    if let Some(code) = find_namespace_error(&err).map(NamespaceError::code) {
123        match code {
124            lance_namespace::error::ErrorCode::TableNotFound => {
125                return Error::TableNotFound {
126                    name: table_name.to_string(),
127                    source: Box::new(err),
128                };
129            }
130            lance_namespace::error::ErrorCode::TableAlreadyExists => {
131                return Error::TableAlreadyExists {
132                    name: table_name.to_string(),
133                };
134            }
135            _ => {}
136        }
137    }
138    match err {
139        lance::Error::Namespace { source, .. } => Error::Runtime {
140            message: format!("Namespace error: {}", source),
141        },
142        other => other.into(),
143    }
144}
145
146/// Defines the type of column
147#[derive(Debug, Clone, Serialize, Deserialize)]
148pub enum ColumnKind {
149    /// Columns populated by data from the user (this is the most common case)
150    Physical,
151    /// Columns populated by applying an embedding function to the input
152    Embedding(EmbeddingDefinition),
153}
154
155/// Defines a column in a table
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct ColumnDefinition {
158    /// The source of the column data
159    pub kind: ColumnKind,
160}
161
162#[derive(Debug, Clone)]
163pub struct TableDefinition {
164    pub column_definitions: Vec<ColumnDefinition>,
165    pub schema: SchemaRef,
166}
167
168impl TableDefinition {
169    pub fn new(schema: SchemaRef, column_definitions: Vec<ColumnDefinition>) -> Self {
170        Self {
171            column_definitions,
172            schema,
173        }
174    }
175
176    pub fn new_from_schema(schema: SchemaRef) -> Self {
177        let column_definitions = schema
178            .fields()
179            .iter()
180            .map(|_| ColumnDefinition {
181                kind: ColumnKind::Physical,
182            })
183            .collect();
184        Self::new(schema, column_definitions)
185    }
186
187    pub fn try_from_rich_schema(schema: SchemaRef) -> Result<Self> {
188        let column_definitions = schema.metadata.get("lancedb::column_definitions");
189        if let Some(column_definitions) = column_definitions {
190            let column_definitions: Vec<ColumnDefinition> =
191                serde_json::from_str(column_definitions).map_err(|e| Error::Runtime {
192                    message: format!("Failed to deserialize column definitions: {}", e),
193                })?;
194            Ok(Self::new(schema, column_definitions))
195        } else {
196            let column_definitions = schema
197                .fields()
198                .iter()
199                .map(|_| ColumnDefinition {
200                    kind: ColumnKind::Physical,
201                })
202                .collect();
203            Ok(Self::new(schema, column_definitions))
204        }
205    }
206
207    pub fn into_rich_schema(self) -> SchemaRef {
208        // We have full control over the structure of column definitions.  This should
209        // not fail, except for a bug
210        let lancedb_metadata = serde_json::to_string(&self.column_definitions).unwrap();
211        let mut schema_with_metadata = (*self.schema).clone();
212        schema_with_metadata
213            .metadata
214            .insert("lancedb::column_definitions".to_string(), lancedb_metadata);
215        Arc::new(schema_with_metadata)
216    }
217}
218
219/// Describes what happens when a vector either contains NaN or
220/// does not have enough values
221#[derive(Clone, Debug, Default)]
222#[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992
223enum BadVectorHandling {
224    /// An error is returned
225    #[default]
226    Error,
227    /// The offending row is droppped
228    Drop,
229    /// The invalid/missing items are replaced by fill_value
230    Fill(f32),
231    /// The invalid items are replaced by NULL
232    None,
233}
234
235/// Options to use when writing data
236#[derive(Clone, Debug, Default)]
237pub struct WriteOptions {
238    // Coming soon: https://github.com/lancedb/lancedb/issues/992
239    // /// What behavior to take if the data contains invalid vectors
240    // pub on_bad_vectors: BadVectorHandling,
241    /// Advanced parameters that can be used to customize table creation
242    ///
243    /// Overlapping `OpenTableBuilder` options (e.g. [AddDataBuilder::mode]) will take
244    /// precedence over their counterparts in `WriteOptions` (e.g. [WriteParams::mode]).
245    pub lance_write_params: Option<WriteParams>,
246}
247
248/// Filters that can be used to limit the rows returned by a query
249pub enum Filter {
250    /// A SQL filter string
251    Sql(String),
252    /// A Datafusion logical expression
253    Datafusion(Expr),
254}
255
256/// A predicate for filtering rows in delete operations.
257///
258/// Accepts either a SQL string or a DataFusion [`Expr`]. Use the [`From`]
259/// implementations to convert from `&str` or `&Expr` automatically.
260/// See [`Table::delete`] for usage examples.
261pub enum Predicate<'a> {
262    /// A SQL predicate string
263    String(&'a str),
264    /// A DataFusion logical expression
265    Expr(&'a Expr),
266}
267
268impl<'a> From<&'a str> for Predicate<'a> {
269    fn from(s: &'a str) -> Self {
270        Predicate::String(s)
271    }
272}
273
274impl<'a> From<&'a String> for Predicate<'a> {
275    fn from(s: &'a String) -> Self {
276        Predicate::String(s.as_str())
277    }
278}
279
280impl<'a> From<&'a Expr> for Predicate<'a> {
281    fn from(e: &'a Expr) -> Self {
282        Predicate::Expr(e)
283    }
284}
285
286#[async_trait]
287pub trait Tags: Send + Sync {
288    /// List the tags of the table.
289    async fn list(&self) -> Result<HashMap<String, TagContents>>;
290
291    /// Get the version of the table referenced by a tag.
292    async fn get_version(&self, tag: &str) -> Result<u64>;
293
294    /// Create a new tag for the given version of the table.
295    async fn create(&mut self, tag: &str, version: u64) -> Result<()>;
296
297    /// Delete a tag from the table.
298    async fn delete(&mut self, tag: &str) -> Result<()>;
299
300    /// Update an existing tag to point to a new version of the table.
301    async fn update(&mut self, tag: &str, version: u64) -> Result<()>;
302}
303
304pub use self::merge::MergeResult;
305
306/// Specification selecting Lance's MemWAL LSM-style write path for
307/// `merge_insert`.
308///
309/// Construct via [`LsmWriteSpec::bucket`], [`LsmWriteSpec::identity`], or
310/// [`LsmWriteSpec::unsharded`], then optionally chain
311/// [`LsmWriteSpec::with_maintained_indexes`] (indexes the MemWAL keeps up to
312/// date) and [`LsmWriteSpec::with_writer_config_defaults`] (default
313/// `ShardWriter` configuration recorded in the MemWAL index).
314///
315/// Install a spec with [`Table::set_lsm_write_spec`] and remove it with
316/// [`Table::unset_lsm_write_spec`]. The actual `merge_insert` dispatch
317/// onto the MemWAL writer is a follow-up.
318#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
319pub enum LsmWriteSpec {
320    /// Hash-bucket sharding by a scalar column.
321    ///
322    /// `column` must be a non-nested column with a supported scalar type.
323    /// `num_buckets` must be in `[1, 1024]`.
324    /// Iceberg-compatible Murmur3-x86-32 (seed 0) is used so each row's
325    /// `bucket(column, num_buckets)` value is stable across processes.
326    Bucket {
327        column: String,
328        num_buckets: u32,
329        /// Names of indexes (already created on the table) that the
330        /// MemWAL should maintain in-memory as rows are appended.
331        maintained_indexes: Vec<String>,
332        /// Default `ShardWriter` configuration recorded in the MemWAL index.
333        writer_config_defaults: HashMap<String, String>,
334    },
335    /// Identity sharding — shard by the raw value of `column`.
336    ///
337    /// Use this when the data is already partitioned by `column`; each
338    /// distinct value of `column` becomes its own shard.
339    Identity {
340        column: String,
341        /// Names of indexes (already created on the table) that the
342        /// MemWAL should maintain in-memory as rows are appended.
343        maintained_indexes: Vec<String>,
344        /// Default `ShardWriter` configuration recorded in the MemWAL index.
345        writer_config_defaults: HashMap<String, String>,
346    },
347    /// No sharding — every `merge_insert` call writes to a single MemWAL shard.
348    Unsharded {
349        /// Names of indexes (already created on the table) that the
350        /// MemWAL should maintain in-memory as rows are appended.
351        maintained_indexes: Vec<String>,
352        /// Default `ShardWriter` configuration recorded in the MemWAL index.
353        writer_config_defaults: HashMap<String, String>,
354    },
355}
356
357impl LsmWriteSpec {
358    /// Construct a hash-bucket sharding spec with no maintained indexes.
359    pub fn bucket(column: impl Into<String>, num_buckets: u32) -> Self {
360        Self::Bucket {
361            column: column.into(),
362            num_buckets,
363            maintained_indexes: Vec::new(),
364            writer_config_defaults: HashMap::new(),
365        }
366    }
367
368    /// Construct an identity-sharding spec (shard by the raw value of
369    /// `column`) with no maintained indexes.
370    pub fn identity(column: impl Into<String>) -> Self {
371        Self::Identity {
372            column: column.into(),
373            maintained_indexes: Vec::new(),
374            writer_config_defaults: HashMap::new(),
375        }
376    }
377
378    /// Construct an unsharded spec with no maintained indexes.
379    pub fn unsharded() -> Self {
380        Self::Unsharded {
381            maintained_indexes: Vec::new(),
382            writer_config_defaults: HashMap::new(),
383        }
384    }
385
386    /// Replace the list of indexes the MemWAL should keep up to date as
387    /// rows are appended. Each name must reference an index that already
388    /// exists on the table at the time `set_lsm_write_spec` is called.
389    pub fn with_maintained_indexes<I, S>(mut self, indexes: I) -> Self
390    where
391        I: IntoIterator<Item = S>,
392        S: Into<String>,
393    {
394        let v: Vec<String> = indexes.into_iter().map(Into::into).collect();
395        match &mut self {
396            Self::Bucket {
397                maintained_indexes, ..
398            }
399            | Self::Identity {
400                maintained_indexes, ..
401            }
402            | Self::Unsharded {
403                maintained_indexes, ..
404            } => *maintained_indexes = v,
405        }
406        self
407    }
408
409    /// Replace the default `ShardWriter` configuration recorded in the MemWAL
410    /// index, so every writer starts from the same defaults. Keys are
411    /// `ShardWriter` config field names (`Duration` knobs use a `_ms` suffix);
412    /// values are their string encodings.
413    pub fn with_writer_config_defaults<I, K, V>(mut self, defaults: I) -> Self
414    where
415        I: IntoIterator<Item = (K, V)>,
416        K: Into<String>,
417        V: Into<String>,
418    {
419        let m: HashMap<String, String> = defaults
420            .into_iter()
421            .map(|(k, v)| (k.into(), v.into()))
422            .collect();
423        match &mut self {
424            Self::Bucket {
425                writer_config_defaults,
426                ..
427            }
428            | Self::Identity {
429                writer_config_defaults,
430                ..
431            }
432            | Self::Unsharded {
433                writer_config_defaults,
434                ..
435            } => *writer_config_defaults = m,
436        }
437        self
438    }
439
440    /// Borrow the list of index names this spec asks MemWAL to maintain.
441    pub fn maintained_indexes(&self) -> &[String] {
442        match self {
443            Self::Bucket {
444                maintained_indexes, ..
445            }
446            | Self::Identity {
447                maintained_indexes, ..
448            }
449            | Self::Unsharded {
450                maintained_indexes, ..
451            } => maintained_indexes,
452        }
453    }
454
455    /// Borrow the default `ShardWriter` configuration recorded by this spec.
456    pub fn writer_config_defaults(&self) -> &HashMap<String, String> {
457        match self {
458            Self::Bucket {
459                writer_config_defaults,
460                ..
461            }
462            | Self::Identity {
463                writer_config_defaults,
464                ..
465            }
466            | Self::Unsharded {
467                writer_config_defaults,
468                ..
469            } => writer_config_defaults,
470        }
471    }
472}
473
474/// A trait for anything "table-like".  This is used for both native tables (which target
475/// Lance datasets) and remote tables (which target LanceDB cloud)
476///
477/// This trait is still EXPERIMENTAL and subject to change in the future
478#[async_trait]
479pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
480    /// Get a reference to std::any::Any
481    fn as_any(&self) -> &dyn std::any::Any;
482    /// Get the name of the table.
483    fn name(&self) -> &str;
484    /// Get the namespace of the table.
485    fn namespace(&self) -> &[String];
486    /// Get the id of the table
487    ///
488    /// This is the namespace of the table concatenated with the name
489    /// separated by $
490    fn id(&self) -> &str;
491    /// Get the arrow [Schema] of the table.
492    async fn schema(&self) -> Result<SchemaRef>;
493    /// Count the number of rows in this table.
494    async fn count_rows(&self, filter: Option<Filter>) -> Result<usize>;
495    /// Create a physical plan for the query.
496    async fn create_plan(
497        &self,
498        query: &AnyQuery,
499        options: QueryExecutionOptions,
500    ) -> Result<Arc<dyn ExecutionPlan>>;
501    /// Execute a query and return the results as a stream of RecordBatches.
502    async fn query(
503        &self,
504        query: &AnyQuery,
505        options: QueryExecutionOptions,
506    ) -> Result<DatasetRecordBatchStream>;
507    /// Explain the plan for a query.
508    async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result<String> {
509        let plan = self.create_plan(query, Default::default()).await?;
510        let display = DisplayableExecutionPlan::new(plan.as_ref());
511
512        Ok(format!("{}", display.indent(verbose)))
513    }
514    async fn analyze_plan(
515        &self,
516        query: &AnyQuery,
517        options: QueryExecutionOptions,
518    ) -> Result<String>;
519
520    /// Add new records to the table.
521    async fn add(&self, add: AddDataBuilder) -> Result<AddResult>;
522    /// Delete rows from the table matching the given [`Predicate`].
523    async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult>;
524    /// Update rows in the table.
525    async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult>;
526    /// Create an index on the provided column(s).
527    async fn create_index(&self, index: IndexBuilder) -> Result<()>;
528    /// List the indices on the table.
529    async fn list_indices(&self) -> Result<Vec<IndexConfig>>;
530    /// Drop an index from the table.
531    async fn drop_index(&self, name: &str) -> Result<()>;
532    /// Prewarm an index in the table.
533    async fn prewarm_index(&self, name: &str) -> Result<()>;
534    /// Prewarm data for the table.
535    ///
536    /// Currently only supported on remote tables.
537    /// If `columns` is `None`, all columns are prewarmed.
538    async fn prewarm_data(&self, columns: Option<Vec<String>>) -> Result<()>;
539    /// Get statistics about the index.
540    async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>>;
541    /// Merge insert new records into the table.
542    async fn merge_insert(
543        &self,
544        params: MergeInsertBuilder,
545        new_data: Box<dyn RecordBatchReader + Send>,
546    ) -> Result<MergeResult>;
547    /// Set the unenforced primary key for the table to a single column.
548    ///
549    /// "Unenforced" means LanceDB does not check uniqueness on writes; the
550    /// column is recorded in the schema as the primary key for use by
551    /// features such as `merge_insert`. Only single-column primary keys are
552    /// supported, and the key cannot be changed once set.
553    ///
554    /// The default implementation returns `NotSupported`; table types
555    /// backed by a Lance dataset override it.
556    async fn set_unenforced_primary_key(&self, _columns: &[&str]) -> Result<()> {
557        Err(Error::NotSupported {
558            message: "set_unenforced_primary_key is not supported on this table type".into(),
559        })
560    }
561    /// Install an [`LsmWriteSpec`] on this table.
562    ///
563    /// The spec selects Lance's MemWAL LSM-style write path for future
564    /// `merge_insert` calls.
565    ///
566    /// The default implementation returns `NotSupported`. Implementations
567    /// that support the MemWAL LSM write path must override this.
568    async fn set_lsm_write_spec(&self, _spec: LsmWriteSpec) -> Result<()> {
569        Err(Error::NotSupported {
570            message: "set_lsm_write_spec is not supported on this table type".into(),
571        })
572    }
573    /// Remove the [`LsmWriteSpec`] from this table.
574    ///
575    /// This is a no-op if no spec is currently set.
576    ///
577    /// The default implementation returns `NotSupported`. Implementations
578    /// that support the MemWAL LSM write path must override this.
579    async fn unset_lsm_write_spec(&self) -> Result<()> {
580        Err(Error::NotSupported {
581            message: "unset_lsm_write_spec is not supported on this table type".into(),
582        })
583    }
584    /// Gets the table tag manager.
585    async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
586    /// Optimize the dataset.
587    async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
588    /// Add columns to the table.
589    async fn add_columns(
590        &self,
591        transforms: NewColumnTransform,
592        read_columns: Option<Vec<String>>,
593    ) -> Result<AddColumnsResult>;
594    /// Alter columns in the table.
595    async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult>;
596    /// Drop columns from the table.
597    async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult>;
598    /// Get the version of the table.
599    async fn version(&self) -> Result<u64>;
600    /// Checkout a specific version of the table.
601    async fn checkout(&self, version: u64) -> Result<()>;
602    /// Checkout a table version referenced by a tag.
603    /// Tags provide a human-readable way to reference specific versions of the table.
604    async fn checkout_tag(&self, tag: &str) -> Result<()>;
605    /// Checkout the latest version of the table.
606    async fn checkout_latest(&self) -> Result<()>;
607    /// Restore the table to the currently checked out version.
608    async fn restore(&self) -> Result<()>;
609    /// List the versions of the table.
610    async fn list_versions(&self) -> Result<Vec<Version>>;
611    /// Get the table definition.
612    async fn table_definition(&self) -> Result<TableDefinition>;
613    /// Get the table URI (storage location)
614    async fn uri(&self) -> Result<String>;
615    /// Get the storage options used when opening this table, if any.
616    #[deprecated(since = "0.25.0", note = "Use initial_storage_options() instead")]
617    async fn storage_options(&self) -> Option<HashMap<String, String>>;
618    /// Get the initial storage options that were passed in when opening this table.
619    ///
620    /// For dynamically refreshed options (e.g., credential vending), use [`Self::latest_storage_options`].
621    async fn initial_storage_options(&self) -> Option<HashMap<String, String>>;
622    /// Get the latest storage options, refreshing from provider if configured.
623    ///
624    /// Returns `Ok(Some(options))` if storage options are available (static or refreshed),
625    /// `Ok(None)` if no storage options were configured, or `Err(...)` if refresh failed.
626    async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>>;
627    /// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
628    /// are not fully indexed within the timeout.
629    async fn wait_for_index(
630        &self,
631        index_names: &[&str],
632        timeout: std::time::Duration,
633    ) -> Result<()>;
634    /// Get statistics on the table
635    async fn stats(&self) -> Result<TableStatistics>;
636    /// Create an ExecutionPlan for inserting data into the table.
637    ///
638    /// This is used by the DataFusion TableProvider implementation to support
639    /// INSERT INTO statements.
640    async fn create_insert_exec(
641        &self,
642        _input: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
643        _write_params: WriteParams,
644    ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>> {
645        Err(Error::NotSupported {
646            message: "create_insert_exec not implemented".to_string(),
647        })
648    }
649}
650
651/// A Table is a collection of strong typed Rows.
652///
653/// The type of the each row is defined in Apache Arrow [Schema].
654#[derive(Clone, Debug)]
655pub struct Table {
656    inner: Arc<dyn BaseTable>,
657    database: Option<Arc<dyn Database>>,
658    embedding_registry: Arc<dyn EmbeddingRegistry>,
659}
660
661#[cfg(all(test, feature = "remote"))]
662mod test_utils {
663    use super::*;
664
665    impl Table {
666        pub fn new_with_handler<T>(
667            name: impl Into<String>,
668            handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
669        ) -> Self
670        where
671            T: Into<reqwest::Body>,
672        {
673            let inner = Arc::new(crate::remote::table::RemoteTable::new_mock(
674                name.into(),
675                handler.clone(),
676                None,
677            ));
678            let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
679            Self {
680                inner,
681                database: Some(database),
682                // Registry is unused.
683                embedding_registry: Arc::new(MemoryRegistry::new()),
684            }
685        }
686
687        pub fn new_with_handler_and_interval<T>(
688            name: impl Into<String>,
689            handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
690            read_consistency_interval: Option<std::time::Duration>,
691        ) -> Self
692        where
693            T: Into<reqwest::Body>,
694        {
695            let inner = Arc::new(
696                crate::remote::table::RemoteTable::new_mock_with_consistency_interval(
697                    name.into(),
698                    handler.clone(),
699                    read_consistency_interval,
700                ),
701            );
702            let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
703            Self {
704                inner,
705                database: Some(database),
706                // Registry is unused.
707                embedding_registry: Arc::new(MemoryRegistry::new()),
708            }
709        }
710
711        pub fn new_with_handler_version<T>(
712            name: impl Into<String>,
713            version: semver::Version,
714            handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
715        ) -> Self
716        where
717            T: Into<reqwest::Body>,
718        {
719            let inner = Arc::new(crate::remote::table::RemoteTable::new_mock(
720                name.into(),
721                handler.clone(),
722                Some(version),
723            ));
724            let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
725            Self {
726                inner,
727                database: Some(database),
728                // Registry is unused.
729                embedding_registry: Arc::new(MemoryRegistry::new()),
730            }
731        }
732
733        pub fn new_with_handler_and_config<T>(
734            name: impl Into<String>,
735            handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
736            config: crate::remote::ClientConfig,
737        ) -> Self
738        where
739            T: Into<reqwest::Body>,
740        {
741            let inner = Arc::new(crate::remote::table::RemoteTable::new_mock_with_config(
742                name.into(),
743                handler.clone(),
744                config.clone(),
745            ));
746            let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock_with_config(
747                handler, config,
748            ));
749            Self {
750                inner,
751                database: Some(database),
752                // Registry is unused.
753                embedding_registry: Arc::new(MemoryRegistry::new()),
754            }
755        }
756
757        pub fn new_with_handler_version_and_config<T>(
758            name: impl Into<String>,
759            version: semver::Version,
760            handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
761            config: crate::remote::ClientConfig,
762        ) -> Self
763        where
764            T: Into<reqwest::Body>,
765        {
766            let inner = Arc::new(
767                crate::remote::table::RemoteTable::new_mock_with_version_and_config(
768                    name.into(),
769                    handler.clone(),
770                    Some(version),
771                    config.clone(),
772                ),
773            );
774            let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock_with_config(
775                handler, config,
776            ));
777            Self {
778                inner,
779                database: Some(database),
780                // Registry is unused.
781                embedding_registry: Arc::new(MemoryRegistry::new()),
782            }
783        }
784    }
785}
786
787impl std::fmt::Display for Table {
788    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
789        write!(f, "{}", self.inner)
790    }
791}
792
793impl From<Arc<dyn BaseTable>> for Table {
794    fn from(inner: Arc<dyn BaseTable>) -> Self {
795        Self {
796            inner,
797            database: None,
798            embedding_registry: Arc::new(MemoryRegistry::new()),
799        }
800    }
801}
802
803impl Table {
804    pub fn new(inner: Arc<dyn BaseTable>, database: Arc<dyn Database>) -> Self {
805        Self {
806            inner,
807            database: Some(database),
808            embedding_registry: Arc::new(MemoryRegistry::new()),
809        }
810    }
811
812    pub fn base_table(&self) -> &Arc<dyn BaseTable> {
813        &self.inner
814    }
815
816    pub fn database(&self) -> &Arc<dyn Database> {
817        self.database.as_ref().unwrap()
818    }
819
820    pub fn embedding_registry(&self) -> &Arc<dyn EmbeddingRegistry> {
821        &self.embedding_registry
822    }
823
824    pub(crate) fn new_with_embedding_registry(
825        inner: Arc<dyn BaseTable>,
826        database: Arc<dyn Database>,
827        embedding_registry: Arc<dyn EmbeddingRegistry>,
828    ) -> Self {
829        Self {
830            inner,
831            database: Some(database),
832            embedding_registry,
833        }
834    }
835
836    /// Cast as [`NativeTable`], or return None it if is not a [`NativeTable`].
837    ///
838    /// Warning: This function will be removed soon (features exclusive to NativeTable
839    ///          will be added to Table)
840    pub fn as_native(&self) -> Option<&NativeTable> {
841        self.inner.as_native()
842    }
843
844    /// Get the name of the table.
845    pub fn name(&self) -> &str {
846        self.inner.name()
847    }
848
849    /// Get the namespace of the table.
850    pub fn namespace(&self) -> &[String] {
851        self.inner.namespace()
852    }
853
854    /// Get the ID of the table (namespace + name joined by '$').
855    pub fn id(&self) -> &str {
856        self.inner.id()
857    }
858
859    /// Get the dataset of the table if it is a native table
860    ///
861    /// Returns None otherwise
862    pub fn dataset(&self) -> Option<&dataset::DatasetConsistencyWrapper> {
863        self.inner.as_native().map(|t| &t.dataset)
864    }
865
866    /// Get the arrow [Schema] of the table.
867    pub async fn schema(&self) -> Result<SchemaRef> {
868        self.inner.schema().await
869    }
870
871    /// Count the number of rows in this dataset.
872    ///
873    /// # Arguments
874    ///
875    /// * `filter` if present, only count rows matching the filter
876    pub async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
877        self.inner.count_rows(filter.map(Filter::Sql)).await
878    }
879
880    /// Insert new records into this Table
881    ///
882    /// # Arguments
883    ///
884    /// * `data` data to be added to the Table
885    /// * `options` options to control how data is added
886    pub fn add<T: Scannable + 'static>(&self, data: T) -> AddDataBuilder {
887        AddDataBuilder::new(
888            self.inner.clone(),
889            Box::new(data),
890            Some(self.embedding_registry.clone()),
891        )
892    }
893
894    /// Update existing records in the Table
895    ///
896    /// An update operation can be used to adjust existing values.  Use the
897    /// returned builder to specify which columns to update.  The new value
898    /// can be a literal value (e.g. replacing nulls with some default value)
899    /// or an expression applied to the old value (e.g. incrementing a value)
900    ///
901    /// An optional condition can be specified (e.g. "only update if the old
902    /// value is 0")
903    ///
904    /// Note: if your condition is something like "some_id_column == 7" and
905    /// you are updating many rows (with different ids) then you will get
906    /// better performance with a single [`merge_insert`] call instead of
907    /// repeatedly calilng this method.
908    pub fn update(&self) -> UpdateBuilder {
909        UpdateBuilder::new(self.inner.clone())
910    }
911
912    /// Delete the rows from table that match the predicate.
913    ///
914    /// # Arguments
915    /// - `predicate` - A SQL string (`&str`) or DataFusion expression (`&Expr`)
916    ///   that selects the rows to delete.
917    ///
918    /// # Example
919    ///
920    /// ```no_run
921    /// # use std::sync::Arc;
922    /// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
923    /// #   RecordBatchIterator, Int32Array};
924    /// # use arrow_schema::{Schema, Field, DataType};
925    /// use datafusion_expr::{col, lit};
926    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
927    /// let tmpdir = tempfile::tempdir().unwrap();
928    /// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
929    ///     .execute()
930    ///     .await
931    ///     .unwrap();
932    /// let schema = Arc::new(Schema::new(vec![
933    ///     Field::new("id", DataType::Int32, false),
934    ///     Field::new("vector", DataType::FixedSizeList(
935    ///         Arc::new(Field::new("item", DataType::Float32, true)), 128), true),
936    /// ]));
937    /// let data = RecordBatch::try_new(
938    ///     schema.clone(),
939    ///     vec![
940    ///         Arc::new(Int32Array::from_iter_values(0..10)),
941    ///         Arc::new(
942    ///             FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
943    ///                 (0..10).map(|_| Some(vec![Some(1.0); 128])),
944    ///                 128,
945    ///             ),
946    ///         ),
947    ///     ],
948    /// )
949    /// .unwrap();
950    /// let tbl = db
951    ///     .create_table("delete_test", data)
952    ///     .execute()
953    ///     .await
954    ///     .unwrap();
955    ///
956    /// // Using a SQL string:
957    /// tbl.delete("id > 5").await.unwrap();
958    ///
959    /// // Using a DataFusion expression:
960    /// let expr = col("id").lt(lit(4));
961    /// tbl.delete(&expr).await.unwrap();
962    /// # });
963    /// ```
964    pub async fn delete(&self, predicate: impl Into<Predicate<'_>>) -> Result<DeleteResult> {
965        self.inner.delete(predicate.into()).await
966    }
967
968    /// Create an index on the provided column(s).
969    ///
970    /// Indices are used to speed up searches and are often needed when the size of the table
971    /// becomes large (the exact size depends on many factors but somewhere between 100K rows
972    /// and 1M rows is a good rule of thumb)
973    ///
974    /// There are a variety of indices available.  They are described more in
975    /// [`crate::index::Index`].  The simplest thing to do is to use `index::Index::Auto` which
976    /// will attempt to create the most useful index based on the column type and column
977    /// statistics. `BTree` index is created by default for numeric, temporal, and
978    /// string columns.
979    ///
980    /// Once an index is created it will remain until the data is overwritten (e.g. an
981    /// add operation with mode overwrite) or the indexed column is dropped.
982    ///
983    /// Indices are not automatically updated with new data.  If you add new data to the
984    /// table then the index will not include the new rows.  However, a table search will
985    /// still consider the unindexed rows.  Searches will issue both an indexed search (on
986    /// the data covered by the index) and a flat search (on the unindexed data) and the
987    /// results will be combined.
988    ///
989    /// If there is enough unindexed data then the flat search will become slow and the index
990    /// should be optimized.  Optimizing an index will add any unindexed data to the existing
991    /// index without rerunning the full index creation process.  For more details see
992    /// [Table::optimize].
993    ///
994    /// Note: Multi-column (composite) indices are not currently supported.  However, they will
995    /// be supported in the future and the API is designed to be compatible with them.
996    ///
997    /// # Examples
998    ///
999    /// ```no_run
1000    /// # use std::sync::Arc;
1001    /// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
1002    /// #   RecordBatchIterator, Int32Array};
1003    /// # use arrow_schema::{Schema, Field, DataType};
1004    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1005    /// use lancedb::index::Index;
1006    /// let tmpdir = tempfile::tempdir().unwrap();
1007    /// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
1008    ///     .execute()
1009    ///     .await
1010    ///     .unwrap();
1011    /// # let tbl = db.open_table("idx_test").execute().await.unwrap();
1012    /// // Create IVF PQ index on the "vector" column by default.
1013    /// tbl.create_index(&["vector"], Index::Auto)
1014    ///    .execute()
1015    ///    .await
1016    ///    .unwrap();
1017    /// // Create a BTree index on the "id" column.
1018    /// tbl.create_index(&["id"], Index::Auto)
1019    ///     .execute()
1020    ///     .await
1021    ///     .unwrap();
1022    /// // Create a LabelList index on the "tags" column.
1023    /// tbl.create_index(&["tags"], Index::LabelList(Default::default()))
1024    ///     .execute()
1025    ///     .await
1026    ///     .unwrap();
1027    /// # });
1028    /// ```
1029    pub fn create_index(&self, columns: &[impl AsRef<str>], index: Index) -> IndexBuilder {
1030        IndexBuilder::new(
1031            self.inner.clone(),
1032            columns
1033                .iter()
1034                .map(|val| val.as_ref().to_string())
1035                .collect::<Vec<_>>(),
1036            index,
1037        )
1038    }
1039
1040    /// See [Table::create_index]
1041    /// For remote tables, this allows an optional wait_timeout to poll until asynchronous indexing is complete
1042    pub fn create_index_with_timeout(
1043        &self,
1044        columns: &[impl AsRef<str>],
1045        index: Index,
1046        wait_timeout: Option<std::time::Duration>,
1047    ) -> IndexBuilder {
1048        let mut builder = IndexBuilder::new(
1049            self.inner.clone(),
1050            columns
1051                .iter()
1052                .map(|val| val.as_ref().to_string())
1053                .collect::<Vec<_>>(),
1054            index,
1055        );
1056        if let Some(timeout) = wait_timeout {
1057            builder = builder.wait_timeout(timeout);
1058        }
1059        builder
1060    }
1061
1062    /// Create a builder for a merge insert operation
1063    ///
1064    /// This operation can add rows, update rows, and remove rows all in a single
1065    /// transaction. It is a very generic tool that can be used to create
1066    /// behaviors like "insert if not exists", "update or insert (i.e. upsert)",
1067    /// or even replace a portion of existing data with new data (e.g. replace
1068    /// all data where month="january")
1069    ///
1070    /// The merge insert operation works by combining new data from a
1071    /// **source table** with existing data in a **target table** by using a
1072    /// join.  There are three categories of records.
1073    ///
1074    /// "Matched" records are records that exist in both the source table and
1075    /// the target table. "Not matched" records exist only in the source table
1076    /// (e.g. these are new data) "Not matched by source" records exist only
1077    /// in the target table (this is old data)
1078    ///
1079    /// The builder returned by this method can be used to customize what
1080    /// should happen for each category of data.
1081    ///
1082    /// Please note that the data may appear to be reordered as part of this
1083    /// operation.  This is because updated rows will be deleted from the
1084    /// dataset and then reinserted at the end with the new values.
1085    ///
1086    /// # Arguments
1087    ///
1088    /// * `on` One or more columns to join on.  This is how records from the
1089    ///   source table and target table are matched.  Typically this is some
1090    ///   kind of key or id column.
1091    ///
1092    /// # Examples
1093    ///
1094    /// ```no_run
1095    /// # use std::sync::Arc;
1096    /// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
1097    /// #   RecordBatchIterator, Int32Array};
1098    /// # use arrow_schema::{Schema, Field, DataType};
1099    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1100    /// let tmpdir = tempfile::tempdir().unwrap();
1101    /// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
1102    ///     .execute()
1103    ///     .await
1104    ///     .unwrap();
1105    /// # let tbl = db.open_table("idx_test").execute().await.unwrap();
1106    /// # let schema = Arc::new(Schema::new(vec![
1107    /// #  Field::new("id", DataType::Int32, false),
1108    /// #  Field::new("vector", DataType::FixedSizeList(
1109    /// #    Arc::new(Field::new("item", DataType::Float32, true)), 128), true),
1110    /// # ]));
1111    /// let new_data = RecordBatchIterator::new(
1112    ///     vec![RecordBatch::try_new(
1113    ///         schema.clone(),
1114    ///         vec![
1115    ///             Arc::new(Int32Array::from_iter_values(0..10)),
1116    ///             Arc::new(
1117    ///                 FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
1118    ///                     (0..10).map(|_| Some(vec![Some(1.0); 128])),
1119    ///                     128,
1120    ///                 ),
1121    ///             ),
1122    ///         ],
1123    ///     )
1124    ///     .unwrap()]
1125    ///     .into_iter()
1126    ///     .map(Ok),
1127    ///     schema.clone(),
1128    /// );
1129    /// // Perform an upsert operation
1130    /// let mut merge_insert = tbl.merge_insert(&["id"]);
1131    /// merge_insert
1132    ///     .when_matched_update_all(None)
1133    ///     .when_not_matched_insert_all();
1134    /// merge_insert.execute(Box::new(new_data)).await.unwrap();
1135    /// # });
1136    /// ```
1137    pub fn merge_insert(&self, on: &[&str]) -> MergeInsertBuilder {
1138        MergeInsertBuilder::new(
1139            self.inner.clone(),
1140            on.iter().map(|s| s.to_string()).collect(),
1141        )
1142    }
1143
1144    /// Create a [`Query`] Builder.
1145    ///
1146    /// Queries allow you to search your existing data.  By default the query will
1147    /// return all the data in the table in no particular order.  The builder
1148    /// returned by this method can be used to control the query using filtering,
1149    /// vector similarity, sorting, and more.
1150    ///
1151    /// Note: By default, all columns are returned.  For best performance, you should
1152    /// only fetch the columns you need.  See [`Query::select_with_projection`] for
1153    /// more details.
1154    ///
1155    /// When appropriate, various indices and statistics will be used to accelerate
1156    /// the query.
1157    ///
1158    /// # Examples
1159    ///
1160    /// ## Vector search
1161    ///
1162    /// This example will find the 10 rows whose value in the "vector" column are
1163    /// closest to the query vector [1.0, 2.0, 3.0].  If an index has been created
1164    /// on the "vector" column then this will perform an ANN search.
1165    ///
1166    /// The [`Query::refine_factor`] and [`Query::nprobes`] methods are used to
1167    /// control the recall / latency tradeoff of the search.
1168    ///
1169    /// ```no_run
1170    /// # use arrow_array::RecordBatch;
1171    /// # use futures::TryStreamExt;
1172    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1173    /// # let conn = lancedb::connect("/tmp").execute().await.unwrap();
1174    /// # let tbl = conn.open_table("tbl").execute().await.unwrap();
1175    /// use crate::lancedb::Table;
1176    /// use crate::lancedb::query::ExecutableQuery;
1177    /// let stream = tbl
1178    ///     .query()
1179    ///     .nearest_to(&[1.0, 2.0, 3.0])
1180    ///     .unwrap()
1181    ///     .refine_factor(5)
1182    ///     .nprobes(10)
1183    ///     .execute()
1184    ///     .await
1185    ///     .unwrap();
1186    /// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1187    /// # });
1188    /// ```
1189    ///
1190    /// ## SQL-style filter
1191    ///
1192    /// This query will return up to 1000 rows whose value in the `id` column
1193    /// is greater than 5.  LanceDb supports a broad set of filtering functions.
1194    ///
1195    /// ```no_run
1196    /// # use arrow_array::RecordBatch;
1197    /// # use futures::TryStreamExt;
1198    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1199    /// # let conn = lancedb::connect("/tmp").execute().await.unwrap();
1200    /// # let tbl = conn.open_table("tbl").execute().await.unwrap();
1201    /// use crate::lancedb::Table;
1202    /// use crate::lancedb::query::{ExecutableQuery, QueryBase};
1203    /// let stream = tbl
1204    ///     .query()
1205    ///     .only_if("id > 5")
1206    ///     .limit(1000)
1207    ///     .execute()
1208    ///     .await
1209    ///     .unwrap();
1210    /// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1211    /// # });
1212    /// ```
1213    ///
1214    /// ## Full scan
1215    ///
1216    /// This query will return everything in the table in no particular
1217    /// order.
1218    ///
1219    /// ```no_run
1220    /// # use arrow_array::RecordBatch;
1221    /// # use futures::TryStreamExt;
1222    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1223    /// # let conn = lancedb::connect("/tmp").execute().await.unwrap();
1224    /// # let tbl = conn.open_table("tbl").execute().await.unwrap();
1225    /// use crate::lancedb::Table;
1226    /// use crate::lancedb::query::ExecutableQuery;
1227    /// let stream = tbl.query().execute().await.unwrap();
1228    /// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1229    /// # });
1230    /// ```
1231    pub fn query(&self) -> Query {
1232        Query::new(self.inner.clone())
1233    }
1234
1235    /// Extract rows from the dataset using dataset offsets.
1236    ///
1237    /// Dataset offsets are 0-indexed and relative to the current version of the table.
1238    /// They are not stable.  A row with an offset of N may have a different offset in a
1239    /// different version of the table (e.g. if an earlier row is deleted).
1240    ///
1241    /// Offsets are useful for sampling as the set of all valid offsets is easily
1242    /// known in advance to be [0, len(table)).
1243    ///
1244    /// No guarantees are made regarding the order in which results are returned.  If you
1245    /// desire an output order that matches the order of the given offsets, you will need
1246    /// to add the row offset column to the output and align it yourself.
1247    ///
1248    /// Parameters
1249    /// ----------
1250    /// offsets: list[int]
1251    ///     The offsets to take.
1252    ///
1253    /// Returns
1254    /// -------
1255    /// pa.RecordBatch
1256    ///     A record batch containing the rows at the given offsets.
1257    pub fn take_offsets(&self, offsets: Vec<u64>) -> TakeQuery {
1258        TakeQuery::from_offsets(self.inner.clone(), offsets)
1259    }
1260
1261    /// Extract rows from the dataset using row ids.
1262    ///
1263    /// Row ids are not stable and are relative to the current version of the table.
1264    /// They can change due to compaction and updates.
1265    ///
1266    /// Even so, row ids are more stable than offsets and can be useful in some situations.
1267    ///
1268    /// There is an ongoing effort to make row ids stable which is tracked at
1269    /// https://github.com/lancedb/lancedb/issues/1120
1270    ///
1271    /// No guarantees are made regarding the order in which results are returned.  If you
1272    /// desire an output order that matches the order of the given ids, you will need
1273    /// to add the row id column to the output and align it yourself.
1274    /// Parameters
1275    /// ----------
1276    /// row_ids: list[int]
1277    ///     The row ids to take.
1278    ///
1279    pub fn take_row_ids(&self, row_ids: Vec<u64>) -> TakeQuery {
1280        TakeQuery::from_row_ids(self.inner.clone(), row_ids)
1281    }
1282
1283    /// Search the table with a given query vector.
1284    ///
1285    /// This is a convenience method for preparing a vector query and
1286    /// is the same thing as calling `nearest_to` on the builder returned
1287    /// by `query`.  See [`Query::nearest_to`] for more details.
1288    pub fn vector_search(&self, query: impl IntoQueryVector) -> Result<VectorQuery> {
1289        self.query().nearest_to(query)
1290    }
1291
1292    /// Optimize the on-disk data and indices for better performance.
1293    ///
1294    /// Modeled after ``VACUUM`` in PostgreSQL.
1295    ///
1296    /// Optimization is discussed in more detail in the [OptimizeAction] documentation
1297    /// and covers three operations:
1298    ///
1299    ///  * Compaction: Merges small files into larger ones
1300    ///  * Prune: Removes old versions of the dataset
1301    ///  * Index: Optimizes the indices, adding new data to existing indices
1302    ///
1303    /// The frequency an application should call optimize is based on the frequency of
1304    /// data modifications.  If data is frequently added, deleted, or updated then
1305    /// optimize should be run frequently.  A good rule of thumb is to run optimize if
1306    /// you have added or modified 100,000 or more records or run more than 20 data
1307    /// modification operations.
1308    pub async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
1309        self.inner.optimize(action).await
1310    }
1311
1312    /// Add new columns to the table, providing values to fill in.
1313    pub async fn add_columns(
1314        &self,
1315        transforms: NewColumnTransform,
1316        read_columns: Option<Vec<String>>,
1317    ) -> Result<AddColumnsResult> {
1318        self.inner.add_columns(transforms, read_columns).await
1319    }
1320
1321    /// Change a column's name or nullability.
1322    pub async fn alter_columns(
1323        &self,
1324        alterations: &[ColumnAlteration],
1325    ) -> Result<AlterColumnsResult> {
1326        self.inner.alter_columns(alterations).await
1327    }
1328
1329    /// Remove columns from the table.
1330    pub async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
1331        self.inner.drop_columns(columns).await
1332    }
1333
1334    /// Set the unenforced primary key for this table to a single column.
1335    ///
1336    /// "Unenforced" means LanceDB does not check uniqueness on writes; the
1337    /// column is recorded in the schema as the primary key so that features
1338    /// such as `merge_insert` can use it.
1339    ///
1340    /// Only single-column primary keys are supported, and the key cannot be
1341    /// changed once set — calling this on a table that already has an
1342    /// unenforced primary key fails. `columns` is an iterable for binding
1343    /// ergonomics but must yield exactly one column:
1344    ///
1345    /// - `table.set_unenforced_primary_key(["id"])`
1346    pub async fn set_unenforced_primary_key<I, S>(&self, columns: I) -> Result<()>
1347    where
1348        I: IntoIterator<Item = S>,
1349        S: Into<String>,
1350    {
1351        let owned: Vec<String> = columns.into_iter().map(Into::into).collect();
1352        let borrowed: Vec<&str> = owned.iter().map(String::as_str).collect();
1353        self.inner.set_unenforced_primary_key(&borrowed).await
1354    }
1355
1356    /// Install an [`LsmWriteSpec`] on this table, selecting Lance's MemWAL
1357    /// LSM-style write path for future `merge_insert` calls.
1358    ///
1359    /// [`LsmWriteSpec`] chooses one of three sharding strategies:
1360    ///
1361    /// - [`LsmWriteSpec::bucket`] — hash-bucket writes by a scalar column.
1362    /// - [`LsmWriteSpec::identity`] — shard by the raw value of a scalar column.
1363    /// - [`LsmWriteSpec::unsharded`] — route every write to a single shard.
1364    ///
1365    /// # Example
1366    ///
1367    /// ```
1368    /// # use lancedb::table::{LsmWriteSpec, Table};
1369    /// # async fn example(table: &Table) -> Result<(), Box<dyn std::error::Error>> {
1370    /// table
1371    ///     .set_lsm_write_spec(
1372    ///         LsmWriteSpec::bucket("id", 16).with_maintained_indexes(["id_idx"]),
1373    ///     )
1374    ///     .await?;
1375    /// # Ok(())
1376    /// # }
1377    /// ```
1378    pub async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> Result<()> {
1379        self.inner.set_lsm_write_spec(spec).await
1380    }
1381
1382    /// Remove the [`LsmWriteSpec`] from this table, reverting to the standard
1383    /// `merge_insert` write path.
1384    ///
1385    /// Errors if no spec is currently set.
1386    pub async fn unset_lsm_write_spec(&self) -> Result<()> {
1387        self.inner.unset_lsm_write_spec().await
1388    }
1389
1390    /// Retrieve the version of the table
1391    ///
1392    /// LanceDb supports versioning.  Every operation that modifies the table increases
1393    /// version.  As long as a version hasn't been deleted you can `[Self::checkout]` that
1394    /// version to view the data at that point.  In addition, you can `[Self::restore]` the
1395    /// version to replace the current table with a previous version.
1396    pub async fn version(&self) -> Result<u64> {
1397        self.inner.version().await
1398    }
1399
1400    /// Checks out a specific version of the Table
1401    ///
1402    /// Any read operation on the table will now access the data at the checked out version.
1403    /// As a consequence, calling this method will disable any read consistency interval
1404    /// that was previously set.
1405    ///
1406    /// This is a read-only operation that turns the table into a sort of "view"
1407    /// or "detached head".  Other table instances will not be affected.  To make the change
1408    /// permanent you can use the `[Self::restore]` method.
1409    ///
1410    /// Any operation that modifies the table will fail while the table is in a checked
1411    /// out state.
1412    ///
1413    /// To return the table to a normal state use `[Self::checkout_latest]`
1414    pub async fn checkout(&self, version: u64) -> Result<()> {
1415        self.inner.checkout(version).await
1416    }
1417
1418    /// Checks out a specific version of the Table by tag
1419    ///
1420    /// Any read operation on the table will now access the data at the version referenced by the tag.
1421    /// As a consequence, calling this method will disable any read consistency interval
1422    /// that was previously set.
1423    ///
1424    /// This is a read-only operation that turns the table into a sort of "view"
1425    /// or "detached head".  Other table instances will not be affected.  To make the change
1426    /// permanent you can use the `[Self::restore]` method.
1427    ///
1428    /// Any operation that modifies the table will fail while the table is in a checked
1429    /// out state.
1430    ///
1431    /// To return the table to a normal state use `[Self::checkout_latest]`
1432    pub async fn checkout_tag(&self, tag: &str) -> Result<()> {
1433        self.inner.checkout_tag(tag).await
1434    }
1435
1436    /// Ensures the table is pointing at the latest version
1437    ///
1438    /// This can be used to manually update a table when the read_consistency_interval is None
1439    /// It can also be used to undo a `[Self::checkout]` operation
1440    pub async fn checkout_latest(&self) -> Result<()> {
1441        self.inner.checkout_latest().await
1442    }
1443
1444    /// Restore the table to the currently checked out version
1445    ///
1446    /// This operation will fail if checkout has not been called previously
1447    ///
1448    /// This operation will overwrite the latest version of the table with a
1449    /// previous version.  Any changes made since the checked out version will
1450    /// no longer be visible.
1451    ///
1452    /// Once the operation concludes the table will no longer be in a checked
1453    /// out state and the read_consistency_interval, if any, will apply.
1454    pub async fn restore(&self) -> Result<()> {
1455        self.inner.restore().await
1456    }
1457
1458    /// List all the versions of the table
1459    pub async fn list_versions(&self) -> Result<Vec<Version>> {
1460        self.inner.list_versions().await
1461    }
1462
1463    /// List all indices that have been created with [`Self::create_index`]
1464    pub async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
1465        self.inner.list_indices().await
1466    }
1467
1468    /// Get the table URI (storage location)
1469    ///
1470    /// Returns the full storage location of the table (e.g., S3/GCS path).
1471    /// For remote tables, this fetches the location from the server via describe.
1472    pub async fn uri(&self) -> Result<String> {
1473        self.inner.uri().await
1474    }
1475
1476    /// Get the storage options used when opening this table, if any.
1477    ///
1478    /// Warning: This is an internal API and the return value is subject to change.
1479    #[deprecated(since = "0.25.0", note = "Use initial_storage_options() instead")]
1480    pub async fn storage_options(&self) -> Option<HashMap<String, String>> {
1481        #[allow(deprecated)]
1482        self.inner.storage_options().await
1483    }
1484
1485    /// Get the initial storage options that were passed in when opening this table.
1486    ///
1487    /// For dynamically refreshed options (e.g., credential vending), use [`Self::latest_storage_options`].
1488    ///
1489    /// Warning: This is an internal API and the return value is subject to change.
1490    pub async fn initial_storage_options(&self) -> Option<HashMap<String, String>> {
1491        self.inner.initial_storage_options().await
1492    }
1493
1494    /// Get the latest storage options, refreshing from provider if configured.
1495    ///
1496    /// This method is useful for credential vending scenarios where storage options
1497    /// may be refreshed dynamically. If no dynamic provider is configured, this
1498    /// returns the initial static options.
1499    ///
1500    /// Warning: This is an internal API and the return value is subject to change.
1501    pub async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>> {
1502        self.inner.latest_storage_options().await
1503    }
1504
1505    /// Get statistics about an index.
1506    /// Returns None if the index does not exist.
1507    pub async fn index_stats(
1508        &self,
1509        index_name: impl AsRef<str>,
1510    ) -> Result<Option<IndexStatistics>> {
1511        self.inner.index_stats(index_name.as_ref()).await
1512    }
1513
1514    /// Drop an index from the table.
1515    ///
1516    /// Note: This is not yet available in LanceDB cloud.
1517    ///
1518    /// This does not delete the index from disk, it just removes it from the table.
1519    /// To delete the index, run [`Self::optimize()`] after dropping the index.
1520    ///
1521    /// Use [`Self::list_indices()`] to find the names of the indices.
1522    pub async fn drop_index(&self, name: &str) -> Result<()> {
1523        self.inner.drop_index(name).await
1524    }
1525
1526    /// Prewarm an index in the table.
1527    ///
1528    /// This is a hint to the database that the index will be accessed in the
1529    /// future and should be loaded into memory if possible.  This can reduce
1530    /// cold-start latency for subsequent queries.
1531    ///
1532    /// This call initiates prewarming and returns once the request is accepted.
1533    /// It is idempotent and safe to call from multiple clients concurrently.
1534    ///
1535    /// It is generally wasteful to call this if the index does not fit into the
1536    /// available cache.  Not all index types support prewarming; unsupported
1537    /// indices will silently ignore the request.
1538    ///
1539    /// Use [`Self::list_indices()`] to find the names of the indices.
1540    pub async fn prewarm_index(&self, name: &str) -> Result<()> {
1541        self.inner.prewarm_index(name).await
1542    }
1543
1544    /// Prewarm data for the table.
1545    ///
1546    /// This is a hint to the database that the given columns will be accessed in
1547    /// the future and the database should prefetch the data if possible.  This
1548    /// can reduce cold-start latency for subsequent queries.  Currently only
1549    /// supported on remote tables.
1550    ///
1551    /// This call initiates prewarming and returns once the request is accepted.
1552    /// It is idempotent and safe to call from multiple clients concurrently —
1553    /// calling it on already-prewarmed columns is a no-op on the server.
1554    ///
1555    /// This operation has a large upfront cost but can speed up future queries
1556    /// that need to fetch the given columns.  Large columns such as embeddings
1557    /// or binary data may not be practical to prewarm.  This feature is intended
1558    /// for workloads that issue many queries against the same columns.
1559    ///
1560    /// If `columns` is `None`, all columns are prewarmed.
1561    pub async fn prewarm_data(&self, columns: Option<Vec<String>>) -> Result<()> {
1562        self.inner.prewarm_data(columns).await
1563    }
1564
1565    /// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
1566    /// are not fully indexed within the timeout.
1567    pub async fn wait_for_index(
1568        &self,
1569        index_names: &[&str],
1570        timeout: std::time::Duration,
1571    ) -> Result<()> {
1572        self.inner.wait_for_index(index_names, timeout).await
1573    }
1574
1575    /// Get the tags manager.
1576    pub async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
1577        self.inner.tags().await
1578    }
1579
1580    /// Retrieve statistics on the table
1581    pub async fn stats(&self) -> Result<TableStatistics> {
1582        self.inner.stats().await
1583    }
1584}
1585
1586pub struct NativeTags {
1587    dataset: dataset::DatasetConsistencyWrapper,
1588}
1589#[async_trait]
1590impl Tags for NativeTags {
1591    async fn list(&self) -> Result<HashMap<String, TagContents>> {
1592        let dataset = self.dataset.get().await?;
1593        Ok(dataset.tags().list().await?)
1594    }
1595
1596    async fn get_version(&self, tag: &str) -> Result<u64> {
1597        let dataset = self.dataset.get().await?;
1598        Ok(dataset.tags().get_version(tag).await?)
1599    }
1600
1601    async fn create(&mut self, tag: &str, version: u64) -> Result<()> {
1602        let dataset = self.dataset.get().await?;
1603        dataset.tags().create(tag, version).await?;
1604        Ok(())
1605    }
1606
1607    async fn delete(&mut self, tag: &str) -> Result<()> {
1608        let dataset = self.dataset.get().await?;
1609        dataset.tags().delete(tag).await?;
1610        Ok(())
1611    }
1612
1613    async fn update(&mut self, tag: &str, version: u64) -> Result<()> {
1614        let dataset = self.dataset.get().await?;
1615        dataset.tags().update(tag, version).await?;
1616        Ok(())
1617    }
1618}
1619
1620pub trait NativeTableExt {
1621    /// Cast as [`NativeTable`], or return None it if is not a [`NativeTable`].
1622    fn as_native(&self) -> Option<&NativeTable>;
1623}
1624
1625impl NativeTableExt for Arc<dyn BaseTable> {
1626    fn as_native(&self) -> Option<&NativeTable> {
1627        self.as_any().downcast_ref::<NativeTable>()
1628    }
1629}
1630
1631/// A table in a LanceDB database.
1632#[derive(Clone)]
1633pub struct NativeTable {
1634    name: String,
1635    namespace: Vec<String>,
1636    id: String,
1637    uri: String,
1638    pub(crate) dataset: dataset::DatasetConsistencyWrapper,
1639    // This comes from the connection options. We store here so we can pass down
1640    // to the dataset when we recreate it (for example, in checkout_latest).
1641    read_consistency_interval: Option<std::time::Duration>,
1642    // Optional namespace client for namespace operations (e.g., managed versioning).
1643    // pub(crate) so query.rs can access the field for server-side query execution.
1644    pub(crate) namespace_client: Option<Arc<dyn LanceNamespace>>,
1645    // Operations to push down to the namespace server.
1646    // pub(crate) so query.rs can access the field for server-side query execution.
1647    pub(crate) pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1648}
1649
1650impl std::fmt::Debug for NativeTable {
1651    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1652        f.debug_struct("NativeTable")
1653            .field("name", &self.name)
1654            .field("namespace", &self.namespace)
1655            .field("id", &self.id)
1656            .field("uri", &self.uri)
1657            .field("read_consistency_interval", &self.read_consistency_interval)
1658            .field("namespace_client", &self.namespace_client)
1659            .field("pushdown_operations", &self.pushdown_operations)
1660            .finish()
1661    }
1662}
1663
1664impl std::fmt::Display for NativeTable {
1665    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1666        write!(
1667            f,
1668            "NativeTable({}, uri={}, read_consistency_interval={})",
1669            self.name,
1670            self.uri,
1671            match self.read_consistency_interval {
1672                None => {
1673                    "None".to_string()
1674                }
1675                Some(duration) => {
1676                    format!("{}s", duration.as_secs_f64())
1677                }
1678            }
1679        )
1680    }
1681}
1682
1683impl NativeTable {
1684    /// Opens an existing Table
1685    ///
1686    /// # Arguments
1687    ///
1688    /// * `uri` - The uri to a [NativeTable]
1689    /// * `name` - The table name
1690    ///
1691    /// # Returns
1692    ///
1693    /// * A [NativeTable] object.
1694    pub async fn open(uri: &str) -> Result<Self> {
1695        let name = Self::get_table_name(uri)?;
1696        Self::open_with_params(
1697            uri,
1698            &name,
1699            vec![],
1700            None,
1701            None,
1702            None,
1703            None,
1704            HashSet::new(),
1705            None,
1706        )
1707        .await
1708    }
1709
1710    /// Opens an existing Table
1711    ///
1712    /// # Arguments
1713    ///
1714    /// * `base_path` - The base path where the table is located
1715    /// * `name` The Table name
1716    /// * `params` The [ReadParams] to use when opening the table
1717    /// * `namespace_client` - Optional namespace client for namespace operations
1718    /// * `pushdown_operations` - Operations to push down to the namespace server
1719    /// * `managed_versioning` - Whether managed versioning is enabled. If None and namespace_client
1720    ///   is provided, the value will be fetched via describe_table.
1721    ///
1722    /// # Returns
1723    ///
1724    /// * A [NativeTable] object.
1725    #[allow(clippy::too_many_arguments)]
1726    pub async fn open_with_params(
1727        uri: &str,
1728        name: &str,
1729        namespace: Vec<String>,
1730        write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1731        params: Option<ReadParams>,
1732        read_consistency_interval: Option<std::time::Duration>,
1733        namespace_client: Option<Arc<dyn LanceNamespace>>,
1734        pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1735        managed_versioning: Option<bool>,
1736    ) -> Result<Self> {
1737        let params = params.unwrap_or_default();
1738        // patch the params if we have a write store wrapper
1739        let params = match write_store_wrapper.clone() {
1740            Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1741            None => params,
1742        };
1743
1744        // Build table_id from namespace + name
1745        let mut table_id = namespace.clone();
1746        table_id.push(name.to_string());
1747
1748        // Determine if managed_versioning is enabled
1749        // Use the provided value if available, otherwise query the namespace
1750        let managed_versioning = match managed_versioning {
1751            Some(value) => value,
1752            None if namespace_client.is_some() => {
1753                let ns_client = namespace_client.as_ref().unwrap();
1754                let describe_request = DescribeTableRequest {
1755                    id: Some(table_id.clone()),
1756                    ..Default::default()
1757                };
1758                let response = ns_client
1759                    .describe_table(describe_request)
1760                    .await
1761                    .map_err(|e| Error::Runtime {
1762                        message: format!(
1763                            "Failed to describe table via namespace client: {}. \
1764                             If you don't need managed versioning, don't pass namespace_client.",
1765                            e
1766                        ),
1767                    })?;
1768                response.managed_versioning == Some(true)
1769            }
1770            None => false,
1771        };
1772
1773        let mut builder = DatasetBuilder::from_uri(uri).with_read_params(params);
1774
1775        // Set up commit handler when managed_versioning is enabled
1776        if managed_versioning && let Some(ref ns_client) = namespace_client {
1777            let external_store =
1778                LanceNamespaceExternalManifestStore::new(ns_client.clone(), table_id.clone());
1779            let commit_handler: Arc<dyn CommitHandler> = Arc::new(ExternalManifestCommitHandler {
1780                external_manifest_store: Arc::new(external_store),
1781            });
1782            builder = builder.with_commit_handler(commit_handler);
1783        }
1784
1785        let dataset = builder.load().await.map_err(|e| match e {
1786            lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
1787                name: name.to_string(),
1788                source: Box::new(e),
1789            },
1790            e => e.into(),
1791        })?;
1792
1793        let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
1794        let id = Self::build_id(&namespace, name);
1795
1796        Ok(Self {
1797            name: name.to_string(),
1798            namespace,
1799            id,
1800            uri: uri.to_string(),
1801            dataset,
1802            read_consistency_interval,
1803            namespace_client,
1804            pushdown_operations,
1805        })
1806    }
1807
1808    /// Set the namespace client for server-side query execution.
1809    ///
1810    /// When set, queries will be executed on the namespace server instead of locally.
1811    pub fn with_namespace_client(mut self, namespace_client: Arc<dyn LanceNamespace>) -> Self {
1812        self.namespace_client = Some(namespace_client);
1813        self
1814    }
1815
1816    /// Opens an existing Table using a namespace client.
1817    ///
1818    /// This method uses `DatasetBuilder::from_namespace` to open the table, which
1819    /// automatically fetches the table location and storage options from the namespace.
1820    /// This eliminates the need to pre-fetch and merge storage options before opening.
1821    ///
1822    /// # Arguments
1823    ///
1824    /// * `namespace_client` - The namespace client to use for fetching table metadata
1825    /// * `name` - The table name
1826    /// * `namespace` - The namespace path (e.g., vec!["parent", "child"])
1827    /// * `write_store_wrapper` - Optional wrapper for the object store on write path
1828    /// * `params` - Optional read parameters
1829    /// * `read_consistency_interval` - Optional interval for read consistency
1830    /// * `pushdown_operations` - Operations to push down to the namespace server.
1831    ///   When `QueryTable` is included, queries will be executed on the namespace server.
1832    /// * `session` - Optional session for object stores and caching
1833    ///
1834    /// # Returns
1835    ///
1836    /// * A [NativeTable] object.
1837    #[allow(clippy::too_many_arguments)]
1838    pub async fn open_from_namespace(
1839        namespace_client: Arc<dyn LanceNamespace>,
1840        name: &str,
1841        namespace: Vec<String>,
1842        write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1843        params: Option<ReadParams>,
1844        read_consistency_interval: Option<std::time::Duration>,
1845        pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1846        session: Option<Arc<lance::session::Session>>,
1847    ) -> Result<Self> {
1848        let mut params = params.unwrap_or_default();
1849
1850        // Set the session in read params
1851        if let Some(sess) = session {
1852            params.session(sess);
1853        }
1854
1855        // patch the params if we have a write store wrapper
1856        let params = match write_store_wrapper.clone() {
1857            Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1858            None => params,
1859        };
1860
1861        // Build table_id from namespace + name
1862        let mut table_id = namespace.clone();
1863        table_id.push(name.to_string());
1864
1865        // Use DatasetBuilder::from_namespace which automatically fetches location
1866        // and storage options from the namespace
1867        let builder = DatasetBuilder::from_namespace(namespace_client.clone(), table_id)
1868            .await
1869            .map_err(|e| map_namespace_lance_error(e, name))?;
1870
1871        let dataset = builder
1872            .with_read_params(params)
1873            .load()
1874            .await
1875            .map_err(|e| match e {
1876                lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
1877                    name: name.to_string(),
1878                    source: Box::new(e),
1879                },
1880                e => e.into(),
1881            })?;
1882
1883        let uri = dataset.uri().to_string();
1884        let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
1885        let id = Self::build_id(&namespace, name);
1886
1887        let stored_namespace_client =
1888            if pushdown_operations.contains(&NamespaceClientPushdownOperation::QueryTable) {
1889                Some(namespace_client)
1890            } else {
1891                None
1892            };
1893
1894        Ok(Self {
1895            name: name.to_string(),
1896            namespace,
1897            id,
1898            uri,
1899            dataset,
1900            read_consistency_interval,
1901            namespace_client: stored_namespace_client,
1902            pushdown_operations,
1903        })
1904    }
1905
1906    fn get_table_name(uri: &str) -> Result<String> {
1907        let path = Path::new(uri);
1908        let name = path
1909            .file_stem()
1910            .ok_or(Error::TableNotFound {
1911                name: uri.to_string(),
1912                source: format!("Could not extract table name from URI: '{}'", uri).into(),
1913            })?
1914            .to_str()
1915            .ok_or(Error::InvalidTableName {
1916                name: uri.to_string(),
1917                reason: "Table name is not valid URL".to_string(),
1918            })?;
1919        Ok(name.to_string())
1920    }
1921
1922    fn build_id(namespace: &[String], name: &str) -> String {
1923        if namespace.is_empty() {
1924            name.to_string()
1925        } else {
1926            let mut parts = namespace.to_vec();
1927            parts.push(name.to_string());
1928            parts.join("$")
1929        }
1930    }
1931
1932    /// Creates a new Table
1933    ///
1934    /// # Arguments
1935    ///
1936    /// * `uri` - The URI to the table. When namespace is not empty, the caller must
1937    ///   provide an explicit URI (location) rather than deriving it from the table name.
1938    /// * `name` The Table name
1939    /// * `namespace` - The namespace path. When non-empty, an explicit URI must be provided.
1940    /// * `batches` RecordBatch to be saved in the database.
1941    /// * `params` - Write parameters.
1942    /// * `namespace_client` - Optional namespace client for namespace operations
1943    /// * `pushdown_operations` - Operations to push down to the namespace server
1944    ///
1945    /// # Returns
1946    ///
1947    /// * A [TableImpl] object.
1948    #[allow(clippy::too_many_arguments)]
1949    pub async fn create(
1950        uri: &str,
1951        name: &str,
1952        namespace: Vec<String>,
1953        batches: impl StreamingWriteSource,
1954        write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1955        params: Option<WriteParams>,
1956        read_consistency_interval: Option<std::time::Duration>,
1957        namespace_client: Option<Arc<dyn LanceNamespace>>,
1958        pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1959    ) -> Result<Self> {
1960        // Default params uses format v1.
1961        let params = params.unwrap_or(WriteParams {
1962            ..Default::default()
1963        });
1964        // patch the params if we have a write store wrapper
1965        let params = match write_store_wrapper.clone() {
1966            Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1967            None => params,
1968        };
1969
1970        let insert_builder = InsertBuilder::new(uri).with_params(&params);
1971        let dataset = insert_builder
1972            .execute_stream(batches)
1973            .await
1974            .map_err(|e| match e {
1975                lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists {
1976                    name: name.to_string(),
1977                },
1978                e => e.into(),
1979            })?;
1980
1981        let id = Self::build_id(&namespace, name);
1982
1983        Ok(Self {
1984            name: name.to_string(),
1985            namespace,
1986            id,
1987            uri: uri.to_string(),
1988            dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
1989            read_consistency_interval,
1990            namespace_client,
1991            pushdown_operations,
1992        })
1993    }
1994
1995    #[allow(clippy::too_many_arguments)]
1996    pub async fn create_empty(
1997        uri: &str,
1998        name: &str,
1999        namespace: Vec<String>,
2000        schema: SchemaRef,
2001        write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
2002        params: Option<WriteParams>,
2003        read_consistency_interval: Option<std::time::Duration>,
2004        namespace_client: Option<Arc<dyn LanceNamespace>>,
2005        pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
2006    ) -> Result<Self> {
2007        let data: Box<dyn Scannable> = Box::new(RecordBatch::new_empty(schema));
2008        Self::create(
2009            uri,
2010            name,
2011            namespace,
2012            data,
2013            write_store_wrapper,
2014            params,
2015            read_consistency_interval,
2016            namespace_client,
2017            pushdown_operations,
2018        )
2019        .await
2020    }
2021
2022    /// Creates a new Table using a namespace client for storage options.
2023    ///
2024    /// This method sets up a `StorageOptionsProvider` from the namespace client,
2025    /// enabling automatic credential refresh for cloud storage. The namespace
2026    /// is used for:
2027    /// 1. Setting up storage options provider for credential vending
2028    /// 2. Optionally enabling server-side query execution
2029    ///
2030    /// # Arguments
2031    ///
2032    /// * `namespace_client` - The namespace client to use for storage options
2033    /// * `uri` - The URI to the table (obtained from create_empty_table response)
2034    /// * `name` - The table name
2035    /// * `namespace` - The namespace path (e.g., vec!["parent", "child"])
2036    /// * `batches` - RecordBatch to be saved in the database
2037    /// * `write_store_wrapper` - Optional wrapper for the object store on write path
2038    /// * `params` - Optional write parameters
2039    /// * `read_consistency_interval` - Optional interval for read consistency
2040    /// * `pushdown_operations` - Operations to push down to the namespace server
2041    ///
2042    /// # Returns
2043    ///
2044    /// * A [NativeTable] object.
2045    #[allow(clippy::too_many_arguments)]
2046    pub async fn create_from_namespace(
2047        namespace_client: Arc<dyn LanceNamespace>,
2048        uri: &str,
2049        name: &str,
2050        namespace: Vec<String>,
2051        batches: impl StreamingWriteSource,
2052        write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
2053        params: Option<WriteParams>,
2054        read_consistency_interval: Option<std::time::Duration>,
2055        pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
2056        session: Option<Arc<lance::session::Session>>,
2057    ) -> Result<Self> {
2058        // Build table_id from namespace + name for the storage options provider
2059        let mut table_id = namespace.clone();
2060        table_id.push(name.to_string());
2061
2062        // Set up storage options provider from namespace
2063        let storage_options_provider = Arc::new(LanceNamespaceStorageOptionsProvider::new(
2064            namespace_client.clone(),
2065            table_id,
2066        ));
2067
2068        // Start with provided params or defaults
2069        let mut params = params.unwrap_or_default();
2070
2071        // Set the session in write params
2072        if let Some(sess) = session {
2073            params.session = Some(sess);
2074        }
2075
2076        // Ensure store_params exists and set the storage options provider
2077        let store_params = params
2078            .store_params
2079            .get_or_insert_with(ObjectStoreParams::default);
2080        let accessor = match store_params.storage_options().cloned() {
2081            Some(options) => {
2082                StorageOptionsAccessor::with_initial_and_provider(options, storage_options_provider)
2083            }
2084            None => StorageOptionsAccessor::with_provider(storage_options_provider),
2085        };
2086        store_params.storage_options_accessor = Some(Arc::new(accessor));
2087
2088        // Patch the params if we have a write store wrapper
2089        let params = match write_store_wrapper.clone() {
2090            Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
2091            None => params,
2092        };
2093
2094        let insert_builder = InsertBuilder::new(uri).with_params(&params);
2095        let dataset = insert_builder
2096            .execute_stream(batches)
2097            .await
2098            .map_err(|e| match e {
2099                lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists {
2100                    name: name.to_string(),
2101                },
2102                e => e.into(),
2103            })?;
2104
2105        let id = Self::build_id(&namespace, name);
2106
2107        let stored_namespace_client =
2108            if pushdown_operations.contains(&NamespaceClientPushdownOperation::QueryTable) {
2109                Some(namespace_client)
2110            } else {
2111                None
2112            };
2113
2114        Ok(Self {
2115            name: name.to_string(),
2116            namespace,
2117            id,
2118            uri: uri.to_string(),
2119            dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
2120            read_consistency_interval,
2121            namespace_client: stored_namespace_client,
2122            pushdown_operations,
2123        })
2124    }
2125
2126    /// Merge new data into this table.
2127    pub async fn merge(
2128        &mut self,
2129        batches: impl RecordBatchReader + Send + 'static,
2130        left_on: &str,
2131        right_on: &str,
2132    ) -> Result<()> {
2133        self.dataset.ensure_mutable()?;
2134        let mut dataset = (*self.dataset.get().await?).clone();
2135        dataset.merge(batches, left_on, right_on).await?;
2136        self.dataset.update(dataset);
2137        Ok(())
2138    }
2139
2140    // TODO: why are these individual methods and not some single "get_stats" method?
2141    pub async fn count_fragments(&self) -> Result<usize> {
2142        Ok(self.dataset.get().await?.count_fragments())
2143    }
2144
2145    pub async fn count_deleted_rows(&self) -> Result<usize> {
2146        Ok(self.dataset.get().await?.count_deleted_rows().await?)
2147    }
2148
2149    pub async fn num_small_files(&self, max_rows_per_group: usize) -> Result<usize> {
2150        Ok(self
2151            .dataset
2152            .get()
2153            .await?
2154            .num_small_files(max_rows_per_group)
2155            .await)
2156    }
2157
2158    pub async fn load_indices(&self) -> Result<Vec<VectorIndex>> {
2159        let dataset = self.dataset.get().await?;
2160        let mf = dataset.manifest();
2161        let indices = dataset.load_indices().await?;
2162        Ok(indices
2163            .iter()
2164            .map(|i| VectorIndex::new_from_format(mf, i))
2165            .collect())
2166    }
2167
2168    // Helper to validate index type compatibility with field data type
2169    fn validate_index_type(
2170        field: &Field,
2171        index_name: &str,
2172        supported_fn: impl Fn(&DataType) -> bool,
2173    ) -> Result<()> {
2174        if !supported_fn(field.data_type()) {
2175            return Err(Error::Schema {
2176                message: format!(
2177                    "A {} index cannot be created on the field `{}` which has data type {}",
2178                    index_name,
2179                    field.name(),
2180                    field.data_type()
2181                ),
2182            });
2183        }
2184        Ok(())
2185    }
2186
2187    // Helper to build IVF params honoring table options.
2188    fn build_ivf_params(
2189        num_partitions: Option<u32>,
2190        target_partition_size: Option<u32>,
2191        sample_rate: u32,
2192        max_iterations: u32,
2193    ) -> IvfBuildParams {
2194        let mut ivf_params = match (num_partitions, target_partition_size) {
2195            (Some(num_partitions), _) => IvfBuildParams::new(num_partitions as usize),
2196            (None, Some(target_partition_size)) => {
2197                IvfBuildParams::with_target_partition_size(target_partition_size as usize)
2198            }
2199            (None, None) => IvfBuildParams::default(),
2200        };
2201        ivf_params.sample_rate = sample_rate as usize;
2202        ivf_params.max_iters = max_iterations as usize;
2203        ivf_params
2204    }
2205
2206    // Helper to get num_sub_vectors with default calculation
2207    fn get_num_sub_vectors(provided: Option<u32>, dim: u32, num_bits: Option<u32>) -> u32 {
2208        if let Some(provided) = provided {
2209            return provided;
2210        }
2211        let suggested = suggested_num_sub_vectors(dim);
2212        if num_bits.is_some_and(|num_bits| num_bits == 4) && !suggested.is_multiple_of(2) {
2213            // num_sub_vectors must be even when 4 bits are used
2214            suggested + 1
2215        } else {
2216            suggested
2217        }
2218    }
2219
2220    // Helper to extract vector dimension from field
2221    fn get_vector_dimension(field: &Field) -> Result<u32> {
2222        match field.data_type() {
2223            arrow_schema::DataType::FixedSizeList(_, n) => Ok(*n as u32),
2224            _ => Ok(infer_vector_dim(field.data_type())? as u32),
2225        }
2226    }
2227
2228    fn resolve_index_field(
2229        schema: &lance_core::datatypes::Schema,
2230        column: &str,
2231    ) -> Result<(String, Field)> {
2232        lance_core::datatypes::parse_field_path(column).map_err(|e| Error::InvalidInput {
2233            message: format!("Invalid field path `{}`: {}", column, e),
2234        })?;
2235
2236        let field_path = schema
2237            .resolve_case_insensitive(column)
2238            .ok_or_else(|| Error::Schema {
2239                message: format!(
2240                    "Field path `{}` not found in schema. Available field paths: {}",
2241                    column,
2242                    schema.field_paths().join(", ")
2243                ),
2244            })?;
2245        let field = field_path.last().expect("field path should be non-empty");
2246        let path_segments = field_path
2247            .iter()
2248            .map(|field| field.name.as_str())
2249            .collect::<Vec<_>>();
2250        let canonical_path = lance_core::datatypes::format_field_path(&path_segments);
2251
2252        Ok((canonical_path, Field::from(*field)))
2253    }
2254
2255    // Convert LanceDB Index to Lance IndexParams
2256    async fn make_index_params(
2257        &self,
2258        field: &Field,
2259        index_opts: Index,
2260    ) -> Result<Box<dyn lance::index::IndexParams>> {
2261        match index_opts {
2262            Index::Auto => {
2263                if supported_vector_data_type(field.data_type()) {
2264                    // Use IvfPq as the default for auto vector indices
2265                    let dim = Self::get_vector_dimension(field)?;
2266                    let ivf_params = lance_index::vector::ivf::IvfBuildParams::default();
2267                    let num_sub_vectors = Self::get_num_sub_vectors(None, dim, None);
2268                    let pq_params =
2269                        lance_index::vector::pq::PQBuildParams::new(num_sub_vectors as usize, 8);
2270                    let lance_idx_params =
2271                        lance::index::vector::VectorIndexParams::with_ivf_pq_params(
2272                            lance_linalg::distance::MetricType::L2,
2273                            ivf_params,
2274                            pq_params,
2275                        );
2276                    Ok(Box::new(lance_idx_params))
2277                } else if supported_btree_data_type(field.data_type()) {
2278                    Ok(Box::new(ScalarIndexParams::for_builtin(
2279                        BuiltinIndexType::BTree,
2280                    )))
2281                } else {
2282                    Err(Error::InvalidInput {
2283                        message: format!(
2284                            "there are no indices supported for the field `{}` with the data type {}",
2285                            field.name(),
2286                            field.data_type()
2287                        ),
2288                    })?
2289                }
2290            }
2291            Index::BTree(_) => {
2292                Self::validate_index_type(field, "BTree", supported_btree_data_type)?;
2293                Ok(Box::new(ScalarIndexParams::for_builtin(
2294                    BuiltinIndexType::BTree,
2295                )))
2296            }
2297            Index::Bitmap(_) => {
2298                Self::validate_index_type(field, "Bitmap", supported_bitmap_data_type)?;
2299                Ok(Box::new(ScalarIndexParams::for_builtin(
2300                    BuiltinIndexType::Bitmap,
2301                )))
2302            }
2303            Index::LabelList(_) => {
2304                Self::validate_index_type(field, "LabelList", supported_label_list_data_type)?;
2305                Ok(Box::new(ScalarIndexParams::for_builtin(
2306                    BuiltinIndexType::LabelList,
2307                )))
2308            }
2309            Index::FTS(fts_opts) => {
2310                Self::validate_index_type(field, "FTS", supported_fts_data_type)?;
2311                Ok(Box::new(fts_opts))
2312            }
2313            Index::IvfFlat(index) => {
2314                Self::validate_index_type(field, "IVF Flat", supported_vector_data_type)?;
2315                let ivf_params = Self::build_ivf_params(
2316                    index.num_partitions,
2317                    index.target_partition_size,
2318                    index.sample_rate,
2319                    index.max_iterations,
2320                );
2321                let lance_idx_params =
2322                    VectorIndexParams::with_ivf_flat_params(index.distance_type.into(), ivf_params);
2323                Ok(Box::new(lance_idx_params))
2324            }
2325            Index::IvfSq(index) => {
2326                Self::validate_index_type(field, "IVF SQ", supported_vector_data_type)?;
2327                let ivf_params = Self::build_ivf_params(
2328                    index.num_partitions,
2329                    index.target_partition_size,
2330                    index.sample_rate,
2331                    index.max_iterations,
2332                );
2333                let sq_params = SQBuildParams {
2334                    sample_rate: index.sample_rate as usize,
2335                    ..Default::default()
2336                };
2337                let lance_idx_params = VectorIndexParams::with_ivf_sq_params(
2338                    index.distance_type.into(),
2339                    ivf_params,
2340                    sq_params,
2341                );
2342                Ok(Box::new(lance_idx_params))
2343            }
2344            Index::IvfPq(index) => {
2345                Self::validate_index_type(field, "IVF PQ", supported_vector_data_type)?;
2346                let dim = Self::get_vector_dimension(field)?;
2347                let ivf_params = Self::build_ivf_params(
2348                    index.num_partitions,
2349                    index.target_partition_size,
2350                    index.sample_rate,
2351                    index.max_iterations,
2352                );
2353                let num_sub_vectors =
2354                    Self::get_num_sub_vectors(index.num_sub_vectors, dim, index.num_bits);
2355                let num_bits = index.num_bits.unwrap_or(8) as usize;
2356                let mut pq_params = PQBuildParams::new(num_sub_vectors as usize, num_bits);
2357                pq_params.max_iters = index.max_iterations as usize;
2358                let lance_idx_params = VectorIndexParams::with_ivf_pq_params(
2359                    index.distance_type.into(),
2360                    ivf_params,
2361                    pq_params,
2362                );
2363                Ok(Box::new(lance_idx_params))
2364            }
2365            Index::IvfRq(index) => {
2366                Self::validate_index_type(field, "IVF RQ", supported_vector_data_type)?;
2367                let ivf_params = Self::build_ivf_params(
2368                    index.num_partitions,
2369                    index.target_partition_size,
2370                    index.sample_rate,
2371                    index.max_iterations,
2372                );
2373                let rq_params = RQBuildParams::new(index.num_bits.unwrap_or(1) as u8);
2374                let lance_idx_params = VectorIndexParams::with_ivf_rq_params(
2375                    index.distance_type.into(),
2376                    ivf_params,
2377                    rq_params,
2378                );
2379                Ok(Box::new(lance_idx_params))
2380            }
2381            Index::IvfHnswPq(index) => {
2382                Self::validate_index_type(field, "IVF HNSW PQ", supported_vector_data_type)?;
2383                let dim = Self::get_vector_dimension(field)?;
2384                let ivf_params = Self::build_ivf_params(
2385                    index.num_partitions,
2386                    index.target_partition_size,
2387                    index.sample_rate,
2388                    index.max_iterations,
2389                );
2390                let num_sub_vectors =
2391                    Self::get_num_sub_vectors(index.num_sub_vectors, dim, index.num_bits);
2392                let hnsw_params = HnswBuildParams::default()
2393                    .num_edges(index.m as usize)
2394                    .ef_construction(index.ef_construction as usize);
2395                let pq_params = PQBuildParams::new(
2396                    num_sub_vectors as usize,
2397                    index.num_bits.unwrap_or(8) as usize,
2398                );
2399                let lance_idx_params = VectorIndexParams::with_ivf_hnsw_pq_params(
2400                    index.distance_type.into(),
2401                    ivf_params,
2402                    hnsw_params,
2403                    pq_params,
2404                );
2405                Ok(Box::new(lance_idx_params))
2406            }
2407            Index::IvfHnswSq(index) => {
2408                Self::validate_index_type(field, "IVF HNSW SQ", supported_vector_data_type)?;
2409                let ivf_params = Self::build_ivf_params(
2410                    index.num_partitions,
2411                    index.target_partition_size,
2412                    index.sample_rate,
2413                    index.max_iterations,
2414                );
2415                let hnsw_params = HnswBuildParams::default()
2416                    .num_edges(index.m as usize)
2417                    .ef_construction(index.ef_construction as usize);
2418                let sq_params = SQBuildParams {
2419                    sample_rate: index.sample_rate as usize,
2420                    ..Default::default()
2421                };
2422                let lance_idx_params = VectorIndexParams::with_ivf_hnsw_sq_params(
2423                    index.distance_type.into(),
2424                    ivf_params,
2425                    hnsw_params,
2426                    sq_params,
2427                );
2428                Ok(Box::new(lance_idx_params))
2429            }
2430            Index::IvfHnswFlat(index) => {
2431                Self::validate_index_type(field, "IVF HNSW FLAT", supported_vector_data_type)?;
2432                let ivf_params = Self::build_ivf_params(
2433                    index.num_partitions,
2434                    index.target_partition_size,
2435                    index.sample_rate,
2436                    index.max_iterations,
2437                );
2438                let hnsw_params = HnswBuildParams::default()
2439                    .num_edges(index.m as usize)
2440                    .ef_construction(index.ef_construction as usize);
2441                let lance_idx_params = VectorIndexParams::ivf_hnsw(
2442                    index.distance_type.into(),
2443                    ivf_params,
2444                    hnsw_params,
2445                );
2446                Ok(Box::new(lance_idx_params))
2447            }
2448        }
2449    }
2450
2451    // Helper method to get the correct IndexType based on the Index variant and field data type
2452    fn get_index_type_for_field(&self, field: &Field, index: &Index) -> IndexType {
2453        match index {
2454            Index::Auto => {
2455                if supported_vector_data_type(field.data_type()) {
2456                    IndexType::Vector
2457                } else if supported_btree_data_type(field.data_type()) {
2458                    IndexType::BTree
2459                } else {
2460                    // This should not happen since make_index_params would have failed
2461                    IndexType::BTree
2462                }
2463            }
2464            Index::BTree(_) => IndexType::BTree,
2465            Index::Bitmap(_) => IndexType::Bitmap,
2466            Index::LabelList(_) => IndexType::LabelList,
2467            Index::FTS(_) => IndexType::Inverted,
2468            Index::IvfFlat(_)
2469            | Index::IvfSq(_)
2470            | Index::IvfPq(_)
2471            | Index::IvfRq(_)
2472            | Index::IvfHnswPq(_)
2473            | Index::IvfHnswSq(_)
2474            | Index::IvfHnswFlat(_) => IndexType::Vector,
2475        }
2476    }
2477
2478    /// Check whether the table uses V2 manifest paths.
2479    ///
2480    /// See [Self::migrate_manifest_paths_v2] and [ManifestNamingScheme] for
2481    /// more information.
2482    pub async fn uses_v2_manifest_paths(&self) -> Result<bool> {
2483        let dataset = self.dataset.get().await?;
2484        Ok(dataset.manifest_location().naming_scheme == ManifestNamingScheme::V2)
2485    }
2486
2487    /// Migrate the table to use the new manifest path scheme.
2488    ///
2489    /// This function will rename all V1 manifests to V2 manifest paths.
2490    /// These paths provide more efficient opening of datasets with many versions
2491    /// on object stores.
2492    ///
2493    /// This function is idempotent, and can be run multiple times without
2494    /// changing the state of the object store.
2495    ///
2496    /// However, it should not be run while other concurrent operations are happening.
2497    /// And it should also run until completion before resuming other operations.
2498    ///
2499    /// You can use [Self::uses_v2_manifest_paths] to check if the table is already
2500    /// using V2 manifest paths.
2501    pub async fn migrate_manifest_paths_v2(&self) -> Result<()> {
2502        self.dataset.ensure_mutable()?;
2503        let mut dataset = (*self.dataset.get().await?).clone();
2504        dataset.migrate_manifest_paths_v2().await?;
2505        self.dataset.update(dataset);
2506        Ok(())
2507    }
2508
2509    /// Get the table manifest
2510    pub async fn manifest(&self) -> Result<Manifest> {
2511        let dataset = self.dataset.get().await?;
2512        Ok(dataset.manifest().clone())
2513    }
2514
2515    /// Update key-value pairs in config.
2516    pub async fn update_config(
2517        &self,
2518        upsert_values: impl IntoIterator<Item = (String, String)>,
2519    ) -> Result<()> {
2520        self.dataset.ensure_mutable()?;
2521        let mut dataset = (*self.dataset.get().await?).clone();
2522        dataset.update_config(upsert_values).await?;
2523        self.dataset.update(dataset);
2524        Ok(())
2525    }
2526
2527    /// Delete keys from the config
2528    pub async fn delete_config_keys(&self, delete_keys: &[&str]) -> Result<()> {
2529        self.dataset.ensure_mutable()?;
2530        let mut dataset = (*self.dataset.get().await?).clone();
2531        // TODO: update this when we implement metadata APIs
2532        #[allow(deprecated)]
2533        dataset.delete_config_keys(delete_keys).await?;
2534        self.dataset.update(dataset);
2535        Ok(())
2536    }
2537
2538    /// Update schema metadata
2539    pub async fn replace_schema_metadata(
2540        &self,
2541        upsert_values: impl IntoIterator<Item = (String, String)>,
2542    ) -> Result<()> {
2543        self.dataset.ensure_mutable()?;
2544        let mut dataset = (*self.dataset.get().await?).clone();
2545        // TODO: update this when we implement metadata APIs
2546        #[allow(deprecated)]
2547        dataset.replace_schema_metadata(upsert_values).await?;
2548        self.dataset.update(dataset);
2549        Ok(())
2550    }
2551
2552    /// Update field metadata
2553    ///
2554    /// # Arguments:
2555    /// * `new_values` - An iterator of tuples where the first element is the
2556    ///   field id and the second element is a hashmap of metadata key-value
2557    ///   pairs.
2558    ///
2559    pub async fn replace_field_metadata(
2560        &self,
2561        new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
2562    ) -> Result<()> {
2563        self.dataset.ensure_mutable()?;
2564        let mut dataset = (*self.dataset.get().await?).clone();
2565        dataset.replace_field_metadata(new_values).await?;
2566        self.dataset.update(dataset);
2567        Ok(())
2568    }
2569}
2570
2571#[async_trait::async_trait]
2572impl BaseTable for NativeTable {
2573    fn as_any(&self) -> &dyn std::any::Any {
2574        self
2575    }
2576
2577    fn name(&self) -> &str {
2578        self.name.as_str()
2579    }
2580
2581    fn namespace(&self) -> &[String] {
2582        &self.namespace
2583    }
2584
2585    fn id(&self) -> &str {
2586        &self.id
2587    }
2588
2589    async fn version(&self) -> Result<u64> {
2590        Ok(self.dataset.get().await?.version().version)
2591    }
2592
2593    async fn checkout(&self, version: u64) -> Result<()> {
2594        self.dataset.as_time_travel(version).await
2595    }
2596
2597    async fn checkout_tag(&self, tag: &str) -> Result<()> {
2598        self.dataset.as_time_travel(tag).await
2599    }
2600
2601    async fn checkout_latest(&self) -> Result<()> {
2602        self.dataset.as_latest().await?;
2603        self.dataset.reload().await
2604    }
2605
2606    async fn list_versions(&self) -> Result<Vec<Version>> {
2607        Ok(self.dataset.get().await?.versions().await?)
2608    }
2609
2610    async fn restore(&self) -> Result<()> {
2611        let version = self
2612            .dataset
2613            .time_travel_version()
2614            .ok_or_else(|| Error::InvalidInput {
2615                message: "you must run checkout before running restore".to_string(),
2616            })?;
2617        {
2618            // restore is the only "write" operation allowed in time travel mode
2619            let mut dataset = (*self.dataset.get().await?).clone();
2620            debug_assert_eq!(dataset.version().version, version);
2621            dataset.restore().await?;
2622        }
2623        self.dataset.as_latest().await?;
2624        Ok(())
2625    }
2626
2627    async fn schema(&self) -> Result<SchemaRef> {
2628        let lance_schema = self.dataset.get().await?.schema().clone();
2629        Ok(Arc::new(Schema::from(&lance_schema)))
2630    }
2631
2632    async fn table_definition(&self) -> Result<TableDefinition> {
2633        let schema = self.schema().await?;
2634        TableDefinition::try_from_rich_schema(schema)
2635    }
2636
2637    async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
2638        let dataset = self.dataset.get().await?;
2639        match filter {
2640            None => Ok(dataset.count_rows(None).await?),
2641            Some(Filter::Sql(sql)) => Ok(dataset.count_rows(Some(sql)).await?),
2642            Some(Filter::Datafusion(_)) => Err(Error::NotSupported {
2643                message: "Datafusion filters are not yet supported".to_string(),
2644            }),
2645        }
2646    }
2647
2648    async fn add(&self, mut add: AddDataBuilder) -> Result<AddResult> {
2649        let table_def = self.table_definition().await?;
2650
2651        self.dataset.ensure_mutable()?;
2652        let ds_wrapper = self.dataset.clone();
2653        let ds = self.dataset.get().await?;
2654
2655        let table_schema = Schema::from(&ds.schema().clone());
2656
2657        let num_partitions = if let Some(parallelism) = add.write_parallelism {
2658            parallelism
2659        } else {
2660            // Peek at the first batch to estimate a good partition count for
2661            // write parallelism.
2662            let mut peeked = PeekedScannable::new(add.data);
2663            let n = if let Some(first_batch) = peeked.peek().await {
2664                let max_partitions = lance_core::utils::tokio::get_num_compute_intensive_cpus();
2665                estimate_write_partitions(
2666                    first_batch.get_array_memory_size(),
2667                    first_batch.num_rows(),
2668                    peeked.num_rows(),
2669                    max_partitions,
2670                )
2671            } else {
2672                1
2673            };
2674            add.data = Box::new(peeked);
2675            n
2676        };
2677
2678        let output = add.into_plan(&table_schema, &table_def)?;
2679
2680        let lance_params = output
2681            .write_options
2682            .lance_write_params
2683            .unwrap_or(WriteParams {
2684                mode: match output.mode {
2685                    AddDataMode::Append => WriteMode::Append,
2686                    AddDataMode::Overwrite => WriteMode::Overwrite,
2687                },
2688                ..Default::default()
2689            });
2690
2691        // Repartition for write parallelism if beneficial.
2692        let plan = if num_partitions > 1 {
2693            Arc::new(
2694                datafusion_physical_plan::repartition::RepartitionExec::try_new(
2695                    output.plan,
2696                    datafusion_physical_plan::Partitioning::RoundRobinBatch(num_partitions),
2697                )?,
2698            ) as Arc<dyn ExecutionPlan>
2699        } else {
2700            output.plan
2701        };
2702
2703        let insert_exec = Arc::new(InsertExec::new(ds_wrapper.clone(), ds, plan, lance_params));
2704
2705        let tracker_for_tasks = output.tracker.clone();
2706        if let Some(ref t) = tracker_for_tasks {
2707            t.set_total_tasks(num_partitions);
2708        }
2709        let _finish = write_progress::FinishOnDrop(output.tracker);
2710
2711        // Execute all partitions in parallel.
2712        let task_ctx = Arc::new(TaskContext::default());
2713        let handles = FuturesUnordered::new();
2714        for partition in 0..num_partitions {
2715            let exec = insert_exec.clone();
2716            let ctx = task_ctx.clone();
2717            let tracker = tracker_for_tasks.clone();
2718            handles.push(tokio::spawn(async move {
2719                let _guard = tracker.as_ref().map(|t| t.track_task());
2720                let mut stream = exec
2721                    .execute(partition, ctx)
2722                    .map_err(|e| -> Error { e.into() })?;
2723                while let Some(batch) = stream.next().await {
2724                    batch.map_err(|e| -> Error { e.into() })?;
2725                }
2726                Ok::<_, Error>(())
2727            }));
2728        }
2729        for handle in handles {
2730            handle.await.map_err(|e| Error::Runtime {
2731                message: format!("Insert task panicked: {}", e),
2732            })??;
2733        }
2734
2735        let version = ds_wrapper.get().await?.manifest().version;
2736        Ok(AddResult { version })
2737    }
2738
2739    async fn create_index(&self, opts: IndexBuilder) -> Result<()> {
2740        if opts.columns.len() != 1 {
2741            return Err(Error::Schema {
2742                message: "Multi-column (composite) indices are not yet supported".to_string(),
2743            });
2744        }
2745        self.dataset.ensure_mutable()?;
2746        let mut dataset = (*self.dataset.get().await?).clone();
2747        let (column, field) = Self::resolve_index_field(dataset.schema(), &opts.columns[0])?;
2748
2749        let lance_idx_params = self.make_index_params(&field, opts.index.clone()).await?;
2750        let index_type = self.get_index_type_for_field(&field, &opts.index);
2751        let columns = [column.as_str()];
2752        let mut builder = dataset
2753            .create_index_builder(&columns, index_type, lance_idx_params.as_ref())
2754            .train(opts.train)
2755            .replace(opts.replace);
2756
2757        if let Some(name) = opts.name {
2758            builder = builder.name(name);
2759        }
2760        builder.await?;
2761        self.dataset.update(dataset);
2762        Ok(())
2763    }
2764
2765    async fn drop_index(&self, index_name: &str) -> Result<()> {
2766        self.dataset.ensure_mutable()?;
2767        let mut dataset = (*self.dataset.get().await?).clone();
2768        dataset.drop_index(index_name).await?;
2769        self.dataset.update(dataset);
2770        Ok(())
2771    }
2772
2773    async fn prewarm_index(&self, index_name: &str) -> Result<()> {
2774        let dataset = self.dataset.get().await?;
2775        Ok(dataset.prewarm_index(index_name).await?)
2776    }
2777
2778    async fn prewarm_data(&self, _columns: Option<Vec<String>>) -> Result<()> {
2779        Err(Error::NotSupported {
2780            message: "prewarm_data is currently only supported on remote tables.".into(),
2781        })
2782    }
2783
2784    async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult> {
2785        // Delegate to the submodule implementation
2786        update::execute_update(self, update).await
2787    }
2788
2789    async fn create_plan(
2790        &self,
2791        query: &AnyQuery,
2792        options: QueryExecutionOptions,
2793    ) -> Result<Arc<dyn ExecutionPlan>> {
2794        query::create_plan(self, query, options).await
2795    }
2796
2797    async fn query(
2798        &self,
2799        query: &AnyQuery,
2800        options: QueryExecutionOptions,
2801    ) -> Result<DatasetRecordBatchStream> {
2802        query::execute_query(self, query, options).await
2803    }
2804
2805    async fn analyze_plan(
2806        &self,
2807        query: &AnyQuery,
2808        options: QueryExecutionOptions,
2809    ) -> Result<String> {
2810        query::analyze_query_plan(self, query, options).await
2811    }
2812
2813    async fn merge_insert(
2814        &self,
2815        params: MergeInsertBuilder,
2816        new_data: Box<dyn RecordBatchReader + Send>,
2817    ) -> Result<MergeResult> {
2818        merge::execute_merge_insert(self, params, new_data).await
2819    }
2820
2821    async fn set_unenforced_primary_key(&self, columns: &[&str]) -> Result<()> {
2822        primary_key::set_unenforced_primary_key(self, columns).await
2823    }
2824
2825    async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> Result<()> {
2826        merge::lsm::set_lsm_write_spec(self, spec).await
2827    }
2828
2829    async fn unset_lsm_write_spec(&self) -> Result<()> {
2830        merge::lsm::unset_lsm_write_spec(self).await
2831    }
2832
2833    /// Delete rows from the table
2834    async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
2835        delete::execute_delete(self, predicate).await
2836    }
2837
2838    async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
2839        Ok(Box::new(NativeTags {
2840            dataset: self.dataset.clone(),
2841        }))
2842    }
2843
2844    async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
2845        // Delegate to the submodule implementation
2846        optimize::execute_optimize(self, action).await
2847    }
2848
2849    async fn add_columns(
2850        &self,
2851        transforms: NewColumnTransform,
2852        read_columns: Option<Vec<String>>,
2853    ) -> Result<AddColumnsResult> {
2854        schema_evolution::execute_add_columns(self, transforms, read_columns).await
2855    }
2856
2857    async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult> {
2858        schema_evolution::execute_alter_columns(self, alterations).await
2859    }
2860
2861    async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
2862        schema_evolution::execute_drop_columns(self, columns).await
2863    }
2864
2865    async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
2866        let dataset = self.dataset.get().await?;
2867        let indices = dataset.load_indices().await?;
2868        let results = futures::stream::iter(indices.as_slice())
2869            .then(|idx| async {
2870                // skip Lance internal indexes
2871                if idx.name == FRAG_REUSE_INDEX_NAME {
2872                    return None;
2873                }
2874
2875                let stats = match dataset.index_statistics(idx.name.as_str()).await {
2876                    Ok(stats) => stats,
2877                    Err(e) => {
2878                        log::warn!(
2879                            "Failed to get statistics for index {} ({}): {}",
2880                            idx.name,
2881                            idx.uuid,
2882                            e
2883                        );
2884                        return None;
2885                    }
2886                };
2887
2888                let stats: serde_json::Value = match serde_json::from_str(&stats) {
2889                    Ok(stats) => stats,
2890                    Err(e) => {
2891                        log::warn!(
2892                            "Failed to deserialize index statistics for index {} ({}): {}",
2893                            idx.name,
2894                            idx.uuid,
2895                            e
2896                        );
2897                        return None;
2898                    }
2899                };
2900
2901                let Some(index_type) = stats.get("index_type").and_then(|v| v.as_str()) else {
2902                    log::warn!(
2903                        "Index statistics was missing 'index_type' field for index {} ({})",
2904                        idx.name,
2905                        idx.uuid
2906                    );
2907                    return None;
2908                };
2909
2910                let index_type: crate::index::IndexType = match index_type.parse() {
2911                    Ok(index_type) => index_type,
2912                    Err(e) => {
2913                        log::warn!(
2914                            "Failed to parse index type for index {} ({}): {}",
2915                            idx.name,
2916                            idx.uuid,
2917                            e
2918                        );
2919                        return None;
2920                    }
2921                };
2922
2923                let mut columns = Vec::with_capacity(idx.fields.len());
2924                for field_id in &idx.fields {
2925                    let field_path = match dataset.schema().field_path(*field_id) {
2926                        Ok(field_path) => field_path,
2927                        Err(e) => {
2928                            log::warn!(
2929                                "Failed to resolve field path for index {} ({}) field id {}: {}",
2930                                idx.name,
2931                                idx.uuid,
2932                                field_id,
2933                                e
2934                            );
2935                            return None;
2936                        }
2937                    };
2938                    columns.push(field_path);
2939                }
2940
2941                let name = idx.name.clone();
2942                Some(IndexConfig {
2943                    index_type,
2944                    columns,
2945                    name,
2946                })
2947            })
2948            .collect::<Vec<_>>()
2949            .await;
2950
2951        Ok(results.into_iter().flatten().collect())
2952    }
2953
2954    async fn uri(&self) -> Result<String> {
2955        Ok(self.uri.clone())
2956    }
2957
2958    async fn storage_options(&self) -> Option<HashMap<String, String>> {
2959        self.initial_storage_options().await
2960    }
2961
2962    async fn initial_storage_options(&self) -> Option<HashMap<String, String>> {
2963        self.dataset
2964            .get()
2965            .await
2966            .ok()
2967            .and_then(|dataset| dataset.initial_storage_options().cloned())
2968    }
2969
2970    async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>> {
2971        let dataset = self.dataset.get().await?;
2972        Ok(dataset.latest_storage_options().await?.map(|o| o.0))
2973    }
2974
2975    async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>> {
2976        let stats = match self
2977            .dataset
2978            .get()
2979            .await?
2980            .index_statistics(index_name.as_ref())
2981            .await
2982        {
2983            Ok(stats) => stats,
2984            Err(lance_core::Error::IndexNotFound { .. }) => return Ok(None),
2985            Err(e) => return Err(Error::from(e)),
2986        };
2987
2988        let mut stats: IndexStatisticsImpl =
2989            serde_json::from_str(&stats).map_err(|e| Error::InvalidInput {
2990                message: format!("error deserializing index statistics: {}", e),
2991            })?;
2992
2993        let first_index = stats.indices.pop().ok_or_else(|| Error::InvalidInput {
2994            message: "index statistics is empty".to_string(),
2995        })?;
2996        // Index type should be present at one of the levels.
2997        let index_type =
2998            stats
2999                .index_type
3000                .or(first_index.index_type)
3001                .ok_or_else(|| Error::InvalidInput {
3002                    message: "index statistics was missing index type".to_string(),
3003                })?;
3004        let loss = stats
3005            .indices
3006            .iter()
3007            .map(|index| index.loss.unwrap_or_default())
3008            .sum::<f64>();
3009
3010        let loss = first_index.loss.map(|first_loss| first_loss + loss);
3011        Ok(Some(IndexStatistics {
3012            num_indexed_rows: stats.num_indexed_rows,
3013            num_unindexed_rows: stats.num_unindexed_rows,
3014            index_type,
3015            distance_type: first_index.metric_type,
3016            num_indices: stats.num_indices,
3017            loss,
3018        }))
3019    }
3020
3021    /// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
3022    /// are not fully indexed within the timeout.
3023    async fn wait_for_index(
3024        &self,
3025        index_names: &[&str],
3026        timeout: std::time::Duration,
3027    ) -> Result<()> {
3028        wait_for_index(self, index_names, timeout).await
3029    }
3030
3031    async fn stats(&self) -> Result<TableStatistics> {
3032        let num_rows = self.count_rows(None).await?;
3033        let num_indices = self.list_indices().await?.len();
3034        let ds = self.dataset.get().await?;
3035        let ds_clone = (*ds).clone();
3036        let ds_stats = Arc::new(ds_clone).calculate_data_stats().await?;
3037        let total_bytes = ds_stats.fields.iter().map(|f| f.bytes_on_disk).sum::<u64>() as usize;
3038
3039        let frags = ds.get_fragments();
3040        let mut sorted_sizes = join_all(
3041            frags
3042                .iter()
3043                .map(|frag| async move { frag.physical_rows().await.unwrap_or(0) }),
3044        )
3045        .await;
3046        sorted_sizes.sort();
3047
3048        let small_frag_threshold = 100000;
3049        let num_fragments = sorted_sizes.len();
3050        let num_small_fragments = sorted_sizes
3051            .iter()
3052            .filter(|&&size| size < small_frag_threshold)
3053            .count();
3054
3055        let p25 = *sorted_sizes.get(num_fragments / 4).unwrap_or(&0);
3056        let p50 = *sorted_sizes.get(num_fragments / 2).unwrap_or(&0);
3057        let p75 = *sorted_sizes.get(num_fragments * 3 / 4).unwrap_or(&0);
3058        let p99 = *sorted_sizes.get(num_fragments * 99 / 100).unwrap_or(&0);
3059        let min = sorted_sizes.first().copied().unwrap_or(0);
3060        let max = sorted_sizes.last().copied().unwrap_or(0);
3061        let mean = if num_fragments == 0 {
3062            0
3063        } else {
3064            sorted_sizes.iter().copied().sum::<usize>() / num_fragments
3065        };
3066
3067        let frag_stats = FragmentStatistics {
3068            num_fragments,
3069            num_small_fragments,
3070            lengths: FragmentSummaryStats {
3071                min,
3072                max,
3073                mean,
3074                p25,
3075                p50,
3076                p75,
3077                p99,
3078            },
3079        };
3080        let stats = TableStatistics {
3081            total_bytes,
3082            num_rows,
3083            num_indices,
3084            fragment_stats: frag_stats,
3085        };
3086        Ok(stats)
3087    }
3088
3089    async fn create_insert_exec(
3090        &self,
3091        input: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
3092        write_params: WriteParams,
3093    ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>> {
3094        let ds = self.dataset.get().await?;
3095        let dataset = Arc::new((*ds).clone());
3096        Ok(Arc::new(datafusion::insert::InsertExec::new(
3097            self.dataset.clone(),
3098            dataset,
3099            input,
3100            write_params,
3101        )))
3102    }
3103}
3104
3105#[skip_serializing_none]
3106#[derive(Debug, Deserialize, PartialEq)]
3107pub struct TableStatistics {
3108    /// The total number of bytes in the table
3109    pub total_bytes: usize,
3110
3111    /// The number of rows in the table
3112    pub num_rows: usize,
3113
3114    /// The number of indices in the table
3115    pub num_indices: usize,
3116
3117    /// Statistics on table fragments
3118    pub fragment_stats: FragmentStatistics,
3119}
3120
3121#[skip_serializing_none]
3122#[derive(Debug, Deserialize, PartialEq)]
3123pub struct FragmentStatistics {
3124    /// The number of fragments in the table
3125    pub num_fragments: usize,
3126
3127    /// The number of uncompacted fragments in the table
3128    pub num_small_fragments: usize,
3129
3130    /// Statistics on the number of rows in the table fragments
3131    pub lengths: FragmentSummaryStats,
3132    // todo: add size statistics
3133    // /// Statistics on the number of bytes in the table fragments
3134    // sizes: FragmentStats,
3135}
3136
3137#[skip_serializing_none]
3138#[derive(Debug, Deserialize, PartialEq)]
3139pub struct FragmentSummaryStats {
3140    pub min: usize,
3141    pub max: usize,
3142    pub mean: usize,
3143    pub p25: usize,
3144    pub p50: usize,
3145    pub p75: usize,
3146    pub p99: usize,
3147}
3148
3149#[cfg(test)]
3150#[allow(deprecated)]
3151mod tests {
3152    use std::collections::HashMap;
3153    use std::sync::Arc;
3154    use std::sync::atomic::{AtomicBool, Ordering};
3155    use std::time::Duration;
3156
3157    use arrow_array::{
3158        Array, ArrayRef, BooleanArray, FixedSizeListArray, Int32Array, LargeStringArray,
3159        RecordBatch, RecordBatchIterator, RecordBatchReader, StringArray, StructArray,
3160        builder::{ListBuilder, StringBuilder},
3161    };
3162    use arrow_array::{BinaryArray, LargeBinaryArray};
3163    use arrow_data::ArrayDataBuilder;
3164    use arrow_schema::{DataType, Field, Schema};
3165    use futures::TryStreamExt;
3166    use lance::Dataset;
3167    use lance::io::{ObjectStoreParams, WrappingObjectStore};
3168    use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION;
3169    use tempfile::tempdir;
3170
3171    use super::*;
3172    use crate::connect;
3173    use crate::connection::ConnectBuilder;
3174    use crate::index::scalar::{BTreeIndexBuilder, BitmapIndexBuilder};
3175    use crate::index::vector::{IvfHnswPqIndexBuilder, IvfHnswSqIndexBuilder};
3176    use crate::query::Select;
3177    use crate::query::{ExecutableQuery, QueryBase};
3178    use crate::test_utils::connection::new_test_connection;
3179    use lance_index::scalar::FullTextSearchQuery;
3180    #[tokio::test]
3181    async fn test_open() {
3182        let tmp_dir = tempdir().unwrap();
3183        let dataset_path = tmp_dir.path().join("test.lance");
3184
3185        let batch = make_test_batches();
3186        let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
3187        Dataset::write(reader, dataset_path.to_str().unwrap(), None)
3188            .await
3189            .unwrap();
3190
3191        let table = NativeTable::open(dataset_path.to_str().unwrap())
3192            .await
3193            .unwrap();
3194
3195        assert_eq!(table.name, "test")
3196    }
3197
3198    #[tokio::test]
3199    async fn test_open_not_found() {
3200        let tmp_dir = tempdir().unwrap();
3201        let uri = tmp_dir.path().to_str().unwrap();
3202        let table = NativeTable::open(uri).await;
3203        assert!(matches!(table.unwrap_err(), Error::TableNotFound { .. }));
3204    }
3205
3206    #[test]
3207    #[cfg(not(windows))]
3208    fn test_object_store_path() {
3209        use std::path::Path as StdPath;
3210        let p = StdPath::new("s3://bucket/path/to/file");
3211        let c = p.join("subfile");
3212        assert_eq!(c.to_str().unwrap(), "s3://bucket/path/to/file/subfile");
3213    }
3214
3215    #[tokio::test]
3216    async fn test_count_rows() {
3217        let tmp_dir = tempdir().unwrap();
3218        let uri = tmp_dir.path().to_str().unwrap();
3219
3220        let batch = make_test_batches();
3221        let reader: Box<dyn RecordBatchReader + Send> = Box::new(RecordBatchIterator::new(
3222            vec![Ok(batch.clone())],
3223            batch.schema(),
3224        ));
3225        let table = NativeTable::create(
3226            uri,
3227            "test",
3228            vec![],
3229            reader,
3230            None,
3231            None,
3232            None,
3233            None,
3234            HashSet::new(),
3235        )
3236        .await
3237        .unwrap();
3238
3239        assert_eq!(table.count_rows(None).await.unwrap(), 10);
3240        assert_eq!(
3241            table
3242                .count_rows(Some(Filter::Sql("i >= 5".to_string())))
3243                .await
3244                .unwrap(),
3245            5
3246        );
3247    }
3248
3249    #[derive(Default, Debug)]
3250    struct NoOpCacheWrapper {
3251        called: AtomicBool,
3252    }
3253
3254    impl NoOpCacheWrapper {
3255        fn called(&self) -> bool {
3256            self.called.load(Ordering::Relaxed)
3257        }
3258    }
3259
3260    impl WrappingObjectStore for NoOpCacheWrapper {
3261        fn wrap(
3262            &self,
3263            _store_prefix: &str,
3264            original: Arc<dyn object_store::ObjectStore>,
3265        ) -> Arc<dyn object_store::ObjectStore> {
3266            self.called.store(true, Ordering::Relaxed);
3267            original
3268        }
3269    }
3270
3271    #[tokio::test]
3272    async fn test_open_table_options() {
3273        let tmp_dir = tempdir().unwrap();
3274        let dataset_path = tmp_dir.path().join("test.lance");
3275        let uri = dataset_path.to_str().unwrap();
3276        let conn = connect(uri).execute().await.unwrap();
3277
3278        let batches = make_test_batches();
3279
3280        conn.create_table("my_table", batches)
3281            .execute()
3282            .await
3283            .unwrap();
3284
3285        let wrapper = Arc::new(NoOpCacheWrapper::default());
3286
3287        let object_store_params = ObjectStoreParams {
3288            object_store_wrapper: Some(wrapper.clone()),
3289            ..Default::default()
3290        };
3291        let param = ReadParams {
3292            store_options: Some(object_store_params),
3293            ..Default::default()
3294        };
3295        assert!(!wrapper.called());
3296        conn.open_table("my_table")
3297            .lance_read_params(param)
3298            .execute()
3299            .await
3300            .unwrap();
3301        assert!(wrapper.called());
3302    }
3303
3304    fn make_test_batches() -> RecordBatch {
3305        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
3306        RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from_iter_values(0..10))]).unwrap()
3307    }
3308
3309    #[tokio::test]
3310    async fn test_tags() {
3311        let tmp_dir = tempdir().unwrap();
3312        let uri = tmp_dir.path().to_str().unwrap();
3313
3314        let conn = ConnectBuilder::new(uri)
3315            .read_consistency_interval(Duration::from_secs(0))
3316            .execute()
3317            .await
3318            .unwrap();
3319        let table = conn
3320            .create_table("my_table", some_sample_data())
3321            .execute()
3322            .await
3323            .unwrap();
3324        assert_eq!(table.version().await.unwrap(), 1);
3325        table.add(some_sample_data()).execute().await.unwrap();
3326        assert_eq!(table.version().await.unwrap(), 2);
3327        let mut tags_manager = table.tags().await.unwrap();
3328        let tags = tags_manager.list().await.unwrap();
3329        assert!(tags.is_empty(), "Tags should be empty initially");
3330        let tag1 = "tag1";
3331        tags_manager.create(tag1, 1).await.unwrap();
3332        assert_eq!(tags_manager.get_version(tag1).await.unwrap(), 1);
3333        let tags = tags_manager.list().await.unwrap();
3334        assert_eq!(tags.len(), 1);
3335        assert!(tags.contains_key(tag1));
3336        assert_eq!(tags.get(tag1).unwrap().version, 1);
3337        tags_manager.create("tag2", 2).await.unwrap();
3338        assert_eq!(tags_manager.get_version("tag2").await.unwrap(), 2);
3339        let tags = tags_manager.list().await.unwrap();
3340        assert_eq!(tags.len(), 2);
3341        assert!(tags.contains_key(tag1));
3342        assert_eq!(tags.get(tag1).unwrap().version, 1);
3343        assert!(tags.contains_key("tag2"));
3344        assert_eq!(tags.get("tag2").unwrap().version, 2);
3345        // Test update and delete
3346        table.add(some_sample_data()).execute().await.unwrap();
3347        tags_manager.update(tag1, 3).await.unwrap();
3348        assert_eq!(tags_manager.get_version(tag1).await.unwrap(), 3);
3349        tags_manager.delete("tag2").await.unwrap();
3350        let tags = tags_manager.list().await.unwrap();
3351        assert_eq!(tags.len(), 1);
3352        assert!(tags.contains_key(tag1));
3353        assert_eq!(tags.get(tag1).unwrap().version, 3);
3354        // Test checkout tag
3355        table.add(some_sample_data()).execute().await.unwrap();
3356        assert_eq!(table.version().await.unwrap(), 4);
3357        table.checkout_tag(tag1).await.unwrap();
3358        assert_eq!(table.version().await.unwrap(), 3);
3359        table.checkout_latest().await.unwrap();
3360        assert_eq!(table.version().await.unwrap(), 4);
3361    }
3362
3363    #[tokio::test]
3364    async fn test_create_index() {
3365        use arrow_array::RecordBatch;
3366        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3367        use rand;
3368        use std::iter::repeat_with;
3369
3370        use arrow_array::Float32Array;
3371
3372        let tmp_dir = tempdir().unwrap();
3373        let uri = tmp_dir.path().to_str().unwrap();
3374        let conn = connect(uri).execute().await.unwrap();
3375
3376        let dimension = 16;
3377        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3378            "embeddings",
3379            DataType::FixedSizeList(
3380                Arc::new(Field::new("item", DataType::Float32, true)),
3381                dimension,
3382            ),
3383            false,
3384        )]));
3385
3386        let float_arr = Float32Array::from(
3387            repeat_with(rand::random::<f32>)
3388                .take(512 * dimension as usize)
3389                .collect::<Vec<f32>>(),
3390        );
3391
3392        let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3393        let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
3394
3395        let table = conn.create_table("test", batch).execute().await.unwrap();
3396
3397        assert_eq!(table.index_stats("my_index").await.unwrap(), None);
3398
3399        table
3400            .create_index(&["embeddings"], Index::Auto)
3401            .execute()
3402            .await
3403            .unwrap();
3404
3405        let index_configs = table.list_indices().await.unwrap();
3406        assert_eq!(index_configs.len(), 1);
3407        let index = index_configs.into_iter().next().unwrap();
3408        assert_eq!(index.index_type, crate::index::IndexType::IvfPq);
3409        assert_eq!(index.columns, vec!["embeddings".to_string()]);
3410        assert_eq!(table.count_rows(None).await.unwrap(), 512);
3411        assert_eq!(table.name(), "test");
3412
3413        let indices = table.as_native().unwrap().load_indices().await.unwrap();
3414        let index_name = &indices[0].index_name;
3415        let stats = table.index_stats(index_name).await.unwrap().unwrap();
3416        assert_eq!(stats.num_indexed_rows, 512);
3417        assert_eq!(stats.num_unindexed_rows, 0);
3418        assert_eq!(stats.index_type, crate::index::IndexType::IvfPq);
3419        assert_eq!(stats.distance_type, Some(crate::DistanceType::L2));
3420        assert!(stats.loss.is_some());
3421
3422        table.drop_index(index_name).await.unwrap();
3423        assert_eq!(table.list_indices().await.unwrap().len(), 0);
3424    }
3425
3426    #[tokio::test]
3427    async fn test_dynamic_select() {
3428        let tc = new_test_connection().await.unwrap();
3429        let db = tc.connection;
3430
3431        let table = db
3432            .create_table("test", some_sample_data())
3433            .execute()
3434            .await
3435            .unwrap();
3436
3437        let query = table.query().select(Select::dynamic(&[("i_alias", "i")]));
3438
3439        let result = query.execute().await;
3440        let batches = result
3441            .expect("should have result")
3442            .try_collect::<Vec<_>>()
3443            .await
3444            .unwrap();
3445
3446        for batch in batches {
3447            assert!(batch.column_by_name("i_alias").is_some());
3448        }
3449    }
3450
3451    #[tokio::test]
3452    async fn test_ivf_pq_uses_default_partition_size_for_num_partitions() {
3453        use arrow_array::{Float32Array, RecordBatch};
3454        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3455
3456        use crate::index::vector::IvfPqIndexBuilder;
3457
3458        let tmp_dir = tempdir().unwrap();
3459        let uri = tmp_dir.path().to_str().unwrap();
3460        let conn = connect(uri).execute().await.unwrap();
3461
3462        const PARTITION_SIZE: usize = 8192;
3463        let num_rows = PARTITION_SIZE * 2;
3464        let dimension = 8usize;
3465        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3466            "embeddings",
3467            DataType::FixedSizeList(
3468                Arc::new(Field::new("item", DataType::Float32, true)),
3469                dimension as i32,
3470            ),
3471            false,
3472        )]));
3473
3474        let float_arr =
3475            Float32Array::from_iter_values((0..(num_rows * dimension)).map(|v| v as f32));
3476        let vectors = Arc::new(create_fixed_size_list(float_arr, dimension as i32).unwrap());
3477        let batch = RecordBatch::try_new(schema.clone(), vec![vectors]).unwrap();
3478
3479        let table = conn.create_table("test", batch).execute().await.unwrap();
3480        let native_table = table.as_native().unwrap();
3481        let builder = IvfPqIndexBuilder::default();
3482        table
3483            .create_index(&["embeddings"], Index::IvfPq(builder))
3484            .execute()
3485            .await
3486            .unwrap();
3487        table
3488            .wait_for_index(&["embeddings_idx"], std::time::Duration::from_secs(30))
3489            .await
3490            .unwrap();
3491
3492        use lance::index::DatasetIndexInternalExt;
3493        use lance::index::vector::ivf::v2::IvfPq as LanceIvfPq;
3494        use lance_index::metrics::NoOpMetricsCollector;
3495        use lance_index::vector::VectorIndex as LanceVectorIndex;
3496
3497        let indices = native_table.load_indices().await.unwrap();
3498        let index_uuid = indices[0].index_uuid.clone();
3499
3500        let dataset_guard = native_table.dataset.get().await.unwrap();
3501        let dataset = (*dataset_guard).clone();
3502        drop(dataset_guard);
3503
3504        let lance_index = dataset
3505            .open_vector_index("embeddings", &index_uuid, &NoOpMetricsCollector)
3506            .await
3507            .unwrap();
3508        let ivf_index = lance_index
3509            .as_any()
3510            .downcast_ref::<LanceIvfPq>()
3511            .expect("expected IvfPq index");
3512        let partition_count = ivf_index.ivf_model().num_partitions();
3513
3514        let expected_partitions = num_rows / PARTITION_SIZE;
3515        assert_eq!(partition_count, expected_partitions);
3516    }
3517
3518    #[tokio::test]
3519    async fn test_create_index_ivf_hnsw_sq() {
3520        use arrow_array::RecordBatch;
3521        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3522        use rand;
3523        use std::iter::repeat_with;
3524
3525        use arrow_array::Float32Array;
3526
3527        let tmp_dir = tempdir().unwrap();
3528        let uri = tmp_dir.path().to_str().unwrap();
3529        let conn = connect(uri).execute().await.unwrap();
3530
3531        let dimension = 16;
3532        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3533            "embeddings",
3534            DataType::FixedSizeList(
3535                Arc::new(Field::new("item", DataType::Float32, true)),
3536                dimension,
3537            ),
3538            false,
3539        )]));
3540
3541        let float_arr = Float32Array::from(
3542            repeat_with(rand::random::<f32>)
3543                .take(512 * dimension as usize)
3544                .collect::<Vec<f32>>(),
3545        );
3546
3547        let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3548        let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
3549
3550        let table = conn.create_table("test", batch).execute().await.unwrap();
3551
3552        let stats = table.index_stats("my_index").await.unwrap();
3553        assert!(stats.is_none());
3554
3555        let index = IvfHnswSqIndexBuilder::default();
3556        table
3557            .create_index(&["embeddings"], Index::IvfHnswSq(index))
3558            .execute()
3559            .await
3560            .unwrap();
3561
3562        let index_configs = table.list_indices().await.unwrap();
3563        assert_eq!(index_configs.len(), 1);
3564        let index = index_configs.into_iter().next().unwrap();
3565        assert_eq!(index.index_type, crate::index::IndexType::IvfHnswSq);
3566        assert_eq!(index.columns, vec!["embeddings".to_string()]);
3567        assert_eq!(table.count_rows(None).await.unwrap(), 512);
3568        assert_eq!(table.name(), "test");
3569
3570        let indices = table.as_native().unwrap().load_indices().await.unwrap();
3571        let index_name = &indices[0].index_name;
3572        let stats = table.index_stats(index_name).await.unwrap().unwrap();
3573        assert_eq!(stats.num_indexed_rows, 512);
3574        assert_eq!(stats.num_unindexed_rows, 0);
3575    }
3576
3577    #[tokio::test]
3578    async fn test_create_index_ivf_hnsw_pq() {
3579        use arrow_array::RecordBatch;
3580        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3581        use rand;
3582        use std::iter::repeat_with;
3583
3584        use arrow_array::Float32Array;
3585
3586        let tmp_dir = tempdir().unwrap();
3587        let uri = tmp_dir.path().to_str().unwrap();
3588        let conn = connect(uri).execute().await.unwrap();
3589
3590        let dimension = 16;
3591        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3592            "embeddings",
3593            DataType::FixedSizeList(
3594                Arc::new(Field::new("item", DataType::Float32, true)),
3595                dimension,
3596            ),
3597            false,
3598        )]));
3599
3600        let float_arr = Float32Array::from(
3601            repeat_with(rand::random::<f32>)
3602                .take(512 * dimension as usize)
3603                .collect::<Vec<f32>>(),
3604        );
3605
3606        let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3607        let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
3608
3609        let table = conn.create_table("test", batch).execute().await.unwrap();
3610        let stats = table.index_stats("my_index").await.unwrap();
3611        assert!(stats.is_none());
3612
3613        let index = IvfHnswPqIndexBuilder::default();
3614        table
3615            .create_index(&["embeddings"], Index::IvfHnswPq(index))
3616            .execute()
3617            .await
3618            .unwrap();
3619        table
3620            .wait_for_index(&["embeddings_idx"], Duration::from_millis(10))
3621            .await
3622            .unwrap();
3623        let index_configs = table.list_indices().await.unwrap();
3624        assert_eq!(index_configs.len(), 1);
3625        let index = index_configs.into_iter().next().unwrap();
3626        assert_eq!(index.index_type, crate::index::IndexType::IvfHnswPq);
3627        assert_eq!(index.columns, vec!["embeddings".to_string()]);
3628        assert_eq!(table.count_rows(None).await.unwrap(), 512);
3629        assert_eq!(table.name(), "test");
3630
3631        let indices: Vec<VectorIndex> = table.as_native().unwrap().load_indices().await.unwrap();
3632        let index_name = &indices[0].index_name;
3633        let stats = table.index_stats(index_name).await.unwrap().unwrap();
3634        assert_eq!(stats.num_indexed_rows, 512);
3635        assert_eq!(stats.num_unindexed_rows, 0);
3636    }
3637
3638    #[tokio::test]
3639    async fn test_create_index_ivf_hnsw_flat() {
3640        use arrow_array::RecordBatch;
3641        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3642        use rand;
3643        use std::iter::repeat_with;
3644
3645        use crate::index::vector::IvfHnswFlatIndexBuilder;
3646        use arrow_array::Float32Array;
3647
3648        let tmp_dir = tempdir().unwrap();
3649        let uri = tmp_dir.path().to_str().unwrap();
3650        let conn = connect(uri).execute().await.unwrap();
3651
3652        let dimension = 16;
3653        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3654            "embeddings",
3655            DataType::FixedSizeList(
3656                Arc::new(Field::new("item", DataType::Float32, true)),
3657                dimension,
3658            ),
3659            false,
3660        )]));
3661
3662        let float_arr = Float32Array::from(
3663            repeat_with(rand::random::<f32>)
3664                .take(512 * dimension as usize)
3665                .collect::<Vec<f32>>(),
3666        );
3667
3668        let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3669        let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
3670
3671        let table = conn.create_table("test", batch).execute().await.unwrap();
3672
3673        let index = IvfHnswFlatIndexBuilder::default();
3674        table
3675            .create_index(&["embeddings"], Index::IvfHnswFlat(index))
3676            .execute()
3677            .await
3678            .unwrap();
3679
3680        let index_configs = table.list_indices().await.unwrap();
3681        assert_eq!(index_configs.len(), 1);
3682        let index = index_configs.into_iter().next().unwrap();
3683        assert_eq!(index.index_type, crate::index::IndexType::IvfHnswFlat);
3684        assert_eq!(index.columns, vec!["embeddings".to_string()]);
3685        assert_eq!(table.count_rows(None).await.unwrap(), 512);
3686    }
3687
3688    fn create_fixed_size_list<T: Array>(values: T, list_size: i32) -> Result<FixedSizeListArray> {
3689        let list_type = DataType::FixedSizeList(
3690            Arc::new(Field::new("item", values.data_type().clone(), true)),
3691            list_size,
3692        );
3693        let data = ArrayDataBuilder::new(list_type)
3694            .len(values.len() / list_size as usize)
3695            .add_child_data(values.into_data())
3696            .build()
3697            .unwrap();
3698
3699        Ok(FixedSizeListArray::from(data))
3700    }
3701
3702    fn some_sample_data() -> Box<dyn arrow_array::RecordBatchReader + Send> {
3703        let batch = RecordBatch::try_new(
3704            Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
3705            vec![Arc::new(Int32Array::from(vec![1]))],
3706        )
3707        .unwrap();
3708        let schema = batch.schema().clone();
3709        let batch = Ok(batch);
3710
3711        Box::new(RecordBatchIterator::new(vec![batch], schema))
3712    }
3713
3714    #[tokio::test]
3715    async fn test_create_scalar_index() {
3716        let tmp_dir = tempdir().unwrap();
3717        let uri = tmp_dir.path().to_str().unwrap();
3718
3719        let batch = RecordBatch::try_new(
3720            Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
3721            vec![Arc::new(Int32Array::from(vec![1]))],
3722        )
3723        .unwrap();
3724        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3725        let table = conn
3726            .create_table("my_table", batch.clone())
3727            .execute()
3728            .await
3729            .unwrap();
3730
3731        // Can create an index on a scalar column (will default to btree)
3732        table
3733            .create_index(&["i"], Index::Auto)
3734            .execute()
3735            .await
3736            .unwrap();
3737        table
3738            .wait_for_index(&["i_idx"], Duration::from_millis(10))
3739            .await
3740            .unwrap();
3741        let index_configs = table.list_indices().await.unwrap();
3742        assert_eq!(index_configs.len(), 1);
3743        let index = index_configs.into_iter().next().unwrap();
3744        assert_eq!(index.index_type, crate::index::IndexType::BTree);
3745        assert_eq!(index.columns, vec!["i".to_string()]);
3746
3747        // Can also specify btree
3748        table
3749            .create_index(&["i"], Index::BTree(BTreeIndexBuilder::default()))
3750            .execute()
3751            .await
3752            .unwrap();
3753
3754        let index_configs = table.list_indices().await.unwrap();
3755        assert_eq!(index_configs.len(), 1);
3756        let index = index_configs.into_iter().next().unwrap();
3757        assert_eq!(index.index_type, crate::index::IndexType::BTree);
3758        assert_eq!(index.columns, vec!["i".to_string()]);
3759
3760        let indices = table.as_native().unwrap().load_indices().await.unwrap();
3761        let index_name = &indices[0].index_name;
3762        let stats = table.index_stats(index_name).await.unwrap().unwrap();
3763        assert_eq!(stats.num_indexed_rows, 1);
3764        assert_eq!(stats.num_unindexed_rows, 0);
3765    }
3766
3767    #[tokio::test]
3768    async fn test_create_index_nested_field_paths() {
3769        let tmp_dir = tempdir().unwrap();
3770        let uri = tmp_dir.path().to_str().unwrap();
3771        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3772
3773        let num_rows = 512;
3774        let dimension = 8;
3775
3776        let metadata = Arc::new(StructArray::from(vec![(
3777            Arc::new(Field::new("user_id", DataType::Int32, false)),
3778            Arc::new(Int32Array::from_iter_values(0..num_rows)) as ArrayRef,
3779        )]));
3780
3781        let vector_values = arrow_array::Float32Array::from_iter_values(
3782            (0..num_rows * dimension).map(|v| v as f32),
3783        );
3784        let embeddings =
3785            Arc::new(create_fixed_size_list(vector_values, dimension).unwrap()) as ArrayRef;
3786        let image = Arc::new(StructArray::from(vec![(
3787            Arc::new(Field::new(
3788                "embedding",
3789                embeddings.data_type().clone(),
3790                false,
3791            )),
3792            embeddings,
3793        )]));
3794
3795        let payload = Arc::new(StructArray::from(vec![(
3796            Arc::new(Field::new("text", DataType::Utf8, false)),
3797            Arc::new(StringArray::from_iter_values(
3798                (0..num_rows).map(|i| format!("document {}", i)),
3799            )) as ArrayRef,
3800        )]));
3801
3802        let meta_data = Arc::new(StructArray::from(vec![(
3803            Arc::new(Field::new("user-id", DataType::Int32, false)),
3804            Arc::new(Int32Array::from_iter_values(0..num_rows)) as ArrayRef,
3805        )]));
3806
3807        let literal = Arc::new(StructArray::from(vec![(
3808            Arc::new(Field::new("a.b", DataType::Int32, false)),
3809            Arc::new(Int32Array::from_iter_values(0..num_rows)) as ArrayRef,
3810        )]));
3811
3812        let schema = Arc::new(Schema::new(vec![
3813            Field::new("metadata", metadata.data_type().clone(), false),
3814            Field::new("image", image.data_type().clone(), false),
3815            Field::new("payload", payload.data_type().clone(), false),
3816            Field::new("meta-data", meta_data.data_type().clone(), false),
3817            Field::new("literal", literal.data_type().clone(), false),
3818        ]));
3819        let batch =
3820            RecordBatch::try_new(schema, vec![metadata, image, payload, meta_data, literal])
3821                .unwrap();
3822
3823        let table = conn
3824            .create_table("nested_index_paths", batch)
3825            .execute()
3826            .await
3827            .unwrap();
3828
3829        table
3830            .create_index(
3831                &["metadata.user_id"],
3832                Index::BTree(BTreeIndexBuilder::default()),
3833            )
3834            .name("metadata_user_id_idx".to_string())
3835            .execute()
3836            .await
3837            .unwrap();
3838        table
3839            .create_index(&["image.embedding"], Index::Auto)
3840            .name("image_embedding_idx".to_string())
3841            .execute()
3842            .await
3843            .unwrap();
3844        table
3845            .create_index(&["payload.text"], Index::FTS(Default::default()))
3846            .name("payload_text_idx".to_string())
3847            .execute()
3848            .await
3849            .unwrap();
3850        table
3851            .create_index(
3852                &["`meta-data`.`user-id`"],
3853                Index::BTree(BTreeIndexBuilder::default()),
3854            )
3855            .name("escaped_names_idx".to_string())
3856            .execute()
3857            .await
3858            .unwrap();
3859        table
3860            .create_index(
3861                &["literal.`a.b`"],
3862                Index::BTree(BTreeIndexBuilder::default()),
3863            )
3864            .name("literal_dot_idx".to_string())
3865            .execute()
3866            .await
3867            .unwrap();
3868
3869        let mut index_configs = table.list_indices().await.unwrap();
3870        index_configs.sort_by(|left, right| left.name.cmp(&right.name));
3871
3872        let indexed_columns = index_configs
3873            .iter()
3874            .map(|index| {
3875                (
3876                    index.name.as_str(),
3877                    index.columns.as_slice(),
3878                    index.index_type.clone(),
3879                )
3880            })
3881            .collect::<Vec<_>>();
3882        assert_eq!(
3883            indexed_columns,
3884            vec![
3885                (
3886                    "escaped_names_idx",
3887                    &["`meta-data`.`user-id`".to_string()][..],
3888                    crate::index::IndexType::BTree,
3889                ),
3890                (
3891                    "image_embedding_idx",
3892                    &["image.embedding".to_string()][..],
3893                    crate::index::IndexType::IvfPq,
3894                ),
3895                (
3896                    "literal_dot_idx",
3897                    &["literal.`a.b`".to_string()][..],
3898                    crate::index::IndexType::BTree,
3899                ),
3900                (
3901                    "metadata_user_id_idx",
3902                    &["metadata.user_id".to_string()][..],
3903                    crate::index::IndexType::BTree,
3904                ),
3905                (
3906                    "payload_text_idx",
3907                    &["payload.text".to_string()][..],
3908                    crate::index::IndexType::FTS,
3909                ),
3910            ]
3911        );
3912
3913        let vector_results = table
3914            .query()
3915            .nearest_to(&[0.0; 8])
3916            .unwrap()
3917            .column("image.embedding")
3918            .limit(1)
3919            .execute()
3920            .await
3921            .unwrap()
3922            .try_collect::<Vec<_>>()
3923            .await
3924            .unwrap();
3925        assert_eq!(
3926            vector_results
3927                .iter()
3928                .map(|batch| batch.num_rows())
3929                .sum::<usize>(),
3930            1
3931        );
3932
3933        let default_vector_results = table
3934            .query()
3935            .nearest_to(&[0.0; 8])
3936            .unwrap()
3937            .limit(1)
3938            .execute()
3939            .await
3940            .unwrap()
3941            .try_collect::<Vec<_>>()
3942            .await
3943            .unwrap();
3944        assert_eq!(
3945            default_vector_results
3946                .iter()
3947                .map(|batch| batch.num_rows())
3948                .sum::<usize>(),
3949            1
3950        );
3951
3952        let fts_results = table
3953            .query()
3954            .full_text_search(FullTextSearchQuery::new("document".to_string()))
3955            .limit(5)
3956            .execute()
3957            .await
3958            .unwrap()
3959            .try_collect::<Vec<_>>()
3960            .await
3961            .unwrap();
3962        assert!(!fts_results.is_empty());
3963
3964        let filtered_results = table
3965            .query()
3966            .only_if("metadata.user_id = 42")
3967            .limit(1)
3968            .execute()
3969            .await
3970            .unwrap()
3971            .try_collect::<Vec<_>>()
3972            .await
3973            .unwrap();
3974        assert_eq!(
3975            filtered_results
3976                .iter()
3977                .map(|batch| batch.num_rows())
3978                .sum::<usize>(),
3979            1
3980        );
3981    }
3982
3983    #[tokio::test]
3984    async fn test_create_bitmap_index() {
3985        let tmp_dir = tempdir().unwrap();
3986        let uri = tmp_dir.path().to_str().unwrap();
3987
3988        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3989
3990        let schema = Arc::new(Schema::new(vec![
3991            Field::new("id", DataType::Int32, false),
3992            Field::new("category", DataType::Utf8, true),
3993            Field::new("large_category", DataType::LargeUtf8, true),
3994            Field::new("is_active", DataType::Boolean, true),
3995            Field::new("data", DataType::Binary, true),
3996            Field::new("large_data", DataType::LargeBinary, true),
3997        ]));
3998
3999        let batch = RecordBatch::try_new(
4000            schema.clone(),
4001            vec![
4002                Arc::new(Int32Array::from_iter_values(0..100)),
4003                Arc::new(StringArray::from_iter_values(
4004                    (0..100).map(|i| format!("category_{}", i % 5)),
4005                )),
4006                Arc::new(LargeStringArray::from_iter_values(
4007                    (0..100).map(|i| format!("large_category_{}", i % 5)),
4008                )),
4009                Arc::new(BooleanArray::from_iter((0..100).map(|i| Some(i % 2 == 0)))),
4010                Arc::new(BinaryArray::from_iter_values(
4011                    (0_u32..100).map(|i| i.to_le_bytes()),
4012                )),
4013                Arc::new(LargeBinaryArray::from_iter_values(
4014                    (0_u32..100).map(|i| i.to_le_bytes()),
4015                )),
4016            ],
4017        )
4018        .unwrap();
4019
4020        let table = conn
4021            .create_table("test_bitmap", batch.clone())
4022            .execute()
4023            .await
4024            .unwrap();
4025
4026        // Create bitmap index on the "category" column
4027        table
4028            .create_index(&["category"], Index::Bitmap(Default::default()))
4029            .execute()
4030            .await
4031            .unwrap();
4032
4033        // Create bitmap index on the "is_active" column
4034        table
4035            .create_index(&["is_active"], Index::Bitmap(Default::default()))
4036            .execute()
4037            .await
4038            .unwrap();
4039
4040        // Create bitmap index on the "data" column
4041        table
4042            .create_index(&["data"], Index::Bitmap(Default::default()))
4043            .execute()
4044            .await
4045            .unwrap();
4046
4047        // Create bitmap index on the "large_data" column
4048        table
4049            .create_index(&["large_data"], Index::Bitmap(Default::default()))
4050            .execute()
4051            .await
4052            .unwrap();
4053
4054        // Create bitmap index on the "large_category" column
4055        table
4056            .create_index(&["large_category"], Index::Bitmap(Default::default()))
4057            .execute()
4058            .await
4059            .unwrap();
4060
4061        // Verify the index was created
4062        let index_configs = table.list_indices().await.unwrap();
4063        assert_eq!(index_configs.len(), 5);
4064
4065        let mut configs_iter = index_configs.into_iter();
4066        let index = configs_iter.next().unwrap();
4067        assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4068        assert_eq!(index.columns, vec!["category".to_string()]);
4069
4070        let index = configs_iter.next().unwrap();
4071        assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4072        assert_eq!(index.columns, vec!["is_active".to_string()]);
4073
4074        let index = configs_iter.next().unwrap();
4075        assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4076        assert_eq!(index.columns, vec!["data".to_string()]);
4077
4078        let index = configs_iter.next().unwrap();
4079        assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4080        assert_eq!(index.columns, vec!["large_data".to_string()]);
4081
4082        let index = configs_iter.next().unwrap();
4083        assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4084        assert_eq!(index.columns, vec!["large_category".to_string()]);
4085    }
4086
4087    #[tokio::test]
4088    async fn test_create_label_list_index() {
4089        let tmp_dir = tempdir().unwrap();
4090        let uri = tmp_dir.path().to_str().unwrap();
4091
4092        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4093
4094        let schema = Arc::new(Schema::new(vec![
4095            Field::new("id", DataType::Int32, false),
4096            Field::new(
4097                "tags",
4098                DataType::List(Field::new("item", DataType::Utf8, true).into()),
4099                true,
4100            ),
4101        ]));
4102
4103        const TAGS: [&str; 3] = ["cat", "dog", "fish"];
4104
4105        let values_builder = StringBuilder::new();
4106        let mut builder = ListBuilder::new(values_builder);
4107        for i in 0..120 {
4108            builder.values().append_value(TAGS[i % 3]);
4109            if i % 3 == 0 {
4110                builder.append(true)
4111            }
4112        }
4113        let tags = Arc::new(builder.finish());
4114
4115        let batch = RecordBatch::try_new(
4116            schema.clone(),
4117            vec![Arc::new(Int32Array::from_iter_values(0..40)), tags],
4118        )
4119        .unwrap();
4120
4121        let table = conn
4122            .create_table("test_bitmap", batch.clone())
4123            .execute()
4124            .await
4125            .unwrap();
4126
4127        // Can not create btree or bitmap index on list column
4128        assert!(
4129            table
4130                .create_index(&["tags"], Index::BTree(Default::default()))
4131                .execute()
4132                .await
4133                .is_err()
4134        );
4135        assert!(
4136            table
4137                .create_index(&["tags"], Index::Bitmap(Default::default()))
4138                .execute()
4139                .await
4140                .is_err()
4141        );
4142
4143        // Create bitmap index on the "category" column
4144        table
4145            .create_index(&["tags"], Index::LabelList(Default::default()))
4146            .execute()
4147            .await
4148            .unwrap();
4149
4150        // Verify the index was created
4151        let index_configs = table.list_indices().await.unwrap();
4152        assert_eq!(index_configs.len(), 1);
4153        let index = index_configs.into_iter().next().unwrap();
4154        assert_eq!(index.index_type, crate::index::IndexType::LabelList);
4155        assert_eq!(index.columns, vec!["tags".to_string()]);
4156    }
4157
4158    #[tokio::test]
4159    async fn test_create_inverted_index() {
4160        let tmp_dir = tempdir().unwrap();
4161        let uri = tmp_dir.path().to_str().unwrap();
4162
4163        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4164        const WORDS: [&str; 3] = ["cat", "dog", "fish"];
4165        let mut text_builder = StringBuilder::new();
4166        let num_rows = 120;
4167        for i in 0..num_rows {
4168            text_builder.append_value(WORDS[i % 3]);
4169        }
4170        let text = Arc::new(text_builder.finish());
4171
4172        let schema = Arc::new(Schema::new(vec![
4173            Field::new("id", DataType::Int32, false),
4174            Field::new("text", DataType::Utf8, true),
4175        ]));
4176        let batch = RecordBatch::try_new(
4177            schema.clone(),
4178            vec![
4179                Arc::new(Int32Array::from_iter_values(0..num_rows as i32)),
4180                text,
4181            ],
4182        )
4183        .unwrap();
4184
4185        let table = conn
4186            .create_table("test_bitmap", batch.clone())
4187            .execute()
4188            .await
4189            .unwrap();
4190
4191        table
4192            .create_index(&["text"], Index::FTS(Default::default()))
4193            .execute()
4194            .await
4195            .unwrap();
4196        let index_configs = table.list_indices().await.unwrap();
4197        assert_eq!(index_configs.len(), 1);
4198        let index = index_configs.into_iter().next().unwrap();
4199        assert_eq!(index.index_type, crate::index::IndexType::FTS);
4200        assert_eq!(index.columns, vec!["text".to_string()]);
4201        assert_eq!(index.name, "text_idx");
4202
4203        let stats = table.index_stats("text_idx").await.unwrap().unwrap();
4204        assert_eq!(stats.num_indexed_rows, num_rows);
4205        assert_eq!(stats.num_unindexed_rows, 0);
4206        assert_eq!(stats.index_type, crate::index::IndexType::FTS);
4207        assert_eq!(stats.distance_type, None);
4208
4209        // Make sure we can call prewarm without error
4210        table.prewarm_index("text_idx").await.unwrap();
4211    }
4212
4213    // Windows does not support precise sleep durations due to timer resolution limitations.
4214    #[cfg(not(target_os = "windows"))]
4215    #[tokio::test]
4216    async fn test_read_consistency_interval() {
4217        let intervals = vec![
4218            None,
4219            Some(0),
4220            Some(100), // 100 ms
4221        ];
4222
4223        for interval in intervals {
4224            let data = some_sample_data();
4225
4226            let tmp_dir = tempdir().unwrap();
4227            let uri = tmp_dir.path().to_str().unwrap();
4228
4229            let conn1 = ConnectBuilder::new(uri).execute().await.unwrap();
4230            let table1 = conn1
4231                .create_empty_table("my_table", RecordBatchReader::schema(&data))
4232                .execute()
4233                .await
4234                .unwrap();
4235
4236            let mut conn2 = ConnectBuilder::new(uri);
4237            if let Some(interval) = interval {
4238                conn2 = conn2.read_consistency_interval(std::time::Duration::from_millis(interval));
4239            }
4240            let conn2 = conn2.execute().await.unwrap();
4241            let table2 = conn2.open_table("my_table").execute().await.unwrap();
4242
4243            assert_eq!(table1.count_rows(None).await.unwrap(), 0);
4244            assert_eq!(table2.count_rows(None).await.unwrap(), 0);
4245
4246            table1.add(data).execute().await.unwrap();
4247            assert_eq!(table1.count_rows(None).await.unwrap(), 1);
4248
4249            match interval {
4250                None => {
4251                    assert_eq!(table2.count_rows(None).await.unwrap(), 0);
4252                    table2.checkout_latest().await.unwrap();
4253                    assert_eq!(table2.count_rows(None).await.unwrap(), 1);
4254                }
4255                Some(0) => {
4256                    assert_eq!(table2.count_rows(None).await.unwrap(), 1);
4257                }
4258                Some(100) => {
4259                    assert_eq!(table2.count_rows(None).await.unwrap(), 0);
4260                    tokio::time::sleep(Duration::from_millis(100)).await;
4261                    assert_eq!(table2.count_rows(None).await.unwrap(), 1);
4262                }
4263                _ => unreachable!(),
4264            }
4265        }
4266    }
4267
4268    #[tokio::test]
4269    async fn test_time_travel_write() {
4270        let tmp_dir = tempdir().unwrap();
4271        let uri = tmp_dir.path().to_str().unwrap();
4272
4273        let conn = ConnectBuilder::new(uri)
4274            .read_consistency_interval(Duration::from_secs(0))
4275            .execute()
4276            .await
4277            .unwrap();
4278        let table = conn
4279            .create_table("my_table", some_sample_data())
4280            .execute()
4281            .await
4282            .unwrap();
4283        let version = table.version().await.unwrap();
4284        table.add(some_sample_data()).execute().await.unwrap();
4285        table.checkout(version).await.unwrap();
4286        assert!(table.add(some_sample_data()).execute().await.is_err())
4287    }
4288
4289    #[tokio::test]
4290    async fn test_update_dataset_config() {
4291        let tmp_dir = tempdir().unwrap();
4292        let uri = tmp_dir.path().to_str().unwrap();
4293
4294        let conn = ConnectBuilder::new(uri)
4295            .read_consistency_interval(Duration::from_secs(0))
4296            .execute()
4297            .await
4298            .unwrap();
4299
4300        let table = conn
4301            .create_table("my_table", some_sample_data())
4302            .execute()
4303            .await
4304            .unwrap();
4305        let native_tbl = table.as_native().unwrap();
4306
4307        let manifest = native_tbl.manifest().await.unwrap();
4308        let base_config_len = manifest.config.len();
4309
4310        native_tbl
4311            .update_config(vec![("test_key1".to_string(), "test_val1".to_string())])
4312            .await
4313            .unwrap();
4314
4315        let manifest = native_tbl.manifest().await.unwrap();
4316        assert_eq!(manifest.config.len(), 1 + base_config_len);
4317        assert_eq!(
4318            manifest.config.get("test_key1"),
4319            Some(&"test_val1".to_string())
4320        );
4321
4322        native_tbl
4323            .update_config(vec![("test_key2".to_string(), "test_val2".to_string())])
4324            .await
4325            .unwrap();
4326        let manifest = native_tbl.manifest().await.unwrap();
4327        assert_eq!(manifest.config.len(), 2 + base_config_len);
4328        assert_eq!(
4329            manifest.config.get("test_key1"),
4330            Some(&"test_val1".to_string())
4331        );
4332        assert_eq!(
4333            manifest.config.get("test_key2"),
4334            Some(&"test_val2".to_string())
4335        );
4336
4337        native_tbl
4338            .update_config(vec![(
4339                "test_key2".to_string(),
4340                "test_val2_update".to_string(),
4341            )])
4342            .await
4343            .unwrap();
4344        let manifest = native_tbl.manifest().await.unwrap();
4345        assert_eq!(manifest.config.len(), 2 + base_config_len);
4346        assert_eq!(
4347            manifest.config.get("test_key1"),
4348            Some(&"test_val1".to_string())
4349        );
4350        assert_eq!(
4351            manifest.config.get("test_key2"),
4352            Some(&"test_val2_update".to_string())
4353        );
4354
4355        native_tbl.delete_config_keys(&["test_key1"]).await.unwrap();
4356        let manifest = native_tbl.manifest().await.unwrap();
4357        assert_eq!(manifest.config.len(), 1 + base_config_len);
4358        assert_eq!(
4359            manifest.config.get("test_key2"),
4360            Some(&"test_val2_update".to_string())
4361        );
4362    }
4363
4364    #[tokio::test]
4365    async fn test_schema_metadata_config() {
4366        let tmp_dir = tempdir().unwrap();
4367        let uri = tmp_dir.path().to_str().unwrap();
4368
4369        let conn = ConnectBuilder::new(uri)
4370            .read_consistency_interval(Duration::from_secs(0))
4371            .execute()
4372            .await
4373            .unwrap();
4374        let table = conn
4375            .create_table("my_table", some_sample_data())
4376            .execute()
4377            .await
4378            .unwrap();
4379
4380        let native_tbl = table.as_native().unwrap();
4381        let schema = native_tbl.schema().await.unwrap();
4382        let metadata = schema.metadata();
4383        assert_eq!(metadata.len(), 0);
4384
4385        native_tbl
4386            .replace_schema_metadata(vec![("test_key1".to_string(), "test_val1".to_string())])
4387            .await
4388            .unwrap();
4389
4390        let schema = native_tbl.schema().await.unwrap();
4391        let metadata = schema.metadata();
4392        assert_eq!(metadata.len(), 1);
4393        assert_eq!(metadata.get("test_key1"), Some(&"test_val1".to_string()));
4394
4395        native_tbl
4396            .replace_schema_metadata(vec![
4397                ("test_key1".to_string(), "test_val1_update".to_string()),
4398                ("test_key2".to_string(), "test_val2".to_string()),
4399            ])
4400            .await
4401            .unwrap();
4402        let schema = native_tbl.schema().await.unwrap();
4403        let metadata = schema.metadata();
4404        assert_eq!(metadata.len(), 2);
4405        assert_eq!(
4406            metadata.get("test_key1"),
4407            Some(&"test_val1_update".to_string())
4408        );
4409        assert_eq!(metadata.get("test_key2"), Some(&"test_val2".to_string()));
4410
4411        native_tbl
4412            .replace_schema_metadata(vec![(
4413                "test_key2".to_string(),
4414                "test_val2_update".to_string(),
4415            )])
4416            .await
4417            .unwrap();
4418        let schema = native_tbl.schema().await.unwrap();
4419        let metadata = schema.metadata();
4420        assert_eq!(
4421            metadata.get("test_key2"),
4422            Some(&"test_val2_update".to_string())
4423        );
4424    }
4425
4426    #[tokio::test]
4427    pub async fn test_field_metadata_update() {
4428        let tmp_dir = tempdir().unwrap();
4429        let uri = tmp_dir.path().to_str().unwrap();
4430
4431        let conn = ConnectBuilder::new(uri)
4432            .read_consistency_interval(Duration::from_secs(0))
4433            .execute()
4434            .await
4435            .unwrap();
4436        let table = conn
4437            .create_table("my_table", some_sample_data())
4438            .execute()
4439            .await
4440            .unwrap();
4441
4442        let native_tbl = table.as_native().unwrap();
4443        let schema = native_tbl.manifest().await.unwrap().schema;
4444
4445        let field = schema.field("i").unwrap();
4446        assert_eq!(field.metadata.len(), 0);
4447
4448        native_tbl
4449            .replace_schema_metadata(vec![(
4450                "test_key2".to_string(),
4451                "test_val2_update".to_string(),
4452            )])
4453            .await
4454            .unwrap();
4455
4456        let schema = native_tbl.schema().await.unwrap();
4457        let metadata = schema.metadata();
4458        assert_eq!(metadata.len(), 1);
4459        assert_eq!(
4460            metadata.get("test_key2"),
4461            Some(&"test_val2_update".to_string())
4462        );
4463
4464        let mut new_field_metadata = HashMap::<String, String>::new();
4465        new_field_metadata.insert("test_field_key1".into(), "test_field_val1".into());
4466        native_tbl
4467            .replace_field_metadata(vec![(field.id as u32, new_field_metadata)])
4468            .await
4469            .unwrap();
4470
4471        let schema = native_tbl.manifest().await.unwrap().schema;
4472        let field = schema.field("i").unwrap();
4473        assert_eq!(field.metadata.len(), 1);
4474        assert_eq!(
4475            field.metadata.get("test_field_key1"),
4476            Some(&"test_field_val1".to_string())
4477        );
4478    }
4479
4480    #[tokio::test]
4481    async fn test_set_unenforced_primary_key() {
4482        let tmp_dir = tempdir().unwrap();
4483        let uri = tmp_dir.path().to_str().unwrap();
4484
4485        let schema = Arc::new(Schema::new(vec![
4486            Field::new("id", DataType::Int64, false),
4487            Field::new("name", DataType::Utf8, true),
4488            Field::new("score", DataType::Float64, true),
4489        ]));
4490        let batch = RecordBatch::try_new(
4491            schema.clone(),
4492            vec![
4493                Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])),
4494                Arc::new(StringArray::from(vec!["a", "b", "c"])),
4495                Arc::new(arrow_array::Float64Array::from(vec![1.0, 2.0, 3.0])),
4496            ],
4497        )
4498        .unwrap();
4499        let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
4500            Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
4501
4502        let conn = ConnectBuilder::new(uri)
4503            .read_consistency_interval(Duration::from_secs(0))
4504            .execute()
4505            .await
4506            .unwrap();
4507        let table = conn.create_table("t", reader).execute().await.unwrap();
4508
4509        // Reject empty input.
4510        let err = table
4511            .set_unenforced_primary_key(Vec::<&str>::new())
4512            .await
4513            .expect_err("empty input should be rejected");
4514        assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
4515
4516        // Reject compound (multi-column) input.
4517        let err = table
4518            .set_unenforced_primary_key(["id", "name"])
4519            .await
4520            .expect_err("compound primary key should be rejected");
4521        assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
4522
4523        // Reject unknown column.
4524        let err = table
4525            .set_unenforced_primary_key(["nonexistent"])
4526            .await
4527            .expect_err("nonexistent column should be rejected");
4528        assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
4529
4530        // Reject unsupported dtype (Float64).
4531        let err = table
4532            .set_unenforced_primary_key(["score"])
4533            .await
4534            .expect_err("Float64 should be rejected");
4535        assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
4536
4537        // None of the rejected calls set a primary key.
4538        let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema;
4539        assert!(lance_schema.unenforced_primary_key().is_empty());
4540
4541        // Happy path: set the primary key to "id".
4542        table.set_unenforced_primary_key(["id"]).await.unwrap();
4543        let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema;
4544        let pk = lance_schema.unenforced_primary_key();
4545        assert_eq!(pk.len(), 1);
4546        assert_eq!(pk[0].name, "id");
4547        // Position metadata is 1-indexed.
4548        assert_eq!(
4549            pk[0].metadata.get(LANCE_UNENFORCED_PRIMARY_KEY_POSITION),
4550            Some(&"1".to_string())
4551        );
4552
4553        // The primary key is immutable: re-setting it is rejected, whether to
4554        // the same column or a different one.
4555        let err = table
4556            .set_unenforced_primary_key(["id"])
4557            .await
4558            .expect_err("re-setting the same primary key should be rejected");
4559        assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
4560        let err = table
4561            .set_unenforced_primary_key(["name"])
4562            .await
4563            .expect_err("changing the primary key should be rejected");
4564        assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
4565
4566        // The primary key is unchanged after the rejected calls.
4567        let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema;
4568        let pk = lance_schema.unenforced_primary_key();
4569        assert_eq!(pk.len(), 1);
4570        assert_eq!(pk[0].name, "id");
4571    }
4572
4573    #[tokio::test]
4574    async fn test_set_unenforced_primary_key_concurrent() {
4575        let tmp_dir = tempdir().unwrap();
4576        let uri = tmp_dir.path().to_str().unwrap();
4577
4578        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
4579        let batch = RecordBatch::try_new(
4580            schema.clone(),
4581            vec![Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3]))],
4582        )
4583        .unwrap();
4584        let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
4585            Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
4586
4587        // A long read-consistency interval keeps each handle pinned to the
4588        // version it opened, so the second handle commits against a stale
4589        // base — the same situation as two processes racing.
4590        let conn = ConnectBuilder::new(uri)
4591            .read_consistency_interval(Duration::from_secs(3600))
4592            .execute()
4593            .await
4594            .unwrap();
4595        conn.create_table("t", reader).execute().await.unwrap();
4596
4597        let table_a = conn.open_table("t").execute().await.unwrap();
4598        let table_b = conn.open_table("t").execute().await.unwrap();
4599
4600        // Handle A sets the primary key first.
4601        table_a.set_unenforced_primary_key(["id"]).await.unwrap();
4602
4603        // Handle B committed against a stale base that had no primary key, so
4604        // its own up-front check did not see A's key. The commit itself must
4605        // still fail rather than silently overriding A's primary key. (The
4606        // cross-process race on a *different* column is caught by the Lance
4607        // commit layer.)
4608        let err = table_b
4609            .set_unenforced_primary_key(["id"])
4610            .await
4611            .expect_err("concurrent primary key commit on a stale base should fail");
4612        assert!(
4613            !matches!(err, Error::InvalidInput { .. }),
4614            "expected a commit-time conflict, not an up-front input error: {:?}",
4615            err
4616        );
4617
4618        // The committed primary key is exactly what A set — no corruption.
4619        let fresh = conn.open_table("t").execute().await.unwrap();
4620        let lance_schema = fresh.as_native().unwrap().manifest().await.unwrap().schema;
4621        let pk = lance_schema.unenforced_primary_key();
4622        assert_eq!(pk.len(), 1);
4623        assert_eq!(pk[0].name, "id");
4624    }
4625
4626    #[tokio::test]
4627    async fn test_set_lsm_write_spec() {
4628        use arrow_array::StringArray;
4629        use lance::dataset::mem_wal::DatasetMemWalExt;
4630
4631        let tmp_dir = tempdir().unwrap();
4632        let uri = tmp_dir.path().to_str().unwrap();
4633
4634        let schema = Arc::new(Schema::new(vec![
4635            Field::new("id", DataType::Int64, false),
4636            Field::new("name", DataType::Utf8, true),
4637        ]));
4638        let batch = RecordBatch::try_new(
4639            schema.clone(),
4640            vec![
4641                Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])),
4642                Arc::new(StringArray::from(vec!["a", "b", "c"])),
4643            ],
4644        )
4645        .unwrap();
4646        let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
4647            Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
4648
4649        let conn = ConnectBuilder::new(uri)
4650            .read_consistency_interval(Duration::from_secs(0))
4651            .execute()
4652            .await
4653            .unwrap();
4654        let table = conn.create_table("t", reader).execute().await.unwrap();
4655
4656        // Reject num_buckets out of range.
4657        for bad in [0u32, 1025] {
4658            let err = table
4659                .set_lsm_write_spec(LsmWriteSpec::bucket("id", bad))
4660                .await
4661                .expect_err("should reject");
4662            assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
4663        }
4664
4665        // Happy path: install spec; verify MemWAL details record it.
4666        table
4667            .set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
4668            .await
4669            .unwrap();
4670
4671        let native_tbl = table.as_native().unwrap();
4672        let dataset = native_tbl.dataset.get().await.unwrap();
4673        let details = dataset
4674            .mem_wal_index_details()
4675            .await
4676            .unwrap()
4677            .expect("MemWAL index should be initialized");
4678        assert_eq!(details.num_shards, 4);
4679        assert_eq!(details.sharding_specs.len(), 1);
4680        let installed = &details.sharding_specs[0];
4681        assert_eq!(installed.fields.len(), 1);
4682        let f = &installed.fields[0];
4683        assert_eq!(f.transform.as_deref(), Some("bucket"));
4684        assert_eq!(
4685            f.parameters.get("num_buckets").map(String::as_str),
4686            Some("4")
4687        );
4688        // Bucket parameters must hold only `num_buckets`.
4689        assert_eq!(f.parameters.len(), 1);
4690
4691        // Mutation rejected.
4692        let err = table
4693            .set_lsm_write_spec(LsmWriteSpec::bucket("id", 8))
4694            .await
4695            .expect_err("mutation should be rejected");
4696        assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
4697    }
4698
4699    #[tokio::test]
4700    async fn test_set_lsm_write_spec_unsharded() {
4701        use lance::dataset::mem_wal::DatasetMemWalExt;
4702
4703        let tmp_dir = tempdir().unwrap();
4704        let uri = tmp_dir.path().to_str().unwrap();
4705
4706        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
4707        let batch = RecordBatch::try_new(
4708            schema.clone(),
4709            vec![Arc::new(arrow_array::Int64Array::from(vec![1]))],
4710        )
4711        .unwrap();
4712        let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
4713            Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
4714        let conn = ConnectBuilder::new(uri)
4715            .read_consistency_interval(Duration::from_secs(0))
4716            .execute()
4717            .await
4718            .unwrap();
4719        let table = conn.create_table("t", reader).execute().await.unwrap();
4720
4721        table
4722            .set_lsm_write_spec(LsmWriteSpec::unsharded())
4723            .await
4724            .unwrap();
4725
4726        let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
4727        let details = dataset
4728            .mem_wal_index_details()
4729            .await
4730            .unwrap()
4731            .expect("MemWAL index should be initialized");
4732        assert_eq!(details.num_shards, 1);
4733        assert_eq!(details.sharding_specs.len(), 1);
4734        let f = &details.sharding_specs[0].fields[0];
4735        assert_eq!(f.transform.as_deref(), Some("unsharded"));
4736        assert!(f.source_ids.is_empty());
4737    }
4738
4739    #[tokio::test]
4740    async fn test_set_lsm_write_spec_identity() {
4741        use lance::dataset::mem_wal::DatasetMemWalExt;
4742
4743        let tmp_dir = tempdir().unwrap();
4744        let uri = tmp_dir.path().to_str().unwrap();
4745
4746        let schema = Arc::new(Schema::new(vec![
4747            Field::new("id", DataType::Int64, false),
4748            Field::new("region", DataType::Utf8, true),
4749        ]));
4750        let batch = RecordBatch::try_new(
4751            schema.clone(),
4752            vec![
4753                Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])),
4754                Arc::new(StringArray::from(vec!["a", "b", "c"])),
4755            ],
4756        )
4757        .unwrap();
4758        let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
4759            Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
4760        let conn = ConnectBuilder::new(uri)
4761            .read_consistency_interval(Duration::from_secs(0))
4762            .execute()
4763            .await
4764            .unwrap();
4765        let table = conn.create_table("t", reader).execute().await.unwrap();
4766
4767        table
4768            .set_lsm_write_spec(
4769                LsmWriteSpec::identity("region")
4770                    .with_writer_config_defaults([("durable_write", "false")]),
4771            )
4772            .await
4773            .unwrap();
4774
4775        let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
4776        let details = dataset
4777            .mem_wal_index_details()
4778            .await
4779            .unwrap()
4780            .expect("MemWAL index should be initialized");
4781        // Identity sharding records an open-ended shard count.
4782        assert_eq!(details.num_shards, 0);
4783        assert_eq!(details.sharding_specs.len(), 1);
4784        let f = &details.sharding_specs[0].fields[0];
4785        assert_eq!(f.transform.as_deref(), Some("identity"));
4786        // Writer config defaults round-trip into the MemWAL index.
4787        assert_eq!(
4788            details
4789                .writer_config_defaults
4790                .get("durable_write")
4791                .map(String::as_str),
4792            Some("false")
4793        );
4794    }
4795
4796    #[tokio::test]
4797    async fn test_unset_lsm_write_spec() {
4798        use lance::dataset::mem_wal::DatasetMemWalExt;
4799
4800        let tmp_dir = tempdir().unwrap();
4801        let uri = tmp_dir.path().to_str().unwrap();
4802
4803        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
4804        let batch = RecordBatch::try_new(
4805            schema.clone(),
4806            vec![Arc::new(arrow_array::Int64Array::from(vec![1]))],
4807        )
4808        .unwrap();
4809        let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
4810            Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
4811        let conn = ConnectBuilder::new(uri)
4812            .read_consistency_interval(Duration::from_secs(0))
4813            .execute()
4814            .await
4815            .unwrap();
4816        let table = conn.create_table("t", reader).execute().await.unwrap();
4817
4818        // unset errors when no spec is set.
4819        table.unset_lsm_write_spec().await.unwrap_err();
4820
4821        // Install a spec, then unset it.
4822        table
4823            .set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
4824            .await
4825            .unwrap();
4826        {
4827            let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
4828            assert!(dataset.mem_wal_index_details().await.unwrap().is_some());
4829        }
4830
4831        table.unset_lsm_write_spec().await.unwrap();
4832        {
4833            let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
4834            assert!(dataset.mem_wal_index_details().await.unwrap().is_none());
4835        }
4836
4837        // A second unset errors; a fresh spec can still be installed afterwards.
4838        table.unset_lsm_write_spec().await.unwrap_err();
4839        table
4840            .set_lsm_write_spec(LsmWriteSpec::bucket("id", 8))
4841            .await
4842            .unwrap();
4843        {
4844            let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
4845            assert!(dataset.mem_wal_index_details().await.unwrap().is_some());
4846        }
4847    }
4848
4849    #[tokio::test]
4850    pub async fn test_stats() {
4851        let tmp_dir = tempdir().unwrap();
4852        let uri = tmp_dir.path().to_str().unwrap();
4853
4854        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4855
4856        let schema = Arc::new(Schema::new(vec![
4857            Field::new("id", DataType::Int32, false),
4858            Field::new("foo", DataType::Int32, true),
4859        ]));
4860        let batch = RecordBatch::try_new(
4861            schema.clone(),
4862            vec![
4863                Arc::new(Int32Array::from_iter_values(0..100)),
4864                Arc::new(Int32Array::from_iter_values(0..100)),
4865            ],
4866        )
4867        .unwrap();
4868
4869        let table = conn
4870            .create_table("test_stats", batch.clone())
4871            .execute()
4872            .await
4873            .unwrap();
4874        for _ in 0..10 {
4875            let batch = RecordBatch::try_new(
4876                schema.clone(),
4877                vec![
4878                    Arc::new(Int32Array::from_iter_values(0..15)),
4879                    Arc::new(Int32Array::from_iter_values(0..15)),
4880                ],
4881            )
4882            .unwrap();
4883            table.add(batch.clone()).execute().await.unwrap();
4884        }
4885
4886        let empty_table = conn
4887            .create_table("test_stats_empty", RecordBatch::new_empty(batch.schema()))
4888            .execute()
4889            .await
4890            .unwrap();
4891
4892        let res = table.stats().await.unwrap();
4893        println!("{:#?}", res);
4894        assert_eq!(
4895            res,
4896            TableStatistics {
4897                num_rows: 250,
4898                num_indices: 0,
4899                total_bytes: 2300,
4900                fragment_stats: FragmentStatistics {
4901                    num_fragments: 11,
4902                    num_small_fragments: 11,
4903                    lengths: FragmentSummaryStats {
4904                        min: 15,
4905                        max: 100,
4906                        mean: 22,
4907                        p25: 15,
4908                        p50: 15,
4909                        p75: 15,
4910                        p99: 100,
4911                    },
4912                },
4913            }
4914        );
4915        let res = empty_table.stats().await.unwrap();
4916        println!("{:#?}", res);
4917        assert_eq!(
4918            res,
4919            TableStatistics {
4920                num_rows: 0,
4921                num_indices: 0,
4922                total_bytes: 0,
4923                fragment_stats: FragmentStatistics {
4924                    num_fragments: 0,
4925                    num_small_fragments: 0,
4926                    lengths: FragmentSummaryStats {
4927                        min: 0,
4928                        max: 0,
4929                        mean: 0,
4930                        p25: 0,
4931                        p50: 0,
4932                        p75: 0,
4933                        p99: 0,
4934                    },
4935                },
4936            }
4937        )
4938    }
4939
4940    #[tokio::test]
4941    pub async fn test_list_indices_skip_frag_reuse() {
4942        let tmp_dir = tempdir().unwrap();
4943        let uri = tmp_dir.path().to_str().unwrap();
4944
4945        let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4946
4947        let schema = Arc::new(Schema::new(vec![
4948            Field::new("id", DataType::Int32, false),
4949            Field::new("foo", DataType::Int32, true),
4950        ]));
4951        let batch = RecordBatch::try_new(
4952            schema.clone(),
4953            vec![
4954                Arc::new(Int32Array::from_iter_values(0..100)),
4955                Arc::new(Int32Array::from_iter_values(0..100)),
4956            ],
4957        )
4958        .unwrap();
4959
4960        let table = conn
4961            .create_table("test_list_indices_skip_frag_reuse", batch.clone())
4962            .execute()
4963            .await
4964            .unwrap();
4965
4966        table.add(batch.clone()).execute().await.unwrap();
4967
4968        table
4969            .create_index(&["id"], Index::Bitmap(BitmapIndexBuilder {}))
4970            .execute()
4971            .await
4972            .unwrap();
4973
4974        table
4975            .optimize(OptimizeAction::Compact {
4976                options: CompactionOptions {
4977                    target_rows_per_fragment: 2_000,
4978                    defer_index_remap: true,
4979                    ..Default::default()
4980                },
4981                remap_options: None,
4982            })
4983            .await
4984            .unwrap();
4985
4986        let result = table.list_indices().await.unwrap();
4987        assert_eq!(result.len(), 1);
4988        assert_eq!(result[0].index_type, crate::index::IndexType::Bitmap);
4989    }
4990}