1use arrow_array::{RecordBatch, RecordBatchReader};
7use arrow_schema::{DataType, Field, Schema, SchemaRef};
8use async_trait::async_trait;
9use datafusion_execution::TaskContext;
10use datafusion_expr::Expr;
11use datafusion_physical_plan::ExecutionPlan;
12use datafusion_physical_plan::display::DisplayableExecutionPlan;
13use futures::StreamExt;
14use futures::stream::FuturesUnordered;
15pub use lance::dataset::ColumnAlteration;
16pub use lance::dataset::NewColumnTransform;
17pub use lance::dataset::ReadParams;
18pub use lance::dataset::Version;
19use lance::dataset::WriteMode;
20use lance::dataset::builder::DatasetBuilder;
21use lance::dataset::{InsertBuilder, WriteParams};
22use lance::index::DatasetIndexExt;
23use lance::index::vector::VectorIndexParams;
24use lance::index::vector::utils::infer_vector_dim;
25use lance::io::{ObjectStoreParams, WrappingObjectStore};
26use lance_datafusion::utils::StreamingWriteSource;
27use lance_index::IndexType;
28use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
29use lance_index::vector::bq::RQBuildParams;
30use lance_index::vector::hnsw::builder::HnswBuildParams;
31use lance_index::vector::ivf::IvfBuildParams;
32use lance_index::vector::pq::PQBuildParams;
33use lance_index::vector::sq::builder::SQBuildParams;
34use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsAccessor};
35pub use query::AnyQuery;
36
37use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
38use lance_namespace::LanceNamespace;
39use lance_namespace::models::DescribeTableRequest;
40use lance_table::format::Manifest;
41use lance_table::io::commit::CommitHandler;
42use lance_table::io::commit::ManifestNamingScheme;
43use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
44use serde::{Deserialize, Serialize};
45use std::collections::{HashMap, HashSet};
46use std::format;
47use std::path::Path;
48use std::sync::Arc;
49
50use crate::connection::NamespaceClientPushdownOperation;
51
52use crate::data::scannable::{PeekedScannable, Scannable, estimate_write_partitions};
53use crate::database::Database;
54use crate::embeddings::{EmbeddingDefinition, EmbeddingRegistry, MemoryRegistry};
55use crate::error::{Error, Result};
56use crate::index::IndexStatistics;
57use crate::index::vector::VectorIndex;
58use crate::index::{Index, IndexBuilder, vector::suggested_num_sub_vectors};
59use crate::index::{IndexConfig, IndexStatisticsImpl};
60use crate::query::{IntoQueryVector, Query, QueryExecutionOptions, TakeQuery, VectorQuery};
61use crate::table::datafusion::insert::InsertExec;
62use crate::utils::{
63 PatchReadParam, PatchWriteParam, supported_bitmap_data_type, supported_btree_data_type,
64 supported_fts_data_type, supported_label_list_data_type, supported_vector_data_type,
65};
66
67use self::dataset::DatasetConsistencyWrapper;
68use self::merge::MergeInsertBuilder;
69
70mod add_data;
71pub mod datafusion;
72pub(crate) mod dataset;
73pub mod delete;
74pub mod merge;
75pub mod optimize;
76pub mod query;
77pub mod schema_evolution;
78pub mod update;
79pub mod write_progress;
80use crate::index::waiter::wait_for_index;
81#[cfg(feature = "remote")]
82pub(crate) use add_data::PreprocessingOutput;
83pub use add_data::{AddDataBuilder, AddDataMode, AddResult, NaNVectorBehavior};
84pub use chrono::Duration;
85pub use delete::DeleteResult;
86use futures::future::join_all;
87pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
88pub use lance::dataset::scanner::DatasetRecordBatchStream;
89use lance::dataset::statistics::DatasetStatisticsExt;
90use lance_index::frag_reuse::FRAG_REUSE_INDEX_NAME;
91pub use lance_index::optimize::OptimizeOptions;
92pub use optimize::{CompactionOptions, OptimizeAction, OptimizeStats};
93pub use schema_evolution::{AddColumnsResult, AlterColumnsResult, DropColumnsResult};
94use serde_with::skip_serializing_none;
95pub use update::{UpdateBuilder, UpdateResult};
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub enum ColumnKind {
100 Physical,
102 Embedding(EmbeddingDefinition),
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct ColumnDefinition {
109 pub kind: ColumnKind,
111}
112
113#[derive(Debug, Clone)]
114pub struct TableDefinition {
115 pub column_definitions: Vec<ColumnDefinition>,
116 pub schema: SchemaRef,
117}
118
119impl TableDefinition {
120 pub fn new(schema: SchemaRef, column_definitions: Vec<ColumnDefinition>) -> Self {
121 Self {
122 column_definitions,
123 schema,
124 }
125 }
126
127 pub fn new_from_schema(schema: SchemaRef) -> Self {
128 let column_definitions = schema
129 .fields()
130 .iter()
131 .map(|_| ColumnDefinition {
132 kind: ColumnKind::Physical,
133 })
134 .collect();
135 Self::new(schema, column_definitions)
136 }
137
138 pub fn try_from_rich_schema(schema: SchemaRef) -> Result<Self> {
139 let column_definitions = schema.metadata.get("lancedb::column_definitions");
140 if let Some(column_definitions) = column_definitions {
141 let column_definitions: Vec<ColumnDefinition> =
142 serde_json::from_str(column_definitions).map_err(|e| Error::Runtime {
143 message: format!("Failed to deserialize column definitions: {}", e),
144 })?;
145 Ok(Self::new(schema, column_definitions))
146 } else {
147 let column_definitions = schema
148 .fields()
149 .iter()
150 .map(|_| ColumnDefinition {
151 kind: ColumnKind::Physical,
152 })
153 .collect();
154 Ok(Self::new(schema, column_definitions))
155 }
156 }
157
158 pub fn into_rich_schema(self) -> SchemaRef {
159 let lancedb_metadata = serde_json::to_string(&self.column_definitions).unwrap();
162 let mut schema_with_metadata = (*self.schema).clone();
163 schema_with_metadata
164 .metadata
165 .insert("lancedb::column_definitions".to_string(), lancedb_metadata);
166 Arc::new(schema_with_metadata)
167 }
168}
169
170#[derive(Clone, Debug, Default)]
173#[allow(dead_code)] enum BadVectorHandling {
175 #[default]
177 Error,
178 Drop,
180 Fill(f32),
182 None,
184}
185
186#[derive(Clone, Debug, Default)]
188pub struct WriteOptions {
189 pub lance_write_params: Option<WriteParams>,
197}
198
199pub enum Filter {
201 Sql(String),
203 Datafusion(Expr),
205}
206
207#[async_trait]
208pub trait Tags: Send + Sync {
209 async fn list(&self) -> Result<HashMap<String, TagContents>>;
211
212 async fn get_version(&self, tag: &str) -> Result<u64>;
214
215 async fn create(&mut self, tag: &str, version: u64) -> Result<()>;
217
218 async fn delete(&mut self, tag: &str) -> Result<()>;
220
221 async fn update(&mut self, tag: &str, version: u64) -> Result<()>;
223}
224
225pub use self::merge::MergeResult;
226
227#[async_trait]
232pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
233 fn as_any(&self) -> &dyn std::any::Any;
235 fn name(&self) -> &str;
237 fn namespace(&self) -> &[String];
239 fn id(&self) -> &str;
244 async fn schema(&self) -> Result<SchemaRef>;
246 async fn count_rows(&self, filter: Option<Filter>) -> Result<usize>;
248 async fn create_plan(
250 &self,
251 query: &AnyQuery,
252 options: QueryExecutionOptions,
253 ) -> Result<Arc<dyn ExecutionPlan>>;
254 async fn query(
256 &self,
257 query: &AnyQuery,
258 options: QueryExecutionOptions,
259 ) -> Result<DatasetRecordBatchStream>;
260 async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result<String> {
262 let plan = self.create_plan(query, Default::default()).await?;
263 let display = DisplayableExecutionPlan::new(plan.as_ref());
264
265 Ok(format!("{}", display.indent(verbose)))
266 }
267 async fn analyze_plan(
268 &self,
269 query: &AnyQuery,
270 options: QueryExecutionOptions,
271 ) -> Result<String>;
272
273 async fn add(&self, add: AddDataBuilder) -> Result<AddResult>;
275 async fn delete(&self, predicate: &str) -> Result<DeleteResult>;
277 async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult>;
279 async fn create_index(&self, index: IndexBuilder) -> Result<()>;
281 async fn list_indices(&self) -> Result<Vec<IndexConfig>>;
283 async fn drop_index(&self, name: &str) -> Result<()>;
285 async fn prewarm_index(&self, name: &str) -> Result<()>;
287 async fn prewarm_data(&self, columns: Option<Vec<String>>) -> Result<()>;
292 async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>>;
294 async fn merge_insert(
296 &self,
297 params: MergeInsertBuilder,
298 new_data: Box<dyn RecordBatchReader + Send>,
299 ) -> Result<MergeResult>;
300 async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
302 async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
304 async fn add_columns(
306 &self,
307 transforms: NewColumnTransform,
308 read_columns: Option<Vec<String>>,
309 ) -> Result<AddColumnsResult>;
310 async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult>;
312 async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult>;
314 async fn version(&self) -> Result<u64>;
316 async fn checkout(&self, version: u64) -> Result<()>;
318 async fn checkout_tag(&self, tag: &str) -> Result<()>;
321 async fn checkout_latest(&self) -> Result<()>;
323 async fn restore(&self) -> Result<()>;
325 async fn list_versions(&self) -> Result<Vec<Version>>;
327 async fn table_definition(&self) -> Result<TableDefinition>;
329 async fn uri(&self) -> Result<String>;
331 #[deprecated(since = "0.25.0", note = "Use initial_storage_options() instead")]
333 async fn storage_options(&self) -> Option<HashMap<String, String>>;
334 async fn initial_storage_options(&self) -> Option<HashMap<String, String>>;
338 async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>>;
343 async fn wait_for_index(
346 &self,
347 index_names: &[&str],
348 timeout: std::time::Duration,
349 ) -> Result<()>;
350 async fn stats(&self) -> Result<TableStatistics>;
352 async fn create_insert_exec(
357 &self,
358 _input: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
359 _write_params: WriteParams,
360 ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>> {
361 Err(Error::NotSupported {
362 message: "create_insert_exec not implemented".to_string(),
363 })
364 }
365}
366
367#[derive(Clone, Debug)]
371pub struct Table {
372 inner: Arc<dyn BaseTable>,
373 database: Option<Arc<dyn Database>>,
374 embedding_registry: Arc<dyn EmbeddingRegistry>,
375}
376
377#[cfg(all(test, feature = "remote"))]
378mod test_utils {
379 use super::*;
380
381 impl Table {
382 pub fn new_with_handler<T>(
383 name: impl Into<String>,
384 handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
385 ) -> Self
386 where
387 T: Into<reqwest::Body>,
388 {
389 let inner = Arc::new(crate::remote::table::RemoteTable::new_mock(
390 name.into(),
391 handler.clone(),
392 None,
393 ));
394 let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
395 Self {
396 inner,
397 database: Some(database),
398 embedding_registry: Arc::new(MemoryRegistry::new()),
400 }
401 }
402
403 pub fn new_with_handler_version<T>(
404 name: impl Into<String>,
405 version: semver::Version,
406 handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
407 ) -> Self
408 where
409 T: Into<reqwest::Body>,
410 {
411 let inner = Arc::new(crate::remote::table::RemoteTable::new_mock(
412 name.into(),
413 handler.clone(),
414 Some(version),
415 ));
416 let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
417 Self {
418 inner,
419 database: Some(database),
420 embedding_registry: Arc::new(MemoryRegistry::new()),
422 }
423 }
424
425 pub fn new_with_handler_and_config<T>(
426 name: impl Into<String>,
427 handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
428 config: crate::remote::ClientConfig,
429 ) -> Self
430 where
431 T: Into<reqwest::Body>,
432 {
433 let inner = Arc::new(crate::remote::table::RemoteTable::new_mock_with_config(
434 name.into(),
435 handler.clone(),
436 config.clone(),
437 ));
438 let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock_with_config(
439 handler, config,
440 ));
441 Self {
442 inner,
443 database: Some(database),
444 embedding_registry: Arc::new(MemoryRegistry::new()),
446 }
447 }
448
449 pub fn new_with_handler_version_and_config<T>(
450 name: impl Into<String>,
451 version: semver::Version,
452 handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
453 config: crate::remote::ClientConfig,
454 ) -> Self
455 where
456 T: Into<reqwest::Body>,
457 {
458 let inner = Arc::new(
459 crate::remote::table::RemoteTable::new_mock_with_version_and_config(
460 name.into(),
461 handler.clone(),
462 Some(version),
463 config.clone(),
464 ),
465 );
466 let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock_with_config(
467 handler, config,
468 ));
469 Self {
470 inner,
471 database: Some(database),
472 embedding_registry: Arc::new(MemoryRegistry::new()),
474 }
475 }
476 }
477}
478
479impl std::fmt::Display for Table {
480 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
481 write!(f, "{}", self.inner)
482 }
483}
484
485impl From<Arc<dyn BaseTable>> for Table {
486 fn from(inner: Arc<dyn BaseTable>) -> Self {
487 Self {
488 inner,
489 database: None,
490 embedding_registry: Arc::new(MemoryRegistry::new()),
491 }
492 }
493}
494
495impl Table {
496 pub fn new(inner: Arc<dyn BaseTable>, database: Arc<dyn Database>) -> Self {
497 Self {
498 inner,
499 database: Some(database),
500 embedding_registry: Arc::new(MemoryRegistry::new()),
501 }
502 }
503
504 pub fn base_table(&self) -> &Arc<dyn BaseTable> {
505 &self.inner
506 }
507
508 pub fn database(&self) -> &Arc<dyn Database> {
509 self.database.as_ref().unwrap()
510 }
511
512 pub fn embedding_registry(&self) -> &Arc<dyn EmbeddingRegistry> {
513 &self.embedding_registry
514 }
515
516 pub(crate) fn new_with_embedding_registry(
517 inner: Arc<dyn BaseTable>,
518 database: Arc<dyn Database>,
519 embedding_registry: Arc<dyn EmbeddingRegistry>,
520 ) -> Self {
521 Self {
522 inner,
523 database: Some(database),
524 embedding_registry,
525 }
526 }
527
528 pub fn as_native(&self) -> Option<&NativeTable> {
533 self.inner.as_native()
534 }
535
536 pub fn name(&self) -> &str {
538 self.inner.name()
539 }
540
541 pub fn namespace(&self) -> &[String] {
543 self.inner.namespace()
544 }
545
546 pub fn id(&self) -> &str {
548 self.inner.id()
549 }
550
551 pub fn dataset(&self) -> Option<&dataset::DatasetConsistencyWrapper> {
555 self.inner.as_native().map(|t| &t.dataset)
556 }
557
558 pub async fn schema(&self) -> Result<SchemaRef> {
560 self.inner.schema().await
561 }
562
563 pub async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
569 self.inner.count_rows(filter.map(Filter::Sql)).await
570 }
571
572 pub fn add<T: Scannable + 'static>(&self, data: T) -> AddDataBuilder {
579 AddDataBuilder::new(
580 self.inner.clone(),
581 Box::new(data),
582 Some(self.embedding_registry.clone()),
583 )
584 }
585
586 pub fn update(&self) -> UpdateBuilder {
601 UpdateBuilder::new(self.inner.clone())
602 }
603
604 pub async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
649 self.inner.delete(predicate).await
650 }
651
652 pub fn create_index(&self, columns: &[impl AsRef<str>], index: Index) -> IndexBuilder {
714 IndexBuilder::new(
715 self.inner.clone(),
716 columns
717 .iter()
718 .map(|val| val.as_ref().to_string())
719 .collect::<Vec<_>>(),
720 index,
721 )
722 }
723
724 pub fn create_index_with_timeout(
727 &self,
728 columns: &[impl AsRef<str>],
729 index: Index,
730 wait_timeout: Option<std::time::Duration>,
731 ) -> IndexBuilder {
732 let mut builder = IndexBuilder::new(
733 self.inner.clone(),
734 columns
735 .iter()
736 .map(|val| val.as_ref().to_string())
737 .collect::<Vec<_>>(),
738 index,
739 );
740 if let Some(timeout) = wait_timeout {
741 builder = builder.wait_timeout(timeout);
742 }
743 builder
744 }
745
746 pub fn merge_insert(&self, on: &[&str]) -> MergeInsertBuilder {
822 MergeInsertBuilder::new(
823 self.inner.clone(),
824 on.iter().map(|s| s.to_string()).collect(),
825 )
826 }
827
828 pub fn query(&self) -> Query {
916 Query::new(self.inner.clone())
917 }
918
919 pub fn take_offsets(&self, offsets: Vec<u64>) -> TakeQuery {
942 TakeQuery::from_offsets(self.inner.clone(), offsets)
943 }
944
945 pub fn take_row_ids(&self, row_ids: Vec<u64>) -> TakeQuery {
964 TakeQuery::from_row_ids(self.inner.clone(), row_ids)
965 }
966
967 pub fn vector_search(&self, query: impl IntoQueryVector) -> Result<VectorQuery> {
973 self.query().nearest_to(query)
974 }
975
976 pub async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
993 self.inner.optimize(action).await
994 }
995
996 pub async fn add_columns(
998 &self,
999 transforms: NewColumnTransform,
1000 read_columns: Option<Vec<String>>,
1001 ) -> Result<AddColumnsResult> {
1002 self.inner.add_columns(transforms, read_columns).await
1003 }
1004
1005 pub async fn alter_columns(
1007 &self,
1008 alterations: &[ColumnAlteration],
1009 ) -> Result<AlterColumnsResult> {
1010 self.inner.alter_columns(alterations).await
1011 }
1012
1013 pub async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
1015 self.inner.drop_columns(columns).await
1016 }
1017
1018 pub async fn version(&self) -> Result<u64> {
1025 self.inner.version().await
1026 }
1027
1028 pub async fn checkout(&self, version: u64) -> Result<()> {
1043 self.inner.checkout(version).await
1044 }
1045
1046 pub async fn checkout_tag(&self, tag: &str) -> Result<()> {
1061 self.inner.checkout_tag(tag).await
1062 }
1063
1064 pub async fn checkout_latest(&self) -> Result<()> {
1069 self.inner.checkout_latest().await
1070 }
1071
1072 pub async fn restore(&self) -> Result<()> {
1083 self.inner.restore().await
1084 }
1085
1086 pub async fn list_versions(&self) -> Result<Vec<Version>> {
1088 self.inner.list_versions().await
1089 }
1090
1091 pub async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
1093 self.inner.list_indices().await
1094 }
1095
1096 pub async fn uri(&self) -> Result<String> {
1101 self.inner.uri().await
1102 }
1103
1104 #[deprecated(since = "0.25.0", note = "Use initial_storage_options() instead")]
1108 pub async fn storage_options(&self) -> Option<HashMap<String, String>> {
1109 #[allow(deprecated)]
1110 self.inner.storage_options().await
1111 }
1112
1113 pub async fn initial_storage_options(&self) -> Option<HashMap<String, String>> {
1119 self.inner.initial_storage_options().await
1120 }
1121
1122 pub async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>> {
1130 self.inner.latest_storage_options().await
1131 }
1132
1133 pub async fn index_stats(
1136 &self,
1137 index_name: impl AsRef<str>,
1138 ) -> Result<Option<IndexStatistics>> {
1139 self.inner.index_stats(index_name.as_ref()).await
1140 }
1141
1142 pub async fn drop_index(&self, name: &str) -> Result<()> {
1151 self.inner.drop_index(name).await
1152 }
1153
1154 pub async fn prewarm_index(&self, name: &str) -> Result<()> {
1169 self.inner.prewarm_index(name).await
1170 }
1171
1172 pub async fn prewarm_data(&self, columns: Option<Vec<String>>) -> Result<()> {
1190 self.inner.prewarm_data(columns).await
1191 }
1192
1193 pub async fn wait_for_index(
1196 &self,
1197 index_names: &[&str],
1198 timeout: std::time::Duration,
1199 ) -> Result<()> {
1200 self.inner.wait_for_index(index_names, timeout).await
1201 }
1202
1203 pub async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
1205 self.inner.tags().await
1206 }
1207
1208 pub async fn stats(&self) -> Result<TableStatistics> {
1210 self.inner.stats().await
1211 }
1212}
1213
1214pub struct NativeTags {
1215 dataset: dataset::DatasetConsistencyWrapper,
1216}
1217#[async_trait]
1218impl Tags for NativeTags {
1219 async fn list(&self) -> Result<HashMap<String, TagContents>> {
1220 let dataset = self.dataset.get().await?;
1221 Ok(dataset.tags().list().await?)
1222 }
1223
1224 async fn get_version(&self, tag: &str) -> Result<u64> {
1225 let dataset = self.dataset.get().await?;
1226 Ok(dataset.tags().get_version(tag).await?)
1227 }
1228
1229 async fn create(&mut self, tag: &str, version: u64) -> Result<()> {
1230 let dataset = self.dataset.get().await?;
1231 dataset.tags().create(tag, version).await?;
1232 Ok(())
1233 }
1234
1235 async fn delete(&mut self, tag: &str) -> Result<()> {
1236 let dataset = self.dataset.get().await?;
1237 dataset.tags().delete(tag).await?;
1238 Ok(())
1239 }
1240
1241 async fn update(&mut self, tag: &str, version: u64) -> Result<()> {
1242 let dataset = self.dataset.get().await?;
1243 dataset.tags().update(tag, version).await?;
1244 Ok(())
1245 }
1246}
1247
1248pub trait NativeTableExt {
1249 fn as_native(&self) -> Option<&NativeTable>;
1251}
1252
1253impl NativeTableExt for Arc<dyn BaseTable> {
1254 fn as_native(&self) -> Option<&NativeTable> {
1255 self.as_any().downcast_ref::<NativeTable>()
1256 }
1257}
1258
1259#[derive(Clone)]
1261pub struct NativeTable {
1262 name: String,
1263 namespace: Vec<String>,
1264 id: String,
1265 uri: String,
1266 pub(crate) dataset: dataset::DatasetConsistencyWrapper,
1267 read_consistency_interval: Option<std::time::Duration>,
1270 pub(crate) namespace_client: Option<Arc<dyn LanceNamespace>>,
1273 pub(crate) pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1276}
1277
1278impl std::fmt::Debug for NativeTable {
1279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1280 f.debug_struct("NativeTable")
1281 .field("name", &self.name)
1282 .field("namespace", &self.namespace)
1283 .field("id", &self.id)
1284 .field("uri", &self.uri)
1285 .field("read_consistency_interval", &self.read_consistency_interval)
1286 .field("namespace_client", &self.namespace_client)
1287 .field("pushdown_operations", &self.pushdown_operations)
1288 .finish()
1289 }
1290}
1291
1292impl std::fmt::Display for NativeTable {
1293 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1294 write!(
1295 f,
1296 "NativeTable({}, uri={}, read_consistency_interval={})",
1297 self.name,
1298 self.uri,
1299 match self.read_consistency_interval {
1300 None => {
1301 "None".to_string()
1302 }
1303 Some(duration) => {
1304 format!("{}s", duration.as_secs_f64())
1305 }
1306 }
1307 )
1308 }
1309}
1310
1311impl NativeTable {
1312 pub async fn open(uri: &str) -> Result<Self> {
1323 let name = Self::get_table_name(uri)?;
1324 Self::open_with_params(
1325 uri,
1326 &name,
1327 vec![],
1328 None,
1329 None,
1330 None,
1331 None,
1332 HashSet::new(),
1333 None,
1334 )
1335 .await
1336 }
1337
1338 #[allow(clippy::too_many_arguments)]
1354 pub async fn open_with_params(
1355 uri: &str,
1356 name: &str,
1357 namespace: Vec<String>,
1358 write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1359 params: Option<ReadParams>,
1360 read_consistency_interval: Option<std::time::Duration>,
1361 namespace_client: Option<Arc<dyn LanceNamespace>>,
1362 pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1363 managed_versioning: Option<bool>,
1364 ) -> Result<Self> {
1365 let params = params.unwrap_or_default();
1366 let params = match write_store_wrapper.clone() {
1368 Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1369 None => params,
1370 };
1371
1372 let mut table_id = namespace.clone();
1374 table_id.push(name.to_string());
1375
1376 let managed_versioning = match managed_versioning {
1379 Some(value) => value,
1380 None if namespace_client.is_some() => {
1381 let ns_client = namespace_client.as_ref().unwrap();
1382 let describe_request = DescribeTableRequest {
1383 id: Some(table_id.clone()),
1384 ..Default::default()
1385 };
1386 let response = ns_client
1387 .describe_table(describe_request)
1388 .await
1389 .map_err(|e| Error::Runtime {
1390 message: format!(
1391 "Failed to describe table via namespace client: {}. \
1392 If you don't need managed versioning, don't pass namespace_client.",
1393 e
1394 ),
1395 })?;
1396 response.managed_versioning == Some(true)
1397 }
1398 None => false,
1399 };
1400
1401 let mut builder = DatasetBuilder::from_uri(uri).with_read_params(params);
1402
1403 if managed_versioning && let Some(ref ns_client) = namespace_client {
1405 let external_store =
1406 LanceNamespaceExternalManifestStore::new(ns_client.clone(), table_id.clone());
1407 let commit_handler: Arc<dyn CommitHandler> = Arc::new(ExternalManifestCommitHandler {
1408 external_manifest_store: Arc::new(external_store),
1409 });
1410 builder = builder.with_commit_handler(commit_handler);
1411 }
1412
1413 let dataset = builder.load().await.map_err(|e| match e {
1414 lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
1415 name: name.to_string(),
1416 source: Box::new(e),
1417 },
1418 e => e.into(),
1419 })?;
1420
1421 let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
1422 let id = Self::build_id(&namespace, name);
1423
1424 Ok(Self {
1425 name: name.to_string(),
1426 namespace,
1427 id,
1428 uri: uri.to_string(),
1429 dataset,
1430 read_consistency_interval,
1431 namespace_client,
1432 pushdown_operations,
1433 })
1434 }
1435
1436 pub fn with_namespace_client(mut self, namespace_client: Arc<dyn LanceNamespace>) -> Self {
1440 self.namespace_client = Some(namespace_client);
1441 self
1442 }
1443
1444 #[allow(clippy::too_many_arguments)]
1466 pub async fn open_from_namespace(
1467 namespace_client: Arc<dyn LanceNamespace>,
1468 name: &str,
1469 namespace: Vec<String>,
1470 write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1471 params: Option<ReadParams>,
1472 read_consistency_interval: Option<std::time::Duration>,
1473 pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1474 session: Option<Arc<lance::session::Session>>,
1475 ) -> Result<Self> {
1476 let mut params = params.unwrap_or_default();
1477
1478 if let Some(sess) = session {
1480 params.session(sess);
1481 }
1482
1483 let params = match write_store_wrapper.clone() {
1485 Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1486 None => params,
1487 };
1488
1489 let mut table_id = namespace.clone();
1491 table_id.push(name.to_string());
1492
1493 let builder = DatasetBuilder::from_namespace(namespace_client.clone(), table_id)
1496 .await
1497 .map_err(|e| match e {
1498 lance::Error::Namespace { source, .. } => Error::Runtime {
1499 message: format!("Failed to get table info from namespace: {:?}", source),
1500 },
1501 e => e.into(),
1502 })?;
1503
1504 let dataset = builder
1505 .with_read_params(params)
1506 .load()
1507 .await
1508 .map_err(|e| match e {
1509 lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
1510 name: name.to_string(),
1511 source: Box::new(e),
1512 },
1513 e => e.into(),
1514 })?;
1515
1516 let uri = dataset.uri().to_string();
1517 let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
1518 let id = Self::build_id(&namespace, name);
1519
1520 let stored_namespace_client =
1521 if pushdown_operations.contains(&NamespaceClientPushdownOperation::QueryTable) {
1522 Some(namespace_client)
1523 } else {
1524 None
1525 };
1526
1527 Ok(Self {
1528 name: name.to_string(),
1529 namespace,
1530 id,
1531 uri,
1532 dataset,
1533 read_consistency_interval,
1534 namespace_client: stored_namespace_client,
1535 pushdown_operations,
1536 })
1537 }
1538
1539 fn get_table_name(uri: &str) -> Result<String> {
1540 let path = Path::new(uri);
1541 let name = path
1542 .file_stem()
1543 .ok_or(Error::TableNotFound {
1544 name: uri.to_string(),
1545 source: format!("Could not extract table name from URI: '{}'", uri).into(),
1546 })?
1547 .to_str()
1548 .ok_or(Error::InvalidTableName {
1549 name: uri.to_string(),
1550 reason: "Table name is not valid URL".to_string(),
1551 })?;
1552 Ok(name.to_string())
1553 }
1554
1555 fn build_id(namespace: &[String], name: &str) -> String {
1556 if namespace.is_empty() {
1557 name.to_string()
1558 } else {
1559 let mut parts = namespace.to_vec();
1560 parts.push(name.to_string());
1561 parts.join("$")
1562 }
1563 }
1564
1565 #[allow(clippy::too_many_arguments)]
1582 pub async fn create(
1583 uri: &str,
1584 name: &str,
1585 namespace: Vec<String>,
1586 batches: impl StreamingWriteSource,
1587 write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1588 params: Option<WriteParams>,
1589 read_consistency_interval: Option<std::time::Duration>,
1590 namespace_client: Option<Arc<dyn LanceNamespace>>,
1591 pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1592 ) -> Result<Self> {
1593 let params = params.unwrap_or(WriteParams {
1595 ..Default::default()
1596 });
1597 let params = match write_store_wrapper.clone() {
1599 Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1600 None => params,
1601 };
1602
1603 let insert_builder = InsertBuilder::new(uri).with_params(¶ms);
1604 let dataset = insert_builder
1605 .execute_stream(batches)
1606 .await
1607 .map_err(|e| match e {
1608 lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists {
1609 name: name.to_string(),
1610 },
1611 e => e.into(),
1612 })?;
1613
1614 let id = Self::build_id(&namespace, name);
1615
1616 Ok(Self {
1617 name: name.to_string(),
1618 namespace,
1619 id,
1620 uri: uri.to_string(),
1621 dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
1622 read_consistency_interval,
1623 namespace_client,
1624 pushdown_operations,
1625 })
1626 }
1627
1628 #[allow(clippy::too_many_arguments)]
1629 pub async fn create_empty(
1630 uri: &str,
1631 name: &str,
1632 namespace: Vec<String>,
1633 schema: SchemaRef,
1634 write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1635 params: Option<WriteParams>,
1636 read_consistency_interval: Option<std::time::Duration>,
1637 namespace_client: Option<Arc<dyn LanceNamespace>>,
1638 pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1639 ) -> Result<Self> {
1640 let data: Box<dyn Scannable> = Box::new(RecordBatch::new_empty(schema));
1641 Self::create(
1642 uri,
1643 name,
1644 namespace,
1645 data,
1646 write_store_wrapper,
1647 params,
1648 read_consistency_interval,
1649 namespace_client,
1650 pushdown_operations,
1651 )
1652 .await
1653 }
1654
1655 #[allow(clippy::too_many_arguments)]
1679 pub async fn create_from_namespace(
1680 namespace_client: Arc<dyn LanceNamespace>,
1681 uri: &str,
1682 name: &str,
1683 namespace: Vec<String>,
1684 batches: impl StreamingWriteSource,
1685 write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1686 params: Option<WriteParams>,
1687 read_consistency_interval: Option<std::time::Duration>,
1688 pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
1689 session: Option<Arc<lance::session::Session>>,
1690 ) -> Result<Self> {
1691 let mut table_id = namespace.clone();
1693 table_id.push(name.to_string());
1694
1695 let storage_options_provider = Arc::new(LanceNamespaceStorageOptionsProvider::new(
1697 namespace_client.clone(),
1698 table_id,
1699 ));
1700
1701 let mut params = params.unwrap_or_default();
1703
1704 if let Some(sess) = session {
1706 params.session = Some(sess);
1707 }
1708
1709 let store_params = params
1711 .store_params
1712 .get_or_insert_with(ObjectStoreParams::default);
1713 let accessor = match store_params.storage_options().cloned() {
1714 Some(options) => {
1715 StorageOptionsAccessor::with_initial_and_provider(options, storage_options_provider)
1716 }
1717 None => StorageOptionsAccessor::with_provider(storage_options_provider),
1718 };
1719 store_params.storage_options_accessor = Some(Arc::new(accessor));
1720
1721 let params = match write_store_wrapper.clone() {
1723 Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1724 None => params,
1725 };
1726
1727 let insert_builder = InsertBuilder::new(uri).with_params(¶ms);
1728 let dataset = insert_builder
1729 .execute_stream(batches)
1730 .await
1731 .map_err(|e| match e {
1732 lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists {
1733 name: name.to_string(),
1734 },
1735 e => e.into(),
1736 })?;
1737
1738 let id = Self::build_id(&namespace, name);
1739
1740 let stored_namespace_client =
1741 if pushdown_operations.contains(&NamespaceClientPushdownOperation::QueryTable) {
1742 Some(namespace_client)
1743 } else {
1744 None
1745 };
1746
1747 Ok(Self {
1748 name: name.to_string(),
1749 namespace,
1750 id,
1751 uri: uri.to_string(),
1752 dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
1753 read_consistency_interval,
1754 namespace_client: stored_namespace_client,
1755 pushdown_operations,
1756 })
1757 }
1758
1759 pub async fn merge(
1761 &mut self,
1762 batches: impl RecordBatchReader + Send + 'static,
1763 left_on: &str,
1764 right_on: &str,
1765 ) -> Result<()> {
1766 self.dataset.ensure_mutable()?;
1767 let mut dataset = (*self.dataset.get().await?).clone();
1768 dataset.merge(batches, left_on, right_on).await?;
1769 self.dataset.update(dataset);
1770 Ok(())
1771 }
1772
1773 pub async fn count_fragments(&self) -> Result<usize> {
1775 Ok(self.dataset.get().await?.count_fragments())
1776 }
1777
1778 pub async fn count_deleted_rows(&self) -> Result<usize> {
1779 Ok(self.dataset.get().await?.count_deleted_rows().await?)
1780 }
1781
1782 pub async fn num_small_files(&self, max_rows_per_group: usize) -> Result<usize> {
1783 Ok(self
1784 .dataset
1785 .get()
1786 .await?
1787 .num_small_files(max_rows_per_group)
1788 .await)
1789 }
1790
1791 pub async fn load_indices(&self) -> Result<Vec<VectorIndex>> {
1792 let dataset = self.dataset.get().await?;
1793 let mf = dataset.manifest();
1794 let indices = dataset.load_indices().await?;
1795 Ok(indices
1796 .iter()
1797 .map(|i| VectorIndex::new_from_format(mf, i))
1798 .collect())
1799 }
1800
1801 fn validate_index_type(
1803 field: &Field,
1804 index_name: &str,
1805 supported_fn: impl Fn(&DataType) -> bool,
1806 ) -> Result<()> {
1807 if !supported_fn(field.data_type()) {
1808 return Err(Error::Schema {
1809 message: format!(
1810 "A {} index cannot be created on the field `{}` which has data type {}",
1811 index_name,
1812 field.name(),
1813 field.data_type()
1814 ),
1815 });
1816 }
1817 Ok(())
1818 }
1819
1820 fn build_ivf_params(
1822 num_partitions: Option<u32>,
1823 target_partition_size: Option<u32>,
1824 sample_rate: u32,
1825 max_iterations: u32,
1826 ) -> IvfBuildParams {
1827 let mut ivf_params = match (num_partitions, target_partition_size) {
1828 (Some(num_partitions), _) => IvfBuildParams::new(num_partitions as usize),
1829 (None, Some(target_partition_size)) => {
1830 IvfBuildParams::with_target_partition_size(target_partition_size as usize)
1831 }
1832 (None, None) => IvfBuildParams::default(),
1833 };
1834 ivf_params.sample_rate = sample_rate as usize;
1835 ivf_params.max_iters = max_iterations as usize;
1836 ivf_params
1837 }
1838
1839 fn get_num_sub_vectors(provided: Option<u32>, dim: u32, num_bits: Option<u32>) -> u32 {
1841 if let Some(provided) = provided {
1842 return provided;
1843 }
1844 let suggested = suggested_num_sub_vectors(dim);
1845 if num_bits.is_some_and(|num_bits| num_bits == 4) && !suggested.is_multiple_of(2) {
1846 suggested + 1
1848 } else {
1849 suggested
1850 }
1851 }
1852
1853 fn get_vector_dimension(field: &Field) -> Result<u32> {
1855 match field.data_type() {
1856 arrow_schema::DataType::FixedSizeList(_, n) => Ok(*n as u32),
1857 _ => Ok(infer_vector_dim(field.data_type())? as u32),
1858 }
1859 }
1860
1861 async fn make_index_params(
1863 &self,
1864 field: &Field,
1865 index_opts: Index,
1866 ) -> Result<Box<dyn lance::index::IndexParams>> {
1867 match index_opts {
1868 Index::Auto => {
1869 if supported_vector_data_type(field.data_type()) {
1870 let dim = Self::get_vector_dimension(field)?;
1872 let ivf_params = lance_index::vector::ivf::IvfBuildParams::default();
1873 let num_sub_vectors = Self::get_num_sub_vectors(None, dim, None);
1874 let pq_params =
1875 lance_index::vector::pq::PQBuildParams::new(num_sub_vectors as usize, 8);
1876 let lance_idx_params =
1877 lance::index::vector::VectorIndexParams::with_ivf_pq_params(
1878 lance_linalg::distance::MetricType::L2,
1879 ivf_params,
1880 pq_params,
1881 );
1882 Ok(Box::new(lance_idx_params))
1883 } else if supported_btree_data_type(field.data_type()) {
1884 Ok(Box::new(ScalarIndexParams::for_builtin(
1885 BuiltinIndexType::BTree,
1886 )))
1887 } else {
1888 Err(Error::InvalidInput {
1889 message: format!(
1890 "there are no indices supported for the field `{}` with the data type {}",
1891 field.name(),
1892 field.data_type()
1893 ),
1894 })?
1895 }
1896 }
1897 Index::BTree(_) => {
1898 Self::validate_index_type(field, "BTree", supported_btree_data_type)?;
1899 Ok(Box::new(ScalarIndexParams::for_builtin(
1900 BuiltinIndexType::BTree,
1901 )))
1902 }
1903 Index::Bitmap(_) => {
1904 Self::validate_index_type(field, "Bitmap", supported_bitmap_data_type)?;
1905 Ok(Box::new(ScalarIndexParams::for_builtin(
1906 BuiltinIndexType::Bitmap,
1907 )))
1908 }
1909 Index::LabelList(_) => {
1910 Self::validate_index_type(field, "LabelList", supported_label_list_data_type)?;
1911 Ok(Box::new(ScalarIndexParams::for_builtin(
1912 BuiltinIndexType::LabelList,
1913 )))
1914 }
1915 Index::FTS(fts_opts) => {
1916 Self::validate_index_type(field, "FTS", supported_fts_data_type)?;
1917 Ok(Box::new(fts_opts))
1918 }
1919 Index::IvfFlat(index) => {
1920 Self::validate_index_type(field, "IVF Flat", supported_vector_data_type)?;
1921 let ivf_params = Self::build_ivf_params(
1922 index.num_partitions,
1923 index.target_partition_size,
1924 index.sample_rate,
1925 index.max_iterations,
1926 );
1927 let lance_idx_params =
1928 VectorIndexParams::with_ivf_flat_params(index.distance_type.into(), ivf_params);
1929 Ok(Box::new(lance_idx_params))
1930 }
1931 Index::IvfSq(index) => {
1932 Self::validate_index_type(field, "IVF SQ", supported_vector_data_type)?;
1933 let ivf_params = Self::build_ivf_params(
1934 index.num_partitions,
1935 index.target_partition_size,
1936 index.sample_rate,
1937 index.max_iterations,
1938 );
1939 let sq_params = SQBuildParams {
1940 sample_rate: index.sample_rate as usize,
1941 ..Default::default()
1942 };
1943 let lance_idx_params = VectorIndexParams::with_ivf_sq_params(
1944 index.distance_type.into(),
1945 ivf_params,
1946 sq_params,
1947 );
1948 Ok(Box::new(lance_idx_params))
1949 }
1950 Index::IvfPq(index) => {
1951 Self::validate_index_type(field, "IVF PQ", supported_vector_data_type)?;
1952 let dim = Self::get_vector_dimension(field)?;
1953 let ivf_params = Self::build_ivf_params(
1954 index.num_partitions,
1955 index.target_partition_size,
1956 index.sample_rate,
1957 index.max_iterations,
1958 );
1959 let num_sub_vectors =
1960 Self::get_num_sub_vectors(index.num_sub_vectors, dim, index.num_bits);
1961 let num_bits = index.num_bits.unwrap_or(8) as usize;
1962 let mut pq_params = PQBuildParams::new(num_sub_vectors as usize, num_bits);
1963 pq_params.max_iters = index.max_iterations as usize;
1964 let lance_idx_params = VectorIndexParams::with_ivf_pq_params(
1965 index.distance_type.into(),
1966 ivf_params,
1967 pq_params,
1968 );
1969 Ok(Box::new(lance_idx_params))
1970 }
1971 Index::IvfRq(index) => {
1972 Self::validate_index_type(field, "IVF RQ", supported_vector_data_type)?;
1973 let ivf_params = Self::build_ivf_params(
1974 index.num_partitions,
1975 index.target_partition_size,
1976 index.sample_rate,
1977 index.max_iterations,
1978 );
1979 let rq_params = RQBuildParams::new(index.num_bits.unwrap_or(1) as u8);
1980 let lance_idx_params = VectorIndexParams::with_ivf_rq_params(
1981 index.distance_type.into(),
1982 ivf_params,
1983 rq_params,
1984 );
1985 Ok(Box::new(lance_idx_params))
1986 }
1987 Index::IvfHnswPq(index) => {
1988 Self::validate_index_type(field, "IVF HNSW PQ", supported_vector_data_type)?;
1989 let dim = Self::get_vector_dimension(field)?;
1990 let ivf_params = Self::build_ivf_params(
1991 index.num_partitions,
1992 index.target_partition_size,
1993 index.sample_rate,
1994 index.max_iterations,
1995 );
1996 let num_sub_vectors =
1997 Self::get_num_sub_vectors(index.num_sub_vectors, dim, index.num_bits);
1998 let hnsw_params = HnswBuildParams::default()
1999 .num_edges(index.m as usize)
2000 .ef_construction(index.ef_construction as usize);
2001 let pq_params = PQBuildParams::new(
2002 num_sub_vectors as usize,
2003 index.num_bits.unwrap_or(8) as usize,
2004 );
2005 let lance_idx_params = VectorIndexParams::with_ivf_hnsw_pq_params(
2006 index.distance_type.into(),
2007 ivf_params,
2008 hnsw_params,
2009 pq_params,
2010 );
2011 Ok(Box::new(lance_idx_params))
2012 }
2013 Index::IvfHnswSq(index) => {
2014 Self::validate_index_type(field, "IVF HNSW SQ", supported_vector_data_type)?;
2015 let ivf_params = Self::build_ivf_params(
2016 index.num_partitions,
2017 index.target_partition_size,
2018 index.sample_rate,
2019 index.max_iterations,
2020 );
2021 let hnsw_params = HnswBuildParams::default()
2022 .num_edges(index.m as usize)
2023 .ef_construction(index.ef_construction as usize);
2024 let sq_params = SQBuildParams {
2025 sample_rate: index.sample_rate as usize,
2026 ..Default::default()
2027 };
2028 let lance_idx_params = VectorIndexParams::with_ivf_hnsw_sq_params(
2029 index.distance_type.into(),
2030 ivf_params,
2031 hnsw_params,
2032 sq_params,
2033 );
2034 Ok(Box::new(lance_idx_params))
2035 }
2036 Index::IvfHnswFlat(index) => {
2037 Self::validate_index_type(field, "IVF HNSW FLAT", supported_vector_data_type)?;
2038 let ivf_params = Self::build_ivf_params(
2039 index.num_partitions,
2040 index.target_partition_size,
2041 index.sample_rate,
2042 index.max_iterations,
2043 );
2044 let hnsw_params = HnswBuildParams::default()
2045 .num_edges(index.m as usize)
2046 .ef_construction(index.ef_construction as usize);
2047 let lance_idx_params = VectorIndexParams::ivf_hnsw(
2048 index.distance_type.into(),
2049 ivf_params,
2050 hnsw_params,
2051 );
2052 Ok(Box::new(lance_idx_params))
2053 }
2054 }
2055 }
2056
2057 fn get_index_type_for_field(&self, field: &Field, index: &Index) -> IndexType {
2059 match index {
2060 Index::Auto => {
2061 if supported_vector_data_type(field.data_type()) {
2062 IndexType::Vector
2063 } else if supported_btree_data_type(field.data_type()) {
2064 IndexType::BTree
2065 } else {
2066 IndexType::BTree
2068 }
2069 }
2070 Index::BTree(_) => IndexType::BTree,
2071 Index::Bitmap(_) => IndexType::Bitmap,
2072 Index::LabelList(_) => IndexType::LabelList,
2073 Index::FTS(_) => IndexType::Inverted,
2074 Index::IvfFlat(_)
2075 | Index::IvfSq(_)
2076 | Index::IvfPq(_)
2077 | Index::IvfRq(_)
2078 | Index::IvfHnswPq(_)
2079 | Index::IvfHnswSq(_)
2080 | Index::IvfHnswFlat(_) => IndexType::Vector,
2081 }
2082 }
2083
2084 pub async fn uses_v2_manifest_paths(&self) -> Result<bool> {
2089 let dataset = self.dataset.get().await?;
2090 Ok(dataset.manifest_location().naming_scheme == ManifestNamingScheme::V2)
2091 }
2092
2093 pub async fn migrate_manifest_paths_v2(&self) -> Result<()> {
2108 self.dataset.ensure_mutable()?;
2109 let mut dataset = (*self.dataset.get().await?).clone();
2110 dataset.migrate_manifest_paths_v2().await?;
2111 self.dataset.update(dataset);
2112 Ok(())
2113 }
2114
2115 pub async fn manifest(&self) -> Result<Manifest> {
2117 let dataset = self.dataset.get().await?;
2118 Ok(dataset.manifest().clone())
2119 }
2120
2121 pub async fn update_config(
2123 &self,
2124 upsert_values: impl IntoIterator<Item = (String, String)>,
2125 ) -> Result<()> {
2126 self.dataset.ensure_mutable()?;
2127 let mut dataset = (*self.dataset.get().await?).clone();
2128 dataset.update_config(upsert_values).await?;
2129 self.dataset.update(dataset);
2130 Ok(())
2131 }
2132
2133 pub async fn delete_config_keys(&self, delete_keys: &[&str]) -> Result<()> {
2135 self.dataset.ensure_mutable()?;
2136 let mut dataset = (*self.dataset.get().await?).clone();
2137 #[allow(deprecated)]
2139 dataset.delete_config_keys(delete_keys).await?;
2140 self.dataset.update(dataset);
2141 Ok(())
2142 }
2143
2144 pub async fn replace_schema_metadata(
2146 &self,
2147 upsert_values: impl IntoIterator<Item = (String, String)>,
2148 ) -> Result<()> {
2149 self.dataset.ensure_mutable()?;
2150 let mut dataset = (*self.dataset.get().await?).clone();
2151 #[allow(deprecated)]
2153 dataset.replace_schema_metadata(upsert_values).await?;
2154 self.dataset.update(dataset);
2155 Ok(())
2156 }
2157
2158 pub async fn replace_field_metadata(
2166 &self,
2167 new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
2168 ) -> Result<()> {
2169 self.dataset.ensure_mutable()?;
2170 let mut dataset = (*self.dataset.get().await?).clone();
2171 dataset.replace_field_metadata(new_values).await?;
2172 self.dataset.update(dataset);
2173 Ok(())
2174 }
2175}
2176
2177#[async_trait::async_trait]
2178impl BaseTable for NativeTable {
2179 fn as_any(&self) -> &dyn std::any::Any {
2180 self
2181 }
2182
2183 fn name(&self) -> &str {
2184 self.name.as_str()
2185 }
2186
2187 fn namespace(&self) -> &[String] {
2188 &self.namespace
2189 }
2190
2191 fn id(&self) -> &str {
2192 &self.id
2193 }
2194
2195 async fn version(&self) -> Result<u64> {
2196 Ok(self.dataset.get().await?.version().version)
2197 }
2198
2199 async fn checkout(&self, version: u64) -> Result<()> {
2200 self.dataset.as_time_travel(version).await
2201 }
2202
2203 async fn checkout_tag(&self, tag: &str) -> Result<()> {
2204 self.dataset.as_time_travel(tag).await
2205 }
2206
2207 async fn checkout_latest(&self) -> Result<()> {
2208 self.dataset.as_latest().await?;
2209 self.dataset.reload().await
2210 }
2211
2212 async fn list_versions(&self) -> Result<Vec<Version>> {
2213 Ok(self.dataset.get().await?.versions().await?)
2214 }
2215
2216 async fn restore(&self) -> Result<()> {
2217 let version = self
2218 .dataset
2219 .time_travel_version()
2220 .ok_or_else(|| Error::InvalidInput {
2221 message: "you must run checkout before running restore".to_string(),
2222 })?;
2223 {
2224 let mut dataset = (*self.dataset.get().await?).clone();
2226 debug_assert_eq!(dataset.version().version, version);
2227 dataset.restore().await?;
2228 }
2229 self.dataset.as_latest().await?;
2230 Ok(())
2231 }
2232
2233 async fn schema(&self) -> Result<SchemaRef> {
2234 let lance_schema = self.dataset.get().await?.schema().clone();
2235 Ok(Arc::new(Schema::from(&lance_schema)))
2236 }
2237
2238 async fn table_definition(&self) -> Result<TableDefinition> {
2239 let schema = self.schema().await?;
2240 TableDefinition::try_from_rich_schema(schema)
2241 }
2242
2243 async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
2244 let dataset = self.dataset.get().await?;
2245 match filter {
2246 None => Ok(dataset.count_rows(None).await?),
2247 Some(Filter::Sql(sql)) => Ok(dataset.count_rows(Some(sql)).await?),
2248 Some(Filter::Datafusion(_)) => Err(Error::NotSupported {
2249 message: "Datafusion filters are not yet supported".to_string(),
2250 }),
2251 }
2252 }
2253
2254 async fn add(&self, mut add: AddDataBuilder) -> Result<AddResult> {
2255 let table_def = self.table_definition().await?;
2256
2257 self.dataset.ensure_mutable()?;
2258 let ds_wrapper = self.dataset.clone();
2259 let ds = self.dataset.get().await?;
2260
2261 let table_schema = Schema::from(&ds.schema().clone());
2262
2263 let num_partitions = if let Some(parallelism) = add.write_parallelism {
2264 parallelism
2265 } else {
2266 let mut peeked = PeekedScannable::new(add.data);
2269 let n = if let Some(first_batch) = peeked.peek().await {
2270 let max_partitions = lance_core::utils::tokio::get_num_compute_intensive_cpus();
2271 estimate_write_partitions(
2272 first_batch.get_array_memory_size(),
2273 first_batch.num_rows(),
2274 peeked.num_rows(),
2275 max_partitions,
2276 )
2277 } else {
2278 1
2279 };
2280 add.data = Box::new(peeked);
2281 n
2282 };
2283
2284 let output = add.into_plan(&table_schema, &table_def)?;
2285
2286 let lance_params = output
2287 .write_options
2288 .lance_write_params
2289 .unwrap_or(WriteParams {
2290 mode: match output.mode {
2291 AddDataMode::Append => WriteMode::Append,
2292 AddDataMode::Overwrite => WriteMode::Overwrite,
2293 },
2294 ..Default::default()
2295 });
2296
2297 let plan = if num_partitions > 1 {
2299 Arc::new(
2300 datafusion_physical_plan::repartition::RepartitionExec::try_new(
2301 output.plan,
2302 datafusion_physical_plan::Partitioning::RoundRobinBatch(num_partitions),
2303 )?,
2304 ) as Arc<dyn ExecutionPlan>
2305 } else {
2306 output.plan
2307 };
2308
2309 let insert_exec = Arc::new(InsertExec::new(ds_wrapper.clone(), ds, plan, lance_params));
2310
2311 let tracker_for_tasks = output.tracker.clone();
2312 if let Some(ref t) = tracker_for_tasks {
2313 t.set_total_tasks(num_partitions);
2314 }
2315 let _finish = write_progress::FinishOnDrop(output.tracker);
2316
2317 let task_ctx = Arc::new(TaskContext::default());
2319 let handles = FuturesUnordered::new();
2320 for partition in 0..num_partitions {
2321 let exec = insert_exec.clone();
2322 let ctx = task_ctx.clone();
2323 let tracker = tracker_for_tasks.clone();
2324 handles.push(tokio::spawn(async move {
2325 let _guard = tracker.as_ref().map(|t| t.track_task());
2326 let mut stream = exec
2327 .execute(partition, ctx)
2328 .map_err(|e| -> Error { e.into() })?;
2329 while let Some(batch) = stream.next().await {
2330 batch.map_err(|e| -> Error { e.into() })?;
2331 }
2332 Ok::<_, Error>(())
2333 }));
2334 }
2335 for handle in handles {
2336 handle.await.map_err(|e| Error::Runtime {
2337 message: format!("Insert task panicked: {}", e),
2338 })??;
2339 }
2340
2341 let version = ds_wrapper.get().await?.manifest().version;
2342 Ok(AddResult { version })
2343 }
2344
2345 async fn create_index(&self, opts: IndexBuilder) -> Result<()> {
2346 if opts.columns.len() != 1 {
2347 return Err(Error::Schema {
2348 message: "Multi-column (composite) indices are not yet supported".to_string(),
2349 });
2350 }
2351 let schema = self.schema().await?;
2352
2353 let field = schema.field_with_name(&opts.columns[0])?;
2354
2355 let lance_idx_params = self.make_index_params(field, opts.index.clone()).await?;
2356 let index_type = self.get_index_type_for_field(field, &opts.index);
2357 let columns = [field.name().as_str()];
2358 self.dataset.ensure_mutable()?;
2359 let mut dataset = (*self.dataset.get().await?).clone();
2360 let mut builder = dataset
2361 .create_index_builder(&columns, index_type, lance_idx_params.as_ref())
2362 .train(opts.train)
2363 .replace(opts.replace);
2364
2365 if let Some(name) = opts.name {
2366 builder = builder.name(name);
2367 }
2368 builder.await?;
2369 self.dataset.update(dataset);
2370 Ok(())
2371 }
2372
2373 async fn drop_index(&self, index_name: &str) -> Result<()> {
2374 self.dataset.ensure_mutable()?;
2375 let mut dataset = (*self.dataset.get().await?).clone();
2376 dataset.drop_index(index_name).await?;
2377 self.dataset.update(dataset);
2378 Ok(())
2379 }
2380
2381 async fn prewarm_index(&self, index_name: &str) -> Result<()> {
2382 let dataset = self.dataset.get().await?;
2383 Ok(dataset.prewarm_index(index_name).await?)
2384 }
2385
2386 async fn prewarm_data(&self, _columns: Option<Vec<String>>) -> Result<()> {
2387 Err(Error::NotSupported {
2388 message: "prewarm_data is currently only supported on remote tables.".into(),
2389 })
2390 }
2391
2392 async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult> {
2393 update::execute_update(self, update).await
2395 }
2396
2397 async fn create_plan(
2398 &self,
2399 query: &AnyQuery,
2400 options: QueryExecutionOptions,
2401 ) -> Result<Arc<dyn ExecutionPlan>> {
2402 query::create_plan(self, query, options).await
2403 }
2404
2405 async fn query(
2406 &self,
2407 query: &AnyQuery,
2408 options: QueryExecutionOptions,
2409 ) -> Result<DatasetRecordBatchStream> {
2410 query::execute_query(self, query, options).await
2411 }
2412
2413 async fn analyze_plan(
2414 &self,
2415 query: &AnyQuery,
2416 options: QueryExecutionOptions,
2417 ) -> Result<String> {
2418 query::analyze_query_plan(self, query, options).await
2419 }
2420
2421 async fn merge_insert(
2422 &self,
2423 params: MergeInsertBuilder,
2424 new_data: Box<dyn RecordBatchReader + Send>,
2425 ) -> Result<MergeResult> {
2426 merge::execute_merge_insert(self, params, new_data).await
2427 }
2428
2429 async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
2431 delete::execute_delete(self, predicate).await
2433 }
2434
2435 async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
2436 Ok(Box::new(NativeTags {
2437 dataset: self.dataset.clone(),
2438 }))
2439 }
2440
2441 async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
2442 optimize::execute_optimize(self, action).await
2444 }
2445
2446 async fn add_columns(
2447 &self,
2448 transforms: NewColumnTransform,
2449 read_columns: Option<Vec<String>>,
2450 ) -> Result<AddColumnsResult> {
2451 schema_evolution::execute_add_columns(self, transforms, read_columns).await
2452 }
2453
2454 async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult> {
2455 schema_evolution::execute_alter_columns(self, alterations).await
2456 }
2457
2458 async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
2459 schema_evolution::execute_drop_columns(self, columns).await
2460 }
2461
2462 async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
2463 let dataset = self.dataset.get().await?;
2464 let indices = dataset.load_indices().await?;
2465 let results = futures::stream::iter(indices.as_slice()).then(|idx| async {
2466
2467 if idx.name == FRAG_REUSE_INDEX_NAME {
2469 return None;
2470 }
2471
2472 let stats = match dataset.index_statistics(idx.name.as_str()).await {
2473 Ok(stats) => stats,
2474 Err(e) => {
2475 log::warn!("Failed to get statistics for index {} ({}): {}", idx.name, idx.uuid, e);
2476 return None;
2477 }
2478 };
2479
2480 let stats: serde_json::Value = match serde_json::from_str(&stats) {
2481 Ok(stats) => stats,
2482 Err(e) => {
2483 log::warn!("Failed to deserialize index statistics for index {} ({}): {}", idx.name, idx.uuid, e);
2484 return None;
2485 }
2486 };
2487
2488 let Some(index_type) = stats.get("index_type").and_then(|v| v.as_str()) else {
2489 log::warn!("Index statistics was missing 'index_type' field for index {} ({})", idx.name, idx.uuid);
2490 return None;
2491 };
2492
2493 let index_type: crate::index::IndexType = match index_type.parse() {
2494 Ok(index_type) => index_type,
2495 Err(e) => {
2496 log::warn!("Failed to parse index type for index {} ({}): {}", idx.name, idx.uuid, e);
2497 return None;
2498 }
2499 };
2500
2501 let mut columns = Vec::with_capacity(idx.fields.len());
2502 for field_id in &idx.fields {
2503 let Some(field) = dataset.schema().field_by_id(*field_id) else {
2504 log::warn!("The index {} ({}) referenced a field with id {} which does not exist in the schema", idx.name, idx.uuid, field_id);
2505 return None;
2506 };
2507 columns.push(field.name.clone());
2508 }
2509
2510 let name = idx.name.clone();
2511 Some(IndexConfig { index_type, columns, name })
2512 }).collect::<Vec<_>>().await;
2513
2514 Ok(results.into_iter().flatten().collect())
2515 }
2516
2517 async fn uri(&self) -> Result<String> {
2518 Ok(self.uri.clone())
2519 }
2520
2521 async fn storage_options(&self) -> Option<HashMap<String, String>> {
2522 self.initial_storage_options().await
2523 }
2524
2525 async fn initial_storage_options(&self) -> Option<HashMap<String, String>> {
2526 self.dataset
2527 .get()
2528 .await
2529 .ok()
2530 .and_then(|dataset| dataset.initial_storage_options().cloned())
2531 }
2532
2533 async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>> {
2534 let dataset = self.dataset.get().await?;
2535 Ok(dataset.latest_storage_options().await?.map(|o| o.0))
2536 }
2537
2538 async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>> {
2539 let stats = match self
2540 .dataset
2541 .get()
2542 .await?
2543 .index_statistics(index_name.as_ref())
2544 .await
2545 {
2546 Ok(stats) => stats,
2547 Err(lance_core::Error::IndexNotFound { .. }) => return Ok(None),
2548 Err(e) => return Err(Error::from(e)),
2549 };
2550
2551 let mut stats: IndexStatisticsImpl =
2552 serde_json::from_str(&stats).map_err(|e| Error::InvalidInput {
2553 message: format!("error deserializing index statistics: {}", e),
2554 })?;
2555
2556 let first_index = stats.indices.pop().ok_or_else(|| Error::InvalidInput {
2557 message: "index statistics is empty".to_string(),
2558 })?;
2559 let index_type =
2561 stats
2562 .index_type
2563 .or(first_index.index_type)
2564 .ok_or_else(|| Error::InvalidInput {
2565 message: "index statistics was missing index type".to_string(),
2566 })?;
2567 let loss = stats
2568 .indices
2569 .iter()
2570 .map(|index| index.loss.unwrap_or_default())
2571 .sum::<f64>();
2572
2573 let loss = first_index.loss.map(|first_loss| first_loss + loss);
2574 Ok(Some(IndexStatistics {
2575 num_indexed_rows: stats.num_indexed_rows,
2576 num_unindexed_rows: stats.num_unindexed_rows,
2577 index_type,
2578 distance_type: first_index.metric_type,
2579 num_indices: stats.num_indices,
2580 loss,
2581 }))
2582 }
2583
2584 async fn wait_for_index(
2587 &self,
2588 index_names: &[&str],
2589 timeout: std::time::Duration,
2590 ) -> Result<()> {
2591 wait_for_index(self, index_names, timeout).await
2592 }
2593
2594 async fn stats(&self) -> Result<TableStatistics> {
2595 let num_rows = self.count_rows(None).await?;
2596 let num_indices = self.list_indices().await?.len();
2597 let ds = self.dataset.get().await?;
2598 let ds_clone = (*ds).clone();
2599 let ds_stats = Arc::new(ds_clone).calculate_data_stats().await?;
2600 let total_bytes = ds_stats.fields.iter().map(|f| f.bytes_on_disk).sum::<u64>() as usize;
2601
2602 let frags = ds.get_fragments();
2603 let mut sorted_sizes = join_all(
2604 frags
2605 .iter()
2606 .map(|frag| async move { frag.physical_rows().await.unwrap_or(0) }),
2607 )
2608 .await;
2609 sorted_sizes.sort();
2610
2611 let small_frag_threshold = 100000;
2612 let num_fragments = sorted_sizes.len();
2613 let num_small_fragments = sorted_sizes
2614 .iter()
2615 .filter(|&&size| size < small_frag_threshold)
2616 .count();
2617
2618 let p25 = *sorted_sizes.get(num_fragments / 4).unwrap_or(&0);
2619 let p50 = *sorted_sizes.get(num_fragments / 2).unwrap_or(&0);
2620 let p75 = *sorted_sizes.get(num_fragments * 3 / 4).unwrap_or(&0);
2621 let p99 = *sorted_sizes.get(num_fragments * 99 / 100).unwrap_or(&0);
2622 let min = sorted_sizes.first().copied().unwrap_or(0);
2623 let max = sorted_sizes.last().copied().unwrap_or(0);
2624 let mean = if num_fragments == 0 {
2625 0
2626 } else {
2627 sorted_sizes.iter().copied().sum::<usize>() / num_fragments
2628 };
2629
2630 let frag_stats = FragmentStatistics {
2631 num_fragments,
2632 num_small_fragments,
2633 lengths: FragmentSummaryStats {
2634 min,
2635 max,
2636 mean,
2637 p25,
2638 p50,
2639 p75,
2640 p99,
2641 },
2642 };
2643 let stats = TableStatistics {
2644 total_bytes,
2645 num_rows,
2646 num_indices,
2647 fragment_stats: frag_stats,
2648 };
2649 Ok(stats)
2650 }
2651
2652 async fn create_insert_exec(
2653 &self,
2654 input: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
2655 write_params: WriteParams,
2656 ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>> {
2657 let ds = self.dataset.get().await?;
2658 let dataset = Arc::new((*ds).clone());
2659 Ok(Arc::new(datafusion::insert::InsertExec::new(
2660 self.dataset.clone(),
2661 dataset,
2662 input,
2663 write_params,
2664 )))
2665 }
2666}
2667
2668#[skip_serializing_none]
2669#[derive(Debug, Deserialize, PartialEq)]
2670pub struct TableStatistics {
2671 pub total_bytes: usize,
2673
2674 pub num_rows: usize,
2676
2677 pub num_indices: usize,
2679
2680 pub fragment_stats: FragmentStatistics,
2682}
2683
2684#[skip_serializing_none]
2685#[derive(Debug, Deserialize, PartialEq)]
2686pub struct FragmentStatistics {
2687 pub num_fragments: usize,
2689
2690 pub num_small_fragments: usize,
2692
2693 pub lengths: FragmentSummaryStats,
2695 }
2699
2700#[skip_serializing_none]
2701#[derive(Debug, Deserialize, PartialEq)]
2702pub struct FragmentSummaryStats {
2703 pub min: usize,
2704 pub max: usize,
2705 pub mean: usize,
2706 pub p25: usize,
2707 pub p50: usize,
2708 pub p75: usize,
2709 pub p99: usize,
2710}
2711
2712#[cfg(test)]
2713#[allow(deprecated)]
2714mod tests {
2715 use std::sync::Arc;
2716 use std::sync::atomic::{AtomicBool, Ordering};
2717 use std::time::Duration;
2718
2719 use arrow_array::{
2720 Array, BooleanArray, FixedSizeListArray, Int32Array, LargeStringArray, RecordBatch,
2721 RecordBatchIterator, RecordBatchReader, StringArray,
2722 builder::{ListBuilder, StringBuilder},
2723 };
2724 use arrow_array::{BinaryArray, LargeBinaryArray};
2725 use arrow_data::ArrayDataBuilder;
2726 use arrow_schema::{DataType, Field, Schema};
2727 use futures::TryStreamExt;
2728 use lance::Dataset;
2729 use lance::io::{ObjectStoreParams, WrappingObjectStore};
2730 use tempfile::tempdir;
2731
2732 use super::*;
2733 use crate::connect;
2734 use crate::connection::ConnectBuilder;
2735 use crate::index::scalar::{BTreeIndexBuilder, BitmapIndexBuilder};
2736 use crate::index::vector::{IvfHnswPqIndexBuilder, IvfHnswSqIndexBuilder};
2737 use crate::query::Select;
2738 use crate::query::{ExecutableQuery, QueryBase};
2739 use crate::test_utils::connection::new_test_connection;
2740 #[tokio::test]
2741 async fn test_open() {
2742 let tmp_dir = tempdir().unwrap();
2743 let dataset_path = tmp_dir.path().join("test.lance");
2744
2745 let batch = make_test_batches();
2746 let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
2747 Dataset::write(reader, dataset_path.to_str().unwrap(), None)
2748 .await
2749 .unwrap();
2750
2751 let table = NativeTable::open(dataset_path.to_str().unwrap())
2752 .await
2753 .unwrap();
2754
2755 assert_eq!(table.name, "test")
2756 }
2757
2758 #[tokio::test]
2759 async fn test_open_not_found() {
2760 let tmp_dir = tempdir().unwrap();
2761 let uri = tmp_dir.path().to_str().unwrap();
2762 let table = NativeTable::open(uri).await;
2763 assert!(matches!(table.unwrap_err(), Error::TableNotFound { .. }));
2764 }
2765
2766 #[test]
2767 #[cfg(not(windows))]
2768 fn test_object_store_path() {
2769 use std::path::Path as StdPath;
2770 let p = StdPath::new("s3://bucket/path/to/file");
2771 let c = p.join("subfile");
2772 assert_eq!(c.to_str().unwrap(), "s3://bucket/path/to/file/subfile");
2773 }
2774
2775 #[tokio::test]
2776 async fn test_count_rows() {
2777 let tmp_dir = tempdir().unwrap();
2778 let uri = tmp_dir.path().to_str().unwrap();
2779
2780 let batch = make_test_batches();
2781 let reader: Box<dyn RecordBatchReader + Send> = Box::new(RecordBatchIterator::new(
2782 vec![Ok(batch.clone())],
2783 batch.schema(),
2784 ));
2785 let table = NativeTable::create(
2786 uri,
2787 "test",
2788 vec![],
2789 reader,
2790 None,
2791 None,
2792 None,
2793 None,
2794 HashSet::new(),
2795 )
2796 .await
2797 .unwrap();
2798
2799 assert_eq!(table.count_rows(None).await.unwrap(), 10);
2800 assert_eq!(
2801 table
2802 .count_rows(Some(Filter::Sql("i >= 5".to_string())))
2803 .await
2804 .unwrap(),
2805 5
2806 );
2807 }
2808
2809 #[derive(Default, Debug)]
2810 struct NoOpCacheWrapper {
2811 called: AtomicBool,
2812 }
2813
2814 impl NoOpCacheWrapper {
2815 fn called(&self) -> bool {
2816 self.called.load(Ordering::Relaxed)
2817 }
2818 }
2819
2820 impl WrappingObjectStore for NoOpCacheWrapper {
2821 fn wrap(
2822 &self,
2823 _store_prefix: &str,
2824 original: Arc<dyn object_store::ObjectStore>,
2825 ) -> Arc<dyn object_store::ObjectStore> {
2826 self.called.store(true, Ordering::Relaxed);
2827 original
2828 }
2829 }
2830
2831 #[tokio::test]
2832 async fn test_open_table_options() {
2833 let tmp_dir = tempdir().unwrap();
2834 let dataset_path = tmp_dir.path().join("test.lance");
2835 let uri = dataset_path.to_str().unwrap();
2836 let conn = connect(uri).execute().await.unwrap();
2837
2838 let batches = make_test_batches();
2839
2840 conn.create_table("my_table", batches)
2841 .execute()
2842 .await
2843 .unwrap();
2844
2845 let wrapper = Arc::new(NoOpCacheWrapper::default());
2846
2847 let object_store_params = ObjectStoreParams {
2848 object_store_wrapper: Some(wrapper.clone()),
2849 ..Default::default()
2850 };
2851 let param = ReadParams {
2852 store_options: Some(object_store_params),
2853 ..Default::default()
2854 };
2855 assert!(!wrapper.called());
2856 conn.open_table("my_table")
2857 .lance_read_params(param)
2858 .execute()
2859 .await
2860 .unwrap();
2861 assert!(wrapper.called());
2862 }
2863
2864 fn make_test_batches() -> RecordBatch {
2865 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
2866 RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from_iter_values(0..10))]).unwrap()
2867 }
2868
2869 #[tokio::test]
2870 async fn test_tags() {
2871 let tmp_dir = tempdir().unwrap();
2872 let uri = tmp_dir.path().to_str().unwrap();
2873
2874 let conn = ConnectBuilder::new(uri)
2875 .read_consistency_interval(Duration::from_secs(0))
2876 .execute()
2877 .await
2878 .unwrap();
2879 let table = conn
2880 .create_table("my_table", some_sample_data())
2881 .execute()
2882 .await
2883 .unwrap();
2884 assert_eq!(table.version().await.unwrap(), 1);
2885 table.add(some_sample_data()).execute().await.unwrap();
2886 assert_eq!(table.version().await.unwrap(), 2);
2887 let mut tags_manager = table.tags().await.unwrap();
2888 let tags = tags_manager.list().await.unwrap();
2889 assert!(tags.is_empty(), "Tags should be empty initially");
2890 let tag1 = "tag1";
2891 tags_manager.create(tag1, 1).await.unwrap();
2892 assert_eq!(tags_manager.get_version(tag1).await.unwrap(), 1);
2893 let tags = tags_manager.list().await.unwrap();
2894 assert_eq!(tags.len(), 1);
2895 assert!(tags.contains_key(tag1));
2896 assert_eq!(tags.get(tag1).unwrap().version, 1);
2897 tags_manager.create("tag2", 2).await.unwrap();
2898 assert_eq!(tags_manager.get_version("tag2").await.unwrap(), 2);
2899 let tags = tags_manager.list().await.unwrap();
2900 assert_eq!(tags.len(), 2);
2901 assert!(tags.contains_key(tag1));
2902 assert_eq!(tags.get(tag1).unwrap().version, 1);
2903 assert!(tags.contains_key("tag2"));
2904 assert_eq!(tags.get("tag2").unwrap().version, 2);
2905 table.add(some_sample_data()).execute().await.unwrap();
2907 tags_manager.update(tag1, 3).await.unwrap();
2908 assert_eq!(tags_manager.get_version(tag1).await.unwrap(), 3);
2909 tags_manager.delete("tag2").await.unwrap();
2910 let tags = tags_manager.list().await.unwrap();
2911 assert_eq!(tags.len(), 1);
2912 assert!(tags.contains_key(tag1));
2913 assert_eq!(tags.get(tag1).unwrap().version, 3);
2914 table.add(some_sample_data()).execute().await.unwrap();
2916 assert_eq!(table.version().await.unwrap(), 4);
2917 table.checkout_tag(tag1).await.unwrap();
2918 assert_eq!(table.version().await.unwrap(), 3);
2919 table.checkout_latest().await.unwrap();
2920 assert_eq!(table.version().await.unwrap(), 4);
2921 }
2922
2923 #[tokio::test]
2924 async fn test_create_index() {
2925 use arrow_array::RecordBatch;
2926 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
2927 use rand;
2928 use std::iter::repeat_with;
2929
2930 use arrow_array::Float32Array;
2931
2932 let tmp_dir = tempdir().unwrap();
2933 let uri = tmp_dir.path().to_str().unwrap();
2934 let conn = connect(uri).execute().await.unwrap();
2935
2936 let dimension = 16;
2937 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
2938 "embeddings",
2939 DataType::FixedSizeList(
2940 Arc::new(Field::new("item", DataType::Float32, true)),
2941 dimension,
2942 ),
2943 false,
2944 )]));
2945
2946 let float_arr = Float32Array::from(
2947 repeat_with(rand::random::<f32>)
2948 .take(512 * dimension as usize)
2949 .collect::<Vec<f32>>(),
2950 );
2951
2952 let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
2953 let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
2954
2955 let table = conn.create_table("test", batch).execute().await.unwrap();
2956
2957 assert_eq!(table.index_stats("my_index").await.unwrap(), None);
2958
2959 table
2960 .create_index(&["embeddings"], Index::Auto)
2961 .execute()
2962 .await
2963 .unwrap();
2964
2965 let index_configs = table.list_indices().await.unwrap();
2966 assert_eq!(index_configs.len(), 1);
2967 let index = index_configs.into_iter().next().unwrap();
2968 assert_eq!(index.index_type, crate::index::IndexType::IvfPq);
2969 assert_eq!(index.columns, vec!["embeddings".to_string()]);
2970 assert_eq!(table.count_rows(None).await.unwrap(), 512);
2971 assert_eq!(table.name(), "test");
2972
2973 let indices = table.as_native().unwrap().load_indices().await.unwrap();
2974 let index_name = &indices[0].index_name;
2975 let stats = table.index_stats(index_name).await.unwrap().unwrap();
2976 assert_eq!(stats.num_indexed_rows, 512);
2977 assert_eq!(stats.num_unindexed_rows, 0);
2978 assert_eq!(stats.index_type, crate::index::IndexType::IvfPq);
2979 assert_eq!(stats.distance_type, Some(crate::DistanceType::L2));
2980 assert!(stats.loss.is_some());
2981
2982 table.drop_index(index_name).await.unwrap();
2983 assert_eq!(table.list_indices().await.unwrap().len(), 0);
2984 }
2985
2986 #[tokio::test]
2987 async fn test_dynamic_select() {
2988 let tc = new_test_connection().await.unwrap();
2989 let db = tc.connection;
2990
2991 let table = db
2992 .create_table("test", some_sample_data())
2993 .execute()
2994 .await
2995 .unwrap();
2996
2997 let query = table.query().select(Select::dynamic(&[("i_alias", "i")]));
2998
2999 let result = query.execute().await;
3000 let batches = result
3001 .expect("should have result")
3002 .try_collect::<Vec<_>>()
3003 .await
3004 .unwrap();
3005
3006 for batch in batches {
3007 assert!(batch.column_by_name("i_alias").is_some());
3008 }
3009 }
3010
3011 #[tokio::test]
3012 async fn test_ivf_pq_uses_default_partition_size_for_num_partitions() {
3013 use arrow_array::{Float32Array, RecordBatch};
3014 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3015
3016 use crate::index::vector::IvfPqIndexBuilder;
3017
3018 let tmp_dir = tempdir().unwrap();
3019 let uri = tmp_dir.path().to_str().unwrap();
3020 let conn = connect(uri).execute().await.unwrap();
3021
3022 const PARTITION_SIZE: usize = 8192;
3023 let num_rows = PARTITION_SIZE * 2;
3024 let dimension = 8usize;
3025 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3026 "embeddings",
3027 DataType::FixedSizeList(
3028 Arc::new(Field::new("item", DataType::Float32, true)),
3029 dimension as i32,
3030 ),
3031 false,
3032 )]));
3033
3034 let float_arr =
3035 Float32Array::from_iter_values((0..(num_rows * dimension)).map(|v| v as f32));
3036 let vectors = Arc::new(create_fixed_size_list(float_arr, dimension as i32).unwrap());
3037 let batch = RecordBatch::try_new(schema.clone(), vec![vectors]).unwrap();
3038
3039 let table = conn.create_table("test", batch).execute().await.unwrap();
3040 let native_table = table.as_native().unwrap();
3041 let builder = IvfPqIndexBuilder::default();
3042 table
3043 .create_index(&["embeddings"], Index::IvfPq(builder))
3044 .execute()
3045 .await
3046 .unwrap();
3047 table
3048 .wait_for_index(&["embeddings_idx"], std::time::Duration::from_secs(30))
3049 .await
3050 .unwrap();
3051
3052 use lance::index::DatasetIndexInternalExt;
3053 use lance::index::vector::ivf::v2::IvfPq as LanceIvfPq;
3054 use lance_index::metrics::NoOpMetricsCollector;
3055 use lance_index::vector::VectorIndex as LanceVectorIndex;
3056
3057 let indices = native_table.load_indices().await.unwrap();
3058 let index_uuid = indices[0].index_uuid.clone();
3059
3060 let dataset_guard = native_table.dataset.get().await.unwrap();
3061 let dataset = (*dataset_guard).clone();
3062 drop(dataset_guard);
3063
3064 let lance_index = dataset
3065 .open_vector_index("embeddings", &index_uuid, &NoOpMetricsCollector)
3066 .await
3067 .unwrap();
3068 let ivf_index = lance_index
3069 .as_any()
3070 .downcast_ref::<LanceIvfPq>()
3071 .expect("expected IvfPq index");
3072 let partition_count = ivf_index.ivf_model().num_partitions();
3073
3074 let expected_partitions = num_rows / PARTITION_SIZE;
3075 assert_eq!(partition_count, expected_partitions);
3076 }
3077
3078 #[tokio::test]
3079 async fn test_create_index_ivf_hnsw_sq() {
3080 use arrow_array::RecordBatch;
3081 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3082 use rand;
3083 use std::iter::repeat_with;
3084
3085 use arrow_array::Float32Array;
3086
3087 let tmp_dir = tempdir().unwrap();
3088 let uri = tmp_dir.path().to_str().unwrap();
3089 let conn = connect(uri).execute().await.unwrap();
3090
3091 let dimension = 16;
3092 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3093 "embeddings",
3094 DataType::FixedSizeList(
3095 Arc::new(Field::new("item", DataType::Float32, true)),
3096 dimension,
3097 ),
3098 false,
3099 )]));
3100
3101 let float_arr = Float32Array::from(
3102 repeat_with(rand::random::<f32>)
3103 .take(512 * dimension as usize)
3104 .collect::<Vec<f32>>(),
3105 );
3106
3107 let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3108 let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
3109
3110 let table = conn.create_table("test", batch).execute().await.unwrap();
3111
3112 let stats = table.index_stats("my_index").await.unwrap();
3113 assert!(stats.is_none());
3114
3115 let index = IvfHnswSqIndexBuilder::default();
3116 table
3117 .create_index(&["embeddings"], Index::IvfHnswSq(index))
3118 .execute()
3119 .await
3120 .unwrap();
3121
3122 let index_configs = table.list_indices().await.unwrap();
3123 assert_eq!(index_configs.len(), 1);
3124 let index = index_configs.into_iter().next().unwrap();
3125 assert_eq!(index.index_type, crate::index::IndexType::IvfHnswSq);
3126 assert_eq!(index.columns, vec!["embeddings".to_string()]);
3127 assert_eq!(table.count_rows(None).await.unwrap(), 512);
3128 assert_eq!(table.name(), "test");
3129
3130 let indices = table.as_native().unwrap().load_indices().await.unwrap();
3131 let index_name = &indices[0].index_name;
3132 let stats = table.index_stats(index_name).await.unwrap().unwrap();
3133 assert_eq!(stats.num_indexed_rows, 512);
3134 assert_eq!(stats.num_unindexed_rows, 0);
3135 }
3136
3137 #[tokio::test]
3138 async fn test_create_index_ivf_hnsw_pq() {
3139 use arrow_array::RecordBatch;
3140 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3141 use rand;
3142 use std::iter::repeat_with;
3143
3144 use arrow_array::Float32Array;
3145
3146 let tmp_dir = tempdir().unwrap();
3147 let uri = tmp_dir.path().to_str().unwrap();
3148 let conn = connect(uri).execute().await.unwrap();
3149
3150 let dimension = 16;
3151 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3152 "embeddings",
3153 DataType::FixedSizeList(
3154 Arc::new(Field::new("item", DataType::Float32, true)),
3155 dimension,
3156 ),
3157 false,
3158 )]));
3159
3160 let float_arr = Float32Array::from(
3161 repeat_with(rand::random::<f32>)
3162 .take(512 * dimension as usize)
3163 .collect::<Vec<f32>>(),
3164 );
3165
3166 let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3167 let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
3168
3169 let table = conn.create_table("test", batch).execute().await.unwrap();
3170 let stats = table.index_stats("my_index").await.unwrap();
3171 assert!(stats.is_none());
3172
3173 let index = IvfHnswPqIndexBuilder::default();
3174 table
3175 .create_index(&["embeddings"], Index::IvfHnswPq(index))
3176 .execute()
3177 .await
3178 .unwrap();
3179 table
3180 .wait_for_index(&["embeddings_idx"], Duration::from_millis(10))
3181 .await
3182 .unwrap();
3183 let index_configs = table.list_indices().await.unwrap();
3184 assert_eq!(index_configs.len(), 1);
3185 let index = index_configs.into_iter().next().unwrap();
3186 assert_eq!(index.index_type, crate::index::IndexType::IvfHnswPq);
3187 assert_eq!(index.columns, vec!["embeddings".to_string()]);
3188 assert_eq!(table.count_rows(None).await.unwrap(), 512);
3189 assert_eq!(table.name(), "test");
3190
3191 let indices: Vec<VectorIndex> = table.as_native().unwrap().load_indices().await.unwrap();
3192 let index_name = &indices[0].index_name;
3193 let stats = table.index_stats(index_name).await.unwrap().unwrap();
3194 assert_eq!(stats.num_indexed_rows, 512);
3195 assert_eq!(stats.num_unindexed_rows, 0);
3196 }
3197
3198 #[tokio::test]
3199 async fn test_create_index_ivf_hnsw_flat() {
3200 use arrow_array::RecordBatch;
3201 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3202 use rand;
3203 use std::iter::repeat_with;
3204
3205 use crate::index::vector::IvfHnswFlatIndexBuilder;
3206 use arrow_array::Float32Array;
3207
3208 let tmp_dir = tempdir().unwrap();
3209 let uri = tmp_dir.path().to_str().unwrap();
3210 let conn = connect(uri).execute().await.unwrap();
3211
3212 let dimension = 16;
3213 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3214 "embeddings",
3215 DataType::FixedSizeList(
3216 Arc::new(Field::new("item", DataType::Float32, true)),
3217 dimension,
3218 ),
3219 false,
3220 )]));
3221
3222 let float_arr = Float32Array::from(
3223 repeat_with(rand::random::<f32>)
3224 .take(512 * dimension as usize)
3225 .collect::<Vec<f32>>(),
3226 );
3227
3228 let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3229 let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
3230
3231 let table = conn.create_table("test", batch).execute().await.unwrap();
3232
3233 let index = IvfHnswFlatIndexBuilder::default();
3234 table
3235 .create_index(&["embeddings"], Index::IvfHnswFlat(index))
3236 .execute()
3237 .await
3238 .unwrap();
3239
3240 let index_configs = table.list_indices().await.unwrap();
3241 assert_eq!(index_configs.len(), 1);
3242 let index = index_configs.into_iter().next().unwrap();
3243 assert_eq!(index.index_type, crate::index::IndexType::IvfHnswFlat);
3244 assert_eq!(index.columns, vec!["embeddings".to_string()]);
3245 assert_eq!(table.count_rows(None).await.unwrap(), 512);
3246 }
3247
3248 fn create_fixed_size_list<T: Array>(values: T, list_size: i32) -> Result<FixedSizeListArray> {
3249 let list_type = DataType::FixedSizeList(
3250 Arc::new(Field::new("item", values.data_type().clone(), true)),
3251 list_size,
3252 );
3253 let data = ArrayDataBuilder::new(list_type)
3254 .len(values.len() / list_size as usize)
3255 .add_child_data(values.into_data())
3256 .build()
3257 .unwrap();
3258
3259 Ok(FixedSizeListArray::from(data))
3260 }
3261
3262 fn some_sample_data() -> Box<dyn arrow_array::RecordBatchReader + Send> {
3263 let batch = RecordBatch::try_new(
3264 Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
3265 vec![Arc::new(Int32Array::from(vec![1]))],
3266 )
3267 .unwrap();
3268 let schema = batch.schema().clone();
3269 let batch = Ok(batch);
3270
3271 Box::new(RecordBatchIterator::new(vec![batch], schema))
3272 }
3273
3274 #[tokio::test]
3275 async fn test_create_scalar_index() {
3276 let tmp_dir = tempdir().unwrap();
3277 let uri = tmp_dir.path().to_str().unwrap();
3278
3279 let batch = RecordBatch::try_new(
3280 Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
3281 vec![Arc::new(Int32Array::from(vec![1]))],
3282 )
3283 .unwrap();
3284 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3285 let table = conn
3286 .create_table("my_table", batch.clone())
3287 .execute()
3288 .await
3289 .unwrap();
3290
3291 table
3293 .create_index(&["i"], Index::Auto)
3294 .execute()
3295 .await
3296 .unwrap();
3297 table
3298 .wait_for_index(&["i_idx"], Duration::from_millis(10))
3299 .await
3300 .unwrap();
3301 let index_configs = table.list_indices().await.unwrap();
3302 assert_eq!(index_configs.len(), 1);
3303 let index = index_configs.into_iter().next().unwrap();
3304 assert_eq!(index.index_type, crate::index::IndexType::BTree);
3305 assert_eq!(index.columns, vec!["i".to_string()]);
3306
3307 table
3309 .create_index(&["i"], Index::BTree(BTreeIndexBuilder::default()))
3310 .execute()
3311 .await
3312 .unwrap();
3313
3314 let index_configs = table.list_indices().await.unwrap();
3315 assert_eq!(index_configs.len(), 1);
3316 let index = index_configs.into_iter().next().unwrap();
3317 assert_eq!(index.index_type, crate::index::IndexType::BTree);
3318 assert_eq!(index.columns, vec!["i".to_string()]);
3319
3320 let indices = table.as_native().unwrap().load_indices().await.unwrap();
3321 let index_name = &indices[0].index_name;
3322 let stats = table.index_stats(index_name).await.unwrap().unwrap();
3323 assert_eq!(stats.num_indexed_rows, 1);
3324 assert_eq!(stats.num_unindexed_rows, 0);
3325 }
3326
3327 #[tokio::test]
3328 async fn test_create_bitmap_index() {
3329 let tmp_dir = tempdir().unwrap();
3330 let uri = tmp_dir.path().to_str().unwrap();
3331
3332 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3333
3334 let schema = Arc::new(Schema::new(vec![
3335 Field::new("id", DataType::Int32, false),
3336 Field::new("category", DataType::Utf8, true),
3337 Field::new("large_category", DataType::LargeUtf8, true),
3338 Field::new("is_active", DataType::Boolean, true),
3339 Field::new("data", DataType::Binary, true),
3340 Field::new("large_data", DataType::LargeBinary, true),
3341 ]));
3342
3343 let batch = RecordBatch::try_new(
3344 schema.clone(),
3345 vec![
3346 Arc::new(Int32Array::from_iter_values(0..100)),
3347 Arc::new(StringArray::from_iter_values(
3348 (0..100).map(|i| format!("category_{}", i % 5)),
3349 )),
3350 Arc::new(LargeStringArray::from_iter_values(
3351 (0..100).map(|i| format!("large_category_{}", i % 5)),
3352 )),
3353 Arc::new(BooleanArray::from_iter((0..100).map(|i| Some(i % 2 == 0)))),
3354 Arc::new(BinaryArray::from_iter_values(
3355 (0_u32..100).map(|i| i.to_le_bytes()),
3356 )),
3357 Arc::new(LargeBinaryArray::from_iter_values(
3358 (0_u32..100).map(|i| i.to_le_bytes()),
3359 )),
3360 ],
3361 )
3362 .unwrap();
3363
3364 let table = conn
3365 .create_table("test_bitmap", batch.clone())
3366 .execute()
3367 .await
3368 .unwrap();
3369
3370 table
3372 .create_index(&["category"], Index::Bitmap(Default::default()))
3373 .execute()
3374 .await
3375 .unwrap();
3376
3377 table
3379 .create_index(&["is_active"], Index::Bitmap(Default::default()))
3380 .execute()
3381 .await
3382 .unwrap();
3383
3384 table
3386 .create_index(&["data"], Index::Bitmap(Default::default()))
3387 .execute()
3388 .await
3389 .unwrap();
3390
3391 table
3393 .create_index(&["large_data"], Index::Bitmap(Default::default()))
3394 .execute()
3395 .await
3396 .unwrap();
3397
3398 table
3400 .create_index(&["large_category"], Index::Bitmap(Default::default()))
3401 .execute()
3402 .await
3403 .unwrap();
3404
3405 let index_configs = table.list_indices().await.unwrap();
3407 assert_eq!(index_configs.len(), 5);
3408
3409 let mut configs_iter = index_configs.into_iter();
3410 let index = configs_iter.next().unwrap();
3411 assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
3412 assert_eq!(index.columns, vec!["category".to_string()]);
3413
3414 let index = configs_iter.next().unwrap();
3415 assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
3416 assert_eq!(index.columns, vec!["is_active".to_string()]);
3417
3418 let index = configs_iter.next().unwrap();
3419 assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
3420 assert_eq!(index.columns, vec!["data".to_string()]);
3421
3422 let index = configs_iter.next().unwrap();
3423 assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
3424 assert_eq!(index.columns, vec!["large_data".to_string()]);
3425
3426 let index = configs_iter.next().unwrap();
3427 assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
3428 assert_eq!(index.columns, vec!["large_category".to_string()]);
3429 }
3430
3431 #[tokio::test]
3432 async fn test_create_label_list_index() {
3433 let tmp_dir = tempdir().unwrap();
3434 let uri = tmp_dir.path().to_str().unwrap();
3435
3436 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3437
3438 let schema = Arc::new(Schema::new(vec![
3439 Field::new("id", DataType::Int32, false),
3440 Field::new(
3441 "tags",
3442 DataType::List(Field::new("item", DataType::Utf8, true).into()),
3443 true,
3444 ),
3445 ]));
3446
3447 const TAGS: [&str; 3] = ["cat", "dog", "fish"];
3448
3449 let values_builder = StringBuilder::new();
3450 let mut builder = ListBuilder::new(values_builder);
3451 for i in 0..120 {
3452 builder.values().append_value(TAGS[i % 3]);
3453 if i % 3 == 0 {
3454 builder.append(true)
3455 }
3456 }
3457 let tags = Arc::new(builder.finish());
3458
3459 let batch = RecordBatch::try_new(
3460 schema.clone(),
3461 vec![Arc::new(Int32Array::from_iter_values(0..40)), tags],
3462 )
3463 .unwrap();
3464
3465 let table = conn
3466 .create_table("test_bitmap", batch.clone())
3467 .execute()
3468 .await
3469 .unwrap();
3470
3471 assert!(
3473 table
3474 .create_index(&["tags"], Index::BTree(Default::default()))
3475 .execute()
3476 .await
3477 .is_err()
3478 );
3479 assert!(
3480 table
3481 .create_index(&["tags"], Index::Bitmap(Default::default()))
3482 .execute()
3483 .await
3484 .is_err()
3485 );
3486
3487 table
3489 .create_index(&["tags"], Index::LabelList(Default::default()))
3490 .execute()
3491 .await
3492 .unwrap();
3493
3494 let index_configs = table.list_indices().await.unwrap();
3496 assert_eq!(index_configs.len(), 1);
3497 let index = index_configs.into_iter().next().unwrap();
3498 assert_eq!(index.index_type, crate::index::IndexType::LabelList);
3499 assert_eq!(index.columns, vec!["tags".to_string()]);
3500 }
3501
3502 #[tokio::test]
3503 async fn test_create_inverted_index() {
3504 let tmp_dir = tempdir().unwrap();
3505 let uri = tmp_dir.path().to_str().unwrap();
3506
3507 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3508 const WORDS: [&str; 3] = ["cat", "dog", "fish"];
3509 let mut text_builder = StringBuilder::new();
3510 let num_rows = 120;
3511 for i in 0..num_rows {
3512 text_builder.append_value(WORDS[i % 3]);
3513 }
3514 let text = Arc::new(text_builder.finish());
3515
3516 let schema = Arc::new(Schema::new(vec![
3517 Field::new("id", DataType::Int32, false),
3518 Field::new("text", DataType::Utf8, true),
3519 ]));
3520 let batch = RecordBatch::try_new(
3521 schema.clone(),
3522 vec![
3523 Arc::new(Int32Array::from_iter_values(0..num_rows as i32)),
3524 text,
3525 ],
3526 )
3527 .unwrap();
3528
3529 let table = conn
3530 .create_table("test_bitmap", batch.clone())
3531 .execute()
3532 .await
3533 .unwrap();
3534
3535 table
3536 .create_index(&["text"], Index::FTS(Default::default()))
3537 .execute()
3538 .await
3539 .unwrap();
3540 let index_configs = table.list_indices().await.unwrap();
3541 assert_eq!(index_configs.len(), 1);
3542 let index = index_configs.into_iter().next().unwrap();
3543 assert_eq!(index.index_type, crate::index::IndexType::FTS);
3544 assert_eq!(index.columns, vec!["text".to_string()]);
3545 assert_eq!(index.name, "text_idx");
3546
3547 let stats = table.index_stats("text_idx").await.unwrap().unwrap();
3548 assert_eq!(stats.num_indexed_rows, num_rows);
3549 assert_eq!(stats.num_unindexed_rows, 0);
3550 assert_eq!(stats.index_type, crate::index::IndexType::FTS);
3551 assert_eq!(stats.distance_type, None);
3552
3553 table.prewarm_index("text_idx").await.unwrap();
3555 }
3556
3557 #[cfg(not(target_os = "windows"))]
3559 #[tokio::test]
3560 async fn test_read_consistency_interval() {
3561 let intervals = vec![
3562 None,
3563 Some(0),
3564 Some(100), ];
3566
3567 for interval in intervals {
3568 let data = some_sample_data();
3569
3570 let tmp_dir = tempdir().unwrap();
3571 let uri = tmp_dir.path().to_str().unwrap();
3572
3573 let conn1 = ConnectBuilder::new(uri).execute().await.unwrap();
3574 let table1 = conn1
3575 .create_empty_table("my_table", RecordBatchReader::schema(&data))
3576 .execute()
3577 .await
3578 .unwrap();
3579
3580 let mut conn2 = ConnectBuilder::new(uri);
3581 if let Some(interval) = interval {
3582 conn2 = conn2.read_consistency_interval(std::time::Duration::from_millis(interval));
3583 }
3584 let conn2 = conn2.execute().await.unwrap();
3585 let table2 = conn2.open_table("my_table").execute().await.unwrap();
3586
3587 assert_eq!(table1.count_rows(None).await.unwrap(), 0);
3588 assert_eq!(table2.count_rows(None).await.unwrap(), 0);
3589
3590 table1.add(data).execute().await.unwrap();
3591 assert_eq!(table1.count_rows(None).await.unwrap(), 1);
3592
3593 match interval {
3594 None => {
3595 assert_eq!(table2.count_rows(None).await.unwrap(), 0);
3596 table2.checkout_latest().await.unwrap();
3597 assert_eq!(table2.count_rows(None).await.unwrap(), 1);
3598 }
3599 Some(0) => {
3600 assert_eq!(table2.count_rows(None).await.unwrap(), 1);
3601 }
3602 Some(100) => {
3603 assert_eq!(table2.count_rows(None).await.unwrap(), 0);
3604 tokio::time::sleep(Duration::from_millis(100)).await;
3605 assert_eq!(table2.count_rows(None).await.unwrap(), 1);
3606 }
3607 _ => unreachable!(),
3608 }
3609 }
3610 }
3611
3612 #[tokio::test]
3613 async fn test_time_travel_write() {
3614 let tmp_dir = tempdir().unwrap();
3615 let uri = tmp_dir.path().to_str().unwrap();
3616
3617 let conn = ConnectBuilder::new(uri)
3618 .read_consistency_interval(Duration::from_secs(0))
3619 .execute()
3620 .await
3621 .unwrap();
3622 let table = conn
3623 .create_table("my_table", some_sample_data())
3624 .execute()
3625 .await
3626 .unwrap();
3627 let version = table.version().await.unwrap();
3628 table.add(some_sample_data()).execute().await.unwrap();
3629 table.checkout(version).await.unwrap();
3630 assert!(table.add(some_sample_data()).execute().await.is_err())
3631 }
3632
3633 #[tokio::test]
3634 async fn test_update_dataset_config() {
3635 let tmp_dir = tempdir().unwrap();
3636 let uri = tmp_dir.path().to_str().unwrap();
3637
3638 let conn = ConnectBuilder::new(uri)
3639 .read_consistency_interval(Duration::from_secs(0))
3640 .execute()
3641 .await
3642 .unwrap();
3643
3644 let table = conn
3645 .create_table("my_table", some_sample_data())
3646 .execute()
3647 .await
3648 .unwrap();
3649 let native_tbl = table.as_native().unwrap();
3650
3651 let manifest = native_tbl.manifest().await.unwrap();
3652 let base_config_len = manifest.config.len();
3653
3654 native_tbl
3655 .update_config(vec![("test_key1".to_string(), "test_val1".to_string())])
3656 .await
3657 .unwrap();
3658
3659 let manifest = native_tbl.manifest().await.unwrap();
3660 assert_eq!(manifest.config.len(), 1 + base_config_len);
3661 assert_eq!(
3662 manifest.config.get("test_key1"),
3663 Some(&"test_val1".to_string())
3664 );
3665
3666 native_tbl
3667 .update_config(vec![("test_key2".to_string(), "test_val2".to_string())])
3668 .await
3669 .unwrap();
3670 let manifest = native_tbl.manifest().await.unwrap();
3671 assert_eq!(manifest.config.len(), 2 + base_config_len);
3672 assert_eq!(
3673 manifest.config.get("test_key1"),
3674 Some(&"test_val1".to_string())
3675 );
3676 assert_eq!(
3677 manifest.config.get("test_key2"),
3678 Some(&"test_val2".to_string())
3679 );
3680
3681 native_tbl
3682 .update_config(vec![(
3683 "test_key2".to_string(),
3684 "test_val2_update".to_string(),
3685 )])
3686 .await
3687 .unwrap();
3688 let manifest = native_tbl.manifest().await.unwrap();
3689 assert_eq!(manifest.config.len(), 2 + base_config_len);
3690 assert_eq!(
3691 manifest.config.get("test_key1"),
3692 Some(&"test_val1".to_string())
3693 );
3694 assert_eq!(
3695 manifest.config.get("test_key2"),
3696 Some(&"test_val2_update".to_string())
3697 );
3698
3699 native_tbl.delete_config_keys(&["test_key1"]).await.unwrap();
3700 let manifest = native_tbl.manifest().await.unwrap();
3701 assert_eq!(manifest.config.len(), 1 + base_config_len);
3702 assert_eq!(
3703 manifest.config.get("test_key2"),
3704 Some(&"test_val2_update".to_string())
3705 );
3706 }
3707
3708 #[tokio::test]
3709 async fn test_schema_metadata_config() {
3710 let tmp_dir = tempdir().unwrap();
3711 let uri = tmp_dir.path().to_str().unwrap();
3712
3713 let conn = ConnectBuilder::new(uri)
3714 .read_consistency_interval(Duration::from_secs(0))
3715 .execute()
3716 .await
3717 .unwrap();
3718 let table = conn
3719 .create_table("my_table", some_sample_data())
3720 .execute()
3721 .await
3722 .unwrap();
3723
3724 let native_tbl = table.as_native().unwrap();
3725 let schema = native_tbl.schema().await.unwrap();
3726 let metadata = schema.metadata();
3727 assert_eq!(metadata.len(), 0);
3728
3729 native_tbl
3730 .replace_schema_metadata(vec![("test_key1".to_string(), "test_val1".to_string())])
3731 .await
3732 .unwrap();
3733
3734 let schema = native_tbl.schema().await.unwrap();
3735 let metadata = schema.metadata();
3736 assert_eq!(metadata.len(), 1);
3737 assert_eq!(metadata.get("test_key1"), Some(&"test_val1".to_string()));
3738
3739 native_tbl
3740 .replace_schema_metadata(vec![
3741 ("test_key1".to_string(), "test_val1_update".to_string()),
3742 ("test_key2".to_string(), "test_val2".to_string()),
3743 ])
3744 .await
3745 .unwrap();
3746 let schema = native_tbl.schema().await.unwrap();
3747 let metadata = schema.metadata();
3748 assert_eq!(metadata.len(), 2);
3749 assert_eq!(
3750 metadata.get("test_key1"),
3751 Some(&"test_val1_update".to_string())
3752 );
3753 assert_eq!(metadata.get("test_key2"), Some(&"test_val2".to_string()));
3754
3755 native_tbl
3756 .replace_schema_metadata(vec![(
3757 "test_key2".to_string(),
3758 "test_val2_update".to_string(),
3759 )])
3760 .await
3761 .unwrap();
3762 let schema = native_tbl.schema().await.unwrap();
3763 let metadata = schema.metadata();
3764 assert_eq!(
3765 metadata.get("test_key2"),
3766 Some(&"test_val2_update".to_string())
3767 );
3768 }
3769
3770 #[tokio::test]
3771 pub async fn test_field_metadata_update() {
3772 let tmp_dir = tempdir().unwrap();
3773 let uri = tmp_dir.path().to_str().unwrap();
3774
3775 let conn = ConnectBuilder::new(uri)
3776 .read_consistency_interval(Duration::from_secs(0))
3777 .execute()
3778 .await
3779 .unwrap();
3780 let table = conn
3781 .create_table("my_table", some_sample_data())
3782 .execute()
3783 .await
3784 .unwrap();
3785
3786 let native_tbl = table.as_native().unwrap();
3787 let schema = native_tbl.manifest().await.unwrap().schema;
3788
3789 let field = schema.field("i").unwrap();
3790 assert_eq!(field.metadata.len(), 0);
3791
3792 native_tbl
3793 .replace_schema_metadata(vec![(
3794 "test_key2".to_string(),
3795 "test_val2_update".to_string(),
3796 )])
3797 .await
3798 .unwrap();
3799
3800 let schema = native_tbl.schema().await.unwrap();
3801 let metadata = schema.metadata();
3802 assert_eq!(metadata.len(), 1);
3803 assert_eq!(
3804 metadata.get("test_key2"),
3805 Some(&"test_val2_update".to_string())
3806 );
3807
3808 let mut new_field_metadata = HashMap::<String, String>::new();
3809 new_field_metadata.insert("test_field_key1".into(), "test_field_val1".into());
3810 native_tbl
3811 .replace_field_metadata(vec![(field.id as u32, new_field_metadata)])
3812 .await
3813 .unwrap();
3814
3815 let schema = native_tbl.manifest().await.unwrap().schema;
3816 let field = schema.field("i").unwrap();
3817 assert_eq!(field.metadata.len(), 1);
3818 assert_eq!(
3819 field.metadata.get("test_field_key1"),
3820 Some(&"test_field_val1".to_string())
3821 );
3822 }
3823
3824 #[tokio::test]
3825 pub async fn test_stats() {
3826 let tmp_dir = tempdir().unwrap();
3827 let uri = tmp_dir.path().to_str().unwrap();
3828
3829 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3830
3831 let schema = Arc::new(Schema::new(vec![
3832 Field::new("id", DataType::Int32, false),
3833 Field::new("foo", DataType::Int32, true),
3834 ]));
3835 let batch = RecordBatch::try_new(
3836 schema.clone(),
3837 vec![
3838 Arc::new(Int32Array::from_iter_values(0..100)),
3839 Arc::new(Int32Array::from_iter_values(0..100)),
3840 ],
3841 )
3842 .unwrap();
3843
3844 let table = conn
3845 .create_table("test_stats", batch.clone())
3846 .execute()
3847 .await
3848 .unwrap();
3849 for _ in 0..10 {
3850 let batch = RecordBatch::try_new(
3851 schema.clone(),
3852 vec![
3853 Arc::new(Int32Array::from_iter_values(0..15)),
3854 Arc::new(Int32Array::from_iter_values(0..15)),
3855 ],
3856 )
3857 .unwrap();
3858 table.add(batch.clone()).execute().await.unwrap();
3859 }
3860
3861 let empty_table = conn
3862 .create_table("test_stats_empty", RecordBatch::new_empty(batch.schema()))
3863 .execute()
3864 .await
3865 .unwrap();
3866
3867 let res = table.stats().await.unwrap();
3868 println!("{:#?}", res);
3869 assert_eq!(
3870 res,
3871 TableStatistics {
3872 num_rows: 250,
3873 num_indices: 0,
3874 total_bytes: 2300,
3875 fragment_stats: FragmentStatistics {
3876 num_fragments: 11,
3877 num_small_fragments: 11,
3878 lengths: FragmentSummaryStats {
3879 min: 15,
3880 max: 100,
3881 mean: 22,
3882 p25: 15,
3883 p50: 15,
3884 p75: 15,
3885 p99: 100,
3886 },
3887 },
3888 }
3889 );
3890 let res = empty_table.stats().await.unwrap();
3891 println!("{:#?}", res);
3892 assert_eq!(
3893 res,
3894 TableStatistics {
3895 num_rows: 0,
3896 num_indices: 0,
3897 total_bytes: 0,
3898 fragment_stats: FragmentStatistics {
3899 num_fragments: 0,
3900 num_small_fragments: 0,
3901 lengths: FragmentSummaryStats {
3902 min: 0,
3903 max: 0,
3904 mean: 0,
3905 p25: 0,
3906 p50: 0,
3907 p75: 0,
3908 p99: 0,
3909 },
3910 },
3911 }
3912 )
3913 }
3914
3915 #[tokio::test]
3916 pub async fn test_list_indices_skip_frag_reuse() {
3917 let tmp_dir = tempdir().unwrap();
3918 let uri = tmp_dir.path().to_str().unwrap();
3919
3920 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
3921
3922 let schema = Arc::new(Schema::new(vec![
3923 Field::new("id", DataType::Int32, false),
3924 Field::new("foo", DataType::Int32, true),
3925 ]));
3926 let batch = RecordBatch::try_new(
3927 schema.clone(),
3928 vec![
3929 Arc::new(Int32Array::from_iter_values(0..100)),
3930 Arc::new(Int32Array::from_iter_values(0..100)),
3931 ],
3932 )
3933 .unwrap();
3934
3935 let table = conn
3936 .create_table("test_list_indices_skip_frag_reuse", batch.clone())
3937 .execute()
3938 .await
3939 .unwrap();
3940
3941 table.add(batch.clone()).execute().await.unwrap();
3942
3943 table
3944 .create_index(&["id"], Index::Bitmap(BitmapIndexBuilder {}))
3945 .execute()
3946 .await
3947 .unwrap();
3948
3949 table
3950 .optimize(OptimizeAction::Compact {
3951 options: CompactionOptions {
3952 target_rows_per_fragment: 2_000,
3953 defer_index_remap: true,
3954 ..Default::default()
3955 },
3956 remap_options: None,
3957 })
3958 .await
3959 .unwrap();
3960
3961 let result = table.list_indices().await.unwrap();
3962 assert_eq!(result.len(), 1);
3963 assert_eq!(result[0].index_type, crate::index::IndexType::Bitmap);
3964 }
3965}