1use arrow_array::{RecordBatch, RecordBatchReader};
7use arrow_schema::{DataType, Field, Schema, SchemaRef};
8use async_trait::async_trait;
9use datafusion_execution::TaskContext;
10use datafusion_expr::Expr;
11use datafusion_physical_plan::ExecutionPlan;
12use datafusion_physical_plan::display::DisplayableExecutionPlan;
13use futures::StreamExt;
14use futures::stream::FuturesUnordered;
15pub use lance::dataset::ColumnAlteration;
16pub use lance::dataset::NewColumnTransform;
17pub use lance::dataset::ReadParams;
18pub use lance::dataset::Version;
19use lance::dataset::WriteMode;
20use lance::dataset::builder::DatasetBuilder;
21use lance::dataset::{InsertBuilder, WriteParams};
22use lance::index::DatasetIndexExt;
23use lance::index::vector::VectorIndexParams;
24use lance::index::vector::utils::infer_vector_dim;
25use lance::io::{ObjectStoreParams, WrappingObjectStore};
26use lance_datafusion::utils::StreamingWriteSource;
27use lance_index::IndexType;
28use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
29use lance_index::vector::bq::RQBuildParams;
30use lance_index::vector::hnsw::builder::HnswBuildParams;
31use lance_index::vector::ivf::IvfBuildParams;
32use lance_index::vector::pq::PQBuildParams;
33use lance_index::vector::sq::builder::SQBuildParams;
34use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsAccessor};
35pub use query::AnyQuery;
36
37use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
38use lance_namespace::LanceNamespace;
39use lance_namespace::error::NamespaceError;
40use lance_namespace::models::DescribeTableRequest;
41use lance_table::format::Manifest;
42use lance_table::io::commit::CommitHandler;
43use lance_table::io::commit::ManifestNamingScheme;
44use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
45use serde::{Deserialize, Serialize};
46use std::collections::{HashMap, HashSet};
47use std::format;
48use std::path::Path;
49use std::sync::Arc;
50
51use crate::connection::NamespaceClientPushdownOperation;
52
53use crate::data::scannable::{PeekedScannable, Scannable, estimate_write_partitions};
54use crate::database::Database;
55use crate::embeddings::{EmbeddingDefinition, EmbeddingRegistry, MemoryRegistry};
56use crate::error::{Error, Result};
57use crate::index::IndexStatistics;
58use crate::index::vector::VectorIndex;
59use crate::index::{Index, IndexBuilder, vector::suggested_num_sub_vectors};
60use crate::index::{IndexConfig, IndexStatisticsImpl};
61use crate::query::{IntoQueryVector, Query, QueryExecutionOptions, TakeQuery, VectorQuery};
62use crate::table::datafusion::insert::InsertExec;
63use crate::utils::{
64 PatchReadParam, PatchWriteParam, supported_bitmap_data_type, supported_btree_data_type,
65 supported_fts_data_type, supported_label_list_data_type, supported_vector_data_type,
66};
67
68use self::dataset::DatasetConsistencyWrapper;
69use self::merge::MergeInsertBuilder;
70
71mod add_data;
72pub mod datafusion;
73pub(crate) mod dataset;
74pub mod delete;
75pub mod merge;
76pub mod optimize;
77mod primary_key;
78pub mod query;
79pub mod schema_evolution;
80pub mod update;
81pub mod write_progress;
82use crate::index::waiter::wait_for_index;
83#[cfg(feature = "remote")]
84pub(crate) use add_data::PreprocessingOutput;
85pub use add_data::{AddDataBuilder, AddDataMode, AddResult, NaNVectorBehavior};
86pub use chrono::Duration;
87pub use delete::DeleteResult;
88use futures::future::join_all;
89pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
90pub use lance::dataset::scanner::DatasetRecordBatchStream;
91use lance::dataset::statistics::DatasetStatisticsExt;
92use lance_index::frag_reuse::FRAG_REUSE_INDEX_NAME;
93pub use lance_index::optimize::OptimizeOptions;
94pub use optimize::{CompactionOptions, OptimizeAction, OptimizeStats};
95pub use schema_evolution::{AddColumnsResult, AlterColumnsResult, DropColumnsResult};
96use serde_with::skip_serializing_none;
97pub use update::{UpdateBuilder, UpdateResult};
98
99fn find_namespace_error<'a>(
105 err: &'a (dyn std::error::Error + 'static),
106) -> Option<&'a NamespaceError> {
107 let mut current: Option<&(dyn std::error::Error + 'static)> = Some(err);
108 while let Some(e) = current {
109 if let Some(ns_err) = e.downcast_ref::<NamespaceError>() {
110 return Some(ns_err);
111 }
112 current = e.source();
113 }
114 None
115}
116
117pub(crate) fn map_namespace_lance_error(err: lance::Error, table_name: &str) -> Error {
122 if let Some(code) = find_namespace_error(&err).map(NamespaceError::code) {
123 match code {
124 lance_namespace::error::ErrorCode::TableNotFound => {
125 return Error::TableNotFound {
126 name: table_name.to_string(),
127 source: Box::new(err),
128 };
129 }
130 lance_namespace::error::ErrorCode::TableAlreadyExists => {
131 return Error::TableAlreadyExists {
132 name: table_name.to_string(),
133 };
134 }
135 _ => {}
136 }
137 }
138 match err {
139 lance::Error::Namespace { source, .. } => Error::Runtime {
140 message: format!("Namespace error: {}", source),
141 },
142 other => other.into(),
143 }
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
148pub enum ColumnKind {
149 Physical,
151 Embedding(EmbeddingDefinition),
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct ColumnDefinition {
158 pub kind: ColumnKind,
160}
161
162#[derive(Debug, Clone)]
163pub struct TableDefinition {
164 pub column_definitions: Vec<ColumnDefinition>,
165 pub schema: SchemaRef,
166}
167
168impl TableDefinition {
169 pub fn new(schema: SchemaRef, column_definitions: Vec<ColumnDefinition>) -> Self {
170 Self {
171 column_definitions,
172 schema,
173 }
174 }
175
176 pub fn new_from_schema(schema: SchemaRef) -> Self {
177 let column_definitions = schema
178 .fields()
179 .iter()
180 .map(|_| ColumnDefinition {
181 kind: ColumnKind::Physical,
182 })
183 .collect();
184 Self::new(schema, column_definitions)
185 }
186
187 pub fn try_from_rich_schema(schema: SchemaRef) -> Result<Self> {
188 let column_definitions = schema.metadata.get("lancedb::column_definitions");
189 if let Some(column_definitions) = column_definitions {
190 let column_definitions: Vec<ColumnDefinition> =
191 serde_json::from_str(column_definitions).map_err(|e| Error::Runtime {
192 message: format!("Failed to deserialize column definitions: {}", e),
193 })?;
194 Ok(Self::new(schema, column_definitions))
195 } else {
196 let column_definitions = schema
197 .fields()
198 .iter()
199 .map(|_| ColumnDefinition {
200 kind: ColumnKind::Physical,
201 })
202 .collect();
203 Ok(Self::new(schema, column_definitions))
204 }
205 }
206
207 pub fn into_rich_schema(self) -> SchemaRef {
208 let lancedb_metadata = serde_json::to_string(&self.column_definitions).unwrap();
211 let mut schema_with_metadata = (*self.schema).clone();
212 schema_with_metadata
213 .metadata
214 .insert("lancedb::column_definitions".to_string(), lancedb_metadata);
215 Arc::new(schema_with_metadata)
216 }
217}
218
219#[derive(Clone, Debug, Default)]
222#[allow(dead_code)] enum BadVectorHandling {
224 #[default]
226 Error,
227 Drop,
229 Fill(f32),
231 None,
233}
234
235#[derive(Clone, Debug, Default)]
237pub struct WriteOptions {
238 pub lance_write_params: Option<WriteParams>,
246}
247
248pub enum Filter {
250 Sql(String),
252 Datafusion(Expr),
254}
255
256pub enum Predicate<'a> {
262 String(&'a str),
264 Expr(&'a Expr),
266}
267
268impl<'a> From<&'a str> for Predicate<'a> {
269 fn from(s: &'a str) -> Self {
270 Predicate::String(s)
271 }
272}
273
274impl<'a> From<&'a String> for Predicate<'a> {
275 fn from(s: &'a String) -> Self {
276 Predicate::String(s.as_str())
277 }
278}
279
280impl<'a> From<&'a Expr> for Predicate<'a> {
281 fn from(e: &'a Expr) -> Self {
282 Predicate::Expr(e)
283 }
284}
285
286#[async_trait]
287pub trait Tags: Send + Sync {
288 async fn list(&self) -> Result<HashMap<String, TagContents>>;
290
291 async fn get_version(&self, tag: &str) -> Result<u64>;
293
294 async fn create(&mut self, tag: &str, version: u64) -> Result<()>;
296
297 async fn delete(&mut self, tag: &str) -> Result<()>;
299
300 async fn update(&mut self, tag: &str, version: u64) -> Result<()>;
302}
303
304pub use self::merge::MergeResult;
305
306#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
319pub enum LsmWriteSpec {
320 Bucket {
327 column: String,
328 num_buckets: u32,
329 maintained_indexes: Vec<String>,
332 writer_config_defaults: HashMap<String, String>,
334 },
335 Identity {
340 column: String,
341 maintained_indexes: Vec<String>,
344 writer_config_defaults: HashMap<String, String>,
346 },
347 Unsharded {
349 maintained_indexes: Vec<String>,
352 writer_config_defaults: HashMap<String, String>,
354 },
355}
356
357impl LsmWriteSpec {
358 pub fn bucket(column: impl Into<String>, num_buckets: u32) -> Self {
360 Self::Bucket {
361 column: column.into(),
362 num_buckets,
363 maintained_indexes: Vec::new(),
364 writer_config_defaults: HashMap::new(),
365 }
366 }
367
368 pub fn identity(column: impl Into<String>) -> Self {
371 Self::Identity {
372 column: column.into(),
373 maintained_indexes: Vec::new(),
374 writer_config_defaults: HashMap::new(),
375 }
376 }
377
378 pub fn unsharded() -> Self {
380 Self::Unsharded {
381 maintained_indexes: Vec::new(),
382 writer_config_defaults: HashMap::new(),
383 }
384 }
385
386 pub fn with_maintained_indexes<I, S>(mut self, indexes: I) -> Self
390 where
391 I: IntoIterator<Item = S>,
392 S: Into<String>,
393 {
394 let v: Vec<String> = indexes.into_iter().map(Into::into).collect();
395 match &mut self {
396 Self::Bucket {
397 maintained_indexes, ..
398 }
399 | Self::Identity {
400 maintained_indexes, ..
401 }
402 | Self::Unsharded {
403 maintained_indexes, ..
404 } => *maintained_indexes = v,
405 }
406 self
407 }
408
409 pub fn with_writer_config_defaults<I, K, V>(mut self, defaults: I) -> Self
414 where
415 I: IntoIterator<Item = (K, V)>,
416 K: Into<String>,
417 V: Into<String>,
418 {
419 let m: HashMap<String, String> = defaults
420 .into_iter()
421 .map(|(k, v)| (k.into(), v.into()))
422 .collect();
423 match &mut self {
424 Self::Bucket {
425 writer_config_defaults,
426 ..
427 }
428 | Self::Identity {
429 writer_config_defaults,
430 ..
431 }
432 | Self::Unsharded {
433 writer_config_defaults,
434 ..
435 } => *writer_config_defaults = m,
436 }
437 self
438 }
439
440 pub fn maintained_indexes(&self) -> &[String] {
442 match self {
443 Self::Bucket {
444 maintained_indexes, ..
445 }
446 | Self::Identity {
447 maintained_indexes, ..
448 }
449 | Self::Unsharded {
450 maintained_indexes, ..
451 } => maintained_indexes,
452 }
453 }
454
455 pub fn writer_config_defaults(&self) -> &HashMap<String, String> {
457 match self {
458 Self::Bucket {
459 writer_config_defaults,
460 ..
461 }
462 | Self::Identity {
463 writer_config_defaults,
464 ..
465 }
466 | Self::Unsharded {
467 writer_config_defaults,
468 ..
469 } => writer_config_defaults,
470 }
471 }
472}
473
474#[async_trait]
479pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
480 fn as_any(&self) -> &dyn std::any::Any;
482 fn name(&self) -> &str;
484 fn namespace(&self) -> &[String];
486 fn id(&self) -> &str;
491 async fn schema(&self) -> Result<SchemaRef>;
493 async fn count_rows(&self, filter: Option<Filter>) -> Result<usize>;
495 async fn create_plan(
497 &self,
498 query: &AnyQuery,
499 options: QueryExecutionOptions,
500 ) -> Result<Arc<dyn ExecutionPlan>>;
501 async fn query(
503 &self,
504 query: &AnyQuery,
505 options: QueryExecutionOptions,
506 ) -> Result<DatasetRecordBatchStream>;
507 async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result<String> {
509 let plan = self.create_plan(query, Default::default()).await?;
510 let display = DisplayableExecutionPlan::new(plan.as_ref());
511
512 Ok(format!("{}", display.indent(verbose)))
513 }
514 async fn analyze_plan(
515 &self,
516 query: &AnyQuery,
517 options: QueryExecutionOptions,
518 ) -> Result<String>;
519
520 async fn add(&self, add: AddDataBuilder) -> Result<AddResult>;
522 async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult>;
524 async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult>;
526 async fn create_index(&self, index: IndexBuilder) -> Result<()>;
528 async fn list_indices(&self) -> Result<Vec<IndexConfig>>;
530 async fn drop_index(&self, name: &str) -> Result<()>;
532 async fn prewarm_index(&self, name: &str) -> Result<()>;
534 async fn prewarm_data(&self, columns: Option<Vec<String>>) -> Result<()>;
539 async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>>;
541 async fn merge_insert(
543 &self,
544 params: MergeInsertBuilder,
545 new_data: Box<dyn RecordBatchReader + Send>,
546 ) -> Result<MergeResult>;
547 async fn set_unenforced_primary_key(&self, _columns: &[&str]) -> Result<()> {
557 Err(Error::NotSupported {
558 message: "set_unenforced_primary_key is not supported on this table type".into(),
559 })
560 }
561 async fn set_lsm_write_spec(&self, _spec: LsmWriteSpec) -> Result<()> {
569 Err(Error::NotSupported {
570 message: "set_lsm_write_spec is not supported on this table type".into(),
571 })
572 }
573 async fn unset_lsm_write_spec(&self) -> Result<()> {
580 Err(Error::NotSupported {
581 message: "unset_lsm_write_spec is not supported on this table type".into(),
582 })
583 }
584 async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
586 async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
588 async fn add_columns(
590 &self,
591 transforms: NewColumnTransform,
592 read_columns: Option<Vec<String>>,
593 ) -> Result<AddColumnsResult>;
594 async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult>;
596 async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult>;
598 async fn version(&self) -> Result<u64>;
600 async fn checkout(&self, version: u64) -> Result<()>;
602 async fn checkout_tag(&self, tag: &str) -> Result<()>;
605 async fn checkout_latest(&self) -> Result<()>;
607 async fn restore(&self) -> Result<()>;
609 async fn list_versions(&self) -> Result<Vec<Version>>;
611 async fn table_definition(&self) -> Result<TableDefinition>;
613 async fn uri(&self) -> Result<String>;
615 #[deprecated(since = "0.25.0", note = "Use initial_storage_options() instead")]
617 async fn storage_options(&self) -> Option<HashMap<String, String>>;
618 async fn initial_storage_options(&self) -> Option<HashMap<String, String>>;
622 async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>>;
627 async fn wait_for_index(
630 &self,
631 index_names: &[&str],
632 timeout: std::time::Duration,
633 ) -> Result<()>;
634 async fn stats(&self) -> Result<TableStatistics>;
636 async fn create_insert_exec(
641 &self,
642 _input: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
643 _write_params: WriteParams,
644 ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>> {
645 Err(Error::NotSupported {
646 message: "create_insert_exec not implemented".to_string(),
647 })
648 }
649}
650
651#[derive(Clone, Debug)]
655pub struct Table {
656 inner: Arc<dyn BaseTable>,
657 database: Option<Arc<dyn Database>>,
658 embedding_registry: Arc<dyn EmbeddingRegistry>,
659}
660
661#[cfg(all(test, feature = "remote"))]
662mod test_utils {
663 use super::*;
664
665 impl Table {
666 pub fn new_with_handler<T>(
667 name: impl Into<String>,
668 handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
669 ) -> Self
670 where
671 T: Into<reqwest::Body>,
672 {
673 let inner = Arc::new(crate::remote::table::RemoteTable::new_mock(
674 name.into(),
675 handler.clone(),
676 None,
677 ));
678 let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
679 Self {
680 inner,
681 database: Some(database),
682 embedding_registry: Arc::new(MemoryRegistry::new()),
684 }
685 }
686
687 pub fn new_with_handler_and_interval<T>(
688 name: impl Into<String>,
689 handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
690 read_consistency_interval: Option<std::time::Duration>,
691 ) -> Self
692 where
693 T: Into<reqwest::Body>,
694 {
695 let inner = Arc::new(
696 crate::remote::table::RemoteTable::new_mock_with_consistency_interval(
697 name.into(),
698 handler.clone(),
699 read_consistency_interval,
700 ),
701 );
702 let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
703 Self {
704 inner,
705 database: Some(database),
706 embedding_registry: Arc::new(MemoryRegistry::new()),
708 }
709 }
710
711 pub fn new_with_handler_version<T>(
712 name: impl Into<String>,
713 version: semver::Version,
714 handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
715 ) -> Self
716 where
717 T: Into<reqwest::Body>,
718 {
719 let inner = Arc::new(crate::remote::table::RemoteTable::new_mock(
720 name.into(),
721 handler.clone(),
722 Some(version),
723 ));
724 let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
725 Self {
726 inner,
727 database: Some(database),
728 embedding_registry: Arc::new(MemoryRegistry::new()),
730 }
731 }
732
733 pub fn new_with_handler_and_config<T>(
734 name: impl Into<String>,
735 handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
736 config: crate::remote::ClientConfig,
737 ) -> Self
738 where
739 T: Into<reqwest::Body>,
740 {
741 let inner = Arc::new(crate::remote::table::RemoteTable::new_mock_with_config(
742 name.into(),
743 handler.clone(),
744 config.clone(),
745 ));
746 let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock_with_config(
747 handler, config,
748 ));
749 Self {
750 inner,
751 database: Some(database),
752 embedding_registry: Arc::new(MemoryRegistry::new()),
754 }
755 }
756
757 pub fn new_with_handler_version_and_config<T>(
758 name: impl Into<String>,
759 version: semver::Version,
760 handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
761 config: crate::remote::ClientConfig,
762 ) -> Self
763 where
764 T: Into<reqwest::Body>,
765 {
766 let inner = Arc::new(
767 crate::remote::table::RemoteTable::new_mock_with_version_and_config(
768 name.into(),
769 handler.clone(),
770 Some(version),
771 config.clone(),
772 ),
773 );
774 let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock_with_config(
775 handler, config,
776 ));
777 Self {
778 inner,
779 database: Some(database),
780 embedding_registry: Arc::new(MemoryRegistry::new()),
782 }
783 }
784 }
785}
786
787impl std::fmt::Display for Table {
788 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
789 write!(f, "{}", self.inner)
790 }
791}
792
793impl From<Arc<dyn BaseTable>> for Table {
794 fn from(inner: Arc<dyn BaseTable>) -> Self {
795 Self {
796 inner,
797 database: None,
798 embedding_registry: Arc::new(MemoryRegistry::new()),
799 }
800 }
801}
802
803impl Table {
804 pub fn new(inner: Arc<dyn BaseTable>, database: Arc<dyn Database>) -> Self {
805 Self {
806 inner,
807 database: Some(database),
808 embedding_registry: Arc::new(MemoryRegistry::new()),
809 }
810 }
811
812 pub fn base_table(&self) -> &Arc<dyn BaseTable> {
813 &self.inner
814 }
815
816 pub fn database(&self) -> &Arc<dyn Database> {
817 self.database.as_ref().unwrap()
818 }
819
820 pub fn embedding_registry(&self) -> &Arc<dyn EmbeddingRegistry> {
821 &self.embedding_registry
822 }
823
824 pub(crate) fn new_with_embedding_registry(
825 inner: Arc<dyn BaseTable>,
826 database: Arc<dyn Database>,
827 embedding_registry: Arc<dyn EmbeddingRegistry>,
828 ) -> Self {
829 Self {
830 inner,
831 database: Some(database),
832 embedding_registry,
833 }
834 }
835
836 pub fn as_native(&self) -> Option<&NativeTable> {
841 self.inner.as_native()
842 }
843
844 pub fn name(&self) -> &str {
846 self.inner.name()
847 }
848
849 pub fn namespace(&self) -> &[String] {
851 self.inner.namespace()
852 }
853
854 pub fn id(&self) -> &str {
856 self.inner.id()
857 }
858
859 pub fn dataset(&self) -> Option<&dataset::DatasetConsistencyWrapper> {
863 self.inner.as_native().map(|t| &t.dataset)
864 }
865
866 pub async fn schema(&self) -> Result<SchemaRef> {
868 self.inner.schema().await
869 }
870
871 pub async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
877 self.inner.count_rows(filter.map(Filter::Sql)).await
878 }
879
880 pub fn add<T: Scannable + 'static>(&self, data: T) -> AddDataBuilder {
887 AddDataBuilder::new(
888 self.inner.clone(),
889 Box::new(data),
890 Some(self.embedding_registry.clone()),
891 )
892 }
893
894 pub fn update(&self) -> UpdateBuilder {
909 UpdateBuilder::new(self.inner.clone())
910 }
911
912 pub async fn delete(&self, predicate: impl Into<Predicate<'_>>) -> Result<DeleteResult> {
965 self.inner.delete(predicate.into()).await
966 }
967
968 pub fn create_index(&self, columns: &[impl AsRef<str>], index: Index) -> IndexBuilder {
1030 IndexBuilder::new(
1031 self.inner.clone(),
1032 columns
1033 .iter()
1034 .map(|val| val.as_ref().to_string())
1035 .collect::<Vec<_>>(),
1036 index,
1037 )
1038 }
1039
1040 pub fn create_index_with_timeout(
1043 &self,
1044 columns: &[impl AsRef<str>],
1045 index: Index,
1046 wait_timeout: Option<std::time::Duration>,
1047 ) -> IndexBuilder {
1048 let mut builder = IndexBuilder::new(
1049 self.inner.clone(),
1050 columns
1051 .iter()
1052 .map(|val| val.as_ref().to_string())
1053 .collect::<Vec<_>>(),
1054 index,
1055 );
1056 if let Some(timeout) = wait_timeout {
1057 builder = builder.wait_timeout(timeout);
1058 }
1059 builder
1060 }
1061
1062 pub fn merge_insert(&self, on: &[&str]) -> MergeInsertBuilder {
1138 MergeInsertBuilder::new(
1139 self.inner.clone(),
1140 on.iter().map(|s| s.to_string()).collect(),
1141 )
1142 }
1143
1144 pub fn query(&self) -> Query {
1232 Query::new(self.inner.clone())
1233 }
1234
1235 pub fn take_offsets(&self, offsets: Vec<u64>) -> TakeQuery {
1258 TakeQuery::from_offsets(self.inner.clone(), offsets)
1259 }
1260
1261 pub fn take_row_ids(&self, row_ids: Vec<u64>) -> TakeQuery {
1280 TakeQuery::from_row_ids(self.inner.clone(), row_ids)
1281 }
1282
1283 pub fn vector_search(&self, query: impl IntoQueryVector) -> Result<VectorQuery> {
1289 self.query().nearest_to(query)
1290 }
1291
1292 pub async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
1309 self.inner.optimize(action).await
1310 }
1311
1312 pub async fn add_columns(
1314 &self,
1315 transforms: NewColumnTransform,
1316 read_columns: Option<Vec<String>>,
1317 ) -> Result<AddColumnsResult> {
1318 self.inner.add_columns(transforms, read_columns).await
1319 }
1320
1321 pub async fn alter_columns(
1323 &self,
1324 alterations: &[ColumnAlteration],
1325 ) -> Result<AlterColumnsResult> {
1326 self.inner.alter_columns(alterations).await
1327 }
1328
1329 pub async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
1331 self.inner.drop_columns(columns).await
1332 }
1333
1334 pub async fn set_unenforced_primary_key<I, S>(&self, columns: I) -> Result<()>
1347 where
1348 I: IntoIterator<Item = S>,
1349 S: Into<String>,
1350 {
1351 let owned: Vec<String> = columns.into_iter().map(Into::into).collect();
1352 let borrowed: Vec<&str> = owned.iter().map(String::as_str).collect();
1353 self.inner.set_unenforced_primary_key(&borrowed).await
1354 }
1355
1356 pub async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> Result<()> {
1379 self.inner.set_lsm_write_spec(spec).await
1380 }
1381
1382 pub async fn unset_lsm_write_spec(&self) -> Result<()> {
1387 self.inner.unset_lsm_write_spec().await
1388 }
1389
1390 pub async fn version(&self) -> Result<u64> {
1397 self.inner.version().await
1398 }
1399
1400 pub async fn checkout(&self, version: u64) -> Result<()> {
1415 self.inner.checkout(version).await
1416 }
1417
1418 pub async fn checkout_tag(&self, tag: &str) -> Result<()> {
1433 self.inner.checkout_tag(tag).await
1434 }
1435
1436 pub async fn checkout_latest(&self) -> Result<()> {
1441 self.inner.checkout_latest().await
1442 }
1443
1444 pub async fn restore(&self) -> Result<()> {
1455 self.inner.restore().await
1456 }
1457
1458 pub async fn list_versions(&self) -> Result<Vec<Version>> {
1460 self.inner.list_versions().await
1461 }
1462
1463 pub async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
1465 self.inner.list_indices().await
1466 }
1467
1468 pub async fn uri(&self) -> Result<String> {
1473 self.inner.uri().await
1474 }
1475
1476 #[deprecated(since = "0.25.0", note = "Use initial_storage_options() instead")]
1480 pub async fn storage_options(&self) -> Option<HashMap<String, String>> {
1481 #[allow(deprecated)]
1482 self.inner.storage_options().await
1483 }
1484
1485 pub async fn initial_storage_options(&self) -> Option<HashMap<String, String>> {
1491 self.inner.initial_storage_options().await
1492 }
1493
1494 pub async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>> {
1502 self.inner.latest_storage_options().await
1503 }
1504
1505 pub async fn index_stats(
1508 &self,
1509 index_name: impl AsRef<str>,
1510 ) -> Result<Option<IndexStatistics>> {
1511 self.inner.index_stats(index_name.as_ref()).await
1512 }
1513
1514 pub async fn drop_index(&self, name: &str) -> Result<()> {
1523 self.inner.drop_index(name).await
1524 }
1525
1526 pub async fn prewarm_index(&self, name: &str) -> Result<()> {
1541 self.inner.prewarm_index(name).await
1542 }
1543
1544 pub async fn prewarm_data(&self, columns: Option<Vec<String>>) -> Result<()> {
1562 self.inner.prewarm_data(columns).await
1563 }
1564
1565 pub async fn wait_for_index(
1568 &self,
1569 index_names: &[&str],
1570 timeout: std::time::Duration,
1571 ) -> Result<()> {
1572 self.inner.wait_for_index(index_names, timeout).await
1573 }
1574
1575 pub async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
1577 self.inner.tags().await
1578 }
1579
1580 pub async fn stats(&self) -> Result<TableStatistics> {
1582 self.inner.stats().await
1583 }
1584}
1585
1586pub struct NativeTags {
1587 dataset: dataset::DatasetConsistencyWrapper,
1588}
1589#[async_trait]
1590impl Tags for NativeTags {
1591 async fn list(&self) -> Result<HashMap<String, TagContents>> {
1592 let dataset = self.dataset.get().await?;
1593 Ok(dataset.tags().list().await?)
1594 }
1595
1596 async fn get_version(&self, tag: &str) -> Result<u64> {
1597 let dataset = self.dataset.get().await?;
1598 Ok(dataset.tags().get_version(tag).await?)
1599 }
1600
1601 async fn create(&mut self, tag: &str, version: u64) -> Result<()> {
1602 let dataset = self.dataset.get().await?;
1603 dataset.tags().create(tag, version).await?;
1604 Ok(())
1605 }
1606
1607 async fn delete(&mut self, tag: &str) -> Result<()> {
1608 let dataset = self.dataset.get().await?;
1609 dataset.tags().delete(tag).await?;
1610 Ok(())
1611 }
1612
1613 async fn update(&mut self, tag: &str, version: u64) -> Result<()> {
1614 let dataset = self.dataset.get().await?;
1615 dataset.tags().update(tag, version).await?;
1616 Ok(())
1617 }
1618}
1619
1620pub trait NativeTableExt {
1621 fn as_native(&self) -> Option<&NativeTable>;
1623}
1624
1625impl NativeTableExt for Arc<dyn BaseTable> {
1626 fn as_native(&self) -> Option<&NativeTable> {
1627 self.as_any().downcast_ref::<NativeTable>()
1628 }
1629}
1630
1631#[derive(Clone)]
1633pub struct NativeTable {
1634 name: String,
1635 namespace: Vec<String>,
1636 id: String,
1637 uri: String,
1638 pub(crate) dataset: dataset::DatasetConsistencyWrapper,
1639 read_consistency_interval: Option<std::time::Duration>,
1642 pub(crate) namespace_client: Option<Arc<dyn LanceNamespace>>,
1645 pub(crate) pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1648}
1649
1650impl std::fmt::Debug for NativeTable {
1651 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1652 f.debug_struct("NativeTable")
1653 .field("name", &self.name)
1654 .field("namespace", &self.namespace)
1655 .field("id", &self.id)
1656 .field("uri", &self.uri)
1657 .field("read_consistency_interval", &self.read_consistency_interval)
1658 .field("namespace_client", &self.namespace_client)
1659 .field("pushdown_operations", &self.pushdown_operations)
1660 .finish()
1661 }
1662}
1663
1664impl std::fmt::Display for NativeTable {
1665 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1666 write!(
1667 f,
1668 "NativeTable({}, uri={}, read_consistency_interval={})",
1669 self.name,
1670 self.uri,
1671 match self.read_consistency_interval {
1672 None => {
1673 "None".to_string()
1674 }
1675 Some(duration) => {
1676 format!("{}s", duration.as_secs_f64())
1677 }
1678 }
1679 )
1680 }
1681}
1682
1683impl NativeTable {
1684 pub async fn open(uri: &str) -> Result<Self> {
1695 let name = Self::get_table_name(uri)?;
1696 Self::open_with_params(
1697 uri,
1698 &name,
1699 vec![],
1700 None,
1701 None,
1702 None,
1703 None,
1704 HashSet::new(),
1705 None,
1706 )
1707 .await
1708 }
1709
1710 #[allow(clippy::too_many_arguments)]
1726 pub async fn open_with_params(
1727 uri: &str,
1728 name: &str,
1729 namespace: Vec<String>,
1730 write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1731 params: Option<ReadParams>,
1732 read_consistency_interval: Option<std::time::Duration>,
1733 namespace_client: Option<Arc<dyn LanceNamespace>>,
1734 pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1735 managed_versioning: Option<bool>,
1736 ) -> Result<Self> {
1737 let params = params.unwrap_or_default();
1738 let params = match write_store_wrapper.clone() {
1740 Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1741 None => params,
1742 };
1743
1744 let mut table_id = namespace.clone();
1746 table_id.push(name.to_string());
1747
1748 let managed_versioning = match managed_versioning {
1751 Some(value) => value,
1752 None if namespace_client.is_some() => {
1753 let ns_client = namespace_client.as_ref().unwrap();
1754 let describe_request = DescribeTableRequest {
1755 id: Some(table_id.clone()),
1756 ..Default::default()
1757 };
1758 let response = ns_client
1759 .describe_table(describe_request)
1760 .await
1761 .map_err(|e| Error::Runtime {
1762 message: format!(
1763 "Failed to describe table via namespace client: {}. \
1764 If you don't need managed versioning, don't pass namespace_client.",
1765 e
1766 ),
1767 })?;
1768 response.managed_versioning == Some(true)
1769 }
1770 None => false,
1771 };
1772
1773 let mut builder = DatasetBuilder::from_uri(uri).with_read_params(params);
1774
1775 if managed_versioning && let Some(ref ns_client) = namespace_client {
1777 let external_store =
1778 LanceNamespaceExternalManifestStore::new(ns_client.clone(), table_id.clone());
1779 let commit_handler: Arc<dyn CommitHandler> = Arc::new(ExternalManifestCommitHandler {
1780 external_manifest_store: Arc::new(external_store),
1781 });
1782 builder = builder.with_commit_handler(commit_handler);
1783 }
1784
1785 let dataset = builder.load().await.map_err(|e| match e {
1786 lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
1787 name: name.to_string(),
1788 source: Box::new(e),
1789 },
1790 e => e.into(),
1791 })?;
1792
1793 let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
1794 let id = Self::build_id(&namespace, name);
1795
1796 Ok(Self {
1797 name: name.to_string(),
1798 namespace,
1799 id,
1800 uri: uri.to_string(),
1801 dataset,
1802 read_consistency_interval,
1803 namespace_client,
1804 pushdown_operations,
1805 })
1806 }
1807
1808 pub fn with_namespace_client(mut self, namespace_client: Arc<dyn LanceNamespace>) -> Self {
1812 self.namespace_client = Some(namespace_client);
1813 self
1814 }
1815
1816 #[allow(clippy::too_many_arguments)]
1838 pub async fn open_from_namespace(
1839 namespace_client: Arc<dyn LanceNamespace>,
1840 name: &str,
1841 namespace: Vec<String>,
1842 write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1843 params: Option<ReadParams>,
1844 read_consistency_interval: Option<std::time::Duration>,
1845 pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1846 session: Option<Arc<lance::session::Session>>,
1847 ) -> Result<Self> {
1848 let mut params = params.unwrap_or_default();
1849
1850 if let Some(sess) = session {
1852 params.session(sess);
1853 }
1854
1855 let params = match write_store_wrapper.clone() {
1857 Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1858 None => params,
1859 };
1860
1861 let mut table_id = namespace.clone();
1863 table_id.push(name.to_string());
1864
1865 let builder = DatasetBuilder::from_namespace(namespace_client.clone(), table_id)
1868 .await
1869 .map_err(|e| map_namespace_lance_error(e, name))?;
1870
1871 let dataset = builder
1872 .with_read_params(params)
1873 .load()
1874 .await
1875 .map_err(|e| match e {
1876 lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
1877 name: name.to_string(),
1878 source: Box::new(e),
1879 },
1880 e => e.into(),
1881 })?;
1882
1883 let uri = dataset.uri().to_string();
1884 let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
1885 let id = Self::build_id(&namespace, name);
1886
1887 let stored_namespace_client =
1888 if pushdown_operations.contains(&NamespaceClientPushdownOperation::QueryTable) {
1889 Some(namespace_client)
1890 } else {
1891 None
1892 };
1893
1894 Ok(Self {
1895 name: name.to_string(),
1896 namespace,
1897 id,
1898 uri,
1899 dataset,
1900 read_consistency_interval,
1901 namespace_client: stored_namespace_client,
1902 pushdown_operations,
1903 })
1904 }
1905
1906 fn get_table_name(uri: &str) -> Result<String> {
1907 let path = Path::new(uri);
1908 let name = path
1909 .file_stem()
1910 .ok_or(Error::TableNotFound {
1911 name: uri.to_string(),
1912 source: format!("Could not extract table name from URI: '{}'", uri).into(),
1913 })?
1914 .to_str()
1915 .ok_or(Error::InvalidTableName {
1916 name: uri.to_string(),
1917 reason: "Table name is not valid URL".to_string(),
1918 })?;
1919 Ok(name.to_string())
1920 }
1921
1922 fn build_id(namespace: &[String], name: &str) -> String {
1923 if namespace.is_empty() {
1924 name.to_string()
1925 } else {
1926 let mut parts = namespace.to_vec();
1927 parts.push(name.to_string());
1928 parts.join("$")
1929 }
1930 }
1931
1932 #[allow(clippy::too_many_arguments)]
1949 pub async fn create(
1950 uri: &str,
1951 name: &str,
1952 namespace: Vec<String>,
1953 batches: impl StreamingWriteSource,
1954 write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1955 params: Option<WriteParams>,
1956 read_consistency_interval: Option<std::time::Duration>,
1957 namespace_client: Option<Arc<dyn LanceNamespace>>,
1958 pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1959 ) -> Result<Self> {
1960 let params = params.unwrap_or(WriteParams {
1962 ..Default::default()
1963 });
1964 let params = match write_store_wrapper.clone() {
1966 Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1967 None => params,
1968 };
1969
1970 let insert_builder = InsertBuilder::new(uri).with_params(¶ms);
1971 let dataset = insert_builder
1972 .execute_stream(batches)
1973 .await
1974 .map_err(|e| match e {
1975 lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists {
1976 name: name.to_string(),
1977 },
1978 e => e.into(),
1979 })?;
1980
1981 let id = Self::build_id(&namespace, name);
1982
1983 Ok(Self {
1984 name: name.to_string(),
1985 namespace,
1986 id,
1987 uri: uri.to_string(),
1988 dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
1989 read_consistency_interval,
1990 namespace_client,
1991 pushdown_operations,
1992 })
1993 }
1994
1995 #[allow(clippy::too_many_arguments)]
1996 pub async fn create_empty(
1997 uri: &str,
1998 name: &str,
1999 namespace: Vec<String>,
2000 schema: SchemaRef,
2001 write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
2002 params: Option<WriteParams>,
2003 read_consistency_interval: Option<std::time::Duration>,
2004 namespace_client: Option<Arc<dyn LanceNamespace>>,
2005 pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
2006 ) -> Result<Self> {
2007 let data: Box<dyn Scannable> = Box::new(RecordBatch::new_empty(schema));
2008 Self::create(
2009 uri,
2010 name,
2011 namespace,
2012 data,
2013 write_store_wrapper,
2014 params,
2015 read_consistency_interval,
2016 namespace_client,
2017 pushdown_operations,
2018 )
2019 .await
2020 }
2021
2022 #[allow(clippy::too_many_arguments)]
2046 pub async fn create_from_namespace(
2047 namespace_client: Arc<dyn LanceNamespace>,
2048 uri: &str,
2049 name: &str,
2050 namespace: Vec<String>,
2051 batches: impl StreamingWriteSource,
2052 write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
2053 params: Option<WriteParams>,
2054 read_consistency_interval: Option<std::time::Duration>,
2055 pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
2056 session: Option<Arc<lance::session::Session>>,
2057 ) -> Result<Self> {
2058 let mut table_id = namespace.clone();
2060 table_id.push(name.to_string());
2061
2062 let storage_options_provider = Arc::new(LanceNamespaceStorageOptionsProvider::new(
2064 namespace_client.clone(),
2065 table_id,
2066 ));
2067
2068 let mut params = params.unwrap_or_default();
2070
2071 if let Some(sess) = session {
2073 params.session = Some(sess);
2074 }
2075
2076 let store_params = params
2078 .store_params
2079 .get_or_insert_with(ObjectStoreParams::default);
2080 let accessor = match store_params.storage_options().cloned() {
2081 Some(options) => {
2082 StorageOptionsAccessor::with_initial_and_provider(options, storage_options_provider)
2083 }
2084 None => StorageOptionsAccessor::with_provider(storage_options_provider),
2085 };
2086 store_params.storage_options_accessor = Some(Arc::new(accessor));
2087
2088 let params = match write_store_wrapper.clone() {
2090 Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
2091 None => params,
2092 };
2093
2094 let insert_builder = InsertBuilder::new(uri).with_params(¶ms);
2095 let dataset = insert_builder
2096 .execute_stream(batches)
2097 .await
2098 .map_err(|e| match e {
2099 lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists {
2100 name: name.to_string(),
2101 },
2102 e => e.into(),
2103 })?;
2104
2105 let id = Self::build_id(&namespace, name);
2106
2107 let stored_namespace_client =
2108 if pushdown_operations.contains(&NamespaceClientPushdownOperation::QueryTable) {
2109 Some(namespace_client)
2110 } else {
2111 None
2112 };
2113
2114 Ok(Self {
2115 name: name.to_string(),
2116 namespace,
2117 id,
2118 uri: uri.to_string(),
2119 dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
2120 read_consistency_interval,
2121 namespace_client: stored_namespace_client,
2122 pushdown_operations,
2123 })
2124 }
2125
2126 pub async fn merge(
2128 &mut self,
2129 batches: impl RecordBatchReader + Send + 'static,
2130 left_on: &str,
2131 right_on: &str,
2132 ) -> Result<()> {
2133 self.dataset.ensure_mutable()?;
2134 let mut dataset = (*self.dataset.get().await?).clone();
2135 dataset.merge(batches, left_on, right_on).await?;
2136 self.dataset.update(dataset);
2137 Ok(())
2138 }
2139
2140 pub async fn count_fragments(&self) -> Result<usize> {
2142 Ok(self.dataset.get().await?.count_fragments())
2143 }
2144
2145 pub async fn count_deleted_rows(&self) -> Result<usize> {
2146 Ok(self.dataset.get().await?.count_deleted_rows().await?)
2147 }
2148
2149 pub async fn num_small_files(&self, max_rows_per_group: usize) -> Result<usize> {
2150 Ok(self
2151 .dataset
2152 .get()
2153 .await?
2154 .num_small_files(max_rows_per_group)
2155 .await)
2156 }
2157
2158 pub async fn load_indices(&self) -> Result<Vec<VectorIndex>> {
2159 let dataset = self.dataset.get().await?;
2160 let mf = dataset.manifest();
2161 let indices = dataset.load_indices().await?;
2162 Ok(indices
2163 .iter()
2164 .map(|i| VectorIndex::new_from_format(mf, i))
2165 .collect())
2166 }
2167
2168 fn validate_index_type(
2170 field: &Field,
2171 index_name: &str,
2172 supported_fn: impl Fn(&DataType) -> bool,
2173 ) -> Result<()> {
2174 if !supported_fn(field.data_type()) {
2175 return Err(Error::Schema {
2176 message: format!(
2177 "A {} index cannot be created on the field `{}` which has data type {}",
2178 index_name,
2179 field.name(),
2180 field.data_type()
2181 ),
2182 });
2183 }
2184 Ok(())
2185 }
2186
2187 fn build_ivf_params(
2189 num_partitions: Option<u32>,
2190 target_partition_size: Option<u32>,
2191 sample_rate: u32,
2192 max_iterations: u32,
2193 ) -> IvfBuildParams {
2194 let mut ivf_params = match (num_partitions, target_partition_size) {
2195 (Some(num_partitions), _) => IvfBuildParams::new(num_partitions as usize),
2196 (None, Some(target_partition_size)) => {
2197 IvfBuildParams::with_target_partition_size(target_partition_size as usize)
2198 }
2199 (None, None) => IvfBuildParams::default(),
2200 };
2201 ivf_params.sample_rate = sample_rate as usize;
2202 ivf_params.max_iters = max_iterations as usize;
2203 ivf_params
2204 }
2205
2206 fn get_num_sub_vectors(provided: Option<u32>, dim: u32, num_bits: Option<u32>) -> u32 {
2208 if let Some(provided) = provided {
2209 return provided;
2210 }
2211 let suggested = suggested_num_sub_vectors(dim);
2212 if num_bits.is_some_and(|num_bits| num_bits == 4) && !suggested.is_multiple_of(2) {
2213 suggested + 1
2215 } else {
2216 suggested
2217 }
2218 }
2219
2220 fn get_vector_dimension(field: &Field) -> Result<u32> {
2222 match field.data_type() {
2223 arrow_schema::DataType::FixedSizeList(_, n) => Ok(*n as u32),
2224 _ => Ok(infer_vector_dim(field.data_type())? as u32),
2225 }
2226 }
2227
2228 fn resolve_index_field(
2229 schema: &lance_core::datatypes::Schema,
2230 column: &str,
2231 ) -> Result<(String, Field)> {
2232 lance_core::datatypes::parse_field_path(column).map_err(|e| Error::InvalidInput {
2233 message: format!("Invalid field path `{}`: {}", column, e),
2234 })?;
2235
2236 let field_path = schema
2237 .resolve_case_insensitive(column)
2238 .ok_or_else(|| Error::Schema {
2239 message: format!(
2240 "Field path `{}` not found in schema. Available field paths: {}",
2241 column,
2242 schema.field_paths().join(", ")
2243 ),
2244 })?;
2245 let field = field_path.last().expect("field path should be non-empty");
2246 let path_segments = field_path
2247 .iter()
2248 .map(|field| field.name.as_str())
2249 .collect::<Vec<_>>();
2250 let canonical_path = lance_core::datatypes::format_field_path(&path_segments);
2251
2252 Ok((canonical_path, Field::from(*field)))
2253 }
2254
2255 async fn make_index_params(
2257 &self,
2258 field: &Field,
2259 index_opts: Index,
2260 ) -> Result<Box<dyn lance::index::IndexParams>> {
2261 match index_opts {
2262 Index::Auto => {
2263 if supported_vector_data_type(field.data_type()) {
2264 let dim = Self::get_vector_dimension(field)?;
2266 let ivf_params = lance_index::vector::ivf::IvfBuildParams::default();
2267 let num_sub_vectors = Self::get_num_sub_vectors(None, dim, None);
2268 let pq_params =
2269 lance_index::vector::pq::PQBuildParams::new(num_sub_vectors as usize, 8);
2270 let lance_idx_params =
2271 lance::index::vector::VectorIndexParams::with_ivf_pq_params(
2272 lance_linalg::distance::MetricType::L2,
2273 ivf_params,
2274 pq_params,
2275 );
2276 Ok(Box::new(lance_idx_params))
2277 } else if supported_btree_data_type(field.data_type()) {
2278 Ok(Box::new(ScalarIndexParams::for_builtin(
2279 BuiltinIndexType::BTree,
2280 )))
2281 } else {
2282 Err(Error::InvalidInput {
2283 message: format!(
2284 "there are no indices supported for the field `{}` with the data type {}",
2285 field.name(),
2286 field.data_type()
2287 ),
2288 })?
2289 }
2290 }
2291 Index::BTree(_) => {
2292 Self::validate_index_type(field, "BTree", supported_btree_data_type)?;
2293 Ok(Box::new(ScalarIndexParams::for_builtin(
2294 BuiltinIndexType::BTree,
2295 )))
2296 }
2297 Index::Bitmap(_) => {
2298 Self::validate_index_type(field, "Bitmap", supported_bitmap_data_type)?;
2299 Ok(Box::new(ScalarIndexParams::for_builtin(
2300 BuiltinIndexType::Bitmap,
2301 )))
2302 }
2303 Index::LabelList(_) => {
2304 Self::validate_index_type(field, "LabelList", supported_label_list_data_type)?;
2305 Ok(Box::new(ScalarIndexParams::for_builtin(
2306 BuiltinIndexType::LabelList,
2307 )))
2308 }
2309 Index::FTS(fts_opts) => {
2310 Self::validate_index_type(field, "FTS", supported_fts_data_type)?;
2311 Ok(Box::new(fts_opts))
2312 }
2313 Index::IvfFlat(index) => {
2314 Self::validate_index_type(field, "IVF Flat", supported_vector_data_type)?;
2315 let ivf_params = Self::build_ivf_params(
2316 index.num_partitions,
2317 index.target_partition_size,
2318 index.sample_rate,
2319 index.max_iterations,
2320 );
2321 let lance_idx_params =
2322 VectorIndexParams::with_ivf_flat_params(index.distance_type.into(), ivf_params);
2323 Ok(Box::new(lance_idx_params))
2324 }
2325 Index::IvfSq(index) => {
2326 Self::validate_index_type(field, "IVF SQ", supported_vector_data_type)?;
2327 let ivf_params = Self::build_ivf_params(
2328 index.num_partitions,
2329 index.target_partition_size,
2330 index.sample_rate,
2331 index.max_iterations,
2332 );
2333 let sq_params = SQBuildParams {
2334 sample_rate: index.sample_rate as usize,
2335 ..Default::default()
2336 };
2337 let lance_idx_params = VectorIndexParams::with_ivf_sq_params(
2338 index.distance_type.into(),
2339 ivf_params,
2340 sq_params,
2341 );
2342 Ok(Box::new(lance_idx_params))
2343 }
2344 Index::IvfPq(index) => {
2345 Self::validate_index_type(field, "IVF PQ", supported_vector_data_type)?;
2346 let dim = Self::get_vector_dimension(field)?;
2347 let ivf_params = Self::build_ivf_params(
2348 index.num_partitions,
2349 index.target_partition_size,
2350 index.sample_rate,
2351 index.max_iterations,
2352 );
2353 let num_sub_vectors =
2354 Self::get_num_sub_vectors(index.num_sub_vectors, dim, index.num_bits);
2355 let num_bits = index.num_bits.unwrap_or(8) as usize;
2356 let mut pq_params = PQBuildParams::new(num_sub_vectors as usize, num_bits);
2357 pq_params.max_iters = index.max_iterations as usize;
2358 let lance_idx_params = VectorIndexParams::with_ivf_pq_params(
2359 index.distance_type.into(),
2360 ivf_params,
2361 pq_params,
2362 );
2363 Ok(Box::new(lance_idx_params))
2364 }
2365 Index::IvfRq(index) => {
2366 Self::validate_index_type(field, "IVF RQ", supported_vector_data_type)?;
2367 let ivf_params = Self::build_ivf_params(
2368 index.num_partitions,
2369 index.target_partition_size,
2370 index.sample_rate,
2371 index.max_iterations,
2372 );
2373 let rq_params = RQBuildParams::new(index.num_bits.unwrap_or(1) as u8);
2374 let lance_idx_params = VectorIndexParams::with_ivf_rq_params(
2375 index.distance_type.into(),
2376 ivf_params,
2377 rq_params,
2378 );
2379 Ok(Box::new(lance_idx_params))
2380 }
2381 Index::IvfHnswPq(index) => {
2382 Self::validate_index_type(field, "IVF HNSW PQ", supported_vector_data_type)?;
2383 let dim = Self::get_vector_dimension(field)?;
2384 let ivf_params = Self::build_ivf_params(
2385 index.num_partitions,
2386 index.target_partition_size,
2387 index.sample_rate,
2388 index.max_iterations,
2389 );
2390 let num_sub_vectors =
2391 Self::get_num_sub_vectors(index.num_sub_vectors, dim, index.num_bits);
2392 let hnsw_params = HnswBuildParams::default()
2393 .num_edges(index.m as usize)
2394 .ef_construction(index.ef_construction as usize);
2395 let pq_params = PQBuildParams::new(
2396 num_sub_vectors as usize,
2397 index.num_bits.unwrap_or(8) as usize,
2398 );
2399 let lance_idx_params = VectorIndexParams::with_ivf_hnsw_pq_params(
2400 index.distance_type.into(),
2401 ivf_params,
2402 hnsw_params,
2403 pq_params,
2404 );
2405 Ok(Box::new(lance_idx_params))
2406 }
2407 Index::IvfHnswSq(index) => {
2408 Self::validate_index_type(field, "IVF HNSW SQ", supported_vector_data_type)?;
2409 let ivf_params = Self::build_ivf_params(
2410 index.num_partitions,
2411 index.target_partition_size,
2412 index.sample_rate,
2413 index.max_iterations,
2414 );
2415 let hnsw_params = HnswBuildParams::default()
2416 .num_edges(index.m as usize)
2417 .ef_construction(index.ef_construction as usize);
2418 let sq_params = SQBuildParams {
2419 sample_rate: index.sample_rate as usize,
2420 ..Default::default()
2421 };
2422 let lance_idx_params = VectorIndexParams::with_ivf_hnsw_sq_params(
2423 index.distance_type.into(),
2424 ivf_params,
2425 hnsw_params,
2426 sq_params,
2427 );
2428 Ok(Box::new(lance_idx_params))
2429 }
2430 Index::IvfHnswFlat(index) => {
2431 Self::validate_index_type(field, "IVF HNSW FLAT", supported_vector_data_type)?;
2432 let ivf_params = Self::build_ivf_params(
2433 index.num_partitions,
2434 index.target_partition_size,
2435 index.sample_rate,
2436 index.max_iterations,
2437 );
2438 let hnsw_params = HnswBuildParams::default()
2439 .num_edges(index.m as usize)
2440 .ef_construction(index.ef_construction as usize);
2441 let lance_idx_params = VectorIndexParams::ivf_hnsw(
2442 index.distance_type.into(),
2443 ivf_params,
2444 hnsw_params,
2445 );
2446 Ok(Box::new(lance_idx_params))
2447 }
2448 }
2449 }
2450
2451 fn get_index_type_for_field(&self, field: &Field, index: &Index) -> IndexType {
2453 match index {
2454 Index::Auto => {
2455 if supported_vector_data_type(field.data_type()) {
2456 IndexType::Vector
2457 } else if supported_btree_data_type(field.data_type()) {
2458 IndexType::BTree
2459 } else {
2460 IndexType::BTree
2462 }
2463 }
2464 Index::BTree(_) => IndexType::BTree,
2465 Index::Bitmap(_) => IndexType::Bitmap,
2466 Index::LabelList(_) => IndexType::LabelList,
2467 Index::FTS(_) => IndexType::Inverted,
2468 Index::IvfFlat(_)
2469 | Index::IvfSq(_)
2470 | Index::IvfPq(_)
2471 | Index::IvfRq(_)
2472 | Index::IvfHnswPq(_)
2473 | Index::IvfHnswSq(_)
2474 | Index::IvfHnswFlat(_) => IndexType::Vector,
2475 }
2476 }
2477
2478 pub async fn uses_v2_manifest_paths(&self) -> Result<bool> {
2483 let dataset = self.dataset.get().await?;
2484 Ok(dataset.manifest_location().naming_scheme == ManifestNamingScheme::V2)
2485 }
2486
2487 pub async fn migrate_manifest_paths_v2(&self) -> Result<()> {
2502 self.dataset.ensure_mutable()?;
2503 let mut dataset = (*self.dataset.get().await?).clone();
2504 dataset.migrate_manifest_paths_v2().await?;
2505 self.dataset.update(dataset);
2506 Ok(())
2507 }
2508
2509 pub async fn manifest(&self) -> Result<Manifest> {
2511 let dataset = self.dataset.get().await?;
2512 Ok(dataset.manifest().clone())
2513 }
2514
2515 pub async fn update_config(
2517 &self,
2518 upsert_values: impl IntoIterator<Item = (String, String)>,
2519 ) -> Result<()> {
2520 self.dataset.ensure_mutable()?;
2521 let mut dataset = (*self.dataset.get().await?).clone();
2522 dataset.update_config(upsert_values).await?;
2523 self.dataset.update(dataset);
2524 Ok(())
2525 }
2526
2527 pub async fn delete_config_keys(&self, delete_keys: &[&str]) -> Result<()> {
2529 self.dataset.ensure_mutable()?;
2530 let mut dataset = (*self.dataset.get().await?).clone();
2531 #[allow(deprecated)]
2533 dataset.delete_config_keys(delete_keys).await?;
2534 self.dataset.update(dataset);
2535 Ok(())
2536 }
2537
2538 pub async fn replace_schema_metadata(
2540 &self,
2541 upsert_values: impl IntoIterator<Item = (String, String)>,
2542 ) -> Result<()> {
2543 self.dataset.ensure_mutable()?;
2544 let mut dataset = (*self.dataset.get().await?).clone();
2545 #[allow(deprecated)]
2547 dataset.replace_schema_metadata(upsert_values).await?;
2548 self.dataset.update(dataset);
2549 Ok(())
2550 }
2551
2552 pub async fn replace_field_metadata(
2560 &self,
2561 new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
2562 ) -> Result<()> {
2563 self.dataset.ensure_mutable()?;
2564 let mut dataset = (*self.dataset.get().await?).clone();
2565 dataset.replace_field_metadata(new_values).await?;
2566 self.dataset.update(dataset);
2567 Ok(())
2568 }
2569}
2570
2571#[async_trait::async_trait]
2572impl BaseTable for NativeTable {
2573 fn as_any(&self) -> &dyn std::any::Any {
2574 self
2575 }
2576
2577 fn name(&self) -> &str {
2578 self.name.as_str()
2579 }
2580
2581 fn namespace(&self) -> &[String] {
2582 &self.namespace
2583 }
2584
2585 fn id(&self) -> &str {
2586 &self.id
2587 }
2588
2589 async fn version(&self) -> Result<u64> {
2590 Ok(self.dataset.get().await?.version().version)
2591 }
2592
2593 async fn checkout(&self, version: u64) -> Result<()> {
2594 self.dataset.as_time_travel(version).await
2595 }
2596
2597 async fn checkout_tag(&self, tag: &str) -> Result<()> {
2598 self.dataset.as_time_travel(tag).await
2599 }
2600
2601 async fn checkout_latest(&self) -> Result<()> {
2602 self.dataset.as_latest().await?;
2603 self.dataset.reload().await
2604 }
2605
2606 async fn list_versions(&self) -> Result<Vec<Version>> {
2607 Ok(self.dataset.get().await?.versions().await?)
2608 }
2609
2610 async fn restore(&self) -> Result<()> {
2611 let version = self
2612 .dataset
2613 .time_travel_version()
2614 .ok_or_else(|| Error::InvalidInput {
2615 message: "you must run checkout before running restore".to_string(),
2616 })?;
2617 {
2618 let mut dataset = (*self.dataset.get().await?).clone();
2620 debug_assert_eq!(dataset.version().version, version);
2621 dataset.restore().await?;
2622 }
2623 self.dataset.as_latest().await?;
2624 Ok(())
2625 }
2626
2627 async fn schema(&self) -> Result<SchemaRef> {
2628 let lance_schema = self.dataset.get().await?.schema().clone();
2629 Ok(Arc::new(Schema::from(&lance_schema)))
2630 }
2631
2632 async fn table_definition(&self) -> Result<TableDefinition> {
2633 let schema = self.schema().await?;
2634 TableDefinition::try_from_rich_schema(schema)
2635 }
2636
2637 async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
2638 let dataset = self.dataset.get().await?;
2639 match filter {
2640 None => Ok(dataset.count_rows(None).await?),
2641 Some(Filter::Sql(sql)) => Ok(dataset.count_rows(Some(sql)).await?),
2642 Some(Filter::Datafusion(_)) => Err(Error::NotSupported {
2643 message: "Datafusion filters are not yet supported".to_string(),
2644 }),
2645 }
2646 }
2647
2648 async fn add(&self, mut add: AddDataBuilder) -> Result<AddResult> {
2649 let table_def = self.table_definition().await?;
2650
2651 self.dataset.ensure_mutable()?;
2652 let ds_wrapper = self.dataset.clone();
2653 let ds = self.dataset.get().await?;
2654
2655 let table_schema = Schema::from(&ds.schema().clone());
2656
2657 let num_partitions = if let Some(parallelism) = add.write_parallelism {
2658 parallelism
2659 } else {
2660 let mut peeked = PeekedScannable::new(add.data);
2663 let n = if let Some(first_batch) = peeked.peek().await {
2664 let max_partitions = lance_core::utils::tokio::get_num_compute_intensive_cpus();
2665 estimate_write_partitions(
2666 first_batch.get_array_memory_size(),
2667 first_batch.num_rows(),
2668 peeked.num_rows(),
2669 max_partitions,
2670 )
2671 } else {
2672 1
2673 };
2674 add.data = Box::new(peeked);
2675 n
2676 };
2677
2678 let output = add.into_plan(&table_schema, &table_def)?;
2679
2680 let lance_params = output
2681 .write_options
2682 .lance_write_params
2683 .unwrap_or(WriteParams {
2684 mode: match output.mode {
2685 AddDataMode::Append => WriteMode::Append,
2686 AddDataMode::Overwrite => WriteMode::Overwrite,
2687 },
2688 ..Default::default()
2689 });
2690
2691 let plan = if num_partitions > 1 {
2693 Arc::new(
2694 datafusion_physical_plan::repartition::RepartitionExec::try_new(
2695 output.plan,
2696 datafusion_physical_plan::Partitioning::RoundRobinBatch(num_partitions),
2697 )?,
2698 ) as Arc<dyn ExecutionPlan>
2699 } else {
2700 output.plan
2701 };
2702
2703 let insert_exec = Arc::new(InsertExec::new(ds_wrapper.clone(), ds, plan, lance_params));
2704
2705 let tracker_for_tasks = output.tracker.clone();
2706 if let Some(ref t) = tracker_for_tasks {
2707 t.set_total_tasks(num_partitions);
2708 }
2709 let _finish = write_progress::FinishOnDrop(output.tracker);
2710
2711 let task_ctx = Arc::new(TaskContext::default());
2713 let handles = FuturesUnordered::new();
2714 for partition in 0..num_partitions {
2715 let exec = insert_exec.clone();
2716 let ctx = task_ctx.clone();
2717 let tracker = tracker_for_tasks.clone();
2718 handles.push(tokio::spawn(async move {
2719 let _guard = tracker.as_ref().map(|t| t.track_task());
2720 let mut stream = exec
2721 .execute(partition, ctx)
2722 .map_err(|e| -> Error { e.into() })?;
2723 while let Some(batch) = stream.next().await {
2724 batch.map_err(|e| -> Error { e.into() })?;
2725 }
2726 Ok::<_, Error>(())
2727 }));
2728 }
2729 for handle in handles {
2730 handle.await.map_err(|e| Error::Runtime {
2731 message: format!("Insert task panicked: {}", e),
2732 })??;
2733 }
2734
2735 let version = ds_wrapper.get().await?.manifest().version;
2736 Ok(AddResult { version })
2737 }
2738
2739 async fn create_index(&self, opts: IndexBuilder) -> Result<()> {
2740 if opts.columns.len() != 1 {
2741 return Err(Error::Schema {
2742 message: "Multi-column (composite) indices are not yet supported".to_string(),
2743 });
2744 }
2745 self.dataset.ensure_mutable()?;
2746 let mut dataset = (*self.dataset.get().await?).clone();
2747 let (column, field) = Self::resolve_index_field(dataset.schema(), &opts.columns[0])?;
2748
2749 let lance_idx_params = self.make_index_params(&field, opts.index.clone()).await?;
2750 let index_type = self.get_index_type_for_field(&field, &opts.index);
2751 let columns = [column.as_str()];
2752 let mut builder = dataset
2753 .create_index_builder(&columns, index_type, lance_idx_params.as_ref())
2754 .train(opts.train)
2755 .replace(opts.replace);
2756
2757 if let Some(name) = opts.name {
2758 builder = builder.name(name);
2759 }
2760 builder.await?;
2761 self.dataset.update(dataset);
2762 Ok(())
2763 }
2764
2765 async fn drop_index(&self, index_name: &str) -> Result<()> {
2766 self.dataset.ensure_mutable()?;
2767 let mut dataset = (*self.dataset.get().await?).clone();
2768 dataset.drop_index(index_name).await?;
2769 self.dataset.update(dataset);
2770 Ok(())
2771 }
2772
2773 async fn prewarm_index(&self, index_name: &str) -> Result<()> {
2774 let dataset = self.dataset.get().await?;
2775 Ok(dataset.prewarm_index(index_name).await?)
2776 }
2777
2778 async fn prewarm_data(&self, _columns: Option<Vec<String>>) -> Result<()> {
2779 Err(Error::NotSupported {
2780 message: "prewarm_data is currently only supported on remote tables.".into(),
2781 })
2782 }
2783
2784 async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult> {
2785 update::execute_update(self, update).await
2787 }
2788
2789 async fn create_plan(
2790 &self,
2791 query: &AnyQuery,
2792 options: QueryExecutionOptions,
2793 ) -> Result<Arc<dyn ExecutionPlan>> {
2794 query::create_plan(self, query, options).await
2795 }
2796
2797 async fn query(
2798 &self,
2799 query: &AnyQuery,
2800 options: QueryExecutionOptions,
2801 ) -> Result<DatasetRecordBatchStream> {
2802 query::execute_query(self, query, options).await
2803 }
2804
2805 async fn analyze_plan(
2806 &self,
2807 query: &AnyQuery,
2808 options: QueryExecutionOptions,
2809 ) -> Result<String> {
2810 query::analyze_query_plan(self, query, options).await
2811 }
2812
2813 async fn merge_insert(
2814 &self,
2815 params: MergeInsertBuilder,
2816 new_data: Box<dyn RecordBatchReader + Send>,
2817 ) -> Result<MergeResult> {
2818 merge::execute_merge_insert(self, params, new_data).await
2819 }
2820
2821 async fn set_unenforced_primary_key(&self, columns: &[&str]) -> Result<()> {
2822 primary_key::set_unenforced_primary_key(self, columns).await
2823 }
2824
2825 async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> Result<()> {
2826 merge::lsm::set_lsm_write_spec(self, spec).await
2827 }
2828
2829 async fn unset_lsm_write_spec(&self) -> Result<()> {
2830 merge::lsm::unset_lsm_write_spec(self).await
2831 }
2832
2833 async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
2835 delete::execute_delete(self, predicate).await
2836 }
2837
2838 async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
2839 Ok(Box::new(NativeTags {
2840 dataset: self.dataset.clone(),
2841 }))
2842 }
2843
2844 async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
2845 optimize::execute_optimize(self, action).await
2847 }
2848
2849 async fn add_columns(
2850 &self,
2851 transforms: NewColumnTransform,
2852 read_columns: Option<Vec<String>>,
2853 ) -> Result<AddColumnsResult> {
2854 schema_evolution::execute_add_columns(self, transforms, read_columns).await
2855 }
2856
2857 async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult> {
2858 schema_evolution::execute_alter_columns(self, alterations).await
2859 }
2860
2861 async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
2862 schema_evolution::execute_drop_columns(self, columns).await
2863 }
2864
2865 async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
2866 let dataset = self.dataset.get().await?;
2867 let indices = dataset.load_indices().await?;
2868 let results = futures::stream::iter(indices.as_slice())
2869 .then(|idx| async {
2870 if idx.name == FRAG_REUSE_INDEX_NAME {
2872 return None;
2873 }
2874
2875 let stats = match dataset.index_statistics(idx.name.as_str()).await {
2876 Ok(stats) => stats,
2877 Err(e) => {
2878 log::warn!(
2879 "Failed to get statistics for index {} ({}): {}",
2880 idx.name,
2881 idx.uuid,
2882 e
2883 );
2884 return None;
2885 }
2886 };
2887
2888 let stats: serde_json::Value = match serde_json::from_str(&stats) {
2889 Ok(stats) => stats,
2890 Err(e) => {
2891 log::warn!(
2892 "Failed to deserialize index statistics for index {} ({}): {}",
2893 idx.name,
2894 idx.uuid,
2895 e
2896 );
2897 return None;
2898 }
2899 };
2900
2901 let Some(index_type) = stats.get("index_type").and_then(|v| v.as_str()) else {
2902 log::warn!(
2903 "Index statistics was missing 'index_type' field for index {} ({})",
2904 idx.name,
2905 idx.uuid
2906 );
2907 return None;
2908 };
2909
2910 let index_type: crate::index::IndexType = match index_type.parse() {
2911 Ok(index_type) => index_type,
2912 Err(e) => {
2913 log::warn!(
2914 "Failed to parse index type for index {} ({}): {}",
2915 idx.name,
2916 idx.uuid,
2917 e
2918 );
2919 return None;
2920 }
2921 };
2922
2923 let mut columns = Vec::with_capacity(idx.fields.len());
2924 for field_id in &idx.fields {
2925 let field_path = match dataset.schema().field_path(*field_id) {
2926 Ok(field_path) => field_path,
2927 Err(e) => {
2928 log::warn!(
2929 "Failed to resolve field path for index {} ({}) field id {}: {}",
2930 idx.name,
2931 idx.uuid,
2932 field_id,
2933 e
2934 );
2935 return None;
2936 }
2937 };
2938 columns.push(field_path);
2939 }
2940
2941 let name = idx.name.clone();
2942 Some(IndexConfig {
2943 index_type,
2944 columns,
2945 name,
2946 })
2947 })
2948 .collect::<Vec<_>>()
2949 .await;
2950
2951 Ok(results.into_iter().flatten().collect())
2952 }
2953
2954 async fn uri(&self) -> Result<String> {
2955 Ok(self.uri.clone())
2956 }
2957
2958 async fn storage_options(&self) -> Option<HashMap<String, String>> {
2959 self.initial_storage_options().await
2960 }
2961
2962 async fn initial_storage_options(&self) -> Option<HashMap<String, String>> {
2963 self.dataset
2964 .get()
2965 .await
2966 .ok()
2967 .and_then(|dataset| dataset.initial_storage_options().cloned())
2968 }
2969
2970 async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>> {
2971 let dataset = self.dataset.get().await?;
2972 Ok(dataset.latest_storage_options().await?.map(|o| o.0))
2973 }
2974
2975 async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>> {
2976 let stats = match self
2977 .dataset
2978 .get()
2979 .await?
2980 .index_statistics(index_name.as_ref())
2981 .await
2982 {
2983 Ok(stats) => stats,
2984 Err(lance_core::Error::IndexNotFound { .. }) => return Ok(None),
2985 Err(e) => return Err(Error::from(e)),
2986 };
2987
2988 let mut stats: IndexStatisticsImpl =
2989 serde_json::from_str(&stats).map_err(|e| Error::InvalidInput {
2990 message: format!("error deserializing index statistics: {}", e),
2991 })?;
2992
2993 let first_index = stats.indices.pop().ok_or_else(|| Error::InvalidInput {
2994 message: "index statistics is empty".to_string(),
2995 })?;
2996 let index_type =
2998 stats
2999 .index_type
3000 .or(first_index.index_type)
3001 .ok_or_else(|| Error::InvalidInput {
3002 message: "index statistics was missing index type".to_string(),
3003 })?;
3004 let loss = stats
3005 .indices
3006 .iter()
3007 .map(|index| index.loss.unwrap_or_default())
3008 .sum::<f64>();
3009
3010 let loss = first_index.loss.map(|first_loss| first_loss + loss);
3011 Ok(Some(IndexStatistics {
3012 num_indexed_rows: stats.num_indexed_rows,
3013 num_unindexed_rows: stats.num_unindexed_rows,
3014 index_type,
3015 distance_type: first_index.metric_type,
3016 num_indices: stats.num_indices,
3017 loss,
3018 }))
3019 }
3020
3021 async fn wait_for_index(
3024 &self,
3025 index_names: &[&str],
3026 timeout: std::time::Duration,
3027 ) -> Result<()> {
3028 wait_for_index(self, index_names, timeout).await
3029 }
3030
3031 async fn stats(&self) -> Result<TableStatistics> {
3032 let num_rows = self.count_rows(None).await?;
3033 let num_indices = self.list_indices().await?.len();
3034 let ds = self.dataset.get().await?;
3035 let ds_clone = (*ds).clone();
3036 let ds_stats = Arc::new(ds_clone).calculate_data_stats().await?;
3037 let total_bytes = ds_stats.fields.iter().map(|f| f.bytes_on_disk).sum::<u64>() as usize;
3038
3039 let frags = ds.get_fragments();
3040 let mut sorted_sizes = join_all(
3041 frags
3042 .iter()
3043 .map(|frag| async move { frag.physical_rows().await.unwrap_or(0) }),
3044 )
3045 .await;
3046 sorted_sizes.sort();
3047
3048 let small_frag_threshold = 100000;
3049 let num_fragments = sorted_sizes.len();
3050 let num_small_fragments = sorted_sizes
3051 .iter()
3052 .filter(|&&size| size < small_frag_threshold)
3053 .count();
3054
3055 let p25 = *sorted_sizes.get(num_fragments / 4).unwrap_or(&0);
3056 let p50 = *sorted_sizes.get(num_fragments / 2).unwrap_or(&0);
3057 let p75 = *sorted_sizes.get(num_fragments * 3 / 4).unwrap_or(&0);
3058 let p99 = *sorted_sizes.get(num_fragments * 99 / 100).unwrap_or(&0);
3059 let min = sorted_sizes.first().copied().unwrap_or(0);
3060 let max = sorted_sizes.last().copied().unwrap_or(0);
3061 let mean = if num_fragments == 0 {
3062 0
3063 } else {
3064 sorted_sizes.iter().copied().sum::<usize>() / num_fragments
3065 };
3066
3067 let frag_stats = FragmentStatistics {
3068 num_fragments,
3069 num_small_fragments,
3070 lengths: FragmentSummaryStats {
3071 min,
3072 max,
3073 mean,
3074 p25,
3075 p50,
3076 p75,
3077 p99,
3078 },
3079 };
3080 let stats = TableStatistics {
3081 total_bytes,
3082 num_rows,
3083 num_indices,
3084 fragment_stats: frag_stats,
3085 };
3086 Ok(stats)
3087 }
3088
3089 async fn create_insert_exec(
3090 &self,
3091 input: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
3092 write_params: WriteParams,
3093 ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>> {
3094 let ds = self.dataset.get().await?;
3095 let dataset = Arc::new((*ds).clone());
3096 Ok(Arc::new(datafusion::insert::InsertExec::new(
3097 self.dataset.clone(),
3098 dataset,
3099 input,
3100 write_params,
3101 )))
3102 }
3103}
3104
3105#[skip_serializing_none]
3106#[derive(Debug, Deserialize, PartialEq)]
3107pub struct TableStatistics {
3108 pub total_bytes: usize,
3110
3111 pub num_rows: usize,
3113
3114 pub num_indices: usize,
3116
3117 pub fragment_stats: FragmentStatistics,
3119}
3120
3121#[skip_serializing_none]
3122#[derive(Debug, Deserialize, PartialEq)]
3123pub struct FragmentStatistics {
3124 pub num_fragments: usize,
3126
3127 pub num_small_fragments: usize,
3129
3130 pub lengths: FragmentSummaryStats,
3132 }
3136
3137#[skip_serializing_none]
3138#[derive(Debug, Deserialize, PartialEq)]
3139pub struct FragmentSummaryStats {
3140 pub min: usize,
3141 pub max: usize,
3142 pub mean: usize,
3143 pub p25: usize,
3144 pub p50: usize,
3145 pub p75: usize,
3146 pub p99: usize,
3147}
3148
3149#[cfg(test)]
3150#[allow(deprecated)]
3151mod tests {
3152 use std::collections::HashMap;
3153 use std::sync::Arc;
3154 use std::sync::atomic::{AtomicBool, Ordering};
3155 use std::time::Duration;
3156
3157 use arrow_array::{
3158 Array, ArrayRef, BooleanArray, FixedSizeListArray, Int32Array, LargeStringArray,
3159 RecordBatch, RecordBatchIterator, RecordBatchReader, StringArray, StructArray,
3160 builder::{ListBuilder, StringBuilder},
3161 };
3162 use arrow_array::{BinaryArray, LargeBinaryArray};
3163 use arrow_data::ArrayDataBuilder;
3164 use arrow_schema::{DataType, Field, Schema};
3165 use futures::TryStreamExt;
3166 use lance::Dataset;
3167 use lance::io::{ObjectStoreParams, WrappingObjectStore};
3168 use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION;
3169 use tempfile::tempdir;
3170
3171 use super::*;
3172 use crate::connect;
3173 use crate::connection::ConnectBuilder;
3174 use crate::index::scalar::{BTreeIndexBuilder, BitmapIndexBuilder};
3175 use crate::index::vector::{IvfHnswPqIndexBuilder, IvfHnswSqIndexBuilder};
3176 use crate::query::Select;
3177 use crate::query::{ExecutableQuery, QueryBase};
3178 use crate::test_utils::connection::new_test_connection;
3179 use lance_index::scalar::FullTextSearchQuery;
3180 #[tokio::test]
3181 async fn test_open() {
3182 let tmp_dir = tempdir().unwrap();
3183 let dataset_path = tmp_dir.path().join("test.lance");
3184
3185 let batch = make_test_batches();
3186 let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
3187 Dataset::write(reader, dataset_path.to_str().unwrap(), None)
3188 .await
3189 .unwrap();
3190
3191 let table = NativeTable::open(dataset_path.to_str().unwrap())
3192 .await
3193 .unwrap();
3194
3195 assert_eq!(table.name, "test")
3196 }
3197
3198 #[tokio::test]
3199 async fn test_open_not_found() {
3200 let tmp_dir = tempdir().unwrap();
3201 let uri = tmp_dir.path().to_str().unwrap();
3202 let table = NativeTable::open(uri).await;
3203 assert!(matches!(table.unwrap_err(), Error::TableNotFound { .. }));
3204 }
3205
3206 #[test]
3207 #[cfg(not(windows))]
3208 fn test_object_store_path() {
3209 use std::path::Path as StdPath;
3210 let p = StdPath::new("s3://bucket/path/to/file");
3211 let c = p.join("subfile");
3212 assert_eq!(c.to_str().unwrap(), "s3://bucket/path/to/file/subfile");
3213 }
3214
3215 #[tokio::test]
3216 async fn test_count_rows() {
3217 let tmp_dir = tempdir().unwrap();
3218 let uri = tmp_dir.path().to_str().unwrap();
3219
3220 let batch = make_test_batches();
3221 let reader: Box<dyn RecordBatchReader + Send> = Box::new(RecordBatchIterator::new(
3222 vec![Ok(batch.clone())],
3223 batch.schema(),
3224 ));
3225 let table = NativeTable::create(
3226 uri,
3227 "test",
3228 vec![],
3229 reader,
3230 None,
3231 None,
3232 None,
3233 None,
3234 HashSet::new(),
3235 )
3236 .await
3237 .unwrap();
3238
3239 assert_eq!(table.count_rows(None).await.unwrap(), 10);
3240 assert_eq!(
3241 table
3242 .count_rows(Some(Filter::Sql("i >= 5".to_string())))
3243 .await
3244 .unwrap(),
3245 5
3246 );
3247 }
3248
3249 #[derive(Default, Debug)]
3250 struct NoOpCacheWrapper {
3251 called: AtomicBool,
3252 }
3253
3254 impl NoOpCacheWrapper {
3255 fn called(&self) -> bool {
3256 self.called.load(Ordering::Relaxed)
3257 }
3258 }
3259
3260 impl WrappingObjectStore for NoOpCacheWrapper {
3261 fn wrap(
3262 &self,
3263 _store_prefix: &str,
3264 original: Arc<dyn object_store::ObjectStore>,
3265 ) -> Arc<dyn object_store::ObjectStore> {
3266 self.called.store(true, Ordering::Relaxed);
3267 original
3268 }
3269 }
3270
3271 #[tokio::test]
3272 async fn test_open_table_options() {
3273 let tmp_dir = tempdir().unwrap();
3274 let dataset_path = tmp_dir.path().join("test.lance");
3275 let uri = dataset_path.to_str().unwrap();
3276 let conn = connect(uri).execute().await.unwrap();
3277
3278 let batches = make_test_batches();
3279
3280 conn.create_table("my_table", batches)
3281 .execute()
3282 .await
3283 .unwrap();
3284
3285 let wrapper = Arc::new(NoOpCacheWrapper::default());
3286
3287 let object_store_params = ObjectStoreParams {
3288 object_store_wrapper: Some(wrapper.clone()),
3289 ..Default::default()
3290 };
3291 let param = ReadParams {
3292 store_options: Some(object_store_params),
3293 ..Default::default()
3294 };
3295 assert!(!wrapper.called());
3296 conn.open_table("my_table")
3297 .lance_read_params(param)
3298 .execute()
3299 .await
3300 .unwrap();
3301 assert!(wrapper.called());
3302 }
3303
3304 fn make_test_batches() -> RecordBatch {
3305 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
3306 RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from_iter_values(0..10))]).unwrap()
3307 }
3308
3309 #[tokio::test]
3310 async fn test_tags() {
3311 let tmp_dir = tempdir().unwrap();
3312 let uri = tmp_dir.path().to_str().unwrap();
3313
3314 let conn = ConnectBuilder::new(uri)
3315 .read_consistency_interval(Duration::from_secs(0))
3316 .execute()
3317 .await
3318 .unwrap();
3319 let table = conn
3320 .create_table("my_table", some_sample_data())
3321 .execute()
3322 .await
3323 .unwrap();
3324 assert_eq!(table.version().await.unwrap(), 1);
3325 table.add(some_sample_data()).execute().await.unwrap();
3326 assert_eq!(table.version().await.unwrap(), 2);
3327 let mut tags_manager = table.tags().await.unwrap();
3328 let tags = tags_manager.list().await.unwrap();
3329 assert!(tags.is_empty(), "Tags should be empty initially");
3330 let tag1 = "tag1";
3331 tags_manager.create(tag1, 1).await.unwrap();
3332 assert_eq!(tags_manager.get_version(tag1).await.unwrap(), 1);
3333 let tags = tags_manager.list().await.unwrap();
3334 assert_eq!(tags.len(), 1);
3335 assert!(tags.contains_key(tag1));
3336 assert_eq!(tags.get(tag1).unwrap().version, 1);
3337 tags_manager.create("tag2", 2).await.unwrap();
3338 assert_eq!(tags_manager.get_version("tag2").await.unwrap(), 2);
3339 let tags = tags_manager.list().await.unwrap();
3340 assert_eq!(tags.len(), 2);
3341 assert!(tags.contains_key(tag1));
3342 assert_eq!(tags.get(tag1).unwrap().version, 1);
3343 assert!(tags.contains_key("tag2"));
3344 assert_eq!(tags.get("tag2").unwrap().version, 2);
3345 table.add(some_sample_data()).execute().await.unwrap();
3347 tags_manager.update(tag1, 3).await.unwrap();
3348 assert_eq!(tags_manager.get_version(tag1).await.unwrap(), 3);
3349 tags_manager.delete("tag2").await.unwrap();
3350 let tags = tags_manager.list().await.unwrap();
3351 assert_eq!(tags.len(), 1);
3352 assert!(tags.contains_key(tag1));
3353 assert_eq!(tags.get(tag1).unwrap().version, 3);
3354 table.add(some_sample_data()).execute().await.unwrap();
3356 assert_eq!(table.version().await.unwrap(), 4);
3357 table.checkout_tag(tag1).await.unwrap();
3358 assert_eq!(table.version().await.unwrap(), 3);
3359 table.checkout_latest().await.unwrap();
3360 assert_eq!(table.version().await.unwrap(), 4);
3361 }
3362
3363 #[tokio::test]
3364 async fn test_create_index() {
3365 use arrow_array::RecordBatch;
3366 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3367 use rand;
3368 use std::iter::repeat_with;
3369
3370 use arrow_array::Float32Array;
3371
3372 let tmp_dir = tempdir().unwrap();
3373 let uri = tmp_dir.path().to_str().unwrap();
3374 let conn = connect(uri).execute().await.unwrap();
3375
3376 let dimension = 16;
3377 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3378 "embeddings",
3379 DataType::FixedSizeList(
3380 Arc::new(Field::new("item", DataType::Float32, true)),
3381 dimension,
3382 ),
3383 false,
3384 )]));
3385
3386 let float_arr = Float32Array::from(
3387 repeat_with(rand::random::<f32>)
3388 .take(512 * dimension as usize)
3389 .collect::<Vec<f32>>(),
3390 );
3391
3392 let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3393 let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
3394
3395 let table = conn.create_table("test", batch).execute().await.unwrap();
3396
3397 assert_eq!(table.index_stats("my_index").await.unwrap(), None);
3398
3399 table
3400 .create_index(&["embeddings"], Index::Auto)
3401 .execute()
3402 .await
3403 .unwrap();
3404
3405 let index_configs = table.list_indices().await.unwrap();
3406 assert_eq!(index_configs.len(), 1);
3407 let index = index_configs.into_iter().next().unwrap();
3408 assert_eq!(index.index_type, crate::index::IndexType::IvfPq);
3409 assert_eq!(index.columns, vec!["embeddings".to_string()]);
3410 assert_eq!(table.count_rows(None).await.unwrap(), 512);
3411 assert_eq!(table.name(), "test");
3412
3413 let indices = table.as_native().unwrap().load_indices().await.unwrap();
3414 let index_name = &indices[0].index_name;
3415 let stats = table.index_stats(index_name).await.unwrap().unwrap();
3416 assert_eq!(stats.num_indexed_rows, 512);
3417 assert_eq!(stats.num_unindexed_rows, 0);
3418 assert_eq!(stats.index_type, crate::index::IndexType::IvfPq);
3419 assert_eq!(stats.distance_type, Some(crate::DistanceType::L2));
3420 assert!(stats.loss.is_some());
3421
3422 table.drop_index(index_name).await.unwrap();
3423 assert_eq!(table.list_indices().await.unwrap().len(), 0);
3424 }
3425
3426 #[tokio::test]
3427 async fn test_dynamic_select() {
3428 let tc = new_test_connection().await.unwrap();
3429 let db = tc.connection;
3430
3431 let table = db
3432 .create_table("test", some_sample_data())
3433 .execute()
3434 .await
3435 .unwrap();
3436
3437 let query = table.query().select(Select::dynamic(&[("i_alias", "i")]));
3438
3439 let result = query.execute().await;
3440 let batches = result
3441 .expect("should have result")
3442 .try_collect::<Vec<_>>()
3443 .await
3444 .unwrap();
3445
3446 for batch in batches {
3447 assert!(batch.column_by_name("i_alias").is_some());
3448 }
3449 }
3450
3451 #[tokio::test]
3452 async fn test_ivf_pq_uses_default_partition_size_for_num_partitions() {
3453 use arrow_array::{Float32Array, RecordBatch};
3454 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3455
3456 use crate::index::vector::IvfPqIndexBuilder;
3457
3458 let tmp_dir = tempdir().unwrap();
3459 let uri = tmp_dir.path().to_str().unwrap();
3460 let conn = connect(uri).execute().await.unwrap();
3461
3462 const PARTITION_SIZE: usize = 8192;
3463 let num_rows = PARTITION_SIZE * 2;
3464 let dimension = 8usize;
3465 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3466 "embeddings",
3467 DataType::FixedSizeList(
3468 Arc::new(Field::new("item", DataType::Float32, true)),
3469 dimension as i32,
3470 ),
3471 false,
3472 )]));
3473
3474 let float_arr =
3475 Float32Array::from_iter_values((0..(num_rows * dimension)).map(|v| v as f32));
3476 let vectors = Arc::new(create_fixed_size_list(float_arr, dimension as i32).unwrap());
3477 let batch = RecordBatch::try_new(schema.clone(), vec![vectors]).unwrap();
3478
3479 let table = conn.create_table("test", batch).execute().await.unwrap();
3480 let native_table = table.as_native().unwrap();
3481 let builder = IvfPqIndexBuilder::default();
3482 table
3483 .create_index(&["embeddings"], Index::IvfPq(builder))
3484 .execute()
3485 .await
3486 .unwrap();
3487 table
3488 .wait_for_index(&["embeddings_idx"], std::time::Duration::from_secs(30))
3489 .await
3490 .unwrap();
3491
3492 use lance::index::DatasetIndexInternalExt;
3493 use lance::index::vector::ivf::v2::IvfPq as LanceIvfPq;
3494 use lance_index::metrics::NoOpMetricsCollector;
3495 use lance_index::vector::VectorIndex as LanceVectorIndex;
3496
3497 let indices = native_table.load_indices().await.unwrap();
3498 let index_uuid = indices[0].index_uuid.clone();
3499
3500 let dataset_guard = native_table.dataset.get().await.unwrap();
3501 let dataset = (*dataset_guard).clone();
3502 drop(dataset_guard);
3503
3504 let lance_index = dataset
3505 .open_vector_index("embeddings", &index_uuid, &NoOpMetricsCollector)
3506 .await
3507 .unwrap();
3508 let ivf_index = lance_index
3509 .as_any()
3510 .downcast_ref::<LanceIvfPq>()
3511 .expect("expected IvfPq index");
3512 let partition_count = ivf_index.ivf_model().num_partitions();
3513
3514 let expected_partitions = num_rows / PARTITION_SIZE;
3515 assert_eq!(partition_count, expected_partitions);
3516 }
3517
3518 #[tokio::test]
3519 async fn test_create_index_ivf_hnsw_sq() {
3520 use arrow_array::RecordBatch;
3521 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3522 use rand;
3523 use std::iter::repeat_with;
3524
3525 use arrow_array::Float32Array;
3526
3527 let tmp_dir = tempdir().unwrap();
3528 let uri = tmp_dir.path().to_str().unwrap();
3529 let conn = connect(uri).execute().await.unwrap();
3530
3531 let dimension = 16;
3532 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3533 "embeddings",
3534 DataType::FixedSizeList(
3535 Arc::new(Field::new("item", DataType::Float32, true)),
3536 dimension,
3537 ),
3538 false,
3539 )]));
3540
3541 let float_arr = Float32Array::from(
3542 repeat_with(rand::random::<f32>)
3543 .take(512 * dimension as usize)
3544 .collect::<Vec<f32>>(),
3545 );
3546
3547 let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3548 let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
3549
3550 let table = conn.create_table("test", batch).execute().await.unwrap();
3551
3552 let stats = table.index_stats("my_index").await.unwrap();
3553 assert!(stats.is_none());
3554
3555 let index = IvfHnswSqIndexBuilder::default();
3556 table
3557 .create_index(&["embeddings"], Index::IvfHnswSq(index))
3558 .execute()
3559 .await
3560 .unwrap();
3561
3562 let index_configs = table.list_indices().await.unwrap();
3563 assert_eq!(index_configs.len(), 1);
3564 let index = index_configs.into_iter().next().unwrap();
3565 assert_eq!(index.index_type, crate::index::IndexType::IvfHnswSq);
3566 assert_eq!(index.columns, vec!["embeddings".to_string()]);
3567 assert_eq!(table.count_rows(None).await.unwrap(), 512);
3568 assert_eq!(table.name(), "test");
3569
3570 let indices = table.as_native().unwrap().load_indices().await.unwrap();
3571 let index_name = &indices[0].index_name;
3572 let stats = table.index_stats(index_name).await.unwrap().unwrap();
3573 assert_eq!(stats.num_indexed_rows, 512);
3574 assert_eq!(stats.num_unindexed_rows, 0);
3575 }
3576
3577 #[tokio::test]
3578 async fn test_create_index_ivf_hnsw_pq() {
3579 use arrow_array::RecordBatch;
3580 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3581 use rand;
3582 use std::iter::repeat_with;
3583
3584 use arrow_array::Float32Array;
3585
3586 let tmp_dir = tempdir().unwrap();
3587 let uri = tmp_dir.path().to_str().unwrap();
3588 let conn = connect(uri).execute().await.unwrap();
3589
3590 let dimension = 16;
3591 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3592 "embeddings",
3593 DataType::FixedSizeList(
3594 Arc::new(Field::new("item", DataType::Float32, true)),
3595 dimension,
3596 ),
3597 false,
3598 )]));
3599
3600 let float_arr = Float32Array::from(
3601 repeat_with(rand::random::<f32>)
3602 .take(512 * dimension as usize)
3603 .collect::<Vec<f32>>(),
3604 );
3605
3606 let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3607 let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
3608
3609 let table = conn.create_table("test", batch).execute().await.unwrap();
3610 let stats = table.index_stats("my_index").await.unwrap();
3611 assert!(stats.is_none());
3612
3613 let index = IvfHnswPqIndexBuilder::default();
3614 table
3615 .create_index(&["embeddings"], Index::IvfHnswPq(index))
3616 .execute()
3617 .await
3618 .unwrap();
3619 table
3620 .wait_for_index(&["embeddings_idx"], Duration::from_millis(10))
3621 .await
3622 .unwrap();
3623 let index_configs = table.list_indices().await.unwrap();
3624 assert_eq!(index_configs.len(), 1);
3625 let index = index_configs.into_iter().next().unwrap();
3626 assert_eq!(index.index_type, crate::index::IndexType::IvfHnswPq);
3627 assert_eq!(index.columns, vec!["embeddings".to_string()]);
3628 assert_eq!(table.count_rows(None).await.unwrap(), 512);
3629 assert_eq!(table.name(), "test");
3630
3631 let indices: Vec<VectorIndex> = table.as_native().unwrap().load_indices().await.unwrap();
3632 let index_name = &indices[0].index_name;
3633 let stats = table.index_stats(index_name).await.unwrap().unwrap();
3634 assert_eq!(stats.num_indexed_rows, 512);
3635 assert_eq!(stats.num_unindexed_rows, 0);
3636 }
3637
3638 #[tokio::test]
3639 async fn test_create_index_ivf_hnsw_flat() {
3640 use arrow_array::RecordBatch;
3641 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3642 use rand;
3643 use std::iter::repeat_with;
3644
3645 use crate::index::vector::IvfHnswFlatIndexBuilder;
3646 use arrow_array::Float32Array;
3647
3648 let tmp_dir = tempdir().unwrap();
3649 let uri = tmp_dir.path().to_str().unwrap();
3650 let conn = connect(uri).execute().await.unwrap();
3651
3652 let dimension = 16;
3653 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3654 "embeddings",
3655 DataType::FixedSizeList(
3656 Arc::new(Field::new("item", DataType::Float32, true)),
3657 dimension,
3658 ),
3659 false,
3660 )]));
3661
3662 let float_arr = Float32Array::from(
3663 repeat_with(rand::random::<f32>)
3664 .take(512 * dimension as usize)
3665 .collect::<Vec<f32>>(),
3666 );
3667
3668 let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3669 let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
3670
3671 let table = conn.create_table("test", batch).execute().await.unwrap();
3672
3673 let index = IvfHnswFlatIndexBuilder::default();
3674 table
3675 .create_index(&["embeddings"], Index::IvfHnswFlat(index))
3676 .execute()
3677 .await
3678 .unwrap();
3679
3680 let index_configs = table.list_indices().await.unwrap();
3681 assert_eq!(index_configs.len(), 1);
3682 let index = index_configs.into_iter().next().unwrap();
3683 assert_eq!(index.index_type, crate::index::IndexType::IvfHnswFlat);
3684 assert_eq!(index.columns, vec!["embeddings".to_string()]);
3685 assert_eq!(table.count_rows(None).await.unwrap(), 512);
3686 }
3687
3688 fn create_fixed_size_list<T: Array>(values: T, list_size: i32) -> Result<FixedSizeListArray> {
3689 let list_type = DataType::FixedSizeList(
3690 Arc::new(Field::new("item", values.data_type().clone(), true)),
3691 list_size,
3692 );
3693 let data = ArrayDataBuilder::new(list_type)
3694 .len(values.len() / list_size as usize)
3695 .add_child_data(values.into_data())
3696 .build()
3697 .unwrap();
3698
3699 Ok(FixedSizeListArray::from(data))
3700 }
3701
3702 fn some_sample_data() -> Box<dyn arrow_array::RecordBatchReader + Send> {
3703 let batch = RecordBatch::try_new(
3704 Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
3705 vec![Arc::new(Int32Array::from(vec![1]))],
3706 )
3707 .unwrap();
3708 let schema = batch.schema().clone();
3709 let batch = Ok(batch);
3710
3711 Box::new(RecordBatchIterator::new(vec![batch], schema))
3712 }
3713
3714 #[tokio::test]
3715 async fn test_create_scalar_index() {
3716 let tmp_dir = tempdir().unwrap();
3717 let uri = tmp_dir.path().to_str().unwrap();
3718
3719 let batch = RecordBatch::try_new(
3720 Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
3721 vec![Arc::new(Int32Array::from(vec![1]))],
3722 )
3723 .unwrap();
3724 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3725 let table = conn
3726 .create_table("my_table", batch.clone())
3727 .execute()
3728 .await
3729 .unwrap();
3730
3731 table
3733 .create_index(&["i"], Index::Auto)
3734 .execute()
3735 .await
3736 .unwrap();
3737 table
3738 .wait_for_index(&["i_idx"], Duration::from_millis(10))
3739 .await
3740 .unwrap();
3741 let index_configs = table.list_indices().await.unwrap();
3742 assert_eq!(index_configs.len(), 1);
3743 let index = index_configs.into_iter().next().unwrap();
3744 assert_eq!(index.index_type, crate::index::IndexType::BTree);
3745 assert_eq!(index.columns, vec!["i".to_string()]);
3746
3747 table
3749 .create_index(&["i"], Index::BTree(BTreeIndexBuilder::default()))
3750 .execute()
3751 .await
3752 .unwrap();
3753
3754 let index_configs = table.list_indices().await.unwrap();
3755 assert_eq!(index_configs.len(), 1);
3756 let index = index_configs.into_iter().next().unwrap();
3757 assert_eq!(index.index_type, crate::index::IndexType::BTree);
3758 assert_eq!(index.columns, vec!["i".to_string()]);
3759
3760 let indices = table.as_native().unwrap().load_indices().await.unwrap();
3761 let index_name = &indices[0].index_name;
3762 let stats = table.index_stats(index_name).await.unwrap().unwrap();
3763 assert_eq!(stats.num_indexed_rows, 1);
3764 assert_eq!(stats.num_unindexed_rows, 0);
3765 }
3766
3767 #[tokio::test]
3768 async fn test_create_index_nested_field_paths() {
3769 let tmp_dir = tempdir().unwrap();
3770 let uri = tmp_dir.path().to_str().unwrap();
3771 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3772
3773 let num_rows = 512;
3774 let dimension = 8;
3775
3776 let metadata = Arc::new(StructArray::from(vec![(
3777 Arc::new(Field::new("user_id", DataType::Int32, false)),
3778 Arc::new(Int32Array::from_iter_values(0..num_rows)) as ArrayRef,
3779 )]));
3780
3781 let vector_values = arrow_array::Float32Array::from_iter_values(
3782 (0..num_rows * dimension).map(|v| v as f32),
3783 );
3784 let embeddings =
3785 Arc::new(create_fixed_size_list(vector_values, dimension).unwrap()) as ArrayRef;
3786 let image = Arc::new(StructArray::from(vec![(
3787 Arc::new(Field::new(
3788 "embedding",
3789 embeddings.data_type().clone(),
3790 false,
3791 )),
3792 embeddings,
3793 )]));
3794
3795 let payload = Arc::new(StructArray::from(vec![(
3796 Arc::new(Field::new("text", DataType::Utf8, false)),
3797 Arc::new(StringArray::from_iter_values(
3798 (0..num_rows).map(|i| format!("document {}", i)),
3799 )) as ArrayRef,
3800 )]));
3801
3802 let meta_data = Arc::new(StructArray::from(vec![(
3803 Arc::new(Field::new("user-id", DataType::Int32, false)),
3804 Arc::new(Int32Array::from_iter_values(0..num_rows)) as ArrayRef,
3805 )]));
3806
3807 let literal = Arc::new(StructArray::from(vec![(
3808 Arc::new(Field::new("a.b", DataType::Int32, false)),
3809 Arc::new(Int32Array::from_iter_values(0..num_rows)) as ArrayRef,
3810 )]));
3811
3812 let schema = Arc::new(Schema::new(vec![
3813 Field::new("metadata", metadata.data_type().clone(), false),
3814 Field::new("image", image.data_type().clone(), false),
3815 Field::new("payload", payload.data_type().clone(), false),
3816 Field::new("meta-data", meta_data.data_type().clone(), false),
3817 Field::new("literal", literal.data_type().clone(), false),
3818 ]));
3819 let batch =
3820 RecordBatch::try_new(schema, vec![metadata, image, payload, meta_data, literal])
3821 .unwrap();
3822
3823 let table = conn
3824 .create_table("nested_index_paths", batch)
3825 .execute()
3826 .await
3827 .unwrap();
3828
3829 table
3830 .create_index(
3831 &["metadata.user_id"],
3832 Index::BTree(BTreeIndexBuilder::default()),
3833 )
3834 .name("metadata_user_id_idx".to_string())
3835 .execute()
3836 .await
3837 .unwrap();
3838 table
3839 .create_index(&["image.embedding"], Index::Auto)
3840 .name("image_embedding_idx".to_string())
3841 .execute()
3842 .await
3843 .unwrap();
3844 table
3845 .create_index(&["payload.text"], Index::FTS(Default::default()))
3846 .name("payload_text_idx".to_string())
3847 .execute()
3848 .await
3849 .unwrap();
3850 table
3851 .create_index(
3852 &["`meta-data`.`user-id`"],
3853 Index::BTree(BTreeIndexBuilder::default()),
3854 )
3855 .name("escaped_names_idx".to_string())
3856 .execute()
3857 .await
3858 .unwrap();
3859 table
3860 .create_index(
3861 &["literal.`a.b`"],
3862 Index::BTree(BTreeIndexBuilder::default()),
3863 )
3864 .name("literal_dot_idx".to_string())
3865 .execute()
3866 .await
3867 .unwrap();
3868
3869 let mut index_configs = table.list_indices().await.unwrap();
3870 index_configs.sort_by(|left, right| left.name.cmp(&right.name));
3871
3872 let indexed_columns = index_configs
3873 .iter()
3874 .map(|index| {
3875 (
3876 index.name.as_str(),
3877 index.columns.as_slice(),
3878 index.index_type.clone(),
3879 )
3880 })
3881 .collect::<Vec<_>>();
3882 assert_eq!(
3883 indexed_columns,
3884 vec![
3885 (
3886 "escaped_names_idx",
3887 &["`meta-data`.`user-id`".to_string()][..],
3888 crate::index::IndexType::BTree,
3889 ),
3890 (
3891 "image_embedding_idx",
3892 &["image.embedding".to_string()][..],
3893 crate::index::IndexType::IvfPq,
3894 ),
3895 (
3896 "literal_dot_idx",
3897 &["literal.`a.b`".to_string()][..],
3898 crate::index::IndexType::BTree,
3899 ),
3900 (
3901 "metadata_user_id_idx",
3902 &["metadata.user_id".to_string()][..],
3903 crate::index::IndexType::BTree,
3904 ),
3905 (
3906 "payload_text_idx",
3907 &["payload.text".to_string()][..],
3908 crate::index::IndexType::FTS,
3909 ),
3910 ]
3911 );
3912
3913 let vector_results = table
3914 .query()
3915 .nearest_to(&[0.0; 8])
3916 .unwrap()
3917 .column("image.embedding")
3918 .limit(1)
3919 .execute()
3920 .await
3921 .unwrap()
3922 .try_collect::<Vec<_>>()
3923 .await
3924 .unwrap();
3925 assert_eq!(
3926 vector_results
3927 .iter()
3928 .map(|batch| batch.num_rows())
3929 .sum::<usize>(),
3930 1
3931 );
3932
3933 let default_vector_results = table
3934 .query()
3935 .nearest_to(&[0.0; 8])
3936 .unwrap()
3937 .limit(1)
3938 .execute()
3939 .await
3940 .unwrap()
3941 .try_collect::<Vec<_>>()
3942 .await
3943 .unwrap();
3944 assert_eq!(
3945 default_vector_results
3946 .iter()
3947 .map(|batch| batch.num_rows())
3948 .sum::<usize>(),
3949 1
3950 );
3951
3952 let fts_results = table
3953 .query()
3954 .full_text_search(FullTextSearchQuery::new("document".to_string()))
3955 .limit(5)
3956 .execute()
3957 .await
3958 .unwrap()
3959 .try_collect::<Vec<_>>()
3960 .await
3961 .unwrap();
3962 assert!(!fts_results.is_empty());
3963
3964 let filtered_results = table
3965 .query()
3966 .only_if("metadata.user_id = 42")
3967 .limit(1)
3968 .execute()
3969 .await
3970 .unwrap()
3971 .try_collect::<Vec<_>>()
3972 .await
3973 .unwrap();
3974 assert_eq!(
3975 filtered_results
3976 .iter()
3977 .map(|batch| batch.num_rows())
3978 .sum::<usize>(),
3979 1
3980 );
3981 }
3982
3983 #[tokio::test]
3984 async fn test_create_bitmap_index() {
3985 let tmp_dir = tempdir().unwrap();
3986 let uri = tmp_dir.path().to_str().unwrap();
3987
3988 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3989
3990 let schema = Arc::new(Schema::new(vec![
3991 Field::new("id", DataType::Int32, false),
3992 Field::new("category", DataType::Utf8, true),
3993 Field::new("large_category", DataType::LargeUtf8, true),
3994 Field::new("is_active", DataType::Boolean, true),
3995 Field::new("data", DataType::Binary, true),
3996 Field::new("large_data", DataType::LargeBinary, true),
3997 ]));
3998
3999 let batch = RecordBatch::try_new(
4000 schema.clone(),
4001 vec![
4002 Arc::new(Int32Array::from_iter_values(0..100)),
4003 Arc::new(StringArray::from_iter_values(
4004 (0..100).map(|i| format!("category_{}", i % 5)),
4005 )),
4006 Arc::new(LargeStringArray::from_iter_values(
4007 (0..100).map(|i| format!("large_category_{}", i % 5)),
4008 )),
4009 Arc::new(BooleanArray::from_iter((0..100).map(|i| Some(i % 2 == 0)))),
4010 Arc::new(BinaryArray::from_iter_values(
4011 (0_u32..100).map(|i| i.to_le_bytes()),
4012 )),
4013 Arc::new(LargeBinaryArray::from_iter_values(
4014 (0_u32..100).map(|i| i.to_le_bytes()),
4015 )),
4016 ],
4017 )
4018 .unwrap();
4019
4020 let table = conn
4021 .create_table("test_bitmap", batch.clone())
4022 .execute()
4023 .await
4024 .unwrap();
4025
4026 table
4028 .create_index(&["category"], Index::Bitmap(Default::default()))
4029 .execute()
4030 .await
4031 .unwrap();
4032
4033 table
4035 .create_index(&["is_active"], Index::Bitmap(Default::default()))
4036 .execute()
4037 .await
4038 .unwrap();
4039
4040 table
4042 .create_index(&["data"], Index::Bitmap(Default::default()))
4043 .execute()
4044 .await
4045 .unwrap();
4046
4047 table
4049 .create_index(&["large_data"], Index::Bitmap(Default::default()))
4050 .execute()
4051 .await
4052 .unwrap();
4053
4054 table
4056 .create_index(&["large_category"], Index::Bitmap(Default::default()))
4057 .execute()
4058 .await
4059 .unwrap();
4060
4061 let index_configs = table.list_indices().await.unwrap();
4063 assert_eq!(index_configs.len(), 5);
4064
4065 let mut configs_iter = index_configs.into_iter();
4066 let index = configs_iter.next().unwrap();
4067 assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4068 assert_eq!(index.columns, vec!["category".to_string()]);
4069
4070 let index = configs_iter.next().unwrap();
4071 assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4072 assert_eq!(index.columns, vec!["is_active".to_string()]);
4073
4074 let index = configs_iter.next().unwrap();
4075 assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4076 assert_eq!(index.columns, vec!["data".to_string()]);
4077
4078 let index = configs_iter.next().unwrap();
4079 assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4080 assert_eq!(index.columns, vec!["large_data".to_string()]);
4081
4082 let index = configs_iter.next().unwrap();
4083 assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4084 assert_eq!(index.columns, vec!["large_category".to_string()]);
4085 }
4086
4087 #[tokio::test]
4088 async fn test_create_label_list_index() {
4089 let tmp_dir = tempdir().unwrap();
4090 let uri = tmp_dir.path().to_str().unwrap();
4091
4092 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4093
4094 let schema = Arc::new(Schema::new(vec![
4095 Field::new("id", DataType::Int32, false),
4096 Field::new(
4097 "tags",
4098 DataType::List(Field::new("item", DataType::Utf8, true).into()),
4099 true,
4100 ),
4101 ]));
4102
4103 const TAGS: [&str; 3] = ["cat", "dog", "fish"];
4104
4105 let values_builder = StringBuilder::new();
4106 let mut builder = ListBuilder::new(values_builder);
4107 for i in 0..120 {
4108 builder.values().append_value(TAGS[i % 3]);
4109 if i % 3 == 0 {
4110 builder.append(true)
4111 }
4112 }
4113 let tags = Arc::new(builder.finish());
4114
4115 let batch = RecordBatch::try_new(
4116 schema.clone(),
4117 vec![Arc::new(Int32Array::from_iter_values(0..40)), tags],
4118 )
4119 .unwrap();
4120
4121 let table = conn
4122 .create_table("test_bitmap", batch.clone())
4123 .execute()
4124 .await
4125 .unwrap();
4126
4127 assert!(
4129 table
4130 .create_index(&["tags"], Index::BTree(Default::default()))
4131 .execute()
4132 .await
4133 .is_err()
4134 );
4135 assert!(
4136 table
4137 .create_index(&["tags"], Index::Bitmap(Default::default()))
4138 .execute()
4139 .await
4140 .is_err()
4141 );
4142
4143 table
4145 .create_index(&["tags"], Index::LabelList(Default::default()))
4146 .execute()
4147 .await
4148 .unwrap();
4149
4150 let index_configs = table.list_indices().await.unwrap();
4152 assert_eq!(index_configs.len(), 1);
4153 let index = index_configs.into_iter().next().unwrap();
4154 assert_eq!(index.index_type, crate::index::IndexType::LabelList);
4155 assert_eq!(index.columns, vec!["tags".to_string()]);
4156 }
4157
4158 #[tokio::test]
4159 async fn test_create_inverted_index() {
4160 let tmp_dir = tempdir().unwrap();
4161 let uri = tmp_dir.path().to_str().unwrap();
4162
4163 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4164 const WORDS: [&str; 3] = ["cat", "dog", "fish"];
4165 let mut text_builder = StringBuilder::new();
4166 let num_rows = 120;
4167 for i in 0..num_rows {
4168 text_builder.append_value(WORDS[i % 3]);
4169 }
4170 let text = Arc::new(text_builder.finish());
4171
4172 let schema = Arc::new(Schema::new(vec![
4173 Field::new("id", DataType::Int32, false),
4174 Field::new("text", DataType::Utf8, true),
4175 ]));
4176 let batch = RecordBatch::try_new(
4177 schema.clone(),
4178 vec![
4179 Arc::new(Int32Array::from_iter_values(0..num_rows as i32)),
4180 text,
4181 ],
4182 )
4183 .unwrap();
4184
4185 let table = conn
4186 .create_table("test_bitmap", batch.clone())
4187 .execute()
4188 .await
4189 .unwrap();
4190
4191 table
4192 .create_index(&["text"], Index::FTS(Default::default()))
4193 .execute()
4194 .await
4195 .unwrap();
4196 let index_configs = table.list_indices().await.unwrap();
4197 assert_eq!(index_configs.len(), 1);
4198 let index = index_configs.into_iter().next().unwrap();
4199 assert_eq!(index.index_type, crate::index::IndexType::FTS);
4200 assert_eq!(index.columns, vec!["text".to_string()]);
4201 assert_eq!(index.name, "text_idx");
4202
4203 let stats = table.index_stats("text_idx").await.unwrap().unwrap();
4204 assert_eq!(stats.num_indexed_rows, num_rows);
4205 assert_eq!(stats.num_unindexed_rows, 0);
4206 assert_eq!(stats.index_type, crate::index::IndexType::FTS);
4207 assert_eq!(stats.distance_type, None);
4208
4209 table.prewarm_index("text_idx").await.unwrap();
4211 }
4212
4213 #[cfg(not(target_os = "windows"))]
4215 #[tokio::test]
4216 async fn test_read_consistency_interval() {
4217 let intervals = vec![
4218 None,
4219 Some(0),
4220 Some(100), ];
4222
4223 for interval in intervals {
4224 let data = some_sample_data();
4225
4226 let tmp_dir = tempdir().unwrap();
4227 let uri = tmp_dir.path().to_str().unwrap();
4228
4229 let conn1 = ConnectBuilder::new(uri).execute().await.unwrap();
4230 let table1 = conn1
4231 .create_empty_table("my_table", RecordBatchReader::schema(&data))
4232 .execute()
4233 .await
4234 .unwrap();
4235
4236 let mut conn2 = ConnectBuilder::new(uri);
4237 if let Some(interval) = interval {
4238 conn2 = conn2.read_consistency_interval(std::time::Duration::from_millis(interval));
4239 }
4240 let conn2 = conn2.execute().await.unwrap();
4241 let table2 = conn2.open_table("my_table").execute().await.unwrap();
4242
4243 assert_eq!(table1.count_rows(None).await.unwrap(), 0);
4244 assert_eq!(table2.count_rows(None).await.unwrap(), 0);
4245
4246 table1.add(data).execute().await.unwrap();
4247 assert_eq!(table1.count_rows(None).await.unwrap(), 1);
4248
4249 match interval {
4250 None => {
4251 assert_eq!(table2.count_rows(None).await.unwrap(), 0);
4252 table2.checkout_latest().await.unwrap();
4253 assert_eq!(table2.count_rows(None).await.unwrap(), 1);
4254 }
4255 Some(0) => {
4256 assert_eq!(table2.count_rows(None).await.unwrap(), 1);
4257 }
4258 Some(100) => {
4259 assert_eq!(table2.count_rows(None).await.unwrap(), 0);
4260 tokio::time::sleep(Duration::from_millis(100)).await;
4261 assert_eq!(table2.count_rows(None).await.unwrap(), 1);
4262 }
4263 _ => unreachable!(),
4264 }
4265 }
4266 }
4267
4268 #[tokio::test]
4269 async fn test_time_travel_write() {
4270 let tmp_dir = tempdir().unwrap();
4271 let uri = tmp_dir.path().to_str().unwrap();
4272
4273 let conn = ConnectBuilder::new(uri)
4274 .read_consistency_interval(Duration::from_secs(0))
4275 .execute()
4276 .await
4277 .unwrap();
4278 let table = conn
4279 .create_table("my_table", some_sample_data())
4280 .execute()
4281 .await
4282 .unwrap();
4283 let version = table.version().await.unwrap();
4284 table.add(some_sample_data()).execute().await.unwrap();
4285 table.checkout(version).await.unwrap();
4286 assert!(table.add(some_sample_data()).execute().await.is_err())
4287 }
4288
4289 #[tokio::test]
4290 async fn test_update_dataset_config() {
4291 let tmp_dir = tempdir().unwrap();
4292 let uri = tmp_dir.path().to_str().unwrap();
4293
4294 let conn = ConnectBuilder::new(uri)
4295 .read_consistency_interval(Duration::from_secs(0))
4296 .execute()
4297 .await
4298 .unwrap();
4299
4300 let table = conn
4301 .create_table("my_table", some_sample_data())
4302 .execute()
4303 .await
4304 .unwrap();
4305 let native_tbl = table.as_native().unwrap();
4306
4307 let manifest = native_tbl.manifest().await.unwrap();
4308 let base_config_len = manifest.config.len();
4309
4310 native_tbl
4311 .update_config(vec![("test_key1".to_string(), "test_val1".to_string())])
4312 .await
4313 .unwrap();
4314
4315 let manifest = native_tbl.manifest().await.unwrap();
4316 assert_eq!(manifest.config.len(), 1 + base_config_len);
4317 assert_eq!(
4318 manifest.config.get("test_key1"),
4319 Some(&"test_val1".to_string())
4320 );
4321
4322 native_tbl
4323 .update_config(vec![("test_key2".to_string(), "test_val2".to_string())])
4324 .await
4325 .unwrap();
4326 let manifest = native_tbl.manifest().await.unwrap();
4327 assert_eq!(manifest.config.len(), 2 + base_config_len);
4328 assert_eq!(
4329 manifest.config.get("test_key1"),
4330 Some(&"test_val1".to_string())
4331 );
4332 assert_eq!(
4333 manifest.config.get("test_key2"),
4334 Some(&"test_val2".to_string())
4335 );
4336
4337 native_tbl
4338 .update_config(vec![(
4339 "test_key2".to_string(),
4340 "test_val2_update".to_string(),
4341 )])
4342 .await
4343 .unwrap();
4344 let manifest = native_tbl.manifest().await.unwrap();
4345 assert_eq!(manifest.config.len(), 2 + base_config_len);
4346 assert_eq!(
4347 manifest.config.get("test_key1"),
4348 Some(&"test_val1".to_string())
4349 );
4350 assert_eq!(
4351 manifest.config.get("test_key2"),
4352 Some(&"test_val2_update".to_string())
4353 );
4354
4355 native_tbl.delete_config_keys(&["test_key1"]).await.unwrap();
4356 let manifest = native_tbl.manifest().await.unwrap();
4357 assert_eq!(manifest.config.len(), 1 + base_config_len);
4358 assert_eq!(
4359 manifest.config.get("test_key2"),
4360 Some(&"test_val2_update".to_string())
4361 );
4362 }
4363
4364 #[tokio::test]
4365 async fn test_schema_metadata_config() {
4366 let tmp_dir = tempdir().unwrap();
4367 let uri = tmp_dir.path().to_str().unwrap();
4368
4369 let conn = ConnectBuilder::new(uri)
4370 .read_consistency_interval(Duration::from_secs(0))
4371 .execute()
4372 .await
4373 .unwrap();
4374 let table = conn
4375 .create_table("my_table", some_sample_data())
4376 .execute()
4377 .await
4378 .unwrap();
4379
4380 let native_tbl = table.as_native().unwrap();
4381 let schema = native_tbl.schema().await.unwrap();
4382 let metadata = schema.metadata();
4383 assert_eq!(metadata.len(), 0);
4384
4385 native_tbl
4386 .replace_schema_metadata(vec![("test_key1".to_string(), "test_val1".to_string())])
4387 .await
4388 .unwrap();
4389
4390 let schema = native_tbl.schema().await.unwrap();
4391 let metadata = schema.metadata();
4392 assert_eq!(metadata.len(), 1);
4393 assert_eq!(metadata.get("test_key1"), Some(&"test_val1".to_string()));
4394
4395 native_tbl
4396 .replace_schema_metadata(vec![
4397 ("test_key1".to_string(), "test_val1_update".to_string()),
4398 ("test_key2".to_string(), "test_val2".to_string()),
4399 ])
4400 .await
4401 .unwrap();
4402 let schema = native_tbl.schema().await.unwrap();
4403 let metadata = schema.metadata();
4404 assert_eq!(metadata.len(), 2);
4405 assert_eq!(
4406 metadata.get("test_key1"),
4407 Some(&"test_val1_update".to_string())
4408 );
4409 assert_eq!(metadata.get("test_key2"), Some(&"test_val2".to_string()));
4410
4411 native_tbl
4412 .replace_schema_metadata(vec![(
4413 "test_key2".to_string(),
4414 "test_val2_update".to_string(),
4415 )])
4416 .await
4417 .unwrap();
4418 let schema = native_tbl.schema().await.unwrap();
4419 let metadata = schema.metadata();
4420 assert_eq!(
4421 metadata.get("test_key2"),
4422 Some(&"test_val2_update".to_string())
4423 );
4424 }
4425
4426 #[tokio::test]
4427 pub async fn test_field_metadata_update() {
4428 let tmp_dir = tempdir().unwrap();
4429 let uri = tmp_dir.path().to_str().unwrap();
4430
4431 let conn = ConnectBuilder::new(uri)
4432 .read_consistency_interval(Duration::from_secs(0))
4433 .execute()
4434 .await
4435 .unwrap();
4436 let table = conn
4437 .create_table("my_table", some_sample_data())
4438 .execute()
4439 .await
4440 .unwrap();
4441
4442 let native_tbl = table.as_native().unwrap();
4443 let schema = native_tbl.manifest().await.unwrap().schema;
4444
4445 let field = schema.field("i").unwrap();
4446 assert_eq!(field.metadata.len(), 0);
4447
4448 native_tbl
4449 .replace_schema_metadata(vec![(
4450 "test_key2".to_string(),
4451 "test_val2_update".to_string(),
4452 )])
4453 .await
4454 .unwrap();
4455
4456 let schema = native_tbl.schema().await.unwrap();
4457 let metadata = schema.metadata();
4458 assert_eq!(metadata.len(), 1);
4459 assert_eq!(
4460 metadata.get("test_key2"),
4461 Some(&"test_val2_update".to_string())
4462 );
4463
4464 let mut new_field_metadata = HashMap::<String, String>::new();
4465 new_field_metadata.insert("test_field_key1".into(), "test_field_val1".into());
4466 native_tbl
4467 .replace_field_metadata(vec![(field.id as u32, new_field_metadata)])
4468 .await
4469 .unwrap();
4470
4471 let schema = native_tbl.manifest().await.unwrap().schema;
4472 let field = schema.field("i").unwrap();
4473 assert_eq!(field.metadata.len(), 1);
4474 assert_eq!(
4475 field.metadata.get("test_field_key1"),
4476 Some(&"test_field_val1".to_string())
4477 );
4478 }
4479
4480 #[tokio::test]
4481 async fn test_set_unenforced_primary_key() {
4482 let tmp_dir = tempdir().unwrap();
4483 let uri = tmp_dir.path().to_str().unwrap();
4484
4485 let schema = Arc::new(Schema::new(vec![
4486 Field::new("id", DataType::Int64, false),
4487 Field::new("name", DataType::Utf8, true),
4488 Field::new("score", DataType::Float64, true),
4489 ]));
4490 let batch = RecordBatch::try_new(
4491 schema.clone(),
4492 vec![
4493 Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])),
4494 Arc::new(StringArray::from(vec!["a", "b", "c"])),
4495 Arc::new(arrow_array::Float64Array::from(vec![1.0, 2.0, 3.0])),
4496 ],
4497 )
4498 .unwrap();
4499 let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
4500 Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
4501
4502 let conn = ConnectBuilder::new(uri)
4503 .read_consistency_interval(Duration::from_secs(0))
4504 .execute()
4505 .await
4506 .unwrap();
4507 let table = conn.create_table("t", reader).execute().await.unwrap();
4508
4509 let err = table
4511 .set_unenforced_primary_key(Vec::<&str>::new())
4512 .await
4513 .expect_err("empty input should be rejected");
4514 assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
4515
4516 let err = table
4518 .set_unenforced_primary_key(["id", "name"])
4519 .await
4520 .expect_err("compound primary key should be rejected");
4521 assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
4522
4523 let err = table
4525 .set_unenforced_primary_key(["nonexistent"])
4526 .await
4527 .expect_err("nonexistent column should be rejected");
4528 assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
4529
4530 let err = table
4532 .set_unenforced_primary_key(["score"])
4533 .await
4534 .expect_err("Float64 should be rejected");
4535 assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
4536
4537 let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema;
4539 assert!(lance_schema.unenforced_primary_key().is_empty());
4540
4541 table.set_unenforced_primary_key(["id"]).await.unwrap();
4543 let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema;
4544 let pk = lance_schema.unenforced_primary_key();
4545 assert_eq!(pk.len(), 1);
4546 assert_eq!(pk[0].name, "id");
4547 assert_eq!(
4549 pk[0].metadata.get(LANCE_UNENFORCED_PRIMARY_KEY_POSITION),
4550 Some(&"1".to_string())
4551 );
4552
4553 let err = table
4556 .set_unenforced_primary_key(["id"])
4557 .await
4558 .expect_err("re-setting the same primary key should be rejected");
4559 assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
4560 let err = table
4561 .set_unenforced_primary_key(["name"])
4562 .await
4563 .expect_err("changing the primary key should be rejected");
4564 assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
4565
4566 let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema;
4568 let pk = lance_schema.unenforced_primary_key();
4569 assert_eq!(pk.len(), 1);
4570 assert_eq!(pk[0].name, "id");
4571 }
4572
4573 #[tokio::test]
4574 async fn test_set_unenforced_primary_key_concurrent() {
4575 let tmp_dir = tempdir().unwrap();
4576 let uri = tmp_dir.path().to_str().unwrap();
4577
4578 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
4579 let batch = RecordBatch::try_new(
4580 schema.clone(),
4581 vec![Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3]))],
4582 )
4583 .unwrap();
4584 let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
4585 Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
4586
4587 let conn = ConnectBuilder::new(uri)
4591 .read_consistency_interval(Duration::from_secs(3600))
4592 .execute()
4593 .await
4594 .unwrap();
4595 conn.create_table("t", reader).execute().await.unwrap();
4596
4597 let table_a = conn.open_table("t").execute().await.unwrap();
4598 let table_b = conn.open_table("t").execute().await.unwrap();
4599
4600 table_a.set_unenforced_primary_key(["id"]).await.unwrap();
4602
4603 let err = table_b
4609 .set_unenforced_primary_key(["id"])
4610 .await
4611 .expect_err("concurrent primary key commit on a stale base should fail");
4612 assert!(
4613 !matches!(err, Error::InvalidInput { .. }),
4614 "expected a commit-time conflict, not an up-front input error: {:?}",
4615 err
4616 );
4617
4618 let fresh = conn.open_table("t").execute().await.unwrap();
4620 let lance_schema = fresh.as_native().unwrap().manifest().await.unwrap().schema;
4621 let pk = lance_schema.unenforced_primary_key();
4622 assert_eq!(pk.len(), 1);
4623 assert_eq!(pk[0].name, "id");
4624 }
4625
4626 #[tokio::test]
4627 async fn test_set_lsm_write_spec() {
4628 use arrow_array::StringArray;
4629 use lance::dataset::mem_wal::DatasetMemWalExt;
4630
4631 let tmp_dir = tempdir().unwrap();
4632 let uri = tmp_dir.path().to_str().unwrap();
4633
4634 let schema = Arc::new(Schema::new(vec![
4635 Field::new("id", DataType::Int64, false),
4636 Field::new("name", DataType::Utf8, true),
4637 ]));
4638 let batch = RecordBatch::try_new(
4639 schema.clone(),
4640 vec![
4641 Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])),
4642 Arc::new(StringArray::from(vec!["a", "b", "c"])),
4643 ],
4644 )
4645 .unwrap();
4646 let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
4647 Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
4648
4649 let conn = ConnectBuilder::new(uri)
4650 .read_consistency_interval(Duration::from_secs(0))
4651 .execute()
4652 .await
4653 .unwrap();
4654 let table = conn.create_table("t", reader).execute().await.unwrap();
4655
4656 for bad in [0u32, 1025] {
4658 let err = table
4659 .set_lsm_write_spec(LsmWriteSpec::bucket("id", bad))
4660 .await
4661 .expect_err("should reject");
4662 assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
4663 }
4664
4665 table
4667 .set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
4668 .await
4669 .unwrap();
4670
4671 let native_tbl = table.as_native().unwrap();
4672 let dataset = native_tbl.dataset.get().await.unwrap();
4673 let details = dataset
4674 .mem_wal_index_details()
4675 .await
4676 .unwrap()
4677 .expect("MemWAL index should be initialized");
4678 assert_eq!(details.num_shards, 4);
4679 assert_eq!(details.sharding_specs.len(), 1);
4680 let installed = &details.sharding_specs[0];
4681 assert_eq!(installed.fields.len(), 1);
4682 let f = &installed.fields[0];
4683 assert_eq!(f.transform.as_deref(), Some("bucket"));
4684 assert_eq!(
4685 f.parameters.get("num_buckets").map(String::as_str),
4686 Some("4")
4687 );
4688 assert_eq!(f.parameters.len(), 1);
4690
4691 let err = table
4693 .set_lsm_write_spec(LsmWriteSpec::bucket("id", 8))
4694 .await
4695 .expect_err("mutation should be rejected");
4696 assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
4697 }
4698
4699 #[tokio::test]
4700 async fn test_set_lsm_write_spec_unsharded() {
4701 use lance::dataset::mem_wal::DatasetMemWalExt;
4702
4703 let tmp_dir = tempdir().unwrap();
4704 let uri = tmp_dir.path().to_str().unwrap();
4705
4706 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
4707 let batch = RecordBatch::try_new(
4708 schema.clone(),
4709 vec![Arc::new(arrow_array::Int64Array::from(vec![1]))],
4710 )
4711 .unwrap();
4712 let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
4713 Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
4714 let conn = ConnectBuilder::new(uri)
4715 .read_consistency_interval(Duration::from_secs(0))
4716 .execute()
4717 .await
4718 .unwrap();
4719 let table = conn.create_table("t", reader).execute().await.unwrap();
4720
4721 table
4722 .set_lsm_write_spec(LsmWriteSpec::unsharded())
4723 .await
4724 .unwrap();
4725
4726 let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
4727 let details = dataset
4728 .mem_wal_index_details()
4729 .await
4730 .unwrap()
4731 .expect("MemWAL index should be initialized");
4732 assert_eq!(details.num_shards, 1);
4733 assert_eq!(details.sharding_specs.len(), 1);
4734 let f = &details.sharding_specs[0].fields[0];
4735 assert_eq!(f.transform.as_deref(), Some("unsharded"));
4736 assert!(f.source_ids.is_empty());
4737 }
4738
4739 #[tokio::test]
4740 async fn test_set_lsm_write_spec_identity() {
4741 use lance::dataset::mem_wal::DatasetMemWalExt;
4742
4743 let tmp_dir = tempdir().unwrap();
4744 let uri = tmp_dir.path().to_str().unwrap();
4745
4746 let schema = Arc::new(Schema::new(vec![
4747 Field::new("id", DataType::Int64, false),
4748 Field::new("region", DataType::Utf8, true),
4749 ]));
4750 let batch = RecordBatch::try_new(
4751 schema.clone(),
4752 vec![
4753 Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])),
4754 Arc::new(StringArray::from(vec!["a", "b", "c"])),
4755 ],
4756 )
4757 .unwrap();
4758 let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
4759 Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
4760 let conn = ConnectBuilder::new(uri)
4761 .read_consistency_interval(Duration::from_secs(0))
4762 .execute()
4763 .await
4764 .unwrap();
4765 let table = conn.create_table("t", reader).execute().await.unwrap();
4766
4767 table
4768 .set_lsm_write_spec(
4769 LsmWriteSpec::identity("region")
4770 .with_writer_config_defaults([("durable_write", "false")]),
4771 )
4772 .await
4773 .unwrap();
4774
4775 let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
4776 let details = dataset
4777 .mem_wal_index_details()
4778 .await
4779 .unwrap()
4780 .expect("MemWAL index should be initialized");
4781 assert_eq!(details.num_shards, 0);
4783 assert_eq!(details.sharding_specs.len(), 1);
4784 let f = &details.sharding_specs[0].fields[0];
4785 assert_eq!(f.transform.as_deref(), Some("identity"));
4786 assert_eq!(
4788 details
4789 .writer_config_defaults
4790 .get("durable_write")
4791 .map(String::as_str),
4792 Some("false")
4793 );
4794 }
4795
4796 #[tokio::test]
4797 async fn test_unset_lsm_write_spec() {
4798 use lance::dataset::mem_wal::DatasetMemWalExt;
4799
4800 let tmp_dir = tempdir().unwrap();
4801 let uri = tmp_dir.path().to_str().unwrap();
4802
4803 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
4804 let batch = RecordBatch::try_new(
4805 schema.clone(),
4806 vec![Arc::new(arrow_array::Int64Array::from(vec![1]))],
4807 )
4808 .unwrap();
4809 let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
4810 Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
4811 let conn = ConnectBuilder::new(uri)
4812 .read_consistency_interval(Duration::from_secs(0))
4813 .execute()
4814 .await
4815 .unwrap();
4816 let table = conn.create_table("t", reader).execute().await.unwrap();
4817
4818 table.unset_lsm_write_spec().await.unwrap_err();
4820
4821 table
4823 .set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
4824 .await
4825 .unwrap();
4826 {
4827 let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
4828 assert!(dataset.mem_wal_index_details().await.unwrap().is_some());
4829 }
4830
4831 table.unset_lsm_write_spec().await.unwrap();
4832 {
4833 let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
4834 assert!(dataset.mem_wal_index_details().await.unwrap().is_none());
4835 }
4836
4837 table.unset_lsm_write_spec().await.unwrap_err();
4839 table
4840 .set_lsm_write_spec(LsmWriteSpec::bucket("id", 8))
4841 .await
4842 .unwrap();
4843 {
4844 let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
4845 assert!(dataset.mem_wal_index_details().await.unwrap().is_some());
4846 }
4847 }
4848
4849 #[tokio::test]
4850 pub async fn test_stats() {
4851 let tmp_dir = tempdir().unwrap();
4852 let uri = tmp_dir.path().to_str().unwrap();
4853
4854 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4855
4856 let schema = Arc::new(Schema::new(vec![
4857 Field::new("id", DataType::Int32, false),
4858 Field::new("foo", DataType::Int32, true),
4859 ]));
4860 let batch = RecordBatch::try_new(
4861 schema.clone(),
4862 vec![
4863 Arc::new(Int32Array::from_iter_values(0..100)),
4864 Arc::new(Int32Array::from_iter_values(0..100)),
4865 ],
4866 )
4867 .unwrap();
4868
4869 let table = conn
4870 .create_table("test_stats", batch.clone())
4871 .execute()
4872 .await
4873 .unwrap();
4874 for _ in 0..10 {
4875 let batch = RecordBatch::try_new(
4876 schema.clone(),
4877 vec![
4878 Arc::new(Int32Array::from_iter_values(0..15)),
4879 Arc::new(Int32Array::from_iter_values(0..15)),
4880 ],
4881 )
4882 .unwrap();
4883 table.add(batch.clone()).execute().await.unwrap();
4884 }
4885
4886 let empty_table = conn
4887 .create_table("test_stats_empty", RecordBatch::new_empty(batch.schema()))
4888 .execute()
4889 .await
4890 .unwrap();
4891
4892 let res = table.stats().await.unwrap();
4893 println!("{:#?}", res);
4894 assert_eq!(
4895 res,
4896 TableStatistics {
4897 num_rows: 250,
4898 num_indices: 0,
4899 total_bytes: 2300,
4900 fragment_stats: FragmentStatistics {
4901 num_fragments: 11,
4902 num_small_fragments: 11,
4903 lengths: FragmentSummaryStats {
4904 min: 15,
4905 max: 100,
4906 mean: 22,
4907 p25: 15,
4908 p50: 15,
4909 p75: 15,
4910 p99: 100,
4911 },
4912 },
4913 }
4914 );
4915 let res = empty_table.stats().await.unwrap();
4916 println!("{:#?}", res);
4917 assert_eq!(
4918 res,
4919 TableStatistics {
4920 num_rows: 0,
4921 num_indices: 0,
4922 total_bytes: 0,
4923 fragment_stats: FragmentStatistics {
4924 num_fragments: 0,
4925 num_small_fragments: 0,
4926 lengths: FragmentSummaryStats {
4927 min: 0,
4928 max: 0,
4929 mean: 0,
4930 p25: 0,
4931 p50: 0,
4932 p75: 0,
4933 p99: 0,
4934 },
4935 },
4936 }
4937 )
4938 }
4939
4940 #[tokio::test]
4941 pub async fn test_list_indices_skip_frag_reuse() {
4942 let tmp_dir = tempdir().unwrap();
4943 let uri = tmp_dir.path().to_str().unwrap();
4944
4945 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4946
4947 let schema = Arc::new(Schema::new(vec![
4948 Field::new("id", DataType::Int32, false),
4949 Field::new("foo", DataType::Int32, true),
4950 ]));
4951 let batch = RecordBatch::try_new(
4952 schema.clone(),
4953 vec![
4954 Arc::new(Int32Array::from_iter_values(0..100)),
4955 Arc::new(Int32Array::from_iter_values(0..100)),
4956 ],
4957 )
4958 .unwrap();
4959
4960 let table = conn
4961 .create_table("test_list_indices_skip_frag_reuse", batch.clone())
4962 .execute()
4963 .await
4964 .unwrap();
4965
4966 table.add(batch.clone()).execute().await.unwrap();
4967
4968 table
4969 .create_index(&["id"], Index::Bitmap(BitmapIndexBuilder {}))
4970 .execute()
4971 .await
4972 .unwrap();
4973
4974 table
4975 .optimize(OptimizeAction::Compact {
4976 options: CompactionOptions {
4977 target_rows_per_fragment: 2_000,
4978 defer_index_remap: true,
4979 ..Default::default()
4980 },
4981 remap_options: None,
4982 })
4983 .await
4984 .unwrap();
4985
4986 let result = table.list_indices().await.unwrap();
4987 assert_eq!(result.len(), 1);
4988 assert_eq!(result[0].index_type, crate::index::IndexType::Bitmap);
4989 }
4990}