use std::sync::Arc;
use lance_io::object_store::StorageOptionsProvider;
use crate::{
Error, Result, Table,
connection::{merge_storage_options, set_storage_options_provider},
data::scannable::{Scannable, WithEmbeddingsScannable},
database::{CreateTableMode, CreateTableRequest, Database},
embeddings::{EmbeddingDefinition, EmbeddingFunction, EmbeddingRegistry},
table::WriteOptions,
};
pub struct CreateTableBuilder {
parent: Arc<dyn Database>,
embeddings: Vec<(EmbeddingDefinition, Arc<dyn EmbeddingFunction>)>,
embedding_registry: Arc<dyn EmbeddingRegistry>,
request: CreateTableRequest,
}
impl CreateTableBuilder {
pub(super) fn new(
parent: Arc<dyn Database>,
embedding_registry: Arc<dyn EmbeddingRegistry>,
name: String,
data: Box<dyn Scannable>,
) -> Self {
Self {
parent,
embeddings: Vec::new(),
embedding_registry,
request: CreateTableRequest::new(name, data),
}
}
pub fn mode(mut self, mode: CreateTableMode) -> Self {
self.request.mode = mode;
self
}
pub fn write_options(mut self, write_options: WriteOptions) -> Self {
self.request.write_options = write_options;
self
}
pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
let store_params = self
.request
.write_options
.lance_write_params
.get_or_insert(Default::default())
.store_params
.get_or_insert(Default::default());
merge_storage_options(store_params, [(key.into(), value.into())]);
self
}
pub fn storage_options(
mut self,
pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
let store_params = self
.request
.write_options
.lance_write_params
.get_or_insert(Default::default())
.store_params
.get_or_insert(Default::default());
let updates = pairs
.into_iter()
.map(|(key, value)| (key.into(), value.into()));
merge_storage_options(store_params, updates);
self
}
pub fn add_embedding(mut self, definition: EmbeddingDefinition) -> Result<Self> {
let embedding_func = self
.embedding_registry
.get(&definition.embedding_name)
.ok_or_else(|| Error::EmbeddingFunctionNotFound {
name: definition.embedding_name.clone(),
reason: "No embedding function found in the connection's embedding_registry"
.to_string(),
})?;
self.embeddings.push((definition, embedding_func));
Ok(self)
}
pub fn namespace(mut self, namespace: Vec<String>) -> Self {
self.request.namespace = namespace;
self
}
pub fn location(mut self, location: impl Into<String>) -> Self {
self.request.location = Some(location.into());
self
}
pub fn storage_options_provider(mut self, provider: Arc<dyn StorageOptionsProvider>) -> Self {
let store_params = self
.request
.write_options
.lance_write_params
.get_or_insert(Default::default())
.store_params
.get_or_insert(Default::default());
set_storage_options_provider(store_params, provider);
self
}
pub async fn execute(mut self) -> Result<Table> {
let embedding_registry = self.embedding_registry.clone();
let parent = self.parent.clone();
if !self.embeddings.is_empty() {
let wrapped_data: Box<dyn Scannable> = Box::new(WithEmbeddingsScannable::try_new(
self.request.data,
self.embeddings,
)?);
self.request.data = wrapped_data;
}
Ok(Table::new_with_embedding_registry(
parent.create_table(self.request).await?,
parent,
embedding_registry,
))
}
}
#[cfg(test)]
mod tests {
use arrow_array::{
Array, FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator, record_batch,
};
use arrow_schema::{ArrowError, DataType, Field, Schema};
use futures::TryStreamExt;
use lance_file::version::LanceFileVersion;
use tempfile::tempdir;
use crate::{
arrow::{SendableRecordBatchStream, SimpleRecordBatchStream},
connect,
database::listing::{ListingDatabaseOptions, NewTableConfig},
embeddings::{EmbeddingDefinition, EmbeddingFunction, MemoryRegistry},
query::{ExecutableQuery, QueryBase, Select},
test_utils::embeddings::MockEmbed,
};
use std::borrow::Cow;
use super::*;
#[tokio::test]
async fn create_empty_table() {
let db = connect("memory://").execute().await.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("value", DataType::Float64, false),
]));
db.create_empty_table("name", schema.clone())
.execute()
.await
.unwrap();
let table = db.open_table("name").execute().await.unwrap();
assert_eq!(table.schema().await.unwrap(), schema);
assert_eq!(table.count_rows(None).await.unwrap(), 0);
}
async fn test_create_table_with_data<T>(data: T)
where
T: Scannable + 'static,
{
let db = connect("memory://").execute().await.unwrap();
let schema = data.schema();
db.create_table("data_table", data).execute().await.unwrap();
let table = db.open_table("data_table").execute().await.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 3);
assert_eq!(table.schema().await.unwrap(), schema);
}
#[tokio::test]
async fn create_table_with_batch() {
let batch = record_batch!(("id", Int64, [1, 2, 3])).unwrap();
test_create_table_with_data(batch).await;
}
#[tokio::test]
async fn test_create_table_with_vec_batch() {
let data = vec![
record_batch!(("id", Int64, [1, 2])).unwrap(),
record_batch!(("id", Int64, [3])).unwrap(),
];
test_create_table_with_data(data).await;
}
#[tokio::test]
async fn test_create_table_with_record_batch_reader() {
let data = vec![
record_batch!(("id", Int64, [1, 2])).unwrap(),
record_batch!(("id", Int64, [3])).unwrap(),
];
let schema = data[0].schema();
let reader: Box<dyn arrow_array::RecordBatchReader + Send> = Box::new(
RecordBatchIterator::new(data.into_iter().map(Ok), schema.clone()),
);
test_create_table_with_data(reader).await;
}
#[tokio::test]
async fn test_create_table_with_stream() {
let data = vec![
record_batch!(("id", Int64, [1, 2])).unwrap(),
record_batch!(("id", Int64, [3])).unwrap(),
];
let schema = data[0].schema();
let inner = futures::stream::iter(data.into_iter().map(Ok));
let stream: SendableRecordBatchStream = Box::pin(SimpleRecordBatchStream {
schema,
stream: inner,
});
test_create_table_with_data(stream).await;
}
#[derive(Debug)]
struct MyError;
impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "MyError occurred")
}
}
impl std::error::Error for MyError {}
#[tokio::test]
async fn test_create_preserves_reader_error() {
let first_batch = record_batch!(("id", Int64, [1, 2])).unwrap();
let schema = first_batch.schema();
let iterator = vec![
Ok(first_batch),
Err(ArrowError::ExternalError(Box::new(MyError))),
];
let reader: Box<dyn arrow_array::RecordBatchReader + Send> = Box::new(
RecordBatchIterator::new(iterator.into_iter(), schema.clone()),
);
let db = connect("memory://").execute().await.unwrap();
let result = db.create_table("failing_table", reader).execute().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_create_preserves_stream_error() {
let first_batch = record_batch!(("id", Int64, [1, 2])).unwrap();
let schema = first_batch.schema();
let iterator = vec![
Ok(first_batch),
Err(Error::External {
source: Box::new(MyError),
}),
];
let stream = futures::stream::iter(iterator);
let stream: SendableRecordBatchStream = Box::pin(SimpleRecordBatchStream {
schema: schema.clone(),
stream,
});
let db = connect("memory://").execute().await.unwrap();
let result = db
.create_table("failing_stream_table", stream)
.execute()
.await;
assert!(result.is_err());
}
#[tokio::test]
#[allow(deprecated)]
async fn test_create_table_with_storage_options() {
let batch = record_batch!(("id", Int64, [1, 2, 3])).unwrap();
let db = connect("memory://").execute().await.unwrap();
let table = db
.create_table("options_table", batch)
.storage_option("timeout", "30s")
.storage_options([("retry_count", "3")])
.execute()
.await
.unwrap();
let final_options = table.storage_options().await.unwrap();
assert_eq!(final_options.get("timeout"), Some(&"30s".to_string()));
assert_eq!(final_options.get("retry_count"), Some(&"3".to_string()));
}
#[tokio::test]
async fn test_create_table_unregistered_embedding() {
let db = connect("memory://").execute().await.unwrap();
let batch = record_batch!(("text", Utf8, ["hello", "world"])).unwrap();
let result = db
.create_table("embed_table", batch)
.add_embedding(EmbeddingDefinition::new(
"text",
"nonexistent_embedding_function",
None::<&str>,
));
match result {
Err(Error::EmbeddingFunctionNotFound { name, .. }) => {
assert_eq!(name, "nonexistent_embedding_function");
}
Err(other) => panic!("Expected EmbeddingFunctionNotFound error, got: {:?}", other),
Ok(_) => panic!("Expected error, but got Ok"),
}
}
#[tokio::test]
async fn test_create_table_already_exists() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = connect(uri).execute().await.unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
db.create_empty_table("test", schema.clone())
.execute()
.await
.unwrap();
db.create_empty_table("test", schema)
.mode(CreateTableMode::exist_ok(|mut req| {
req.index_cache_size = Some(16);
req
}))
.execute()
.await
.unwrap();
let other_schema = Arc::new(Schema::new(vec![Field::new("y", DataType::Int32, false)]));
assert!(
db.create_empty_table("test", other_schema.clone())
.execute()
.await
.is_err()
); let overwritten = db
.create_empty_table("test", other_schema.clone())
.mode(CreateTableMode::Overwrite)
.execute()
.await
.unwrap();
assert_eq!(other_schema, overwritten.schema().await.unwrap());
}
#[tokio::test]
#[rstest::rstest]
#[case(LanceFileVersion::Legacy)]
#[case(LanceFileVersion::Stable)]
async fn test_create_table_with_storage_version(
#[case] data_storage_version: LanceFileVersion,
) {
let db = connect("memory://")
.database_options(&ListingDatabaseOptions {
new_table_config: NewTableConfig {
data_storage_version: Some(data_storage_version),
..Default::default()
},
..Default::default()
})
.execute()
.await
.unwrap();
let batch = record_batch!(("id", Int64, [1, 2, 3])).unwrap();
let table = db
.create_table("legacy_table", batch)
.execute()
.await
.unwrap();
let native_table = table.as_native().unwrap();
let storage_format = native_table
.manifest()
.await
.unwrap()
.data_storage_format
.lance_file_version()
.unwrap();
assert_eq!(storage_format.resolve(), data_storage_version.resolve());
}
#[tokio::test]
async fn test_create_table_with_embedding() {
let registry = Arc::new(MemoryRegistry::new());
let mock_embedding: Arc<dyn EmbeddingFunction> = Arc::new(MockEmbed::new("mock", 4));
registry.register("mock", mock_embedding).unwrap();
let conn = connect("memory://")
.embedding_registry(registry)
.execute()
.await
.unwrap();
let batch = record_batch!(("text", Utf8, ["hello", "world", "test"])).unwrap();
let table = conn
.create_table("embed_test", batch)
.add_embedding(EmbeddingDefinition::new(
"text",
"mock",
Some("text_embedding"),
))
.unwrap()
.execute()
.await
.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 3);
let result_schema = table.schema().await.unwrap();
assert_eq!(result_schema.fields().len(), 2);
assert_eq!(result_schema.field(0).name(), "text");
assert_eq!(result_schema.field(1).name(), "text_embedding");
assert!(matches!(
result_schema.field(1).data_type(),
DataType::FixedSizeList(_, 4)
));
let results: Vec<RecordBatch> = table
.query()
.select(Select::columns(&["text", "text_embedding"]))
.execute()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let total_rows: usize = results.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 3);
for batch in &results {
let embedding_col = batch.column(1);
assert_eq!(embedding_col.null_count(), 0);
assert_eq!(embedding_col.len(), batch.num_rows());
}
assert!(
result_schema
.metadata
.contains_key("lancedb::column_definitions"),
"Schema metadata should contain column definitions"
);
}
#[tokio::test]
async fn test_create_empty_table_with_embeddings() {
#[derive(Debug, Clone)]
struct MockEmbedding {
dim: usize,
}
impl EmbeddingFunction for MockEmbedding {
fn name(&self) -> &str {
"test_embedding"
}
fn source_type(&self) -> Result<Cow<'_, DataType>> {
Ok(Cow::Owned(DataType::Utf8))
}
fn dest_type(&self) -> Result<Cow<'_, DataType>> {
Ok(Cow::Owned(DataType::new_fixed_size_list(
DataType::Float32,
self.dim as i32,
true,
)))
}
fn compute_source_embeddings(&self, source: Arc<dyn Array>) -> Result<Arc<dyn Array>> {
let len = source.len();
let values = vec![1.0f32; len * self.dim];
let values = Arc::new(Float32Array::from(values));
let field = Arc::new(Field::new("item", DataType::Float32, true));
Ok(Arc::new(FixedSizeListArray::new(
field,
self.dim as i32,
values,
None,
)))
}
fn compute_query_embeddings(&self, _input: Arc<dyn Array>) -> Result<Arc<dyn Array>> {
unimplemented!()
}
}
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = connect(uri).execute().await.unwrap();
let embed_func = Arc::new(MockEmbedding { dim: 128 });
db.embedding_registry()
.register("test_embedding", embed_func.clone())
.unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
let ed = EmbeddingDefinition {
source_column: "name".to_owned(),
dest_column: Some("name_embedding".to_owned()),
embedding_name: "test_embedding".to_owned(),
};
let table = db
.create_empty_table("test", schema)
.mode(CreateTableMode::Overwrite)
.add_embedding(ed)
.unwrap()
.execute()
.await
.unwrap();
let table_schema = table.schema().await.unwrap();
assert!(table_schema.column_with_name("name").is_some());
assert!(table_schema.column_with_name("name_embedding").is_some());
let embedding_field = table_schema.field_with_name("name_embedding").unwrap();
assert_eq!(
embedding_field.data_type(),
&DataType::new_fixed_size_list(DataType::Float32, 128, true)
);
let input_batch = record_batch!(("name", Utf8, ["Alice", "Bob", "Charlie"])).unwrap();
table.add(input_batch).execute().await.unwrap();
let results = table
.query()
.execute()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(results.len(), 1);
let batch = &results[0];
assert_eq!(batch.num_rows(), 3);
assert!(batch.column_by_name("name_embedding").is_some());
let embedding_col = batch
.column_by_name("name_embedding")
.unwrap()
.as_any()
.downcast_ref::<FixedSizeListArray>()
.unwrap();
assert_eq!(embedding_col.len(), 3);
}
}