1use arrow::array::{AsArray, FixedSizeListBuilder, Float32Builder};
7use arrow::datatypes::{Float32Type, UInt8Type};
8use arrow_array::{RecordBatchIterator, RecordBatchReader};
9use arrow_schema::{DataType, Field, Schema, SchemaRef};
10use async_trait::async_trait;
11use datafusion_expr::Expr;
12use datafusion_physical_plan::display::DisplayableExecutionPlan;
13use datafusion_physical_plan::projection::ProjectionExec;
14use datafusion_physical_plan::repartition::RepartitionExec;
15use datafusion_physical_plan::union::UnionExec;
16use datafusion_physical_plan::ExecutionPlan;
17use futures::{FutureExt, StreamExt, TryFutureExt};
18use lance::dataset::builder::DatasetBuilder;
19use lance::dataset::cleanup::RemovalStats;
20use lance::dataset::optimize::{compact_files, CompactionMetrics, IndexRemapperOptions};
21use lance::dataset::scanner::Scanner;
22pub use lance::dataset::ColumnAlteration;
23pub use lance::dataset::NewColumnTransform;
24pub use lance::dataset::ReadParams;
25pub use lance::dataset::Version;
26use lance::dataset::{InsertBuilder, WhenMatched, WriteMode, WriteParams};
27use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource};
28use lance::index::vector::utils::infer_vector_dim;
29use lance::index::vector::VectorIndexParams;
30use lance::io::{ObjectStoreParams, WrappingObjectStore};
31use lance_datafusion::exec::{analyze_plan as lance_analyze_plan, execute_plan};
32use lance_datafusion::utils::StreamingWriteSource;
33use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
34use lance_index::vector::bq::RQBuildParams;
35use lance_index::vector::hnsw::builder::HnswBuildParams;
36use lance_index::vector::ivf::IvfBuildParams;
37use lance_index::vector::pq::PQBuildParams;
38use lance_index::vector::sq::builder::SQBuildParams;
39use lance_index::DatasetIndexExt;
40use lance_index::IndexType;
41use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsAccessor};
42use lance_namespace::models::{
43 QueryTableRequest as NsQueryTableRequest, QueryTableRequestColumns,
44 QueryTableRequestFullTextQuery, QueryTableRequestVector, StringFtsQuery,
45};
46use lance_namespace::LanceNamespace;
47use lance_table::format::Manifest;
48use lance_table::io::commit::ManifestNamingScheme;
49use log::info;
50use serde::{Deserialize, Serialize};
51use std::collections::HashMap;
52use std::format;
53use std::path::Path;
54use std::sync::Arc;
55
56use crate::arrow::IntoArrow;
57use crate::connection::NoData;
58use crate::database::Database;
59use crate::embeddings::{EmbeddingDefinition, EmbeddingRegistry, MaybeEmbedded, MemoryRegistry};
60use crate::error::{Error, Result};
61use crate::index::vector::VectorIndex;
62use crate::index::IndexStatistics;
63use crate::index::{vector::suggested_num_sub_vectors, Index, IndexBuilder};
64use crate::index::{IndexConfig, IndexStatisticsImpl};
65use crate::query::{
66 IntoQueryVector, Query, QueryExecutionOptions, QueryFilter, QueryRequest, Select, TakeQuery,
67 VectorQuery, VectorQueryRequest, DEFAULT_TOP_K,
68};
69use crate::utils::{
70 default_vector_column, supported_bitmap_data_type, supported_btree_data_type,
71 supported_fts_data_type, supported_label_list_data_type, supported_vector_data_type,
72 PatchReadParam, PatchWriteParam, TimeoutStream,
73};
74
75use self::dataset::DatasetConsistencyWrapper;
76use self::merge::MergeInsertBuilder;
77
78pub mod datafusion;
79pub(crate) mod dataset;
80pub mod delete;
81pub mod merge;
82pub mod schema_evolution;
83pub mod update;
84
85use crate::index::waiter::wait_for_index;
86pub use chrono::Duration;
87pub use delete::DeleteResult;
88use futures::future::{join_all, Either};
89pub use lance::dataset::optimize::CompactionOptions;
90pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
91pub use lance::dataset::scanner::DatasetRecordBatchStream;
92use lance::dataset::statistics::DatasetStatisticsExt;
93use lance_index::frag_reuse::FRAG_REUSE_INDEX_NAME;
94pub use lance_index::optimize::OptimizeOptions;
95pub use schema_evolution::{AddColumnsResult, AlterColumnsResult, DropColumnsResult};
96use serde_with::skip_serializing_none;
97pub use update::{UpdateBuilder, UpdateResult};
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub enum ColumnKind {
102 Physical,
104 Embedding(EmbeddingDefinition),
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct ColumnDefinition {
111 pub kind: ColumnKind,
113}
114
115#[derive(Debug, Clone)]
116pub struct TableDefinition {
117 pub column_definitions: Vec<ColumnDefinition>,
118 pub schema: SchemaRef,
119}
120
121impl TableDefinition {
122 pub fn new(schema: SchemaRef, column_definitions: Vec<ColumnDefinition>) -> Self {
123 Self {
124 column_definitions,
125 schema,
126 }
127 }
128
129 pub fn new_from_schema(schema: SchemaRef) -> Self {
130 let column_definitions = schema
131 .fields()
132 .iter()
133 .map(|_| ColumnDefinition {
134 kind: ColumnKind::Physical,
135 })
136 .collect();
137 Self::new(schema, column_definitions)
138 }
139
140 pub fn try_from_rich_schema(schema: SchemaRef) -> Result<Self> {
141 let column_definitions = schema.metadata.get("lancedb::column_definitions");
142 if let Some(column_definitions) = column_definitions {
143 let column_definitions: Vec<ColumnDefinition> =
144 serde_json::from_str(column_definitions).map_err(|e| Error::Runtime {
145 message: format!("Failed to deserialize column definitions: {}", e),
146 })?;
147 Ok(Self::new(schema, column_definitions))
148 } else {
149 let column_definitions = schema
150 .fields()
151 .iter()
152 .map(|_| ColumnDefinition {
153 kind: ColumnKind::Physical,
154 })
155 .collect();
156 Ok(Self::new(schema, column_definitions))
157 }
158 }
159
160 pub fn into_rich_schema(self) -> SchemaRef {
161 let lancedb_metadata = serde_json::to_string(&self.column_definitions).unwrap();
164 let mut schema_with_metadata = (*self.schema).clone();
165 schema_with_metadata
166 .metadata
167 .insert("lancedb::column_definitions".to_string(), lancedb_metadata);
168 Arc::new(schema_with_metadata)
169 }
170}
171
172pub enum OptimizeAction {
179 All,
181 Compact {
193 options: CompactionOptions,
194 remap_options: Option<Arc<dyn IndexRemapperOptions>>,
195 },
196 Prune {
210 older_than: Option<Duration>,
212 delete_unverified: Option<bool>,
215 error_if_tagged_old_versions: Option<bool>,
217 },
218 Index(OptimizeOptions),
234}
235
236impl Default for OptimizeAction {
237 fn default() -> Self {
238 Self::All
239 }
240}
241
242pub struct OptimizeStats {
244 pub compaction: Option<CompactionMetrics>,
246
247 pub prune: Option<RemovalStats>,
249}
250
251#[derive(Clone, Debug, Default)]
254#[allow(dead_code)] enum BadVectorHandling {
256 #[default]
258 Error,
259 Drop,
261 Fill(f32),
263 None,
265}
266
267#[derive(Clone, Debug, Default)]
269pub struct WriteOptions {
270 pub lance_write_params: Option<WriteParams>,
278}
279
280#[derive(Debug, Clone, Default)]
281pub enum AddDataMode {
282 #[default]
284 Append,
285 Overwrite,
287}
288
289pub struct AddDataBuilder<T: IntoArrow> {
292 parent: Arc<dyn BaseTable>,
293 pub(crate) data: T,
294 pub(crate) mode: AddDataMode,
295 pub(crate) write_options: WriteOptions,
296 embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
297}
298
299impl<T: IntoArrow> std::fmt::Debug for AddDataBuilder<T> {
300 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
301 f.debug_struct("AddDataBuilder")
302 .field("parent", &self.parent)
303 .field("mode", &self.mode)
304 .field("write_options", &self.write_options)
305 .finish()
306 }
307}
308
309impl<T: IntoArrow> AddDataBuilder<T> {
310 pub fn mode(mut self, mode: AddDataMode) -> Self {
311 self.mode = mode;
312 self
313 }
314
315 pub fn write_options(mut self, options: WriteOptions) -> Self {
316 self.write_options = options;
317 self
318 }
319
320 pub async fn execute(self) -> Result<AddResult> {
321 let parent = self.parent.clone();
322 let data = self.data.into_arrow()?;
323 let without_data = AddDataBuilder::<NoData> {
324 data: NoData {},
325 mode: self.mode,
326 parent: self.parent,
327 write_options: self.write_options,
328 embedding_registry: self.embedding_registry,
329 };
330 parent.add(without_data, data).await
331 }
332}
333
334pub enum Filter {
336 Sql(String),
338 Datafusion(Expr),
340}
341
342#[derive(Debug, Clone)]
344pub enum AnyQuery {
345 Query(QueryRequest),
346 VectorQuery(VectorQueryRequest),
347}
348
349#[async_trait]
350pub trait Tags: Send + Sync {
351 async fn list(&self) -> Result<HashMap<String, TagContents>>;
353
354 async fn get_version(&self, tag: &str) -> Result<u64>;
356
357 async fn create(&mut self, tag: &str, version: u64) -> Result<()>;
359
360 async fn delete(&mut self, tag: &str) -> Result<()>;
362
363 async fn update(&mut self, tag: &str, version: u64) -> Result<()>;
365}
366
367#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
368pub struct AddResult {
369 #[serde(default)]
373 pub version: u64,
374}
375
376#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
377pub struct MergeResult {
378 #[serde(default)]
382 pub version: u64,
383 #[serde(default)]
385 pub num_inserted_rows: u64,
386 #[serde(default)]
388 pub num_updated_rows: u64,
389 #[serde(default)]
393 pub num_deleted_rows: u64,
394 #[serde(default)]
398 pub num_attempts: u32,
399}
400
401#[async_trait]
406pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
407 fn as_any(&self) -> &dyn std::any::Any;
409 fn name(&self) -> &str;
411 fn namespace(&self) -> &[String];
413 fn id(&self) -> &str;
418 async fn schema(&self) -> Result<SchemaRef>;
420 async fn count_rows(&self, filter: Option<Filter>) -> Result<usize>;
422 async fn create_plan(
424 &self,
425 query: &AnyQuery,
426 options: QueryExecutionOptions,
427 ) -> Result<Arc<dyn ExecutionPlan>>;
428 async fn query(
430 &self,
431 query: &AnyQuery,
432 options: QueryExecutionOptions,
433 ) -> Result<DatasetRecordBatchStream>;
434 async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result<String> {
436 let plan = self.create_plan(query, Default::default()).await?;
437 let display = DisplayableExecutionPlan::new(plan.as_ref());
438
439 Ok(format!("{}", display.indent(verbose)))
440 }
441 async fn analyze_plan(
442 &self,
443 query: &AnyQuery,
444 options: QueryExecutionOptions,
445 ) -> Result<String>;
446
447 async fn add(
449 &self,
450 add: AddDataBuilder<NoData>,
451 data: Box<dyn arrow_array::RecordBatchReader + Send>,
452 ) -> Result<AddResult>;
453 async fn delete(&self, predicate: &str) -> Result<DeleteResult>;
455 async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult>;
457 async fn create_index(&self, index: IndexBuilder) -> Result<()>;
459 async fn list_indices(&self) -> Result<Vec<IndexConfig>>;
461 async fn drop_index(&self, name: &str) -> Result<()>;
463 async fn prewarm_index(&self, name: &str) -> Result<()>;
465 async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>>;
467 async fn merge_insert(
469 &self,
470 params: MergeInsertBuilder,
471 new_data: Box<dyn RecordBatchReader + Send>,
472 ) -> Result<MergeResult>;
473 async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
475 async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
477 async fn add_columns(
479 &self,
480 transforms: NewColumnTransform,
481 read_columns: Option<Vec<String>>,
482 ) -> Result<AddColumnsResult>;
483 async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult>;
485 async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult>;
487 async fn version(&self) -> Result<u64>;
489 async fn checkout(&self, version: u64) -> Result<()>;
491 async fn checkout_tag(&self, tag: &str) -> Result<()>;
494 async fn checkout_latest(&self) -> Result<()>;
496 async fn restore(&self) -> Result<()>;
498 async fn list_versions(&self) -> Result<Vec<Version>>;
500 async fn table_definition(&self) -> Result<TableDefinition>;
502 async fn uri(&self) -> Result<String>;
504 #[deprecated(since = "0.25.0", note = "Use initial_storage_options() instead")]
506 async fn storage_options(&self) -> Option<HashMap<String, String>>;
507 async fn initial_storage_options(&self) -> Option<HashMap<String, String>>;
511 async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>>;
516 async fn wait_for_index(
519 &self,
520 index_names: &[&str],
521 timeout: std::time::Duration,
522 ) -> Result<()>;
523 async fn stats(&self) -> Result<TableStatistics>;
525 async fn create_insert_exec(
530 &self,
531 _input: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
532 _write_params: WriteParams,
533 ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>> {
534 Err(Error::NotSupported {
535 message: "create_insert_exec not implemented".to_string(),
536 })
537 }
538}
539
540#[derive(Clone, Debug)]
544pub struct Table {
545 inner: Arc<dyn BaseTable>,
546 database: Option<Arc<dyn Database>>,
547 embedding_registry: Arc<dyn EmbeddingRegistry>,
548}
549
550#[cfg(all(test, feature = "remote"))]
551mod test_utils {
552 use super::*;
553
554 impl Table {
555 pub fn new_with_handler<T>(
556 name: impl Into<String>,
557 handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
558 ) -> Self
559 where
560 T: Into<reqwest::Body>,
561 {
562 let inner = Arc::new(crate::remote::table::RemoteTable::new_mock(
563 name.into(),
564 handler.clone(),
565 None,
566 ));
567 let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
568 Self {
569 inner,
570 database: Some(database),
571 embedding_registry: Arc::new(MemoryRegistry::new()),
573 }
574 }
575
576 pub fn new_with_handler_version<T>(
577 name: impl Into<String>,
578 version: semver::Version,
579 handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
580 ) -> Self
581 where
582 T: Into<reqwest::Body>,
583 {
584 let inner = Arc::new(crate::remote::table::RemoteTable::new_mock(
585 name.into(),
586 handler.clone(),
587 Some(version),
588 ));
589 let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
590 Self {
591 inner,
592 database: Some(database),
593 embedding_registry: Arc::new(MemoryRegistry::new()),
595 }
596 }
597 }
598}
599
600impl std::fmt::Display for Table {
601 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
602 write!(f, "{}", self.inner)
603 }
604}
605
606impl From<Arc<dyn BaseTable>> for Table {
607 fn from(inner: Arc<dyn BaseTable>) -> Self {
608 Self {
609 inner,
610 database: None,
611 embedding_registry: Arc::new(MemoryRegistry::new()),
612 }
613 }
614}
615
616impl Table {
617 pub fn new(inner: Arc<dyn BaseTable>, database: Arc<dyn Database>) -> Self {
618 Self {
619 inner,
620 database: Some(database),
621 embedding_registry: Arc::new(MemoryRegistry::new()),
622 }
623 }
624
625 pub fn base_table(&self) -> &Arc<dyn BaseTable> {
626 &self.inner
627 }
628
629 pub fn database(&self) -> &Arc<dyn Database> {
630 self.database.as_ref().unwrap()
631 }
632
633 pub fn embedding_registry(&self) -> &Arc<dyn EmbeddingRegistry> {
634 &self.embedding_registry
635 }
636
637 pub(crate) fn new_with_embedding_registry(
638 inner: Arc<dyn BaseTable>,
639 database: Arc<dyn Database>,
640 embedding_registry: Arc<dyn EmbeddingRegistry>,
641 ) -> Self {
642 Self {
643 inner,
644 database: Some(database),
645 embedding_registry,
646 }
647 }
648
649 pub fn as_native(&self) -> Option<&NativeTable> {
654 self.inner.as_native()
655 }
656
657 pub fn name(&self) -> &str {
659 self.inner.name()
660 }
661
662 pub fn namespace(&self) -> &[String] {
664 self.inner.namespace()
665 }
666
667 pub fn id(&self) -> &str {
669 self.inner.id()
670 }
671
672 pub fn dataset(&self) -> Option<&dataset::DatasetConsistencyWrapper> {
676 self.inner.as_native().map(|t| &t.dataset)
677 }
678
679 pub async fn schema(&self) -> Result<SchemaRef> {
681 self.inner.schema().await
682 }
683
684 pub async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
690 self.inner.count_rows(filter.map(Filter::Sql)).await
691 }
692
693 pub fn add<T: IntoArrow>(&self, batches: T) -> AddDataBuilder<T> {
700 AddDataBuilder {
701 parent: self.inner.clone(),
702 data: batches,
703 mode: AddDataMode::Append,
704 write_options: WriteOptions::default(),
705 embedding_registry: Some(self.embedding_registry.clone()),
706 }
707 }
708
709 pub fn update(&self) -> UpdateBuilder {
724 UpdateBuilder::new(self.inner.clone())
725 }
726
727 pub async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
777 self.inner.delete(predicate).await
778 }
779
780 pub fn create_index(&self, columns: &[impl AsRef<str>], index: Index) -> IndexBuilder {
842 IndexBuilder::new(
843 self.inner.clone(),
844 columns
845 .iter()
846 .map(|val| val.as_ref().to_string())
847 .collect::<Vec<_>>(),
848 index,
849 )
850 }
851
852 pub fn create_index_with_timeout(
855 &self,
856 columns: &[impl AsRef<str>],
857 index: Index,
858 wait_timeout: Option<std::time::Duration>,
859 ) -> IndexBuilder {
860 let mut builder = IndexBuilder::new(
861 self.inner.clone(),
862 columns
863 .iter()
864 .map(|val| val.as_ref().to_string())
865 .collect::<Vec<_>>(),
866 index,
867 );
868 if let Some(timeout) = wait_timeout {
869 builder = builder.wait_timeout(timeout);
870 }
871 builder
872 }
873
874 pub fn merge_insert(&self, on: &[&str]) -> MergeInsertBuilder {
950 MergeInsertBuilder::new(
951 self.inner.clone(),
952 on.iter().map(|s| s.to_string()).collect(),
953 )
954 }
955
956 pub fn query(&self) -> Query {
1044 Query::new(self.inner.clone())
1045 }
1046
1047 pub fn take_offsets(&self, offsets: Vec<u64>) -> TakeQuery {
1070 TakeQuery::from_offsets(self.inner.clone(), offsets)
1071 }
1072
1073 pub fn take_row_ids(&self, row_ids: Vec<u64>) -> TakeQuery {
1092 TakeQuery::from_row_ids(self.inner.clone(), row_ids)
1093 }
1094
1095 pub fn vector_search(&self, query: impl IntoQueryVector) -> Result<VectorQuery> {
1101 self.query().nearest_to(query)
1102 }
1103
1104 pub async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
1131 self.inner.optimize(action).await
1132 }
1133
1134 pub async fn add_columns(
1136 &self,
1137 transforms: NewColumnTransform,
1138 read_columns: Option<Vec<String>>,
1139 ) -> Result<AddColumnsResult> {
1140 self.inner.add_columns(transforms, read_columns).await
1141 }
1142
1143 pub async fn alter_columns(
1145 &self,
1146 alterations: &[ColumnAlteration],
1147 ) -> Result<AlterColumnsResult> {
1148 self.inner.alter_columns(alterations).await
1149 }
1150
1151 pub async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
1153 self.inner.drop_columns(columns).await
1154 }
1155
1156 pub async fn version(&self) -> Result<u64> {
1163 self.inner.version().await
1164 }
1165
1166 pub async fn checkout(&self, version: u64) -> Result<()> {
1181 self.inner.checkout(version).await
1182 }
1183
1184 pub async fn checkout_tag(&self, tag: &str) -> Result<()> {
1199 self.inner.checkout_tag(tag).await
1200 }
1201
1202 pub async fn checkout_latest(&self) -> Result<()> {
1207 self.inner.checkout_latest().await
1208 }
1209
1210 pub async fn restore(&self) -> Result<()> {
1221 self.inner.restore().await
1222 }
1223
1224 pub async fn list_versions(&self) -> Result<Vec<Version>> {
1226 self.inner.list_versions().await
1227 }
1228
1229 pub async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
1231 self.inner.list_indices().await
1232 }
1233
1234 pub async fn uri(&self) -> Result<String> {
1239 self.inner.uri().await
1240 }
1241
1242 #[deprecated(since = "0.25.0", note = "Use initial_storage_options() instead")]
1246 pub async fn storage_options(&self) -> Option<HashMap<String, String>> {
1247 #[allow(deprecated)]
1248 self.inner.storage_options().await
1249 }
1250
1251 pub async fn initial_storage_options(&self) -> Option<HashMap<String, String>> {
1257 self.inner.initial_storage_options().await
1258 }
1259
1260 pub async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>> {
1268 self.inner.latest_storage_options().await
1269 }
1270
1271 pub async fn index_stats(
1274 &self,
1275 index_name: impl AsRef<str>,
1276 ) -> Result<Option<IndexStatistics>> {
1277 self.inner.index_stats(index_name.as_ref()).await
1278 }
1279
1280 pub async fn drop_index(&self, name: &str) -> Result<()> {
1289 self.inner.drop_index(name).await
1290 }
1291
1292 pub async fn prewarm_index(&self, name: &str) -> Result<()> {
1305 self.inner.prewarm_index(name).await
1306 }
1307
1308 pub async fn wait_for_index(
1311 &self,
1312 index_names: &[&str],
1313 timeout: std::time::Duration,
1314 ) -> Result<()> {
1315 self.inner.wait_for_index(index_names, timeout).await
1316 }
1317
1318 pub async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
1320 self.inner.tags().await
1321 }
1322
1323 pub(crate) fn multi_vector_plan(
1326 plans: Vec<Arc<dyn ExecutionPlan>>,
1327 ) -> Result<Arc<dyn ExecutionPlan>> {
1328 if plans.is_empty() {
1329 return Err(Error::InvalidInput {
1330 message: "No plans provided".to_string(),
1331 });
1332 }
1333 let first_plan = plans[0].clone();
1335 let project_all_columns = first_plan
1336 .schema()
1337 .fields()
1338 .iter()
1339 .enumerate()
1340 .map(|(i, field)| {
1341 let expr =
1342 datafusion_physical_plan::expressions::Column::new(field.name().as_str(), i);
1343 let expr = Arc::new(expr) as Arc<dyn datafusion_physical_plan::PhysicalExpr>;
1344 (expr, field.name().clone())
1345 })
1346 .collect::<Vec<_>>();
1347
1348 let projected_plans = plans
1349 .into_iter()
1350 .enumerate()
1351 .map(|(plan_i, plan)| {
1352 let query_index = datafusion_common::ScalarValue::Int32(Some(plan_i as i32));
1353 let query_index_expr =
1354 datafusion_physical_plan::expressions::Literal::new(query_index);
1355 let query_index_expr =
1356 Arc::new(query_index_expr) as Arc<dyn datafusion_physical_plan::PhysicalExpr>;
1357 let mut projections = vec![(query_index_expr, "query_index".to_string())];
1358 projections.extend_from_slice(&project_all_columns);
1359 let projection = ProjectionExec::try_new(projections, plan).unwrap();
1360 Arc::new(projection) as Arc<dyn datafusion_physical_plan::ExecutionPlan>
1361 })
1362 .collect::<Vec<_>>();
1363
1364 let unioned = UnionExec::try_new(projected_plans).map_err(|err| Error::Runtime {
1365 message: err.to_string(),
1366 })?;
1367 let repartitioned = RepartitionExec::try_new(
1369 unioned,
1370 datafusion_physical_plan::Partitioning::RoundRobinBatch(1),
1371 )
1372 .unwrap();
1373 Ok(Arc::new(repartitioned))
1374 }
1375
1376 pub async fn stats(&self) -> Result<TableStatistics> {
1378 self.inner.stats().await
1379 }
1380}
1381
1382pub struct NativeTags {
1383 dataset: dataset::DatasetConsistencyWrapper,
1384}
1385#[async_trait]
1386impl Tags for NativeTags {
1387 async fn list(&self) -> Result<HashMap<String, TagContents>> {
1388 let dataset = self.dataset.get().await?;
1389 Ok(dataset.tags().list().await?)
1390 }
1391
1392 async fn get_version(&self, tag: &str) -> Result<u64> {
1393 let dataset = self.dataset.get().await?;
1394 Ok(dataset.tags().get_version(tag).await?)
1395 }
1396
1397 async fn create(&mut self, tag: &str, version: u64) -> Result<()> {
1398 let dataset = self.dataset.get().await?;
1399 dataset.tags().create(tag, version).await?;
1400 Ok(())
1401 }
1402
1403 async fn delete(&mut self, tag: &str) -> Result<()> {
1404 let dataset = self.dataset.get().await?;
1405 dataset.tags().delete(tag).await?;
1406 Ok(())
1407 }
1408
1409 async fn update(&mut self, tag: &str, version: u64) -> Result<()> {
1410 let dataset = self.dataset.get().await?;
1411 dataset.tags().update(tag, version).await?;
1412 Ok(())
1413 }
1414}
1415
1416pub trait NativeTableExt {
1417 fn as_native(&self) -> Option<&NativeTable>;
1419}
1420
1421impl NativeTableExt for Arc<dyn BaseTable> {
1422 fn as_native(&self) -> Option<&NativeTable> {
1423 self.as_any().downcast_ref::<NativeTable>()
1424 }
1425}
1426
1427#[derive(Clone)]
1429pub struct NativeTable {
1430 name: String,
1431 namespace: Vec<String>,
1432 id: String,
1433 uri: String,
1434 pub(crate) dataset: dataset::DatasetConsistencyWrapper,
1435 read_consistency_interval: Option<std::time::Duration>,
1438 namespace_client: Option<Arc<dyn LanceNamespace>>,
1441}
1442
1443impl std::fmt::Debug for NativeTable {
1444 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1445 f.debug_struct("NativeTable")
1446 .field("name", &self.name)
1447 .field("namespace", &self.namespace)
1448 .field("id", &self.id)
1449 .field("uri", &self.uri)
1450 .field("read_consistency_interval", &self.read_consistency_interval)
1451 .field("namespace_client", &self.namespace_client)
1452 .finish()
1453 }
1454}
1455
1456impl std::fmt::Display for NativeTable {
1457 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1458 write!(
1459 f,
1460 "NativeTable({}, uri={}, read_consistency_interval={})",
1461 self.name,
1462 self.uri,
1463 match self.read_consistency_interval {
1464 None => {
1465 "None".to_string()
1466 }
1467 Some(duration) => {
1468 format!("{}s", duration.as_secs_f64())
1469 }
1470 }
1471 )
1472 }
1473}
1474
1475impl NativeTable {
1476 pub async fn open(uri: &str) -> Result<Self> {
1487 let name = Self::get_table_name(uri)?;
1488 Self::open_with_params(uri, &name, vec![], None, None, None, None).await
1489 }
1490
1491 #[allow(clippy::too_many_arguments)]
1504 pub async fn open_with_params(
1505 uri: &str,
1506 name: &str,
1507 namespace: Vec<String>,
1508 write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1509 params: Option<ReadParams>,
1510 read_consistency_interval: Option<std::time::Duration>,
1511 namespace_client: Option<Arc<dyn LanceNamespace>>,
1512 ) -> Result<Self> {
1513 let params = params.unwrap_or_default();
1514 let params = match write_store_wrapper.clone() {
1516 Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1517 None => params,
1518 };
1519
1520 let dataset = DatasetBuilder::from_uri(uri)
1521 .with_read_params(params)
1522 .load()
1523 .await
1524 .map_err(|e| match e {
1525 lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
1526 name: name.to_string(),
1527 source: Box::new(e),
1528 },
1529 source => Error::Lance { source },
1530 })?;
1531
1532 let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
1533 let id = Self::build_id(&namespace, name);
1534
1535 Ok(Self {
1536 name: name.to_string(),
1537 namespace,
1538 id,
1539 uri: uri.to_string(),
1540 dataset,
1541 read_consistency_interval,
1542 namespace_client,
1543 })
1544 }
1545
1546 pub fn with_namespace_client(mut self, namespace_client: Arc<dyn LanceNamespace>) -> Self {
1550 self.namespace_client = Some(namespace_client);
1551 self
1552 }
1553
1554 #[allow(clippy::too_many_arguments)]
1578 pub async fn open_from_namespace(
1579 namespace_client: Arc<dyn LanceNamespace>,
1580 name: &str,
1581 namespace: Vec<String>,
1582 write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1583 params: Option<ReadParams>,
1584 read_consistency_interval: Option<std::time::Duration>,
1585 server_side_query_enabled: bool,
1586 session: Option<Arc<lance::session::Session>>,
1587 ) -> Result<Self> {
1588 let mut params = params.unwrap_or_default();
1589
1590 if let Some(sess) = session {
1592 params.session(sess);
1593 }
1594
1595 let params = match write_store_wrapper.clone() {
1597 Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1598 None => params,
1599 };
1600
1601 let mut table_id = namespace.clone();
1603 table_id.push(name.to_string());
1604
1605 let builder = DatasetBuilder::from_namespace(namespace_client.clone(), table_id)
1608 .await
1609 .map_err(|e| match e {
1610 lance::Error::Namespace { source, .. } => Error::Runtime {
1611 message: format!("Failed to get table info from namespace: {:?}", source),
1612 },
1613 source => Error::Lance { source },
1614 })?;
1615
1616 let dataset = builder
1617 .with_read_params(params)
1618 .load()
1619 .await
1620 .map_err(|e| match e {
1621 lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
1622 name: name.to_string(),
1623 source: Box::new(e),
1624 },
1625 source => Error::Lance { source },
1626 })?;
1627
1628 let uri = dataset.uri().to_string();
1629 let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
1630 let id = Self::build_id(&namespace, name);
1631
1632 let stored_namespace_client = if server_side_query_enabled {
1633 Some(namespace_client)
1634 } else {
1635 None
1636 };
1637
1638 Ok(Self {
1639 name: name.to_string(),
1640 namespace,
1641 id,
1642 uri,
1643 dataset,
1644 read_consistency_interval,
1645 namespace_client: stored_namespace_client,
1646 })
1647 }
1648
1649 fn get_table_name(uri: &str) -> Result<String> {
1650 let path = Path::new(uri);
1651 let name = path
1652 .file_stem()
1653 .ok_or(Error::TableNotFound {
1654 name: uri.to_string(),
1655 source: format!("Could not extract table name from URI: '{}'", uri).into(),
1656 })?
1657 .to_str()
1658 .ok_or(Error::InvalidTableName {
1659 name: uri.to_string(),
1660 reason: "Table name is not valid URL".to_string(),
1661 })?;
1662 Ok(name.to_string())
1663 }
1664
1665 fn build_id(namespace: &[String], name: &str) -> String {
1666 if namespace.is_empty() {
1667 name.to_string()
1668 } else {
1669 let mut parts = namespace.to_vec();
1670 parts.push(name.to_string());
1671 parts.join("$")
1672 }
1673 }
1674
1675 #[allow(clippy::too_many_arguments)]
1691 pub async fn create(
1692 uri: &str,
1693 name: &str,
1694 namespace: Vec<String>,
1695 batches: impl StreamingWriteSource,
1696 write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1697 params: Option<WriteParams>,
1698 read_consistency_interval: Option<std::time::Duration>,
1699 namespace_client: Option<Arc<dyn LanceNamespace>>,
1700 ) -> Result<Self> {
1701 let params = params.unwrap_or(WriteParams {
1703 ..Default::default()
1704 });
1705 let params = match write_store_wrapper.clone() {
1707 Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1708 None => params,
1709 };
1710
1711 let insert_builder = InsertBuilder::new(uri).with_params(¶ms);
1712 let dataset = insert_builder
1713 .execute_stream(batches)
1714 .await
1715 .map_err(|e| match e {
1716 lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists {
1717 name: name.to_string(),
1718 },
1719 source => Error::Lance { source },
1720 })?;
1721
1722 let id = Self::build_id(&namespace, name);
1723
1724 Ok(Self {
1725 name: name.to_string(),
1726 namespace,
1727 id,
1728 uri: uri.to_string(),
1729 dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
1730 read_consistency_interval,
1731 namespace_client,
1732 })
1733 }
1734
1735 #[allow(clippy::too_many_arguments)]
1736 pub async fn create_empty(
1737 uri: &str,
1738 name: &str,
1739 namespace: Vec<String>,
1740 schema: SchemaRef,
1741 write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1742 params: Option<WriteParams>,
1743 read_consistency_interval: Option<std::time::Duration>,
1744 namespace_client: Option<Arc<dyn LanceNamespace>>,
1745 ) -> Result<Self> {
1746 let batches = RecordBatchIterator::new(vec![], schema);
1747 Self::create(
1748 uri,
1749 name,
1750 namespace,
1751 batches,
1752 write_store_wrapper,
1753 params,
1754 read_consistency_interval,
1755 namespace_client,
1756 )
1757 .await
1758 }
1759
1760 #[allow(clippy::too_many_arguments)]
1784 pub async fn create_from_namespace(
1785 namespace_client: Arc<dyn LanceNamespace>,
1786 uri: &str,
1787 name: &str,
1788 namespace: Vec<String>,
1789 batches: impl StreamingWriteSource,
1790 write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
1791 params: Option<WriteParams>,
1792 read_consistency_interval: Option<std::time::Duration>,
1793 server_side_query_enabled: bool,
1794 session: Option<Arc<lance::session::Session>>,
1795 ) -> Result<Self> {
1796 let mut table_id = namespace.clone();
1798 table_id.push(name.to_string());
1799
1800 let storage_options_provider = Arc::new(LanceNamespaceStorageOptionsProvider::new(
1802 namespace_client.clone(),
1803 table_id,
1804 ));
1805
1806 let mut params = params.unwrap_or_default();
1808
1809 if let Some(sess) = session {
1811 params.session = Some(sess);
1812 }
1813
1814 let store_params = params
1816 .store_params
1817 .get_or_insert_with(ObjectStoreParams::default);
1818 let accessor = match store_params.storage_options().cloned() {
1819 Some(options) => {
1820 StorageOptionsAccessor::with_initial_and_provider(options, storage_options_provider)
1821 }
1822 None => StorageOptionsAccessor::with_provider(storage_options_provider),
1823 };
1824 store_params.storage_options_accessor = Some(Arc::new(accessor));
1825
1826 let params = match write_store_wrapper.clone() {
1828 Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
1829 None => params,
1830 };
1831
1832 let insert_builder = InsertBuilder::new(uri).with_params(¶ms);
1833 let dataset = insert_builder
1834 .execute_stream(batches)
1835 .await
1836 .map_err(|e| match e {
1837 lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists {
1838 name: name.to_string(),
1839 },
1840 source => Error::Lance { source },
1841 })?;
1842
1843 let id = Self::build_id(&namespace, name);
1844
1845 let stored_namespace_client = if server_side_query_enabled {
1846 Some(namespace_client)
1847 } else {
1848 None
1849 };
1850
1851 Ok(Self {
1852 name: name.to_string(),
1853 namespace,
1854 id,
1855 uri: uri.to_string(),
1856 dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
1857 read_consistency_interval,
1858 namespace_client: stored_namespace_client,
1859 })
1860 }
1861
1862 async fn optimize_indices(&self, options: &OptimizeOptions) -> Result<()> {
1863 info!("LanceDB: optimizing indices: {:?}", options);
1864 self.dataset
1865 .get_mut()
1866 .await?
1867 .optimize_indices(options)
1868 .await?;
1869 Ok(())
1870 }
1871
1872 pub async fn merge(
1874 &mut self,
1875 batches: impl RecordBatchReader + Send + 'static,
1876 left_on: &str,
1877 right_on: &str,
1878 ) -> Result<()> {
1879 self.dataset
1880 .get_mut()
1881 .await?
1882 .merge(batches, left_on, right_on)
1883 .await?;
1884 Ok(())
1885 }
1886
1887 async fn cleanup_old_versions(
1899 &self,
1900 older_than: Duration,
1901 delete_unverified: Option<bool>,
1902 error_if_tagged_old_versions: Option<bool>,
1903 ) -> Result<RemovalStats> {
1904 Ok(self
1905 .dataset
1906 .get_mut()
1907 .await?
1908 .cleanup_old_versions(older_than, delete_unverified, error_if_tagged_old_versions)
1909 .await?)
1910 }
1911
1912 async fn compact_files(
1919 &self,
1920 options: CompactionOptions,
1921 remap_options: Option<Arc<dyn IndexRemapperOptions>>,
1922 ) -> Result<CompactionMetrics> {
1923 let mut dataset_mut = self.dataset.get_mut().await?;
1924 let metrics = compact_files(&mut dataset_mut, options, remap_options).await?;
1925 Ok(metrics)
1926 }
1927
1928 pub async fn count_fragments(&self) -> Result<usize> {
1930 Ok(self.dataset.get().await?.count_fragments())
1931 }
1932
1933 pub async fn count_deleted_rows(&self) -> Result<usize> {
1934 Ok(self.dataset.get().await?.count_deleted_rows().await?)
1935 }
1936
1937 pub async fn num_small_files(&self, max_rows_per_group: usize) -> Result<usize> {
1938 Ok(self
1939 .dataset
1940 .get()
1941 .await?
1942 .num_small_files(max_rows_per_group)
1943 .await)
1944 }
1945
1946 pub async fn load_indices(&self) -> Result<Vec<VectorIndex>> {
1947 let dataset = self.dataset.get().await?;
1948 let mf = dataset.manifest();
1949 let indices = dataset.load_indices().await?;
1950 Ok(indices
1951 .iter()
1952 .map(|i| VectorIndex::new_from_format(mf, i))
1953 .collect())
1954 }
1955
1956 fn validate_index_type(
1958 field: &Field,
1959 index_name: &str,
1960 supported_fn: impl Fn(&DataType) -> bool,
1961 ) -> Result<()> {
1962 if !supported_fn(field.data_type()) {
1963 return Err(Error::Schema {
1964 message: format!(
1965 "A {} index cannot be created on the field `{}` which has data type {}",
1966 index_name,
1967 field.name(),
1968 field.data_type()
1969 ),
1970 });
1971 }
1972 Ok(())
1973 }
1974
1975 fn build_ivf_params(
1977 num_partitions: Option<u32>,
1978 target_partition_size: Option<u32>,
1979 sample_rate: u32,
1980 max_iterations: u32,
1981 ) -> IvfBuildParams {
1982 let mut ivf_params = match (num_partitions, target_partition_size) {
1983 (Some(num_partitions), _) => IvfBuildParams::new(num_partitions as usize),
1984 (None, Some(target_partition_size)) => {
1985 IvfBuildParams::with_target_partition_size(target_partition_size as usize)
1986 }
1987 (None, None) => IvfBuildParams::default(),
1988 };
1989 ivf_params.sample_rate = sample_rate as usize;
1990 ivf_params.max_iters = max_iterations as usize;
1991 ivf_params
1992 }
1993
1994 fn get_num_sub_vectors(provided: Option<u32>, dim: u32, num_bits: Option<u32>) -> u32 {
1996 if let Some(provided) = provided {
1997 return provided;
1998 }
1999 let suggested = suggested_num_sub_vectors(dim);
2000 if num_bits.is_some_and(|num_bits| num_bits == 4) && !suggested.is_multiple_of(2) {
2001 suggested + 1
2003 } else {
2004 suggested
2005 }
2006 }
2007
2008 fn get_vector_dimension(field: &Field) -> Result<u32> {
2010 match field.data_type() {
2011 arrow_schema::DataType::FixedSizeList(_, n) => Ok(*n as u32),
2012 _ => Ok(infer_vector_dim(field.data_type())? as u32),
2013 }
2014 }
2015
2016 async fn make_index_params(
2018 &self,
2019 field: &Field,
2020 index_opts: Index,
2021 ) -> Result<Box<dyn lance::index::IndexParams>> {
2022 match index_opts {
2023 Index::Auto => {
2024 if supported_vector_data_type(field.data_type()) {
2025 let dim = Self::get_vector_dimension(field)?;
2027 let ivf_params = lance_index::vector::ivf::IvfBuildParams::default();
2028 let num_sub_vectors = Self::get_num_sub_vectors(None, dim, None);
2029 let pq_params =
2030 lance_index::vector::pq::PQBuildParams::new(num_sub_vectors as usize, 8);
2031 let lance_idx_params =
2032 lance::index::vector::VectorIndexParams::with_ivf_pq_params(
2033 lance_linalg::distance::MetricType::L2,
2034 ivf_params,
2035 pq_params,
2036 );
2037 Ok(Box::new(lance_idx_params))
2038 } else if supported_btree_data_type(field.data_type()) {
2039 Ok(Box::new(ScalarIndexParams::for_builtin(
2040 BuiltinIndexType::BTree,
2041 )))
2042 } else {
2043 Err(Error::InvalidInput {
2044 message: format!(
2045 "there are no indices supported for the field `{}` with the data type {}",
2046 field.name(),
2047 field.data_type()
2048 ),
2049 })?
2050 }
2051 }
2052 Index::BTree(_) => {
2053 Self::validate_index_type(field, "BTree", supported_btree_data_type)?;
2054 Ok(Box::new(ScalarIndexParams::for_builtin(
2055 BuiltinIndexType::BTree,
2056 )))
2057 }
2058 Index::Bitmap(_) => {
2059 Self::validate_index_type(field, "Bitmap", supported_bitmap_data_type)?;
2060 Ok(Box::new(ScalarIndexParams::for_builtin(
2061 BuiltinIndexType::Bitmap,
2062 )))
2063 }
2064 Index::LabelList(_) => {
2065 Self::validate_index_type(field, "LabelList", supported_label_list_data_type)?;
2066 Ok(Box::new(ScalarIndexParams::for_builtin(
2067 BuiltinIndexType::LabelList,
2068 )))
2069 }
2070 Index::FTS(fts_opts) => {
2071 Self::validate_index_type(field, "FTS", supported_fts_data_type)?;
2072 Ok(Box::new(fts_opts))
2073 }
2074 Index::IvfFlat(index) => {
2075 Self::validate_index_type(field, "IVF Flat", supported_vector_data_type)?;
2076 let ivf_params = Self::build_ivf_params(
2077 index.num_partitions,
2078 index.target_partition_size,
2079 index.sample_rate,
2080 index.max_iterations,
2081 );
2082 let lance_idx_params =
2083 VectorIndexParams::with_ivf_flat_params(index.distance_type.into(), ivf_params);
2084 Ok(Box::new(lance_idx_params))
2085 }
2086 Index::IvfSq(index) => {
2087 Self::validate_index_type(field, "IVF SQ", supported_vector_data_type)?;
2088 let ivf_params = Self::build_ivf_params(
2089 index.num_partitions,
2090 index.target_partition_size,
2091 index.sample_rate,
2092 index.max_iterations,
2093 );
2094 let sq_params = SQBuildParams {
2095 sample_rate: index.sample_rate as usize,
2096 ..Default::default()
2097 };
2098 let lance_idx_params = VectorIndexParams::with_ivf_sq_params(
2099 index.distance_type.into(),
2100 ivf_params,
2101 sq_params,
2102 );
2103 Ok(Box::new(lance_idx_params))
2104 }
2105 Index::IvfPq(index) => {
2106 Self::validate_index_type(field, "IVF PQ", supported_vector_data_type)?;
2107 let dim = Self::get_vector_dimension(field)?;
2108 let ivf_params = Self::build_ivf_params(
2109 index.num_partitions,
2110 index.target_partition_size,
2111 index.sample_rate,
2112 index.max_iterations,
2113 );
2114 let num_sub_vectors =
2115 Self::get_num_sub_vectors(index.num_sub_vectors, dim, index.num_bits);
2116 let num_bits = index.num_bits.unwrap_or(8) as usize;
2117 let mut pq_params = PQBuildParams::new(num_sub_vectors as usize, num_bits);
2118 pq_params.max_iters = index.max_iterations as usize;
2119 let lance_idx_params = VectorIndexParams::with_ivf_pq_params(
2120 index.distance_type.into(),
2121 ivf_params,
2122 pq_params,
2123 );
2124 Ok(Box::new(lance_idx_params))
2125 }
2126 Index::IvfRq(index) => {
2127 Self::validate_index_type(field, "IVF RQ", supported_vector_data_type)?;
2128 let ivf_params = Self::build_ivf_params(
2129 index.num_partitions,
2130 index.target_partition_size,
2131 index.sample_rate,
2132 index.max_iterations,
2133 );
2134 let rq_params = RQBuildParams::new(index.num_bits.unwrap_or(1) as u8);
2135 let lance_idx_params = VectorIndexParams::with_ivf_rq_params(
2136 index.distance_type.into(),
2137 ivf_params,
2138 rq_params,
2139 );
2140 Ok(Box::new(lance_idx_params))
2141 }
2142 Index::IvfHnswPq(index) => {
2143 Self::validate_index_type(field, "IVF HNSW PQ", supported_vector_data_type)?;
2144 let dim = Self::get_vector_dimension(field)?;
2145 let ivf_params = Self::build_ivf_params(
2146 index.num_partitions,
2147 index.target_partition_size,
2148 index.sample_rate,
2149 index.max_iterations,
2150 );
2151 let num_sub_vectors =
2152 Self::get_num_sub_vectors(index.num_sub_vectors, dim, index.num_bits);
2153 let hnsw_params = HnswBuildParams::default()
2154 .num_edges(index.m as usize)
2155 .ef_construction(index.ef_construction as usize);
2156 let pq_params = PQBuildParams::new(
2157 num_sub_vectors as usize,
2158 index.num_bits.unwrap_or(8) as usize,
2159 );
2160 let lance_idx_params = VectorIndexParams::with_ivf_hnsw_pq_params(
2161 index.distance_type.into(),
2162 ivf_params,
2163 hnsw_params,
2164 pq_params,
2165 );
2166 Ok(Box::new(lance_idx_params))
2167 }
2168 Index::IvfHnswSq(index) => {
2169 Self::validate_index_type(field, "IVF HNSW SQ", supported_vector_data_type)?;
2170 let ivf_params = Self::build_ivf_params(
2171 index.num_partitions,
2172 index.target_partition_size,
2173 index.sample_rate,
2174 index.max_iterations,
2175 );
2176 let hnsw_params = HnswBuildParams::default()
2177 .num_edges(index.m as usize)
2178 .ef_construction(index.ef_construction as usize);
2179 let sq_params = SQBuildParams {
2180 sample_rate: index.sample_rate as usize,
2181 ..Default::default()
2182 };
2183 let lance_idx_params = VectorIndexParams::with_ivf_hnsw_sq_params(
2184 index.distance_type.into(),
2185 ivf_params,
2186 hnsw_params,
2187 sq_params,
2188 );
2189 Ok(Box::new(lance_idx_params))
2190 }
2191 }
2192 }
2193
2194 fn get_index_type_for_field(&self, field: &Field, index: &Index) -> IndexType {
2196 match index {
2197 Index::Auto => {
2198 if supported_vector_data_type(field.data_type()) {
2199 IndexType::Vector
2200 } else if supported_btree_data_type(field.data_type()) {
2201 IndexType::BTree
2202 } else {
2203 IndexType::BTree
2205 }
2206 }
2207 Index::BTree(_) => IndexType::BTree,
2208 Index::Bitmap(_) => IndexType::Bitmap,
2209 Index::LabelList(_) => IndexType::LabelList,
2210 Index::FTS(_) => IndexType::Inverted,
2211 Index::IvfFlat(_)
2212 | Index::IvfSq(_)
2213 | Index::IvfPq(_)
2214 | Index::IvfRq(_)
2215 | Index::IvfHnswPq(_)
2216 | Index::IvfHnswSq(_) => IndexType::Vector,
2217 }
2218 }
2219
2220 async fn generic_query(
2221 &self,
2222 query: &AnyQuery,
2223 options: QueryExecutionOptions,
2224 ) -> Result<DatasetRecordBatchStream> {
2225 let plan = self.create_plan(query, options.clone()).await?;
2226 let inner = execute_plan(plan, Default::default())?;
2227 let inner = if let Some(timeout) = options.timeout {
2228 TimeoutStream::new_boxed(inner, timeout)
2229 } else {
2230 inner
2231 };
2232 Ok(DatasetRecordBatchStream::new(inner))
2233 }
2234
2235 async fn namespace_query(
2237 &self,
2238 namespace_client: Arc<dyn LanceNamespace>,
2239 query: &AnyQuery,
2240 _options: QueryExecutionOptions,
2241 ) -> Result<DatasetRecordBatchStream> {
2242 let mut table_id = self.namespace.clone();
2244 table_id.push(self.name.clone());
2245
2246 let mut ns_request = self.convert_to_namespace_query(query)?;
2248 ns_request.id = Some(table_id);
2250
2251 let response_bytes = namespace_client
2253 .query_table(ns_request)
2254 .await
2255 .map_err(|e| Error::Runtime {
2256 message: format!("Failed to execute server-side query: {}", e),
2257 })?;
2258
2259 self.parse_arrow_ipc_response(response_bytes).await
2261 }
2262
2263 fn filter_to_sql(&self, filter: &QueryFilter) -> Result<String> {
2265 match filter {
2266 QueryFilter::Sql(sql) => Ok(sql.clone()),
2267 QueryFilter::Substrait(_) => Err(Error::NotSupported {
2268 message: "Substrait filters are not supported for server-side queries".to_string(),
2269 }),
2270 QueryFilter::Datafusion(_) => Err(Error::NotSupported {
2271 message: "Datafusion expression filters are not supported for server-side queries. Use SQL filter instead.".to_string(),
2272 }),
2273 }
2274 }
2275
2276 fn convert_to_namespace_query(&self, query: &AnyQuery) -> Result<NsQueryTableRequest> {
2278 match query {
2279 AnyQuery::VectorQuery(vq) => {
2280 let vector = self.extract_query_vector(&vq.query_vector)?;
2282
2283 let filter = match &vq.base.filter {
2285 Some(f) => Some(self.filter_to_sql(f)?),
2286 None => None,
2287 };
2288
2289 let columns = match &vq.base.select {
2291 Select::All => None,
2292 Select::Columns(cols) => Some(Box::new(QueryTableRequestColumns {
2293 column_names: Some(cols.clone()),
2294 column_aliases: None,
2295 })),
2296 Select::Dynamic(_) => {
2297 return Err(Error::NotSupported {
2298 message:
2299 "Dynamic column selection is not supported for server-side queries"
2300 .to_string(),
2301 });
2302 }
2303 };
2304
2305 if vq.base.reranker.is_some() {
2307 return Err(Error::NotSupported {
2308 message: "Reranker is not supported for server-side queries".to_string(),
2309 });
2310 }
2311
2312 let full_text_query = vq.base.full_text_search.as_ref().map(|fts| {
2314 let columns = fts.columns();
2315 let columns_vec = if columns.is_empty() {
2316 None
2317 } else {
2318 Some(columns.into_iter().collect())
2319 };
2320 Box::new(QueryTableRequestFullTextQuery {
2321 string_query: Some(Box::new(StringFtsQuery {
2322 query: fts.query.to_string(),
2323 columns: columns_vec,
2324 })),
2325 structured_query: None,
2326 })
2327 });
2328
2329 Ok(NsQueryTableRequest {
2330 id: None, k: vq.base.limit.unwrap_or(10) as i32,
2332 vector: Box::new(vector),
2333 vector_column: vq.column.clone(),
2334 filter,
2335 columns,
2336 offset: vq.base.offset.map(|o| o as i32),
2337 distance_type: vq.distance_type.map(|dt| dt.to_string()),
2338 nprobes: Some(vq.minimum_nprobes as i32),
2339 ef: vq.ef.map(|e| e as i32),
2340 refine_factor: vq.refine_factor.map(|r| r as i32),
2341 lower_bound: vq.lower_bound,
2342 upper_bound: vq.upper_bound,
2343 prefilter: Some(vq.base.prefilter),
2344 fast_search: Some(vq.base.fast_search),
2345 with_row_id: Some(vq.base.with_row_id),
2346 bypass_vector_index: Some(!vq.use_index),
2347 full_text_query,
2348 ..Default::default()
2349 })
2350 }
2351 AnyQuery::Query(q) => {
2352 if q.reranker.is_some() {
2354 return Err(Error::NotSupported {
2355 message: "Reranker is not supported for server-side query execution"
2356 .to_string(),
2357 });
2358 }
2359
2360 let filter = q
2361 .filter
2362 .as_ref()
2363 .map(|f| self.filter_to_sql(f))
2364 .transpose()?;
2365
2366 let columns = match &q.select {
2367 Select::All => None,
2368 Select::Columns(cols) => Some(Box::new(QueryTableRequestColumns {
2369 column_names: Some(cols.clone()),
2370 column_aliases: None,
2371 })),
2372 Select::Dynamic(_) => {
2373 return Err(Error::NotSupported {
2374 message: "Dynamic columns are not supported for server-side query"
2375 .to_string(),
2376 });
2377 }
2378 };
2379
2380 let full_text_query = q.full_text_search.as_ref().map(|fts| {
2382 let columns_vec = if fts.columns().is_empty() {
2383 None
2384 } else {
2385 Some(fts.columns().iter().cloned().collect())
2386 };
2387 Box::new(QueryTableRequestFullTextQuery {
2388 string_query: Some(Box::new(StringFtsQuery {
2389 query: fts.query.to_string(),
2390 columns: columns_vec,
2391 })),
2392 structured_query: None,
2393 })
2394 });
2395
2396 let vector = Box::new(QueryTableRequestVector {
2398 single_vector: Some(vec![]),
2399 multi_vector: None,
2400 });
2401
2402 Ok(NsQueryTableRequest {
2403 id: None, vector,
2405 k: q.limit.unwrap_or(10) as i32,
2406 filter,
2407 columns,
2408 prefilter: Some(q.prefilter),
2409 offset: q.offset.map(|o| o as i32),
2410 vector_column: None, with_row_id: Some(q.with_row_id),
2412 bypass_vector_index: Some(true), full_text_query,
2414 ..Default::default()
2415 })
2416 }
2417 }
2418 }
2419
2420 fn extract_query_vector(
2422 &self,
2423 query_vectors: &[Arc<dyn arrow_array::Array>],
2424 ) -> Result<QueryTableRequestVector> {
2425 if query_vectors.is_empty() {
2426 return Err(Error::InvalidInput {
2427 message: "Query vector is required for vector search".to_string(),
2428 });
2429 }
2430
2431 if query_vectors.len() == 1 {
2433 let arr = &query_vectors[0];
2434 let single_vector = self.array_to_f32_vec(arr)?;
2435 Ok(QueryTableRequestVector {
2436 single_vector: Some(single_vector),
2437 multi_vector: None,
2438 })
2439 } else {
2440 let multi_vector: Result<Vec<Vec<f32>>> = query_vectors
2442 .iter()
2443 .map(|arr| self.array_to_f32_vec(arr))
2444 .collect();
2445 Ok(QueryTableRequestVector {
2446 single_vector: None,
2447 multi_vector: Some(multi_vector?),
2448 })
2449 }
2450 }
2451
2452 fn array_to_f32_vec(&self, arr: &Arc<dyn arrow_array::Array>) -> Result<Vec<f32>> {
2454 if let Some(fsl) = arr
2456 .as_any()
2457 .downcast_ref::<arrow_array::FixedSizeListArray>()
2458 {
2459 let values = fsl.values();
2460 if let Some(f32_arr) = values.as_any().downcast_ref::<arrow_array::Float32Array>() {
2461 return Ok(f32_arr.values().to_vec());
2462 }
2463 }
2464
2465 if let Some(f32_arr) = arr.as_any().downcast_ref::<arrow_array::Float32Array>() {
2467 return Ok(f32_arr.values().to_vec());
2468 }
2469
2470 Err(Error::InvalidInput {
2471 message: "Query vector must be Float32 type".to_string(),
2472 })
2473 }
2474
2475 async fn parse_arrow_ipc_response(
2477 &self,
2478 bytes: bytes::Bytes,
2479 ) -> Result<DatasetRecordBatchStream> {
2480 use arrow_ipc::reader::StreamReader;
2481 use std::io::Cursor;
2482
2483 let cursor = Cursor::new(bytes);
2484 let reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Runtime {
2485 message: format!("Failed to parse Arrow IPC response: {}", e),
2486 })?;
2487
2488 let schema = reader.schema();
2490 let batches: Vec<_> = reader
2491 .into_iter()
2492 .collect::<std::result::Result<Vec<_>, _>>()
2493 .map_err(|e| Error::Runtime {
2494 message: format!("Failed to read Arrow IPC batches: {}", e),
2495 })?;
2496
2497 let stream = futures::stream::iter(batches.into_iter().map(Ok));
2499 let record_batch_stream = Box::pin(
2500 datafusion_physical_plan::stream::RecordBatchStreamAdapter::new(schema, stream),
2501 );
2502
2503 Ok(DatasetRecordBatchStream::new(record_batch_stream))
2504 }
2505
2506 pub async fn uses_v2_manifest_paths(&self) -> Result<bool> {
2511 let dataset = self.dataset.get().await?;
2512 Ok(dataset.manifest_location().naming_scheme == ManifestNamingScheme::V2)
2513 }
2514
2515 pub async fn migrate_manifest_paths_v2(&self) -> Result<()> {
2530 let mut dataset = self.dataset.get_mut().await?;
2531 dataset.migrate_manifest_paths_v2().await?;
2532 Ok(())
2533 }
2534
2535 pub async fn manifest(&self) -> Result<Manifest> {
2537 let dataset = self.dataset.get().await?;
2538 Ok(dataset.manifest().clone())
2539 }
2540
2541 pub async fn update_config(
2543 &self,
2544 upsert_values: impl IntoIterator<Item = (String, String)>,
2545 ) -> Result<()> {
2546 let mut dataset = self.dataset.get_mut().await?;
2547 dataset.update_config(upsert_values).await?;
2548 Ok(())
2549 }
2550
2551 pub async fn delete_config_keys(&self, delete_keys: &[&str]) -> Result<()> {
2553 let mut dataset = self.dataset.get_mut().await?;
2554 #[allow(deprecated)]
2556 dataset.delete_config_keys(delete_keys).await?;
2557 Ok(())
2558 }
2559
2560 pub async fn replace_schema_metadata(
2562 &self,
2563 upsert_values: impl IntoIterator<Item = (String, String)>,
2564 ) -> Result<()> {
2565 let mut dataset = self.dataset.get_mut().await?;
2566 #[allow(deprecated)]
2568 dataset.replace_schema_metadata(upsert_values).await?;
2569 Ok(())
2570 }
2571
2572 pub async fn replace_field_metadata(
2580 &self,
2581 new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
2582 ) -> Result<()> {
2583 let mut dataset = self.dataset.get_mut().await?;
2584 dataset.replace_field_metadata(new_values).await?;
2585 Ok(())
2586 }
2587}
2588
2589#[async_trait::async_trait]
2590impl BaseTable for NativeTable {
2591 fn as_any(&self) -> &dyn std::any::Any {
2592 self
2593 }
2594
2595 fn name(&self) -> &str {
2596 self.name.as_str()
2597 }
2598
2599 fn namespace(&self) -> &[String] {
2600 &self.namespace
2601 }
2602
2603 fn id(&self) -> &str {
2604 &self.id
2605 }
2606
2607 async fn version(&self) -> Result<u64> {
2608 Ok(self.dataset.get().await?.version().version)
2609 }
2610
2611 async fn checkout(&self, version: u64) -> Result<()> {
2612 self.dataset.as_time_travel(version).await
2613 }
2614
2615 async fn checkout_tag(&self, tag: &str) -> Result<()> {
2616 self.dataset.as_time_travel(tag).await
2617 }
2618
2619 async fn checkout_latest(&self) -> Result<()> {
2620 self.dataset
2621 .as_latest(self.read_consistency_interval)
2622 .await?;
2623 self.dataset.reload().await
2624 }
2625
2626 async fn list_versions(&self) -> Result<Vec<Version>> {
2627 Ok(self.dataset.get().await?.versions().await?)
2628 }
2629
2630 async fn restore(&self) -> Result<()> {
2631 let version =
2632 self.dataset
2633 .time_travel_version()
2634 .await
2635 .ok_or_else(|| Error::InvalidInput {
2636 message: "you must run checkout before running restore".to_string(),
2637 })?;
2638 {
2639 let mut dataset = self.dataset.get_mut_unchecked().await?;
2643 debug_assert_eq!(dataset.version().version, version);
2644 dataset.restore().await?;
2645 }
2646 self.dataset
2647 .as_latest(self.read_consistency_interval)
2648 .await?;
2649 Ok(())
2650 }
2651
2652 async fn schema(&self) -> Result<SchemaRef> {
2653 let lance_schema = self.dataset.get().await?.schema().clone();
2654 Ok(Arc::new(Schema::from(&lance_schema)))
2655 }
2656
2657 async fn table_definition(&self) -> Result<TableDefinition> {
2658 let schema = self.schema().await?;
2659 TableDefinition::try_from_rich_schema(schema)
2660 }
2661
2662 async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
2663 let dataset = self.dataset.get().await?;
2664 match filter {
2665 None => Ok(dataset.count_rows(None).await?),
2666 Some(Filter::Sql(sql)) => Ok(dataset.count_rows(Some(sql)).await?),
2667 Some(Filter::Datafusion(_)) => Err(Error::NotSupported {
2668 message: "Datafusion filters are not yet supported".to_string(),
2669 }),
2670 }
2671 }
2672
2673 async fn add(
2674 &self,
2675 add: AddDataBuilder<NoData>,
2676 data: Box<dyn RecordBatchReader + Send>,
2677 ) -> Result<AddResult> {
2678 let data = Box::new(MaybeEmbedded::try_new(
2679 data,
2680 self.table_definition().await?,
2681 add.embedding_registry,
2682 )?) as Box<dyn RecordBatchReader + Send>;
2683
2684 let lance_params = add.write_options.lance_write_params.unwrap_or(WriteParams {
2685 mode: match add.mode {
2686 AddDataMode::Append => WriteMode::Append,
2687 AddDataMode::Overwrite => WriteMode::Overwrite,
2688 },
2689 ..Default::default()
2690 });
2691
2692 let dataset = {
2693 let ds = self.dataset.get_mut().await?;
2695 InsertBuilder::new(Arc::new(ds.clone()))
2696 .with_params(&lance_params)
2697 .execute_stream(data)
2698 .await?
2699 };
2700 let version = dataset.manifest().version;
2701 self.dataset.set_latest(dataset).await;
2702 Ok(AddResult { version })
2703 }
2704
2705 async fn create_index(&self, opts: IndexBuilder) -> Result<()> {
2706 if opts.columns.len() != 1 {
2707 return Err(Error::Schema {
2708 message: "Multi-column (composite) indices are not yet supported".to_string(),
2709 });
2710 }
2711 let schema = self.schema().await?;
2712
2713 let field = schema.field_with_name(&opts.columns[0])?;
2714
2715 let lance_idx_params = self.make_index_params(field, opts.index.clone()).await?;
2716 let index_type = self.get_index_type_for_field(field, &opts.index);
2717 let columns = [field.name().as_str()];
2718 let mut dataset = self.dataset.get_mut().await?;
2719 let mut builder = dataset
2720 .create_index_builder(&columns, index_type, lance_idx_params.as_ref())
2721 .train(opts.train)
2722 .replace(opts.replace);
2723
2724 if let Some(name) = opts.name {
2725 builder = builder.name(name);
2726 }
2727 builder.await?;
2728 Ok(())
2729 }
2730
2731 async fn drop_index(&self, index_name: &str) -> Result<()> {
2732 let mut dataset = self.dataset.get_mut().await?;
2733 dataset.drop_index(index_name).await?;
2734 Ok(())
2735 }
2736
2737 async fn prewarm_index(&self, index_name: &str) -> Result<()> {
2738 let dataset = self.dataset.get().await?;
2739 Ok(dataset.prewarm_index(index_name).await?)
2740 }
2741
2742 async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult> {
2743 update::execute_update(self, update).await
2745 }
2746
2747 async fn create_plan(
2748 &self,
2749 query: &AnyQuery,
2750 options: QueryExecutionOptions,
2751 ) -> Result<Arc<dyn ExecutionPlan>> {
2752 let query = match query {
2753 AnyQuery::VectorQuery(query) => query.clone(),
2754 AnyQuery::Query(query) => VectorQueryRequest::from_plain_query(query.clone()),
2755 };
2756
2757 let ds_ref = self.dataset.get().await?;
2758 let schema = ds_ref.schema();
2759 let mut column = query.column.clone();
2760
2761 let mut query_vector = query.query_vector.first().cloned();
2762 if query.query_vector.len() > 1 {
2763 if column.is_none() {
2764 let arrow_schema = Schema::from(ds_ref.schema());
2766 column = Some(default_vector_column(
2767 &arrow_schema,
2768 Some(query.query_vector[0].len() as i32),
2769 )?);
2770 }
2771 let vector_field = schema.field(column.as_ref().unwrap()).unwrap();
2772 if let DataType::List(_) = vector_field.data_type() {
2773 let vectors = query
2778 .query_vector
2779 .iter()
2780 .map(|arr| arr.as_ref())
2781 .collect::<Vec<_>>();
2782 let dim = vectors[0].len();
2783 let mut fsl_builder = FixedSizeListBuilder::with_capacity(
2784 Float32Builder::with_capacity(dim),
2785 dim as i32,
2786 vectors.len(),
2787 );
2788 for vec in vectors {
2789 fsl_builder
2790 .values()
2791 .append_slice(vec.as_primitive::<Float32Type>().values());
2792 fsl_builder.append(true);
2793 }
2794 query_vector = Some(Arc::new(fsl_builder.finish()));
2795 } else {
2796 let query_vecs = query.query_vector.clone();
2798 let plan_futures = query_vecs
2799 .into_iter()
2800 .map(|query_vector| {
2801 let mut sub_query = query.clone();
2802 sub_query.query_vector = vec![query_vector];
2803 let options_ref = options.clone();
2804 async move {
2805 self.create_plan(&AnyQuery::VectorQuery(sub_query), options_ref)
2806 .await
2807 }
2808 })
2809 .collect::<Vec<_>>();
2810 let plans = futures::future::try_join_all(plan_futures).await?;
2811 return Table::multi_vector_plan(plans);
2812 }
2813 }
2814
2815 let mut scanner: Scanner = ds_ref.scan();
2816
2817 if let Some(query_vector) = query_vector {
2818 let column = if let Some(col) = column {
2820 col
2821 } else {
2822 let arrow_schema = Schema::from(ds_ref.schema());
2824 default_vector_column(&arrow_schema, Some(query_vector.len() as i32))?
2825 };
2826
2827 let (_, element_type) = lance::index::vector::utils::get_vector_type(schema, &column)?;
2828 let is_binary = matches!(element_type, DataType::UInt8);
2829 let top_k = query.base.limit.unwrap_or(DEFAULT_TOP_K) + query.base.offset.unwrap_or(0);
2830 if is_binary {
2831 let query_vector = arrow::compute::cast(&query_vector, &DataType::UInt8)?;
2832 let query_vector = query_vector.as_primitive::<UInt8Type>();
2833 scanner.nearest(&column, query_vector, top_k)?;
2834 } else {
2835 scanner.nearest(&column, query_vector.as_ref(), top_k)?;
2836 }
2837 scanner.minimum_nprobes(query.minimum_nprobes);
2838 if let Some(maximum_nprobes) = query.maximum_nprobes {
2839 scanner.maximum_nprobes(maximum_nprobes);
2840 }
2841 }
2842 scanner.limit(
2843 query.base.limit.map(|limit| limit as i64),
2844 query.base.offset.map(|offset| offset as i64),
2845 )?;
2846 if let Some(ef) = query.ef {
2847 scanner.ef(ef);
2848 }
2849 scanner.distance_range(query.lower_bound, query.upper_bound);
2850 scanner.use_index(query.use_index);
2851 scanner.prefilter(query.base.prefilter);
2852 match query.base.select {
2853 Select::Columns(ref columns) => {
2854 scanner.project(columns.as_slice())?;
2855 }
2856 Select::Dynamic(ref select_with_transform) => {
2857 scanner.project_with_transform(select_with_transform.as_slice())?;
2858 }
2859 Select::All => {}
2860 }
2861
2862 if query.base.with_row_id {
2863 scanner.with_row_id();
2864 }
2865
2866 scanner.batch_size(options.max_batch_length as usize);
2867
2868 if query.base.fast_search {
2869 scanner.fast_search();
2870 }
2871
2872 match &query.base.select {
2873 Select::Columns(select) => {
2874 scanner.project(select.as_slice())?;
2875 }
2876 Select::Dynamic(select_with_transform) => {
2877 scanner.project_with_transform(select_with_transform.as_slice())?;
2878 }
2879 Select::All => { }
2880 }
2881
2882 if let Some(filter) = &query.base.filter {
2883 match filter {
2884 QueryFilter::Sql(sql) => {
2885 scanner.filter(sql)?;
2886 }
2887 QueryFilter::Substrait(substrait) => {
2888 scanner.filter_substrait(substrait)?;
2889 }
2890 QueryFilter::Datafusion(expr) => {
2891 scanner.filter_expr(expr.clone());
2892 }
2893 }
2894 }
2895
2896 if let Some(fts) = &query.base.full_text_search {
2897 scanner.full_text_search(fts.clone())?;
2898 }
2899
2900 if let Some(refine_factor) = query.refine_factor {
2901 scanner.refine(refine_factor);
2902 }
2903
2904 if let Some(distance_type) = query.distance_type {
2905 scanner.distance_metric(distance_type.into());
2906 }
2907
2908 if query.base.disable_scoring_autoprojection {
2909 scanner.disable_scoring_autoprojection();
2910 }
2911
2912 Ok(scanner.create_plan().await?)
2913 }
2914
2915 async fn query(
2916 &self,
2917 query: &AnyQuery,
2918 options: QueryExecutionOptions,
2919 ) -> Result<DatasetRecordBatchStream> {
2920 if let Some(ref namespace_client) = self.namespace_client {
2922 return self
2923 .namespace_query(namespace_client.clone(), query, options)
2924 .await;
2925 }
2926 self.generic_query(query, options).await
2927 }
2928
2929 async fn analyze_plan(
2930 &self,
2931 query: &AnyQuery,
2932 options: QueryExecutionOptions,
2933 ) -> Result<String> {
2934 let plan = self.create_plan(query, options).await?;
2935 Ok(lance_analyze_plan(plan, Default::default()).await?)
2936 }
2937
2938 async fn merge_insert(
2939 &self,
2940 params: MergeInsertBuilder,
2941 new_data: Box<dyn RecordBatchReader + Send>,
2942 ) -> Result<MergeResult> {
2943 let dataset = Arc::new(self.dataset.get().await?.clone());
2944 let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
2945 match (
2946 params.when_matched_update_all,
2947 params.when_matched_update_all_filt,
2948 ) {
2949 (false, _) => builder.when_matched(WhenMatched::DoNothing),
2950 (true, None) => builder.when_matched(WhenMatched::UpdateAll),
2951 (true, Some(filt)) => builder.when_matched(WhenMatched::update_if(&dataset, &filt)?),
2952 };
2953 if params.when_not_matched_insert_all {
2954 builder.when_not_matched(lance::dataset::WhenNotMatched::InsertAll);
2955 } else {
2956 builder.when_not_matched(lance::dataset::WhenNotMatched::DoNothing);
2957 }
2958 if params.when_not_matched_by_source_delete {
2959 let behavior = if let Some(filter) = params.when_not_matched_by_source_delete_filt {
2960 WhenNotMatchedBySource::delete_if(dataset.as_ref(), &filter)?
2961 } else {
2962 WhenNotMatchedBySource::Delete
2963 };
2964 builder.when_not_matched_by_source(behavior);
2965 } else {
2966 builder.when_not_matched_by_source(WhenNotMatchedBySource::Keep);
2967 }
2968 builder.use_index(params.use_index);
2969
2970 let future = if let Some(timeout) = params.timeout {
2971 let future = builder
2974 .retry_timeout(timeout)
2975 .try_build()?
2976 .execute_reader(new_data);
2977 Either::Left(tokio::time::timeout(timeout, future).map(|res| match res {
2978 Ok(Ok((new_dataset, stats))) => Ok((new_dataset, stats)),
2979 Ok(Err(e)) => Err(e.into()),
2980 Err(_) => Err(Error::Runtime {
2981 message: "merge insert timed out".to_string(),
2982 }),
2983 }))
2984 } else {
2985 let job = builder.try_build()?;
2986 Either::Right(job.execute_reader(new_data).map_err(|e| e.into()))
2987 };
2988 let (new_dataset, stats) = future.await?;
2989 let version = new_dataset.manifest().version;
2990 self.dataset.set_latest(new_dataset.as_ref().clone()).await;
2991 Ok(MergeResult {
2992 version,
2993 num_updated_rows: stats.num_updated_rows,
2994 num_inserted_rows: stats.num_inserted_rows,
2995 num_deleted_rows: stats.num_deleted_rows,
2996 num_attempts: stats.num_attempts,
2997 })
2998 }
2999
3000 async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
3002 delete::execute_delete(self, predicate).await
3004 }
3005
3006 async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
3007 Ok(Box::new(NativeTags {
3008 dataset: self.dataset.clone(),
3009 }))
3010 }
3011
3012 async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
3013 let mut stats = OptimizeStats {
3014 compaction: None,
3015 prune: None,
3016 };
3017 match action {
3018 OptimizeAction::All => {
3019 stats.compaction = self
3020 .optimize(OptimizeAction::Compact {
3021 options: CompactionOptions::default(),
3022 remap_options: None,
3023 })
3024 .await?
3025 .compaction;
3026 stats.prune = self
3027 .optimize(OptimizeAction::Prune {
3028 older_than: None,
3029 delete_unverified: None,
3030 error_if_tagged_old_versions: None,
3031 })
3032 .await?
3033 .prune;
3034 self.optimize(OptimizeAction::Index(OptimizeOptions::default()))
3035 .await?;
3036 }
3037 OptimizeAction::Compact {
3038 options,
3039 remap_options,
3040 } => {
3041 stats.compaction = Some(self.compact_files(options, remap_options).await?);
3042 }
3043 OptimizeAction::Prune {
3044 older_than,
3045 delete_unverified,
3046 error_if_tagged_old_versions,
3047 } => {
3048 stats.prune = Some(
3049 self.cleanup_old_versions(
3050 older_than.unwrap_or(Duration::try_days(7).expect("valid delta")),
3051 delete_unverified,
3052 error_if_tagged_old_versions,
3053 )
3054 .await?,
3055 );
3056 }
3057 OptimizeAction::Index(options) => {
3058 self.optimize_indices(&options).await?;
3059 }
3060 }
3061 Ok(stats)
3062 }
3063
3064 async fn add_columns(
3065 &self,
3066 transforms: NewColumnTransform,
3067 read_columns: Option<Vec<String>>,
3068 ) -> Result<AddColumnsResult> {
3069 schema_evolution::execute_add_columns(self, transforms, read_columns).await
3070 }
3071
3072 async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult> {
3073 schema_evolution::execute_alter_columns(self, alterations).await
3074 }
3075
3076 async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
3077 schema_evolution::execute_drop_columns(self, columns).await
3078 }
3079
3080 async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
3081 let dataset = self.dataset.get().await?;
3082 let indices = dataset.load_indices().await?;
3083 let results = futures::stream::iter(indices.as_slice()).then(|idx| async {
3084
3085 if idx.name == FRAG_REUSE_INDEX_NAME {
3087 return None;
3088 }
3089
3090 let stats = match dataset.index_statistics(idx.name.as_str()).await {
3091 Ok(stats) => stats,
3092 Err(e) => {
3093 log::warn!("Failed to get statistics for index {} ({}): {}", idx.name, idx.uuid, e);
3094 return None;
3095 }
3096 };
3097
3098 let stats: serde_json::Value = match serde_json::from_str(&stats) {
3099 Ok(stats) => stats,
3100 Err(e) => {
3101 log::warn!("Failed to deserialize index statistics for index {} ({}): {}", idx.name, idx.uuid, e);
3102 return None;
3103 }
3104 };
3105
3106 let Some(index_type) = stats.get("index_type").and_then(|v| v.as_str()) else {
3107 log::warn!("Index statistics was missing 'index_type' field for index {} ({})", idx.name, idx.uuid);
3108 return None;
3109 };
3110
3111 let index_type: crate::index::IndexType = match index_type.parse() {
3112 Ok(index_type) => index_type,
3113 Err(e) => {
3114 log::warn!("Failed to parse index type for index {} ({}): {}", idx.name, idx.uuid, e);
3115 return None;
3116 }
3117 };
3118
3119 let mut columns = Vec::with_capacity(idx.fields.len());
3120 for field_id in &idx.fields {
3121 let Some(field) = dataset.schema().field_by_id(*field_id) else {
3122 log::warn!("The index {} ({}) referenced a field with id {} which does not exist in the schema", idx.name, idx.uuid, field_id);
3123 return None;
3124 };
3125 columns.push(field.name.clone());
3126 }
3127
3128 let name = idx.name.clone();
3129 Some(IndexConfig { index_type, columns, name })
3130 }).collect::<Vec<_>>().await;
3131
3132 Ok(results.into_iter().flatten().collect())
3133 }
3134
3135 async fn uri(&self) -> Result<String> {
3136 Ok(self.uri.clone())
3137 }
3138
3139 async fn storage_options(&self) -> Option<HashMap<String, String>> {
3140 self.initial_storage_options().await
3141 }
3142
3143 async fn initial_storage_options(&self) -> Option<HashMap<String, String>> {
3144 self.dataset
3145 .get()
3146 .await
3147 .ok()
3148 .and_then(|dataset| dataset.initial_storage_options().cloned())
3149 }
3150
3151 async fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>> {
3152 let dataset = self.dataset.get().await?;
3153 Ok(dataset.latest_storage_options().await?.map(|o| o.0))
3154 }
3155
3156 async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>> {
3157 let stats = match self
3158 .dataset
3159 .get()
3160 .await?
3161 .index_statistics(index_name.as_ref())
3162 .await
3163 {
3164 Ok(stats) => stats,
3165 Err(lance_core::Error::IndexNotFound { .. }) => return Ok(None),
3166 Err(e) => return Err(Error::from(e)),
3167 };
3168
3169 let mut stats: IndexStatisticsImpl =
3170 serde_json::from_str(&stats).map_err(|e| Error::InvalidInput {
3171 message: format!("error deserializing index statistics: {}", e),
3172 })?;
3173
3174 let first_index = stats.indices.pop().ok_or_else(|| Error::InvalidInput {
3175 message: "index statistics is empty".to_string(),
3176 })?;
3177 let index_type =
3179 stats
3180 .index_type
3181 .or(first_index.index_type)
3182 .ok_or_else(|| Error::InvalidInput {
3183 message: "index statistics was missing index type".to_string(),
3184 })?;
3185 let loss = stats
3186 .indices
3187 .iter()
3188 .map(|index| index.loss.unwrap_or_default())
3189 .sum::<f64>();
3190
3191 let loss = first_index.loss.map(|first_loss| first_loss + loss);
3192 Ok(Some(IndexStatistics {
3193 num_indexed_rows: stats.num_indexed_rows,
3194 num_unindexed_rows: stats.num_unindexed_rows,
3195 index_type,
3196 distance_type: first_index.metric_type,
3197 num_indices: stats.num_indices,
3198 loss,
3199 }))
3200 }
3201
3202 async fn wait_for_index(
3205 &self,
3206 index_names: &[&str],
3207 timeout: std::time::Duration,
3208 ) -> Result<()> {
3209 wait_for_index(self, index_names, timeout).await
3210 }
3211
3212 async fn stats(&self) -> Result<TableStatistics> {
3213 let num_rows = self.count_rows(None).await?;
3214 let num_indices = self.list_indices().await?.len();
3215 let ds = self.dataset.get().await?;
3216 let ds_clone = (*ds).clone();
3217 let ds_stats = Arc::new(ds_clone).calculate_data_stats().await?;
3218 let total_bytes = ds_stats.fields.iter().map(|f| f.bytes_on_disk).sum::<u64>() as usize;
3219
3220 let frags = ds.get_fragments();
3221 let mut sorted_sizes = join_all(
3222 frags
3223 .iter()
3224 .map(|frag| async move { frag.physical_rows().await.unwrap_or(0) }),
3225 )
3226 .await;
3227 sorted_sizes.sort();
3228
3229 let small_frag_threshold = 100000;
3230 let num_fragments = sorted_sizes.len();
3231 let num_small_fragments = sorted_sizes
3232 .iter()
3233 .filter(|&&size| size < small_frag_threshold)
3234 .count();
3235
3236 let p25 = *sorted_sizes.get(num_fragments / 4).unwrap_or(&0);
3237 let p50 = *sorted_sizes.get(num_fragments / 2).unwrap_or(&0);
3238 let p75 = *sorted_sizes.get(num_fragments * 3 / 4).unwrap_or(&0);
3239 let p99 = *sorted_sizes.get(num_fragments * 99 / 100).unwrap_or(&0);
3240 let min = sorted_sizes.first().copied().unwrap_or(0);
3241 let max = sorted_sizes.last().copied().unwrap_or(0);
3242 let mean = if num_fragments == 0 {
3243 0
3244 } else {
3245 sorted_sizes.iter().copied().sum::<usize>() / num_fragments
3246 };
3247
3248 let frag_stats = FragmentStatistics {
3249 num_fragments,
3250 num_small_fragments,
3251 lengths: FragmentSummaryStats {
3252 min,
3253 max,
3254 mean,
3255 p25,
3256 p50,
3257 p75,
3258 p99,
3259 },
3260 };
3261 let stats = TableStatistics {
3262 total_bytes,
3263 num_rows,
3264 num_indices,
3265 fragment_stats: frag_stats,
3266 };
3267 Ok(stats)
3268 }
3269
3270 async fn create_insert_exec(
3271 &self,
3272 input: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
3273 write_params: WriteParams,
3274 ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>> {
3275 let ds = self.dataset.get().await?;
3276 let dataset = Arc::new((*ds).clone());
3277 Ok(Arc::new(datafusion::insert::InsertExec::new(
3278 self.dataset.clone(),
3279 dataset,
3280 input,
3281 write_params,
3282 )))
3283 }
3284}
3285
3286#[skip_serializing_none]
3287#[derive(Debug, Deserialize, PartialEq)]
3288pub struct TableStatistics {
3289 pub total_bytes: usize,
3291
3292 pub num_rows: usize,
3294
3295 pub num_indices: usize,
3297
3298 pub fragment_stats: FragmentStatistics,
3300}
3301
3302#[skip_serializing_none]
3303#[derive(Debug, Deserialize, PartialEq)]
3304pub struct FragmentStatistics {
3305 pub num_fragments: usize,
3307
3308 pub num_small_fragments: usize,
3310
3311 pub lengths: FragmentSummaryStats,
3313 }
3317
3318#[skip_serializing_none]
3319#[derive(Debug, Deserialize, PartialEq)]
3320pub struct FragmentSummaryStats {
3321 pub min: usize,
3322 pub max: usize,
3323 pub mean: usize,
3324 pub p25: usize,
3325 pub p50: usize,
3326 pub p75: usize,
3327 pub p99: usize,
3328}
3329
3330#[cfg(test)]
3331#[allow(deprecated)]
3332mod tests {
3333 use std::sync::atomic::{AtomicBool, Ordering};
3334 use std::sync::Arc;
3335 use std::time::Duration;
3336
3337 use arrow_array::{
3338 builder::{ListBuilder, StringBuilder},
3339 Array, BooleanArray, FixedSizeListArray, Float32Array, Int32Array, LargeStringArray,
3340 RecordBatch, RecordBatchIterator, RecordBatchReader, StringArray,
3341 };
3342 use arrow_array::{BinaryArray, LargeBinaryArray};
3343 use arrow_data::ArrayDataBuilder;
3344 use arrow_schema::{DataType, Field, Schema};
3345 use lance::dataset::WriteMode;
3346 use lance::io::{ObjectStoreParams, WrappingObjectStore};
3347 use lance::Dataset;
3348 use rand::Rng;
3349 use tempfile::tempdir;
3350
3351 use super::*;
3352 use crate::connect;
3353 use crate::connection::ConnectBuilder;
3354 use crate::index::scalar::{BTreeIndexBuilder, BitmapIndexBuilder};
3355 use crate::index::vector::{IvfHnswPqIndexBuilder, IvfHnswSqIndexBuilder};
3356
3357 #[tokio::test]
3358 async fn test_open() {
3359 let tmp_dir = tempdir().unwrap();
3360 let dataset_path = tmp_dir.path().join("test.lance");
3361
3362 let batches = make_test_batches();
3363 Dataset::write(batches, dataset_path.to_str().unwrap(), None)
3364 .await
3365 .unwrap();
3366
3367 let table = NativeTable::open(dataset_path.to_str().unwrap())
3368 .await
3369 .unwrap();
3370
3371 assert_eq!(table.name, "test")
3372 }
3373
3374 #[tokio::test]
3375 async fn test_open_not_found() {
3376 let tmp_dir = tempdir().unwrap();
3377 let uri = tmp_dir.path().to_str().unwrap();
3378 let table = NativeTable::open(uri).await;
3379 assert!(matches!(table.unwrap_err(), Error::TableNotFound { .. }));
3380 }
3381
3382 #[test]
3383 #[cfg(not(windows))]
3384 fn test_object_store_path() {
3385 use std::path::Path as StdPath;
3386 let p = StdPath::new("s3://bucket/path/to/file");
3387 let c = p.join("subfile");
3388 assert_eq!(c.to_str().unwrap(), "s3://bucket/path/to/file/subfile");
3389 }
3390
3391 #[tokio::test]
3392 async fn test_count_rows() {
3393 let tmp_dir = tempdir().unwrap();
3394 let uri = tmp_dir.path().to_str().unwrap();
3395
3396 let batches = make_test_batches();
3397 let batches = Box::new(batches) as Box<dyn RecordBatchReader + Send>;
3398 let table = NativeTable::create(uri, "test", vec![], batches, None, None, None, None)
3399 .await
3400 .unwrap();
3401
3402 assert_eq!(table.count_rows(None).await.unwrap(), 10);
3403 assert_eq!(
3404 table
3405 .count_rows(Some(Filter::Sql("i >= 5".to_string())))
3406 .await
3407 .unwrap(),
3408 5
3409 );
3410 }
3411
3412 #[tokio::test]
3413 async fn test_add() {
3414 let tmp_dir = tempdir().unwrap();
3415 let uri = tmp_dir.path().to_str().unwrap();
3416 let conn = connect(uri).execute().await.unwrap();
3417
3418 let batches = make_test_batches();
3419 let schema = batches.schema().clone();
3420 let table = conn.create_table("test", batches).execute().await.unwrap();
3421 assert_eq!(table.count_rows(None).await.unwrap(), 10);
3422
3423 let new_batches = RecordBatchIterator::new(
3424 vec![RecordBatch::try_new(
3425 schema.clone(),
3426 vec![Arc::new(Int32Array::from_iter_values(100..110))],
3427 )
3428 .unwrap()]
3429 .into_iter()
3430 .map(Ok),
3431 schema.clone(),
3432 );
3433
3434 table.add(new_batches).execute().await.unwrap();
3435 assert_eq!(table.count_rows(None).await.unwrap(), 20);
3436 assert_eq!(table.name(), "test");
3437 }
3438
3439 #[tokio::test]
3440 async fn test_merge_insert() {
3441 let tmp_dir = tempdir().unwrap();
3442 let uri = tmp_dir.path().to_str().unwrap();
3443 let conn = connect(uri).execute().await.unwrap();
3444
3445 let batches = merge_insert_test_batches(0, 0);
3447 let table = conn
3448 .create_table("my_table", batches)
3449 .execute()
3450 .await
3451 .unwrap();
3452 assert_eq!(table.count_rows(None).await.unwrap(), 10);
3453
3454 let new_batches = Box::new(merge_insert_test_batches(5, 1));
3456
3457 let mut merge_insert_builder = table.merge_insert(&["i"]);
3459 merge_insert_builder.when_not_matched_insert_all();
3460 let result = merge_insert_builder.execute(new_batches).await.unwrap();
3461 assert_eq!(table.count_rows(None).await.unwrap(), 15);
3463 assert_eq!(result.num_inserted_rows, 5);
3464 assert_eq!(result.num_updated_rows, 0);
3465 assert_eq!(result.num_deleted_rows, 0);
3466 assert_eq!(result.num_attempts, 1);
3467
3468 let new_batches = Box::new(merge_insert_test_batches(15, 2));
3470 let mut merge_insert_builder = table.merge_insert(&["i"]);
3472 merge_insert_builder.when_matched_update_all(None);
3473 merge_insert_builder.execute(new_batches).await.unwrap();
3474 assert_eq!(table.count_rows(None).await.unwrap(), 15);
3476 assert_eq!(
3477 table.count_rows(Some("age = 2".to_string())).await.unwrap(),
3478 0
3479 );
3480
3481 let new_batches = Box::new(merge_insert_test_batches(5, 3));
3483 let mut merge_insert_builder = table.merge_insert(&["i"]);
3484 merge_insert_builder.when_matched_update_all(Some("target.age = 0".to_string()));
3485 merge_insert_builder.execute(new_batches).await.unwrap();
3486 assert_eq!(
3487 table.count_rows(Some("age = 3".to_string())).await.unwrap(),
3488 5
3489 );
3490 }
3491
3492 #[tokio::test]
3493 async fn test_merge_insert_use_index() {
3494 let tmp_dir = tempdir().unwrap();
3495 let uri = tmp_dir.path().to_str().unwrap();
3496 let conn = connect(uri).execute().await.unwrap();
3497
3498 let batches = merge_insert_test_batches(0, 0);
3500 let table = conn
3501 .create_table("my_table", batches)
3502 .execute()
3503 .await
3504 .unwrap();
3505 assert_eq!(table.count_rows(None).await.unwrap(), 10);
3506
3507 let new_batches = Box::new(merge_insert_test_batches(5, 1));
3509 let mut merge_insert_builder = table.merge_insert(&["i"]);
3510 merge_insert_builder.when_not_matched_insert_all();
3511 merge_insert_builder.use_index(true);
3512 merge_insert_builder.execute(new_batches).await.unwrap();
3513 assert_eq!(table.count_rows(None).await.unwrap(), 15);
3514
3515 let new_batches = Box::new(merge_insert_test_batches(15, 2));
3517 let mut merge_insert_builder = table.merge_insert(&["i"]);
3518 merge_insert_builder.when_not_matched_insert_all();
3519 merge_insert_builder.use_index(false);
3520 merge_insert_builder.execute(new_batches).await.unwrap();
3521 assert_eq!(table.count_rows(None).await.unwrap(), 25);
3522 }
3523
3524 #[tokio::test]
3525 async fn test_add_overwrite() {
3526 let tmp_dir = tempdir().unwrap();
3527 let uri = tmp_dir.path().to_str().unwrap();
3528 let conn = connect(uri).execute().await.unwrap();
3529
3530 let batches = make_test_batches();
3531 let schema = batches.schema().clone();
3532 let table = conn.create_table("test", batches).execute().await.unwrap();
3533 assert_eq!(table.count_rows(None).await.unwrap(), 10);
3534
3535 let batches = vec![RecordBatch::try_new(
3536 schema.clone(),
3537 vec![Arc::new(Int32Array::from_iter_values(100..110))],
3538 )
3539 .unwrap()]
3540 .into_iter()
3541 .map(Ok);
3542
3543 let new_batches = RecordBatchIterator::new(batches.clone(), schema.clone());
3544
3545 table
3547 .add(new_batches)
3548 .mode(AddDataMode::Overwrite)
3549 .execute()
3550 .await
3551 .unwrap();
3552 assert_eq!(table.count_rows(None).await.unwrap(), 10);
3553 assert_eq!(table.name(), "test");
3554
3555 let param: WriteParams = WriteParams {
3559 mode: WriteMode::Overwrite,
3560 ..Default::default()
3561 };
3562
3563 let new_batches = RecordBatchIterator::new(batches.clone(), schema.clone());
3564 table
3565 .add(new_batches)
3566 .write_options(WriteOptions {
3567 lance_write_params: Some(param),
3568 })
3569 .mode(AddDataMode::Append)
3570 .execute()
3571 .await
3572 .unwrap();
3573 assert_eq!(table.count_rows(None).await.unwrap(), 10);
3574 assert_eq!(table.name(), "test");
3575 }
3576
3577 #[derive(Default, Debug)]
3578 struct NoOpCacheWrapper {
3579 called: AtomicBool,
3580 }
3581
3582 impl NoOpCacheWrapper {
3583 fn called(&self) -> bool {
3584 self.called.load(Ordering::Relaxed)
3585 }
3586 }
3587
3588 impl WrappingObjectStore for NoOpCacheWrapper {
3589 fn wrap(
3590 &self,
3591 _store_prefix: &str,
3592 original: Arc<dyn object_store::ObjectStore>,
3593 ) -> Arc<dyn object_store::ObjectStore> {
3594 self.called.store(true, Ordering::Relaxed);
3595 original
3596 }
3597 }
3598
3599 #[tokio::test]
3600 async fn test_open_table_options() {
3601 let tmp_dir = tempdir().unwrap();
3602 let dataset_path = tmp_dir.path().join("test.lance");
3603 let uri = dataset_path.to_str().unwrap();
3604 let conn = connect(uri).execute().await.unwrap();
3605
3606 let batches = make_test_batches();
3607
3608 conn.create_table("my_table", batches)
3609 .execute()
3610 .await
3611 .unwrap();
3612
3613 let wrapper = Arc::new(NoOpCacheWrapper::default());
3614
3615 let object_store_params = ObjectStoreParams {
3616 object_store_wrapper: Some(wrapper.clone()),
3617 ..Default::default()
3618 };
3619 let param = ReadParams {
3620 store_options: Some(object_store_params),
3621 ..Default::default()
3622 };
3623 assert!(!wrapper.called());
3624 conn.open_table("my_table")
3625 .lance_read_params(param)
3626 .execute()
3627 .await
3628 .unwrap();
3629 assert!(wrapper.called());
3630 }
3631
3632 fn merge_insert_test_batches(
3633 offset: i32,
3634 age: i32,
3635 ) -> impl RecordBatchReader + Send + Sync + 'static {
3636 let schema = Arc::new(Schema::new(vec![
3637 Field::new("i", DataType::Int32, false),
3638 Field::new("age", DataType::Int32, false),
3639 ]));
3640 RecordBatchIterator::new(
3641 vec![RecordBatch::try_new(
3642 schema.clone(),
3643 vec![
3644 Arc::new(Int32Array::from_iter_values(offset..(offset + 10))),
3645 Arc::new(Int32Array::from_iter_values(std::iter::repeat_n(age, 10))),
3646 ],
3647 )],
3648 schema,
3649 )
3650 }
3651
3652 fn make_test_batches() -> impl RecordBatchReader + Send + Sync + 'static {
3653 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
3654 RecordBatchIterator::new(
3655 vec![RecordBatch::try_new(
3656 schema.clone(),
3657 vec![Arc::new(Int32Array::from_iter_values(0..10))],
3658 )],
3659 schema,
3660 )
3661 }
3662
3663 #[tokio::test]
3664 async fn test_tags() {
3665 let tmp_dir = tempdir().unwrap();
3666 let uri = tmp_dir.path().to_str().unwrap();
3667
3668 let conn = ConnectBuilder::new(uri)
3669 .read_consistency_interval(Duration::from_secs(0))
3670 .execute()
3671 .await
3672 .unwrap();
3673 let table = conn
3674 .create_table("my_table", some_sample_data())
3675 .execute()
3676 .await
3677 .unwrap();
3678 assert_eq!(table.version().await.unwrap(), 1);
3679 table.add(some_sample_data()).execute().await.unwrap();
3680 assert_eq!(table.version().await.unwrap(), 2);
3681 let mut tags_manager = table.tags().await.unwrap();
3682 let tags = tags_manager.list().await.unwrap();
3683 assert!(tags.is_empty(), "Tags should be empty initially");
3684 let tag1 = "tag1";
3685 tags_manager.create(tag1, 1).await.unwrap();
3686 assert_eq!(tags_manager.get_version(tag1).await.unwrap(), 1);
3687 let tags = tags_manager.list().await.unwrap();
3688 assert_eq!(tags.len(), 1);
3689 assert!(tags.contains_key(tag1));
3690 assert_eq!(tags.get(tag1).unwrap().version, 1);
3691 tags_manager.create("tag2", 2).await.unwrap();
3692 assert_eq!(tags_manager.get_version("tag2").await.unwrap(), 2);
3693 let tags = tags_manager.list().await.unwrap();
3694 assert_eq!(tags.len(), 2);
3695 assert!(tags.contains_key(tag1));
3696 assert_eq!(tags.get(tag1).unwrap().version, 1);
3697 assert!(tags.contains_key("tag2"));
3698 assert_eq!(tags.get("tag2").unwrap().version, 2);
3699 table.add(some_sample_data()).execute().await.unwrap();
3701 tags_manager.update(tag1, 3).await.unwrap();
3702 assert_eq!(tags_manager.get_version(tag1).await.unwrap(), 3);
3703 tags_manager.delete("tag2").await.unwrap();
3704 let tags = tags_manager.list().await.unwrap();
3705 assert_eq!(tags.len(), 1);
3706 assert!(tags.contains_key(tag1));
3707 assert_eq!(tags.get(tag1).unwrap().version, 3);
3708 table.add(some_sample_data()).execute().await.unwrap();
3710 assert_eq!(table.version().await.unwrap(), 4);
3711 table.checkout_tag(tag1).await.unwrap();
3712 assert_eq!(table.version().await.unwrap(), 3);
3713 table.checkout_latest().await.unwrap();
3714 assert_eq!(table.version().await.unwrap(), 4);
3715 }
3716
3717 #[tokio::test]
3718 async fn test_create_index() {
3719 use arrow_array::RecordBatch;
3720 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3721 use rand;
3722 use std::iter::repeat_with;
3723
3724 use arrow_array::Float32Array;
3725
3726 let tmp_dir = tempdir().unwrap();
3727 let uri = tmp_dir.path().to_str().unwrap();
3728 let conn = connect(uri).execute().await.unwrap();
3729
3730 let dimension = 16;
3731 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3732 "embeddings",
3733 DataType::FixedSizeList(
3734 Arc::new(Field::new("item", DataType::Float32, true)),
3735 dimension,
3736 ),
3737 false,
3738 )]));
3739
3740 let mut rng = rand::thread_rng();
3741 let float_arr = Float32Array::from(
3742 repeat_with(|| rng.gen::<f32>())
3743 .take(512 * dimension as usize)
3744 .collect::<Vec<f32>>(),
3745 );
3746
3747 let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3748 let batches = RecordBatchIterator::new(
3749 vec![RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap()]
3750 .into_iter()
3751 .map(Ok),
3752 schema,
3753 );
3754
3755 let table = conn.create_table("test", batches).execute().await.unwrap();
3756
3757 assert_eq!(table.index_stats("my_index").await.unwrap(), None);
3758
3759 table
3760 .create_index(&["embeddings"], Index::Auto)
3761 .execute()
3762 .await
3763 .unwrap();
3764
3765 let index_configs = table.list_indices().await.unwrap();
3766 assert_eq!(index_configs.len(), 1);
3767 let index = index_configs.into_iter().next().unwrap();
3768 assert_eq!(index.index_type, crate::index::IndexType::IvfPq);
3769 assert_eq!(index.columns, vec!["embeddings".to_string()]);
3770 assert_eq!(table.count_rows(None).await.unwrap(), 512);
3771 assert_eq!(table.name(), "test");
3772
3773 let indices = table.as_native().unwrap().load_indices().await.unwrap();
3774 let index_name = &indices[0].index_name;
3775 let stats = table.index_stats(index_name).await.unwrap().unwrap();
3776 assert_eq!(stats.num_indexed_rows, 512);
3777 assert_eq!(stats.num_unindexed_rows, 0);
3778 assert_eq!(stats.index_type, crate::index::IndexType::IvfPq);
3779 assert_eq!(stats.distance_type, Some(crate::DistanceType::L2));
3780 assert!(stats.loss.is_some());
3781
3782 table.drop_index(index_name).await.unwrap();
3783 assert_eq!(table.list_indices().await.unwrap().len(), 0);
3784 }
3785
3786 #[tokio::test]
3787 async fn test_ivf_pq_uses_default_partition_size_for_num_partitions() {
3788 use arrow_array::{Float32Array, RecordBatch};
3789 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3790
3791 use crate::index::vector::IvfPqIndexBuilder;
3792
3793 let tmp_dir = tempdir().unwrap();
3794 let uri = tmp_dir.path().to_str().unwrap();
3795 let conn = connect(uri).execute().await.unwrap();
3796
3797 const PARTITION_SIZE: usize = 8192;
3798 let num_rows = PARTITION_SIZE * 2;
3799 let dimension = 8usize;
3800 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3801 "embeddings",
3802 DataType::FixedSizeList(
3803 Arc::new(Field::new("item", DataType::Float32, true)),
3804 dimension as i32,
3805 ),
3806 false,
3807 )]));
3808
3809 let float_arr =
3810 Float32Array::from_iter_values((0..(num_rows * dimension)).map(|v| v as f32));
3811 let vectors = Arc::new(create_fixed_size_list(float_arr, dimension as i32).unwrap());
3812 let batches = RecordBatchIterator::new(
3813 vec![RecordBatch::try_new(schema.clone(), vec![vectors]).unwrap()]
3814 .into_iter()
3815 .map(Ok),
3816 schema,
3817 );
3818
3819 let table = conn.create_table("test", batches).execute().await.unwrap();
3820 let native_table = table.as_native().unwrap();
3821 let builder = IvfPqIndexBuilder::default();
3822 table
3823 .create_index(&["embeddings"], Index::IvfPq(builder))
3824 .execute()
3825 .await
3826 .unwrap();
3827 table
3828 .wait_for_index(&["embeddings_idx"], std::time::Duration::from_secs(30))
3829 .await
3830 .unwrap();
3831
3832 use lance::index::vector::ivf::v2::IvfPq as LanceIvfPq;
3833 use lance::index::DatasetIndexInternalExt;
3834 use lance_index::metrics::NoOpMetricsCollector;
3835 use lance_index::vector::VectorIndex as LanceVectorIndex;
3836
3837 let indices = native_table.load_indices().await.unwrap();
3838 let index_uuid = indices[0].index_uuid.clone();
3839
3840 let dataset_guard = native_table.dataset.get().await.unwrap();
3841 let dataset = (*dataset_guard).clone();
3842 drop(dataset_guard);
3843
3844 let lance_index = dataset
3845 .open_vector_index("embeddings", &index_uuid, &NoOpMetricsCollector)
3846 .await
3847 .unwrap();
3848 let ivf_index = lance_index
3849 .as_any()
3850 .downcast_ref::<LanceIvfPq>()
3851 .expect("expected IvfPq index");
3852 let partition_count = ivf_index.ivf_model().num_partitions();
3853
3854 let expected_partitions = num_rows / PARTITION_SIZE;
3855 assert_eq!(partition_count, expected_partitions);
3856 }
3857
3858 #[tokio::test]
3859 async fn test_create_index_ivf_hnsw_sq() {
3860 use arrow_array::RecordBatch;
3861 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3862 use rand;
3863 use std::iter::repeat_with;
3864
3865 use arrow_array::Float32Array;
3866
3867 let tmp_dir = tempdir().unwrap();
3868 let uri = tmp_dir.path().to_str().unwrap();
3869 let conn = connect(uri).execute().await.unwrap();
3870
3871 let dimension = 16;
3872 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3873 "embeddings",
3874 DataType::FixedSizeList(
3875 Arc::new(Field::new("item", DataType::Float32, true)),
3876 dimension,
3877 ),
3878 false,
3879 )]));
3880
3881 let mut rng = rand::thread_rng();
3882 let float_arr = Float32Array::from(
3883 repeat_with(|| rng.gen::<f32>())
3884 .take(512 * dimension as usize)
3885 .collect::<Vec<f32>>(),
3886 );
3887
3888 let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3889 let batches = RecordBatchIterator::new(
3890 vec![RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap()]
3891 .into_iter()
3892 .map(Ok),
3893 schema,
3894 );
3895
3896 let table = conn.create_table("test", batches).execute().await.unwrap();
3897
3898 let stats = table.index_stats("my_index").await.unwrap();
3899 assert!(stats.is_none());
3900
3901 let index = IvfHnswSqIndexBuilder::default();
3902 table
3903 .create_index(&["embeddings"], Index::IvfHnswSq(index))
3904 .execute()
3905 .await
3906 .unwrap();
3907
3908 let index_configs = table.list_indices().await.unwrap();
3909 assert_eq!(index_configs.len(), 1);
3910 let index = index_configs.into_iter().next().unwrap();
3911 assert_eq!(index.index_type, crate::index::IndexType::IvfHnswSq);
3912 assert_eq!(index.columns, vec!["embeddings".to_string()]);
3913 assert_eq!(table.count_rows(None).await.unwrap(), 512);
3914 assert_eq!(table.name(), "test");
3915
3916 let indices = table.as_native().unwrap().load_indices().await.unwrap();
3917 let index_name = &indices[0].index_name;
3918 let stats = table.index_stats(index_name).await.unwrap().unwrap();
3919 assert_eq!(stats.num_indexed_rows, 512);
3920 assert_eq!(stats.num_unindexed_rows, 0);
3921 }
3922
3923 #[tokio::test]
3924 async fn test_create_index_ivf_hnsw_pq() {
3925 use arrow_array::RecordBatch;
3926 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3927 use rand;
3928 use std::iter::repeat_with;
3929
3930 use arrow_array::Float32Array;
3931
3932 let tmp_dir = tempdir().unwrap();
3933 let uri = tmp_dir.path().to_str().unwrap();
3934 let conn = connect(uri).execute().await.unwrap();
3935
3936 let dimension = 16;
3937 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
3938 "embeddings",
3939 DataType::FixedSizeList(
3940 Arc::new(Field::new("item", DataType::Float32, true)),
3941 dimension,
3942 ),
3943 false,
3944 )]));
3945
3946 let mut rng = rand::thread_rng();
3947 let float_arr = Float32Array::from(
3948 repeat_with(|| rng.gen::<f32>())
3949 .take(512 * dimension as usize)
3950 .collect::<Vec<f32>>(),
3951 );
3952
3953 let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
3954 let batches = RecordBatchIterator::new(
3955 vec![RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap()]
3956 .into_iter()
3957 .map(Ok),
3958 schema,
3959 );
3960
3961 let table = conn.create_table("test", batches).execute().await.unwrap();
3962 let stats = table.index_stats("my_index").await.unwrap();
3963 assert!(stats.is_none());
3964
3965 let index = IvfHnswPqIndexBuilder::default();
3966 table
3967 .create_index(&["embeddings"], Index::IvfHnswPq(index))
3968 .execute()
3969 .await
3970 .unwrap();
3971 table
3972 .wait_for_index(&["embeddings_idx"], Duration::from_millis(10))
3973 .await
3974 .unwrap();
3975 let index_configs = table.list_indices().await.unwrap();
3976 assert_eq!(index_configs.len(), 1);
3977 let index = index_configs.into_iter().next().unwrap();
3978 assert_eq!(index.index_type, crate::index::IndexType::IvfHnswPq);
3979 assert_eq!(index.columns, vec!["embeddings".to_string()]);
3980 assert_eq!(table.count_rows(None).await.unwrap(), 512);
3981 assert_eq!(table.name(), "test");
3982
3983 let indices: Vec<VectorIndex> = table.as_native().unwrap().load_indices().await.unwrap();
3984 let index_name = &indices[0].index_name;
3985 let stats = table.index_stats(index_name).await.unwrap().unwrap();
3986 assert_eq!(stats.num_indexed_rows, 512);
3987 assert_eq!(stats.num_unindexed_rows, 0);
3988 }
3989
3990 fn create_fixed_size_list<T: Array>(values: T, list_size: i32) -> Result<FixedSizeListArray> {
3991 let list_type = DataType::FixedSizeList(
3992 Arc::new(Field::new("item", values.data_type().clone(), true)),
3993 list_size,
3994 );
3995 let data = ArrayDataBuilder::new(list_type)
3996 .len(values.len() / list_size as usize)
3997 .add_child_data(values.into_data())
3998 .build()
3999 .unwrap();
4000
4001 Ok(FixedSizeListArray::from(data))
4002 }
4003
4004 fn some_sample_data() -> Box<dyn RecordBatchReader + Send> {
4005 let batch = RecordBatch::try_new(
4006 Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
4007 vec![Arc::new(Int32Array::from(vec![1]))],
4008 )
4009 .unwrap();
4010 let schema = batch.schema().clone();
4011 let batch = Ok(batch);
4012
4013 Box::new(RecordBatchIterator::new(vec![batch], schema))
4014 }
4015
4016 #[tokio::test]
4017 async fn test_create_scalar_index() {
4018 let tmp_dir = tempdir().unwrap();
4019 let uri = tmp_dir.path().to_str().unwrap();
4020
4021 let batch = RecordBatch::try_new(
4022 Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
4023 vec![Arc::new(Int32Array::from(vec![1]))],
4024 )
4025 .unwrap();
4026 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4027 let table = conn
4028 .create_table(
4029 "my_table",
4030 RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()),
4031 )
4032 .execute()
4033 .await
4034 .unwrap();
4035
4036 table
4038 .create_index(&["i"], Index::Auto)
4039 .execute()
4040 .await
4041 .unwrap();
4042 table
4043 .wait_for_index(&["i_idx"], Duration::from_millis(10))
4044 .await
4045 .unwrap();
4046 let index_configs = table.list_indices().await.unwrap();
4047 assert_eq!(index_configs.len(), 1);
4048 let index = index_configs.into_iter().next().unwrap();
4049 assert_eq!(index.index_type, crate::index::IndexType::BTree);
4050 assert_eq!(index.columns, vec!["i".to_string()]);
4051
4052 table
4054 .create_index(&["i"], Index::BTree(BTreeIndexBuilder::default()))
4055 .execute()
4056 .await
4057 .unwrap();
4058
4059 let index_configs = table.list_indices().await.unwrap();
4060 assert_eq!(index_configs.len(), 1);
4061 let index = index_configs.into_iter().next().unwrap();
4062 assert_eq!(index.index_type, crate::index::IndexType::BTree);
4063 assert_eq!(index.columns, vec!["i".to_string()]);
4064
4065 let indices = table.as_native().unwrap().load_indices().await.unwrap();
4066 let index_name = &indices[0].index_name;
4067 let stats = table.index_stats(index_name).await.unwrap().unwrap();
4068 assert_eq!(stats.num_indexed_rows, 1);
4069 assert_eq!(stats.num_unindexed_rows, 0);
4070 }
4071
4072 #[tokio::test]
4073 async fn test_create_bitmap_index() {
4074 let tmp_dir = tempdir().unwrap();
4075 let uri = tmp_dir.path().to_str().unwrap();
4076
4077 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4078
4079 let schema = Arc::new(Schema::new(vec![
4080 Field::new("id", DataType::Int32, false),
4081 Field::new("category", DataType::Utf8, true),
4082 Field::new("large_category", DataType::LargeUtf8, true),
4083 Field::new("is_active", DataType::Boolean, true),
4084 Field::new("data", DataType::Binary, true),
4085 Field::new("large_data", DataType::LargeBinary, true),
4086 ]));
4087
4088 let batch = RecordBatch::try_new(
4089 schema.clone(),
4090 vec![
4091 Arc::new(Int32Array::from_iter_values(0..100)),
4092 Arc::new(StringArray::from_iter_values(
4093 (0..100).map(|i| format!("category_{}", i % 5)),
4094 )),
4095 Arc::new(LargeStringArray::from_iter_values(
4096 (0..100).map(|i| format!("large_category_{}", i % 5)),
4097 )),
4098 Arc::new(BooleanArray::from_iter((0..100).map(|i| Some(i % 2 == 0)))),
4099 Arc::new(BinaryArray::from_iter_values(
4100 (0_u32..100).map(|i| i.to_le_bytes()),
4101 )),
4102 Arc::new(LargeBinaryArray::from_iter_values(
4103 (0_u32..100).map(|i| i.to_le_bytes()),
4104 )),
4105 ],
4106 )
4107 .unwrap();
4108
4109 let table = conn
4110 .create_table(
4111 "test_bitmap",
4112 RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()),
4113 )
4114 .execute()
4115 .await
4116 .unwrap();
4117
4118 table
4120 .create_index(&["category"], Index::Bitmap(Default::default()))
4121 .execute()
4122 .await
4123 .unwrap();
4124
4125 table
4127 .create_index(&["is_active"], Index::Bitmap(Default::default()))
4128 .execute()
4129 .await
4130 .unwrap();
4131
4132 table
4134 .create_index(&["data"], Index::Bitmap(Default::default()))
4135 .execute()
4136 .await
4137 .unwrap();
4138
4139 table
4141 .create_index(&["large_data"], Index::Bitmap(Default::default()))
4142 .execute()
4143 .await
4144 .unwrap();
4145
4146 table
4148 .create_index(&["large_category"], Index::Bitmap(Default::default()))
4149 .execute()
4150 .await
4151 .unwrap();
4152
4153 let index_configs = table.list_indices().await.unwrap();
4155 assert_eq!(index_configs.len(), 5);
4156
4157 let mut configs_iter = index_configs.into_iter();
4158 let index = configs_iter.next().unwrap();
4159 assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4160 assert_eq!(index.columns, vec!["category".to_string()]);
4161
4162 let index = configs_iter.next().unwrap();
4163 assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4164 assert_eq!(index.columns, vec!["is_active".to_string()]);
4165
4166 let index = configs_iter.next().unwrap();
4167 assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4168 assert_eq!(index.columns, vec!["data".to_string()]);
4169
4170 let index = configs_iter.next().unwrap();
4171 assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4172 assert_eq!(index.columns, vec!["large_data".to_string()]);
4173
4174 let index = configs_iter.next().unwrap();
4175 assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
4176 assert_eq!(index.columns, vec!["large_category".to_string()]);
4177 }
4178
4179 #[tokio::test]
4180 async fn test_create_label_list_index() {
4181 let tmp_dir = tempdir().unwrap();
4182 let uri = tmp_dir.path().to_str().unwrap();
4183
4184 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4185
4186 let schema = Arc::new(Schema::new(vec![
4187 Field::new("id", DataType::Int32, false),
4188 Field::new(
4189 "tags",
4190 DataType::List(Field::new("item", DataType::Utf8, true).into()),
4191 true,
4192 ),
4193 ]));
4194
4195 const TAGS: [&str; 3] = ["cat", "dog", "fish"];
4196
4197 let values_builder = StringBuilder::new();
4198 let mut builder = ListBuilder::new(values_builder);
4199 for i in 0..120 {
4200 builder.values().append_value(TAGS[i % 3]);
4201 if i % 3 == 0 {
4202 builder.append(true)
4203 }
4204 }
4205 let tags = Arc::new(builder.finish());
4206
4207 let batch = RecordBatch::try_new(
4208 schema.clone(),
4209 vec![Arc::new(Int32Array::from_iter_values(0..40)), tags],
4210 )
4211 .unwrap();
4212
4213 let table = conn
4214 .create_table(
4215 "test_bitmap",
4216 RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()),
4217 )
4218 .execute()
4219 .await
4220 .unwrap();
4221
4222 assert!(table
4224 .create_index(&["tags"], Index::BTree(Default::default()))
4225 .execute()
4226 .await
4227 .is_err());
4228 assert!(table
4229 .create_index(&["tags"], Index::Bitmap(Default::default()))
4230 .execute()
4231 .await
4232 .is_err());
4233
4234 table
4236 .create_index(&["tags"], Index::LabelList(Default::default()))
4237 .execute()
4238 .await
4239 .unwrap();
4240
4241 let index_configs = table.list_indices().await.unwrap();
4243 assert_eq!(index_configs.len(), 1);
4244 let index = index_configs.into_iter().next().unwrap();
4245 assert_eq!(index.index_type, crate::index::IndexType::LabelList);
4246 assert_eq!(index.columns, vec!["tags".to_string()]);
4247 }
4248
4249 #[tokio::test]
4250 async fn test_create_inverted_index() {
4251 let tmp_dir = tempdir().unwrap();
4252 let uri = tmp_dir.path().to_str().unwrap();
4253
4254 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4255 const WORDS: [&str; 3] = ["cat", "dog", "fish"];
4256 let mut text_builder = StringBuilder::new();
4257 let num_rows = 120;
4258 for i in 0..num_rows {
4259 text_builder.append_value(WORDS[i % 3]);
4260 }
4261 let text = Arc::new(text_builder.finish());
4262
4263 let schema = Arc::new(Schema::new(vec![
4264 Field::new("id", DataType::Int32, false),
4265 Field::new("text", DataType::Utf8, true),
4266 ]));
4267 let batch = RecordBatch::try_new(
4268 schema.clone(),
4269 vec![
4270 Arc::new(Int32Array::from_iter_values(0..num_rows as i32)),
4271 text,
4272 ],
4273 )
4274 .unwrap();
4275
4276 let table = conn
4277 .create_table(
4278 "test_bitmap",
4279 RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()),
4280 )
4281 .execute()
4282 .await
4283 .unwrap();
4284
4285 table
4286 .create_index(&["text"], Index::FTS(Default::default()))
4287 .execute()
4288 .await
4289 .unwrap();
4290 let index_configs = table.list_indices().await.unwrap();
4291 assert_eq!(index_configs.len(), 1);
4292 let index = index_configs.into_iter().next().unwrap();
4293 assert_eq!(index.index_type, crate::index::IndexType::FTS);
4294 assert_eq!(index.columns, vec!["text".to_string()]);
4295 assert_eq!(index.name, "text_idx");
4296
4297 let stats = table.index_stats("text_idx").await.unwrap().unwrap();
4298 assert_eq!(stats.num_indexed_rows, num_rows);
4299 assert_eq!(stats.num_unindexed_rows, 0);
4300 assert_eq!(stats.index_type, crate::index::IndexType::FTS);
4301 assert_eq!(stats.distance_type, None);
4302
4303 table.prewarm_index("text_idx").await.unwrap();
4305 }
4306
4307 #[cfg(not(target_os = "windows"))]
4309 #[tokio::test]
4310 async fn test_read_consistency_interval() {
4311 let intervals = vec![
4312 None,
4313 Some(0),
4314 Some(100), ];
4316
4317 for interval in intervals {
4318 let data = some_sample_data();
4319
4320 let tmp_dir = tempdir().unwrap();
4321 let uri = tmp_dir.path().to_str().unwrap();
4322
4323 let conn1 = ConnectBuilder::new(uri).execute().await.unwrap();
4324 let table1 = conn1
4325 .create_empty_table("my_table", data.schema())
4326 .execute()
4327 .await
4328 .unwrap();
4329
4330 let mut conn2 = ConnectBuilder::new(uri);
4331 if let Some(interval) = interval {
4332 conn2 = conn2.read_consistency_interval(std::time::Duration::from_millis(interval));
4333 }
4334 let conn2 = conn2.execute().await.unwrap();
4335 let table2 = conn2.open_table("my_table").execute().await.unwrap();
4336
4337 assert_eq!(table1.count_rows(None).await.unwrap(), 0);
4338 assert_eq!(table2.count_rows(None).await.unwrap(), 0);
4339
4340 table1.add(data).execute().await.unwrap();
4341 assert_eq!(table1.count_rows(None).await.unwrap(), 1);
4342
4343 match interval {
4344 None => {
4345 assert_eq!(table2.count_rows(None).await.unwrap(), 0);
4346 table2.checkout_latest().await.unwrap();
4347 assert_eq!(table2.count_rows(None).await.unwrap(), 1);
4348 }
4349 Some(0) => {
4350 assert_eq!(table2.count_rows(None).await.unwrap(), 1);
4351 }
4352 Some(100) => {
4353 assert_eq!(table2.count_rows(None).await.unwrap(), 0);
4354 tokio::time::sleep(Duration::from_millis(100)).await;
4355 assert_eq!(table2.count_rows(None).await.unwrap(), 1);
4356 }
4357 _ => unreachable!(),
4358 }
4359 }
4360 }
4361
4362 #[tokio::test]
4363 async fn test_time_travel_write() {
4364 let tmp_dir = tempdir().unwrap();
4365 let uri = tmp_dir.path().to_str().unwrap();
4366
4367 let conn = ConnectBuilder::new(uri)
4368 .read_consistency_interval(Duration::from_secs(0))
4369 .execute()
4370 .await
4371 .unwrap();
4372 let table = conn
4373 .create_table("my_table", some_sample_data())
4374 .execute()
4375 .await
4376 .unwrap();
4377 let version = table.version().await.unwrap();
4378 table.add(some_sample_data()).execute().await.unwrap();
4379 table.checkout(version).await.unwrap();
4380 assert!(table.add(some_sample_data()).execute().await.is_err())
4381 }
4382
4383 #[tokio::test]
4384 async fn test_update_dataset_config() {
4385 let tmp_dir = tempdir().unwrap();
4386 let uri = tmp_dir.path().to_str().unwrap();
4387
4388 let conn = ConnectBuilder::new(uri)
4389 .read_consistency_interval(Duration::from_secs(0))
4390 .execute()
4391 .await
4392 .unwrap();
4393
4394 let table = conn
4395 .create_table("my_table", some_sample_data())
4396 .execute()
4397 .await
4398 .unwrap();
4399 let native_tbl = table.as_native().unwrap();
4400
4401 let manifest = native_tbl.manifest().await.unwrap();
4402 let base_config_len = manifest.config.len();
4403
4404 native_tbl
4405 .update_config(vec![("test_key1".to_string(), "test_val1".to_string())])
4406 .await
4407 .unwrap();
4408
4409 let manifest = native_tbl.manifest().await.unwrap();
4410 assert_eq!(manifest.config.len(), 1 + base_config_len);
4411 assert_eq!(
4412 manifest.config.get("test_key1"),
4413 Some(&"test_val1".to_string())
4414 );
4415
4416 native_tbl
4417 .update_config(vec![("test_key2".to_string(), "test_val2".to_string())])
4418 .await
4419 .unwrap();
4420 let manifest = native_tbl.manifest().await.unwrap();
4421 assert_eq!(manifest.config.len(), 2 + base_config_len);
4422 assert_eq!(
4423 manifest.config.get("test_key1"),
4424 Some(&"test_val1".to_string())
4425 );
4426 assert_eq!(
4427 manifest.config.get("test_key2"),
4428 Some(&"test_val2".to_string())
4429 );
4430
4431 native_tbl
4432 .update_config(vec![(
4433 "test_key2".to_string(),
4434 "test_val2_update".to_string(),
4435 )])
4436 .await
4437 .unwrap();
4438 let manifest = native_tbl.manifest().await.unwrap();
4439 assert_eq!(manifest.config.len(), 2 + base_config_len);
4440 assert_eq!(
4441 manifest.config.get("test_key1"),
4442 Some(&"test_val1".to_string())
4443 );
4444 assert_eq!(
4445 manifest.config.get("test_key2"),
4446 Some(&"test_val2_update".to_string())
4447 );
4448
4449 native_tbl.delete_config_keys(&["test_key1"]).await.unwrap();
4450 let manifest = native_tbl.manifest().await.unwrap();
4451 assert_eq!(manifest.config.len(), 1 + base_config_len);
4452 assert_eq!(
4453 manifest.config.get("test_key2"),
4454 Some(&"test_val2_update".to_string())
4455 );
4456 }
4457
4458 #[tokio::test]
4459 async fn test_schema_metadata_config() {
4460 let tmp_dir = tempdir().unwrap();
4461 let uri = tmp_dir.path().to_str().unwrap();
4462
4463 let conn = ConnectBuilder::new(uri)
4464 .read_consistency_interval(Duration::from_secs(0))
4465 .execute()
4466 .await
4467 .unwrap();
4468 let table = conn
4469 .create_table("my_table", some_sample_data())
4470 .execute()
4471 .await
4472 .unwrap();
4473
4474 let native_tbl = table.as_native().unwrap();
4475 let schema = native_tbl.schema().await.unwrap();
4476 let metadata = schema.metadata();
4477 assert_eq!(metadata.len(), 0);
4478
4479 native_tbl
4480 .replace_schema_metadata(vec![("test_key1".to_string(), "test_val1".to_string())])
4481 .await
4482 .unwrap();
4483
4484 let schema = native_tbl.schema().await.unwrap();
4485 let metadata = schema.metadata();
4486 assert_eq!(metadata.len(), 1);
4487 assert_eq!(metadata.get("test_key1"), Some(&"test_val1".to_string()));
4488
4489 native_tbl
4490 .replace_schema_metadata(vec![
4491 ("test_key1".to_string(), "test_val1_update".to_string()),
4492 ("test_key2".to_string(), "test_val2".to_string()),
4493 ])
4494 .await
4495 .unwrap();
4496 let schema = native_tbl.schema().await.unwrap();
4497 let metadata = schema.metadata();
4498 assert_eq!(metadata.len(), 2);
4499 assert_eq!(
4500 metadata.get("test_key1"),
4501 Some(&"test_val1_update".to_string())
4502 );
4503 assert_eq!(metadata.get("test_key2"), Some(&"test_val2".to_string()));
4504
4505 native_tbl
4506 .replace_schema_metadata(vec![(
4507 "test_key2".to_string(),
4508 "test_val2_update".to_string(),
4509 )])
4510 .await
4511 .unwrap();
4512 let schema = native_tbl.schema().await.unwrap();
4513 let metadata = schema.metadata();
4514 assert_eq!(
4515 metadata.get("test_key2"),
4516 Some(&"test_val2_update".to_string())
4517 );
4518 }
4519
4520 #[tokio::test]
4521 pub async fn test_field_metadata_update() {
4522 let tmp_dir = tempdir().unwrap();
4523 let uri = tmp_dir.path().to_str().unwrap();
4524
4525 let conn = ConnectBuilder::new(uri)
4526 .read_consistency_interval(Duration::from_secs(0))
4527 .execute()
4528 .await
4529 .unwrap();
4530 let table = conn
4531 .create_table("my_table", some_sample_data())
4532 .execute()
4533 .await
4534 .unwrap();
4535
4536 let native_tbl = table.as_native().unwrap();
4537 let schema = native_tbl.manifest().await.unwrap().schema;
4538
4539 let field = schema.field("i").unwrap();
4540 assert_eq!(field.metadata.len(), 0);
4541
4542 native_tbl
4543 .replace_schema_metadata(vec![(
4544 "test_key2".to_string(),
4545 "test_val2_update".to_string(),
4546 )])
4547 .await
4548 .unwrap();
4549
4550 let schema = native_tbl.schema().await.unwrap();
4551 let metadata = schema.metadata();
4552 assert_eq!(metadata.len(), 1);
4553 assert_eq!(
4554 metadata.get("test_key2"),
4555 Some(&"test_val2_update".to_string())
4556 );
4557
4558 let mut new_field_metadata = HashMap::<String, String>::new();
4559 new_field_metadata.insert("test_field_key1".into(), "test_field_val1".into());
4560 native_tbl
4561 .replace_field_metadata(vec![(field.id as u32, new_field_metadata)])
4562 .await
4563 .unwrap();
4564
4565 let schema = native_tbl.manifest().await.unwrap().schema;
4566 let field = schema.field("i").unwrap();
4567 assert_eq!(field.metadata.len(), 1);
4568 assert_eq!(
4569 field.metadata.get("test_field_key1"),
4570 Some(&"test_field_val1".to_string())
4571 );
4572 }
4573
4574 #[tokio::test]
4575 pub async fn test_stats() {
4576 let tmp_dir = tempdir().unwrap();
4577 let uri = tmp_dir.path().to_str().unwrap();
4578
4579 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4580
4581 let schema = Arc::new(Schema::new(vec![
4582 Field::new("id", DataType::Int32, false),
4583 Field::new("foo", DataType::Int32, true),
4584 ]));
4585 let batch = RecordBatch::try_new(
4586 schema.clone(),
4587 vec![
4588 Arc::new(Int32Array::from_iter_values(0..100)),
4589 Arc::new(Int32Array::from_iter_values(0..100)),
4590 ],
4591 )
4592 .unwrap();
4593
4594 let table = conn
4595 .create_table(
4596 "test_stats",
4597 RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()),
4598 )
4599 .execute()
4600 .await
4601 .unwrap();
4602 for _ in 0..10 {
4603 let batch = RecordBatch::try_new(
4604 schema.clone(),
4605 vec![
4606 Arc::new(Int32Array::from_iter_values(0..15)),
4607 Arc::new(Int32Array::from_iter_values(0..15)),
4608 ],
4609 )
4610 .unwrap();
4611 table
4612 .add(RecordBatchIterator::new(
4613 vec![Ok(batch.clone())],
4614 batch.schema(),
4615 ))
4616 .execute()
4617 .await
4618 .unwrap();
4619 }
4620
4621 let empty_table = conn
4622 .create_table(
4623 "test_stats_empty",
4624 RecordBatchIterator::new(vec![], batch.schema()),
4625 )
4626 .execute()
4627 .await
4628 .unwrap();
4629
4630 let res = table.stats().await.unwrap();
4631 println!("{:#?}", res);
4632 assert_eq!(
4633 res,
4634 TableStatistics {
4635 num_rows: 250,
4636 num_indices: 0,
4637 total_bytes: 2000,
4638 fragment_stats: FragmentStatistics {
4639 num_fragments: 11,
4640 num_small_fragments: 11,
4641 lengths: FragmentSummaryStats {
4642 min: 15,
4643 max: 100,
4644 mean: 22,
4645 p25: 15,
4646 p50: 15,
4647 p75: 15,
4648 p99: 100,
4649 },
4650 },
4651 }
4652 );
4653 let res = empty_table.stats().await.unwrap();
4654 println!("{:#?}", res);
4655 assert_eq!(
4656 res,
4657 TableStatistics {
4658 num_rows: 0,
4659 num_indices: 0,
4660 total_bytes: 0,
4661 fragment_stats: FragmentStatistics {
4662 num_fragments: 0,
4663 num_small_fragments: 0,
4664 lengths: FragmentSummaryStats {
4665 min: 0,
4666 max: 0,
4667 mean: 0,
4668 p25: 0,
4669 p50: 0,
4670 p75: 0,
4671 p99: 0,
4672 },
4673 },
4674 }
4675 )
4676 }
4677
4678 #[tokio::test]
4679 pub async fn test_list_indices_skip_frag_reuse() {
4680 let tmp_dir = tempdir().unwrap();
4681 let uri = tmp_dir.path().to_str().unwrap();
4682
4683 let conn = ConnectBuilder::new(uri).execute().await.unwrap();
4684
4685 let schema = Arc::new(Schema::new(vec![
4686 Field::new("id", DataType::Int32, false),
4687 Field::new("foo", DataType::Int32, true),
4688 ]));
4689 let batch = RecordBatch::try_new(
4690 schema.clone(),
4691 vec![
4692 Arc::new(Int32Array::from_iter_values(0..100)),
4693 Arc::new(Int32Array::from_iter_values(0..100)),
4694 ],
4695 )
4696 .unwrap();
4697
4698 let table = conn
4699 .create_table(
4700 "test_list_indices_skip_frag_reuse",
4701 RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()),
4702 )
4703 .execute()
4704 .await
4705 .unwrap();
4706
4707 table
4708 .add(RecordBatchIterator::new(
4709 vec![Ok(batch.clone())],
4710 batch.schema(),
4711 ))
4712 .execute()
4713 .await
4714 .unwrap();
4715
4716 table
4717 .create_index(&["id"], Index::Bitmap(BitmapIndexBuilder {}))
4718 .execute()
4719 .await
4720 .unwrap();
4721
4722 table
4723 .optimize(OptimizeAction::Compact {
4724 options: CompactionOptions {
4725 target_rows_per_fragment: 2_000,
4726 defer_index_remap: true,
4727 ..Default::default()
4728 },
4729 remap_options: None,
4730 })
4731 .await
4732 .unwrap();
4733
4734 let result = table.list_indices().await.unwrap();
4735 assert_eq!(result.len(), 1);
4736 assert_eq!(result[0].index_type, crate::index::IndexType::Bitmap);
4737 }
4738
4739 #[tokio::test]
4740 async fn test_convert_to_namespace_query_vector() {
4741 let tmp_dir = tempdir().unwrap();
4742 let dataset_path = tmp_dir.path().join("test_ns_query.lance");
4743
4744 let batches = make_test_batches();
4745 Dataset::write(batches, dataset_path.to_str().unwrap(), None)
4746 .await
4747 .unwrap();
4748
4749 let table = NativeTable::open(dataset_path.to_str().unwrap())
4750 .await
4751 .unwrap();
4752
4753 let query_vector = Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 4.0]));
4755 let vq = VectorQueryRequest {
4756 base: QueryRequest {
4757 limit: Some(10),
4758 offset: Some(5),
4759 filter: Some(QueryFilter::Sql("id > 0".to_string())),
4760 select: Select::Columns(vec!["id".to_string()]),
4761 ..Default::default()
4762 },
4763 column: Some("vector".to_string()),
4764 query_vector: vec![query_vector as Arc<dyn Array>],
4765 minimum_nprobes: 20,
4766 distance_type: Some(crate::DistanceType::L2),
4767 ..Default::default()
4768 };
4769
4770 let any_query = AnyQuery::VectorQuery(vq);
4771 let ns_request = table.convert_to_namespace_query(&any_query).unwrap();
4772
4773 assert_eq!(ns_request.k, 10);
4774 assert_eq!(ns_request.offset, Some(5));
4775 assert_eq!(ns_request.filter, Some("id > 0".to_string()));
4776 assert_eq!(
4777 ns_request
4778 .columns
4779 .as_ref()
4780 .and_then(|c| c.column_names.as_ref()),
4781 Some(&vec!["id".to_string()])
4782 );
4783 assert_eq!(ns_request.vector_column, Some("vector".to_string()));
4784 assert_eq!(ns_request.distance_type, Some("l2".to_string()));
4785 assert!(ns_request.vector.single_vector.is_some());
4786 assert_eq!(
4787 ns_request.vector.single_vector.as_ref().unwrap(),
4788 &vec![1.0, 2.0, 3.0, 4.0]
4789 );
4790 }
4791
4792 #[tokio::test]
4793 async fn test_convert_to_namespace_query_plain_query() {
4794 let tmp_dir = tempdir().unwrap();
4795 let dataset_path = tmp_dir.path().join("test_ns_plain.lance");
4796
4797 let batches = make_test_batches();
4798 Dataset::write(batches, dataset_path.to_str().unwrap(), None)
4799 .await
4800 .unwrap();
4801
4802 let table = NativeTable::open(dataset_path.to_str().unwrap())
4803 .await
4804 .unwrap();
4805
4806 let q = QueryRequest {
4808 limit: Some(20),
4809 offset: Some(5),
4810 filter: Some(QueryFilter::Sql("id > 5".to_string())),
4811 select: Select::Columns(vec!["id".to_string()]),
4812 with_row_id: true,
4813 ..Default::default()
4814 };
4815
4816 let any_query = AnyQuery::Query(q);
4817 let ns_request = table.convert_to_namespace_query(&any_query).unwrap();
4818
4819 assert_eq!(ns_request.k, 20);
4821 assert_eq!(ns_request.offset, Some(5));
4822 assert_eq!(ns_request.filter, Some("id > 5".to_string()));
4823 assert_eq!(
4824 ns_request
4825 .columns
4826 .as_ref()
4827 .and_then(|c| c.column_names.as_ref()),
4828 Some(&vec!["id".to_string()])
4829 );
4830 assert_eq!(ns_request.with_row_id, Some(true));
4831 assert_eq!(ns_request.bypass_vector_index, Some(true));
4832 assert!(ns_request.vector_column.is_none()); assert!(ns_request.vector.single_vector.as_ref().unwrap().is_empty());
4836 }
4837}