pub mod manifest;
use arrow::record_batch::RecordBatchIterator;
use arrow_ipc::reader::StreamReader;
use async_trait::async_trait;
use bytes::Bytes;
use futures::TryStreamExt;
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::{Dataset, WriteMode, WriteParams};
use lance::session::Session;
use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
use lance_table::io::commit::ManifestNamingScheme;
use object_store::path::Path;
use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, PutMode, PutOptions};
use std::collections::HashMap;
use std::io::Cursor;
use std::sync::Arc;
use crate::context::DynamicContextProvider;
use lance_namespace::models::{
BatchDeleteTableVersionsRequest, BatchDeleteTableVersionsResponse, CreateNamespaceRequest,
CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, CreateTableVersionRequest,
CreateTableVersionResponse, DeclareTableRequest, DeclareTableResponse,
DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
DescribeTableResponse, DescribeTableVersionRequest, DescribeTableVersionResponse,
DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, DropTableResponse, Identity,
ListNamespacesRequest, ListNamespacesResponse, ListTableVersionsRequest,
ListTableVersionsResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest,
TableExistsRequest, TableVersion,
};
use lance_core::{Error, Result, box_error};
use lance_namespace::LanceNamespace;
use lance_namespace::schema::arrow_schema_to_json;
use crate::credentials::{
CredentialVendor, create_credential_vendor_for_location, has_credential_vendor_config,
};
pub(crate) struct TableStatus {
pub(crate) exists: bool,
pub(crate) is_deregistered: bool,
pub(crate) has_reserved_file: bool,
}
#[derive(Clone)]
pub struct DirectoryNamespaceBuilder {
root: String,
storage_options: Option<HashMap<String, String>>,
session: Option<Arc<Session>>,
manifest_enabled: bool,
dir_listing_enabled: bool,
inline_optimization_enabled: bool,
table_version_tracking_enabled: bool,
table_version_storage_enabled: bool,
credential_vendor_properties: HashMap<String, String>,
context_provider: Option<Arc<dyn DynamicContextProvider>>,
commit_retries: Option<u32>,
}
impl std::fmt::Debug for DirectoryNamespaceBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DirectoryNamespaceBuilder")
.field("root", &self.root)
.field("storage_options", &self.storage_options)
.field("manifest_enabled", &self.manifest_enabled)
.field("dir_listing_enabled", &self.dir_listing_enabled)
.field(
"inline_optimization_enabled",
&self.inline_optimization_enabled,
)
.field(
"table_version_tracking_enabled",
&self.table_version_tracking_enabled,
)
.field(
"table_version_storage_enabled",
&self.table_version_storage_enabled,
)
.field(
"context_provider",
&self.context_provider.as_ref().map(|_| "Some(...)"),
)
.finish()
}
}
impl DirectoryNamespaceBuilder {
pub fn new(root: impl Into<String>) -> Self {
Self {
root: root.into().trim_end_matches('/').to_string(),
storage_options: None,
session: None,
manifest_enabled: true,
dir_listing_enabled: true, inline_optimization_enabled: true,
table_version_tracking_enabled: false, table_version_storage_enabled: false, credential_vendor_properties: HashMap::new(),
context_provider: None,
commit_retries: None,
}
}
pub fn manifest_enabled(mut self, enabled: bool) -> Self {
self.manifest_enabled = enabled;
self
}
pub fn dir_listing_enabled(mut self, enabled: bool) -> Self {
self.dir_listing_enabled = enabled;
self
}
pub fn inline_optimization_enabled(mut self, enabled: bool) -> Self {
self.inline_optimization_enabled = enabled;
self
}
pub fn table_version_tracking_enabled(mut self, enabled: bool) -> Self {
self.table_version_tracking_enabled = enabled;
self
}
pub fn table_version_storage_enabled(mut self, enabled: bool) -> Self {
self.table_version_storage_enabled = enabled;
self
}
pub fn from_properties(
properties: HashMap<String, String>,
session: Option<Arc<Session>>,
) -> Result<Self> {
let root = properties.get("root").cloned().ok_or_else(|| {
Error::namespace_source(
"Missing required property 'root' for directory namespace".into(),
)
})?;
let storage_options: HashMap<String, String> = properties
.iter()
.filter_map(|(k, v)| {
k.strip_prefix("storage.")
.map(|key| (key.to_string(), v.clone()))
})
.collect();
let storage_options = if storage_options.is_empty() {
None
} else {
Some(storage_options)
};
let manifest_enabled = properties
.get("manifest_enabled")
.and_then(|v| v.parse::<bool>().ok())
.unwrap_or(true);
let dir_listing_enabled = properties
.get("dir_listing_enabled")
.and_then(|v| v.parse::<bool>().ok())
.unwrap_or(true);
let inline_optimization_enabled = properties
.get("inline_optimization_enabled")
.and_then(|v| v.parse::<bool>().ok())
.unwrap_or(true);
let table_version_tracking_enabled = properties
.get("table_version_tracking_enabled")
.and_then(|v| v.parse::<bool>().ok())
.unwrap_or(false);
let table_version_storage_enabled = properties
.get("table_version_storage_enabled")
.and_then(|v| v.parse::<bool>().ok())
.unwrap_or(false);
let credential_vendor_properties: HashMap<String, String> = properties
.iter()
.filter_map(|(k, v)| {
k.strip_prefix("credential_vendor.")
.map(|key| (key.to_string(), v.clone()))
})
.collect();
let commit_retries = properties
.get("commit_retries")
.and_then(|v| v.parse::<u32>().ok());
Ok(Self {
root: root.trim_end_matches('/').to_string(),
storage_options,
session,
manifest_enabled,
dir_listing_enabled,
inline_optimization_enabled,
table_version_tracking_enabled,
table_version_storage_enabled,
credential_vendor_properties,
context_provider: None,
commit_retries,
})
}
pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.storage_options
.get_or_insert_with(HashMap::new)
.insert(key.into(), value.into());
self
}
pub fn storage_options(mut self, options: HashMap<String, String>) -> Self {
self.storage_options
.get_or_insert_with(HashMap::new)
.extend(options);
self
}
pub fn session(mut self, session: Arc<Session>) -> Self {
self.session = Some(session);
self
}
pub fn commit_retries(mut self, retries: u32) -> Self {
self.commit_retries = Some(retries);
self
}
pub fn credential_vendor_property(
mut self,
key: impl Into<String>,
value: impl Into<String>,
) -> Self {
self.credential_vendor_properties
.insert(key.into(), value.into());
self
}
pub fn credential_vendor_properties(mut self, properties: HashMap<String, String>) -> Self {
self.credential_vendor_properties.extend(properties);
self
}
pub fn context_provider(mut self, provider: Arc<dyn DynamicContextProvider>) -> Self {
self.context_provider = Some(provider);
self
}
pub async fn build(self) -> Result<DirectoryNamespace> {
if self.table_version_storage_enabled && !self.manifest_enabled {
return Err(Error::invalid_input(
"table_version_storage_enabled requires manifest_enabled=true",
));
}
let (object_store, base_path) =
Self::initialize_object_store(&self.root, &self.storage_options, &self.session).await?;
let manifest_ns = if self.manifest_enabled {
match manifest::ManifestNamespace::from_directory(
self.root.clone(),
self.storage_options.clone(),
self.session.clone(),
object_store.clone(),
base_path.clone(),
self.dir_listing_enabled,
self.inline_optimization_enabled,
self.commit_retries,
self.table_version_storage_enabled,
)
.await
{
Ok(ns) => Some(Arc::new(ns)),
Err(e) => {
log::warn!(
"Failed to initialize manifest namespace, falling back to directory listing only: {}",
e
);
None
}
}
} else {
None
};
let credential_vendor = if has_credential_vendor_config(&self.credential_vendor_properties)
{
create_credential_vendor_for_location(&self.root, &self.credential_vendor_properties)
.await?
.map(Arc::from)
} else {
None
};
Ok(DirectoryNamespace {
root: self.root,
storage_options: self.storage_options,
session: self.session,
object_store,
base_path,
manifest_ns,
dir_listing_enabled: self.dir_listing_enabled,
table_version_tracking_enabled: self.table_version_tracking_enabled,
table_version_storage_enabled: self.table_version_storage_enabled,
credential_vendor,
context_provider: self.context_provider,
})
}
async fn initialize_object_store(
root: &str,
storage_options: &Option<HashMap<String, String>>,
session: &Option<Arc<Session>>,
) -> Result<(Arc<ObjectStore>, Path)> {
let accessor = storage_options.clone().map(|opts| {
Arc::new(lance_io::object_store::StorageOptionsAccessor::with_static_options(opts))
});
let params = ObjectStoreParams {
storage_options_accessor: accessor,
..Default::default()
};
let registry = if let Some(session) = session {
session.store_registry()
} else {
Arc::new(ObjectStoreRegistry::default())
};
let (object_store, base_path) = ObjectStore::from_uri_and_params(registry, root, ¶ms)
.await
.map_err(|e| {
Error::namespace_source(format!("Failed to create object store: {}", e).into())
})?;
Ok((object_store, base_path))
}
}
pub struct DirectoryNamespace {
root: String,
storage_options: Option<HashMap<String, String>>,
#[allow(dead_code)]
session: Option<Arc<Session>>,
object_store: Arc<ObjectStore>,
base_path: Path,
manifest_ns: Option<Arc<manifest::ManifestNamespace>>,
dir_listing_enabled: bool,
table_version_tracking_enabled: bool,
table_version_storage_enabled: bool,
credential_vendor: Option<Arc<dyn CredentialVendor>>,
#[allow(dead_code)]
context_provider: Option<Arc<dyn DynamicContextProvider>>,
}
impl std::fmt::Debug for DirectoryNamespace {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.namespace_id())
}
}
impl std::fmt::Display for DirectoryNamespace {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.namespace_id())
}
}
struct TableDeleteEntry {
table_id: Option<Vec<String>>,
ranges: Vec<(i64, i64)>,
}
impl DirectoryNamespace {
fn apply_pagination(names: &mut Vec<String>, page_token: Option<String>, limit: Option<i32>) {
names.sort();
if let Some(start_after) = page_token {
if let Some(index) = names
.iter()
.position(|name| name.as_str() > start_after.as_str())
{
names.drain(0..index);
} else {
names.clear();
}
}
if let Some(limit) = limit
&& limit >= 0
{
names.truncate(limit as usize);
}
}
async fn list_directory_tables(&self) -> Result<Vec<String>> {
let mut tables = Vec::new();
let entries = self
.object_store
.read_dir(self.base_path.clone())
.await
.map_err(|e| {
Error::io_source(box_error(std::io::Error::other(format!(
"Failed to list directory: {}",
e
))))
})?;
for entry in entries {
let path = entry.trim_end_matches('/');
if !path.ends_with(".lance") {
continue;
}
let table_name = &path[..path.len() - 6];
let status = self.check_table_status(table_name).await;
if status.is_deregistered || status.has_reserved_file {
continue;
}
tables.push(table_name.to_string());
}
Ok(tables)
}
fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
if let Some(id) = id
&& !id.is_empty()
{
return Err(Error::namespace_source(format!(
"Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
id
).into()));
}
Ok(())
}
fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
let id = id.as_ref().ok_or_else(|| {
Error::namespace_source("Directory namespace table ID cannot be empty".into())
})?;
if id.len() != 1 {
return Err(Error::namespace_source(format!(
"Multi-level table IDs are only supported when manifest mode is enabled, but got: {:?}",
id
)
.into()));
}
Ok(id[0].clone())
}
async fn resolve_table_location(&self, id: &Option<Vec<String>>) -> Result<String> {
let mut describe_req = DescribeTableRequest::new();
describe_req.id = id.clone();
describe_req.load_detailed_metadata = Some(false);
let describe_resp = self.describe_table(describe_req).await?;
describe_resp.location.ok_or_else(|| {
Error::namespace_source(format!("Table location not found for: {:?}", id).into())
})
}
fn table_full_uri(&self, table_name: &str) -> String {
format!("{}/{}.lance", &self.root, table_name)
}
fn uri_to_object_store_path(uri: &str) -> Path {
let path_str = if let Some(rest) = uri.strip_prefix("file://") {
rest
} else if let Some(rest) = uri.strip_prefix("s3://") {
rest.split_once('/').map(|(_, p)| p).unwrap_or(rest)
} else if let Some(rest) = uri.strip_prefix("gs://") {
rest.split_once('/').map(|(_, p)| p).unwrap_or(rest)
} else if let Some(rest) = uri.strip_prefix("az://") {
rest.split_once('/').map(|(_, p)| p).unwrap_or(rest)
} else {
uri
};
Path::from(path_str)
}
fn table_path(&self, table_name: &str) -> Path {
self.base_path
.child(format!("{}.lance", table_name).as_str())
}
fn table_reserved_file_path(&self, table_name: &str) -> Path {
self.base_path
.child(format!("{}.lance", table_name).as_str())
.child(".lance-reserved")
}
fn table_deregistered_file_path(&self, table_name: &str) -> Path {
self.base_path
.child(format!("{}.lance", table_name).as_str())
.child(".lance-deregistered")
}
pub(crate) async fn check_table_status(&self, table_name: &str) -> TableStatus {
let table_path = self.table_path(table_name);
match self.object_store.read_dir(table_path).await {
Ok(entries) => {
let exists = !entries.is_empty();
let is_deregistered = entries.iter().any(|e| e.ends_with(".lance-deregistered"));
let has_reserved_file = entries.iter().any(|e| e.ends_with(".lance-reserved"));
TableStatus {
exists,
is_deregistered,
has_reserved_file,
}
}
Err(_) => TableStatus {
exists: false,
is_deregistered: false,
has_reserved_file: false,
},
}
}
async fn put_marker_file_atomic(
&self,
path: &Path,
file_description: &str,
) -> std::result::Result<(), String> {
let put_opts = PutOptions {
mode: PutMode::Create,
..Default::default()
};
match self
.object_store
.inner
.put_opts(path, bytes::Bytes::new().into(), put_opts)
.await
{
Ok(_) => Ok(()),
Err(ObjectStoreError::AlreadyExists { .. })
| Err(ObjectStoreError::Precondition { .. }) => {
Err(format!("{} already exists", file_description))
}
Err(e) => Err(format!("Failed to create {}: {}", file_description, e)),
}
}
async fn get_storage_options_for_table(
&self,
table_uri: &str,
vend_credentials: bool,
identity: Option<&Identity>,
) -> Result<Option<HashMap<String, String>>> {
if vend_credentials && let Some(ref vendor) = self.credential_vendor {
let vended = vendor.vend_credentials(table_uri, identity).await?;
return Ok(Some(vended.storage_options));
}
Ok(None)
}
pub async fn migrate(&self) -> Result<usize> {
let Some(ref manifest_ns) = self.manifest_ns else {
return Ok(0); };
let manifest_locations = manifest_ns.list_manifest_table_locations().await?;
let dir_tables = self.list_directory_tables().await?;
let mut migrated_count = 0;
for table_name in dir_tables {
let dir_name = format!("{}.lance", table_name);
if !manifest_locations.contains(&dir_name) {
manifest_ns.register_table(&table_name, dir_name).await?;
migrated_count += 1;
}
}
Ok(migrated_count)
}
async fn delete_physical_version_files(
&self,
table_entries: &[TableDeleteEntry],
best_effort: bool,
) -> Result<i64> {
let mut deleted_count = 0i64;
for te in table_entries {
let table_uri = self.resolve_table_location(&te.table_id).await?;
let table_path = Self::uri_to_object_store_path(&table_uri);
let table_path_str = table_path.as_ref();
let versions_dir_path = Path::from(format!("{}_versions", table_path_str));
for (start, end) in &te.ranges {
for version in *start..=*end {
let version_path =
versions_dir_path.child(format!("{}.manifest", version as u64));
match self.object_store.inner.delete(&version_path).await {
Ok(_) => {
deleted_count += 1;
}
Err(object_store::Error::NotFound { .. }) => {}
Err(e) => {
if best_effort {
log::warn!(
"Failed to delete manifest file for version {} of table {:?}: {:?}",
version,
te.table_id,
e
);
} else {
return Err(Error::namespace_source(
format!(
"Failed to delete version {} for table at '{}': {}",
version, table_uri, e
)
.into(),
));
}
}
}
}
}
}
Ok(deleted_count)
}
}
#[async_trait]
impl LanceNamespace for DirectoryNamespace {
async fn list_namespaces(
&self,
request: ListNamespacesRequest,
) -> Result<ListNamespacesResponse> {
if let Some(ref manifest_ns) = self.manifest_ns {
return manifest_ns.list_namespaces(request).await;
}
Self::validate_root_namespace_id(&request.id)?;
Ok(ListNamespacesResponse::new(vec![]))
}
async fn describe_namespace(
&self,
request: DescribeNamespaceRequest,
) -> Result<DescribeNamespaceResponse> {
if let Some(ref manifest_ns) = self.manifest_ns {
return manifest_ns.describe_namespace(request).await;
}
Self::validate_root_namespace_id(&request.id)?;
#[allow(clippy::needless_update)]
Ok(DescribeNamespaceResponse {
properties: Some(HashMap::new()),
..Default::default()
})
}
async fn create_namespace(
&self,
request: CreateNamespaceRequest,
) -> Result<CreateNamespaceResponse> {
if let Some(ref manifest_ns) = self.manifest_ns {
return manifest_ns.create_namespace(request).await;
}
if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
return Err(Error::namespace_source(
"Root namespace already exists and cannot be created".into(),
));
}
Err(Error::not_supported_source(
"Child namespaces are only supported when manifest mode is enabled".into(),
))
}
async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
if let Some(ref manifest_ns) = self.manifest_ns {
return manifest_ns.drop_namespace(request).await;
}
if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
return Err(Error::namespace_source(
"Root namespace cannot be dropped".into(),
));
}
Err(Error::not_supported_source(
"Child namespaces are only supported when manifest mode is enabled".into(),
))
}
async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
if let Some(ref manifest_ns) = self.manifest_ns {
return manifest_ns.namespace_exists(request).await;
}
if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
return Ok(());
}
Err(Error::namespace_source(
"Child namespaces are only supported when manifest mode is enabled".into(),
))
}
async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
let namespace_id = request
.id
.as_ref()
.ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
if !namespace_id.is_empty() {
if let Some(ref manifest_ns) = self.manifest_ns {
return manifest_ns.list_tables(request).await;
}
return Err(Error::not_supported_source(
"Child namespaces are only supported when manifest mode is enabled".into(),
));
}
if let Some(ref manifest_ns) = self.manifest_ns
&& !self.dir_listing_enabled
{
return manifest_ns.list_tables(request).await;
}
let mut tables = if self.manifest_ns.is_some() && self.dir_listing_enabled {
let manifest_locations = if let Some(ref manifest_ns) = self.manifest_ns {
manifest_ns.list_manifest_table_locations().await?
} else {
std::collections::HashSet::new()
};
let mut manifest_request = request.clone();
manifest_request.limit = None;
manifest_request.page_token = None;
let manifest_tables = if let Some(ref manifest_ns) = self.manifest_ns {
let manifest_response = manifest_ns.list_tables(manifest_request).await?;
manifest_response.tables
} else {
vec![]
};
let mut all_tables: Vec<String> = manifest_tables;
let dir_tables = self.list_directory_tables().await?;
for table_name in dir_tables {
let full_location = format!("{}/{}.lance", self.root, table_name);
let relative_location = format!("{}.lance", table_name);
if !manifest_locations.contains(&full_location)
&& !manifest_locations.contains(&relative_location)
{
all_tables.push(table_name);
}
}
all_tables
} else {
self.list_directory_tables().await?
};
Self::apply_pagination(&mut tables, request.page_token, request.limit);
let response = ListTablesResponse::new(tables);
Ok(response)
}
async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
if let Some(ref manifest_ns) = self.manifest_ns {
match manifest_ns.describe_table(request.clone()).await {
Ok(mut response) => {
if let Some(ref table_uri) = response.table_uri {
let vend = request.vend_credentials.unwrap_or(true);
let identity = request.identity.as_deref();
response.storage_options = self
.get_storage_options_for_table(table_uri, vend, identity)
.await?;
}
if self.table_version_tracking_enabled {
response.managed_versioning = Some(true);
}
return Ok(response);
}
Err(_)
if self.dir_listing_enabled
&& request.id.as_ref().is_some_and(|id| id.len() == 1) =>
{
}
Err(e) => return Err(e),
}
}
let table_name = Self::table_name_from_id(&request.id)?;
let table_uri = self.table_full_uri(&table_name);
let status = self.check_table_status(&table_name).await;
if !status.exists {
return Err(Error::namespace_source(
format!("Table does not exist: {}", table_name).into(),
));
}
if status.is_deregistered {
return Err(Error::namespace_source(
format!("Table is deregistered: {}", table_name).into(),
));
}
let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
let vend_credentials = request.vend_credentials.unwrap_or(true);
let identity = request.identity.as_deref();
if !load_detailed_metadata {
let storage_options = self
.get_storage_options_for_table(&table_uri, vend_credentials, identity)
.await?;
return Ok(DescribeTableResponse {
table: Some(table_name),
namespace: request.id.as_ref().map(|id| {
if id.len() > 1 {
id[..id.len() - 1].to_vec()
} else {
vec![]
}
}),
location: Some(table_uri.clone()),
table_uri: Some(table_uri),
storage_options,
managed_versioning: if self.table_version_tracking_enabled {
Some(true)
} else {
None
},
..Default::default()
});
}
let mut builder = DatasetBuilder::from_uri(&table_uri);
if let Some(opts) = &self.storage_options {
builder = builder.with_storage_options(opts.clone());
}
if let Some(sess) = &self.session {
builder = builder.with_session(sess.clone());
}
match builder.load().await {
Ok(mut dataset) => {
if let Some(requested_version) = request.version {
dataset = dataset.checkout_version(requested_version as u64).await?;
}
let version_info = dataset.version();
let lance_schema = dataset.schema();
let arrow_schema: arrow_schema::Schema = lance_schema.into();
let json_schema = arrow_schema_to_json(&arrow_schema)?;
let storage_options = self
.get_storage_options_for_table(&table_uri, vend_credentials, identity)
.await?;
let metadata: std::collections::HashMap<String, String> =
version_info.metadata.into_iter().collect();
Ok(DescribeTableResponse {
table: Some(table_name),
namespace: request.id.as_ref().map(|id| {
if id.len() > 1 {
id[..id.len() - 1].to_vec()
} else {
vec![]
}
}),
version: Some(version_info.version as i64),
location: Some(table_uri.clone()),
table_uri: Some(table_uri),
schema: Some(Box::new(json_schema)),
storage_options,
metadata: Some(metadata),
managed_versioning: if self.table_version_tracking_enabled {
Some(true)
} else {
None
},
..Default::default()
})
}
Err(err) => {
if status.has_reserved_file {
let storage_options = self
.get_storage_options_for_table(&table_uri, vend_credentials, identity)
.await?;
Ok(DescribeTableResponse {
table: Some(table_name),
namespace: request.id.as_ref().map(|id| {
if id.len() > 1 {
id[..id.len() - 1].to_vec()
} else {
vec![]
}
}),
location: Some(table_uri.clone()),
table_uri: Some(table_uri),
storage_options,
managed_versioning: if self.table_version_tracking_enabled {
Some(true)
} else {
None
},
..Default::default()
})
} else {
Err(Error::namespace_source(
format!(
"Table directory exists but cannot load dataset {}: {:?}",
table_name, err
)
.into(),
))
}
}
}
}
async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
if let Some(ref manifest_ns) = self.manifest_ns {
match manifest_ns.table_exists(request.clone()).await {
Ok(()) => return Ok(()),
Err(_) if self.dir_listing_enabled => {
}
Err(e) => return Err(e),
}
}
let table_name = Self::table_name_from_id(&request.id)?;
let status = self.check_table_status(&table_name).await;
if !status.exists {
return Err(Error::namespace_source(
format!("Table does not exist: {}", table_name).into(),
));
}
if status.is_deregistered {
return Err(Error::namespace_source(
format!("Table is deregistered: {}", table_name).into(),
));
}
Ok(())
}
async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
if let Some(ref manifest_ns) = self.manifest_ns {
return manifest_ns.drop_table(request).await;
}
let table_name = Self::table_name_from_id(&request.id)?;
let table_uri = self.table_full_uri(&table_name);
let table_path = self.table_path(&table_name);
self.object_store
.remove_dir_all(table_path)
.await
.map_err(|e| {
Error::namespace_source(
format!("Failed to drop table {}: {}", table_name, e).into(),
)
})?;
Ok(DropTableResponse {
id: request.id,
location: Some(table_uri),
..Default::default()
})
}
async fn create_table(
&self,
request: CreateTableRequest,
request_data: Bytes,
) -> Result<CreateTableResponse> {
if let Some(ref manifest_ns) = self.manifest_ns {
return manifest_ns.create_table(request, request_data).await;
}
let table_name = Self::table_name_from_id(&request.id)?;
let table_uri = self.table_full_uri(&table_name);
if request_data.is_empty() {
return Err(Error::namespace_source(
"Request data (Arrow IPC stream) is required for create_table".into(),
));
}
let cursor = Cursor::new(request_data.to_vec());
let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| {
Error::namespace_source(format!("Invalid Arrow IPC stream: {}", e).into())
})?;
let arrow_schema = stream_reader.schema();
let mut batches = Vec::new();
for batch_result in stream_reader {
batches.push(batch_result.map_err(|e| {
Error::namespace_source(
format!("Failed to read batch from IPC stream: {}", e).into(),
)
})?);
}
let reader = if batches.is_empty() {
let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
let batches = vec![Ok(batch)];
RecordBatchIterator::new(batches, arrow_schema.clone())
} else {
let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
RecordBatchIterator::new(batch_results, arrow_schema)
};
let store_params = self.storage_options.as_ref().map(|opts| ObjectStoreParams {
storage_options_accessor: Some(Arc::new(
lance_io::object_store::StorageOptionsAccessor::with_static_options(opts.clone()),
)),
..Default::default()
});
let write_params = WriteParams {
mode: WriteMode::Create,
store_params,
..Default::default()
};
Dataset::write(reader, &table_uri, Some(write_params))
.await
.map_err(|e| {
Error::namespace_source(format!("Failed to create Lance dataset: {}", e).into())
})?;
Ok(CreateTableResponse {
version: Some(1),
location: Some(table_uri),
storage_options: self.storage_options.clone(),
..Default::default()
})
}
async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
if let Some(ref manifest_ns) = self.manifest_ns {
let mut response = manifest_ns.declare_table(request.clone()).await?;
if let Some(ref location) = response.location {
let vend = request.vend_credentials.unwrap_or(true);
let identity = request.identity.as_deref();
response.storage_options = self
.get_storage_options_for_table(location, vend, identity)
.await?;
}
if self.table_version_tracking_enabled {
response.managed_versioning = Some(true);
}
return Ok(response);
}
let table_name = Self::table_name_from_id(&request.id)?;
let table_uri = self.table_full_uri(&table_name);
if let Some(location) = &request.location {
let location = location.trim_end_matches('/');
if location != table_uri {
return Err(Error::namespace_source(
format!(
"Cannot declare table {} at location {}, must be at location {}",
table_name, location, table_uri
)
.into(),
));
}
}
let status = self.check_table_status(&table_name).await;
if status.exists && !status.has_reserved_file {
return Err(Error::namespace_source(
format!("Table already exists: {}", table_name).into(),
));
}
let reserved_file_path = self.table_reserved_file_path(&table_name);
self.put_marker_file_atomic(&reserved_file_path, &format!("table {}", table_name))
.await
.map_err(|e| Error::namespace_source(e.into()))?;
let vend_credentials = request.vend_credentials.unwrap_or(true);
let identity = request.identity.as_deref();
let storage_options = self
.get_storage_options_for_table(&table_uri, vend_credentials, identity)
.await?;
Ok(DeclareTableResponse {
location: Some(table_uri),
storage_options,
managed_versioning: if self.table_version_tracking_enabled {
Some(true)
} else {
None
},
..Default::default()
})
}
async fn register_table(
&self,
request: lance_namespace::models::RegisterTableRequest,
) -> Result<lance_namespace::models::RegisterTableResponse> {
if let Some(ref manifest_ns) = self.manifest_ns {
return LanceNamespace::register_table(manifest_ns.as_ref(), request).await;
}
Err(Error::not_supported_source(
"register_table is only supported when manifest mode is enabled".into(),
))
}
async fn deregister_table(
&self,
request: lance_namespace::models::DeregisterTableRequest,
) -> Result<lance_namespace::models::DeregisterTableResponse> {
if let Some(ref manifest_ns) = self.manifest_ns {
return LanceNamespace::deregister_table(manifest_ns.as_ref(), request).await;
}
let table_name = Self::table_name_from_id(&request.id)?;
let table_uri = self.table_full_uri(&table_name);
let status = self.check_table_status(&table_name).await;
if !status.exists {
return Err(Error::namespace_source(
format!("Table does not exist: {}", table_name).into(),
));
}
if status.is_deregistered {
return Err(Error::namespace_source(
format!("Table is already deregistered: {}", table_name).into(),
));
}
let deregistered_path = self.table_deregistered_file_path(&table_name);
self.put_marker_file_atomic(
&deregistered_path,
&format!("deregistration marker for table {}", table_name),
)
.await
.map_err(|e| {
let message = if e.contains("already exists") {
format!("Table is already deregistered: {}", table_name)
} else {
e
};
Error::namespace_source(message.into())
})?;
Ok(lance_namespace::models::DeregisterTableResponse {
id: request.id,
location: Some(table_uri),
..Default::default()
})
}
async fn list_table_versions(
&self,
request: ListTableVersionsRequest,
) -> Result<ListTableVersionsResponse> {
if self.table_version_storage_enabled
&& let Some(ref manifest_ns) = self.manifest_ns
{
let table_id = request.id.clone().unwrap_or_default();
let want_descending = request.descending == Some(true);
return manifest_ns
.list_table_versions(&table_id, want_descending, request.limit)
.await;
}
let table_uri = self.resolve_table_location(&request.id).await?;
let table_path = Self::uri_to_object_store_path(&table_uri);
let versions_dir = table_path.child("_versions");
let manifest_metas: Vec<_> = self
.object_store
.read_dir_all(&versions_dir, None)
.try_collect()
.await
.map_err(|e| {
Error::namespace_source(
format!(
"Failed to list manifest files for table at '{}': {}",
table_uri, e
)
.into(),
)
})?;
let is_v2_naming = manifest_metas
.first()
.is_some_and(|meta| meta.location.filename().is_some_and(|f| f.len() == 29));
let mut table_versions: Vec<TableVersion> = manifest_metas
.into_iter()
.filter_map(|meta| {
let filename = meta.location.filename()?;
let version_str = filename.strip_suffix(".manifest")?;
if version_str.starts_with('d') {
return None;
}
let file_version: u64 = version_str.parse().ok()?;
let actual_version = if file_version > u64::MAX / 2 {
u64::MAX - file_version
} else {
file_version
};
Some(TableVersion {
version: actual_version as i64,
manifest_path: meta.location.to_string(),
manifest_size: Some(meta.size as i64),
e_tag: meta.e_tag,
timestamp_millis: Some(meta.last_modified.timestamp_millis()),
metadata: None,
})
})
.collect();
let list_is_ordered = self.object_store.list_is_lexically_ordered;
let want_descending = request.descending == Some(true);
let needs_sort = if list_is_ordered {
if is_v2_naming {
!want_descending
} else {
want_descending
}
} else {
true
};
if needs_sort {
if want_descending {
table_versions.sort_by(|a, b| b.version.cmp(&a.version));
} else {
table_versions.sort_by(|a, b| a.version.cmp(&b.version));
}
}
if let Some(limit) = request.limit {
table_versions.truncate(limit as usize);
}
Ok(ListTableVersionsResponse {
versions: table_versions,
page_token: None,
})
}
async fn create_table_version(
&self,
request: CreateTableVersionRequest,
) -> Result<CreateTableVersionResponse> {
let table_uri = self.resolve_table_location(&request.id).await?;
let staging_manifest_path = &request.manifest_path;
let version = request.version as u64;
let table_path = Self::uri_to_object_store_path(&table_uri);
let naming_scheme = match request.naming_scheme.as_deref() {
Some("V1") => ManifestNamingScheme::V1,
_ => ManifestNamingScheme::V2,
};
let final_path = naming_scheme.manifest_path(&table_path, version);
let staging_path = Self::uri_to_object_store_path(staging_manifest_path);
let manifest_data = self
.object_store
.inner
.get(&staging_path)
.await
.map_err(|e| {
Error::namespace_source(
format!(
"Failed to read staging manifest at '{}': {}",
staging_manifest_path, e
)
.into(),
)
})?
.bytes()
.await
.map_err(|e| {
Error::namespace_source(
format!(
"Failed to read staging manifest bytes at '{}': {}",
staging_manifest_path, e
)
.into(),
)
})?;
let manifest_size = manifest_data.len() as i64;
let put_result = self
.object_store
.inner
.put_opts(
&final_path,
manifest_data.into(),
PutOptions {
mode: PutMode::Create,
..Default::default()
},
)
.await
.map_err(|e| match e {
object_store::Error::AlreadyExists { .. }
| object_store::Error::Precondition { .. } => Error::namespace_source(
format!(
"Version {} already exists for table at '{}'",
version, table_uri
)
.into(),
),
_ => Error::namespace_source(
format!(
"Failed to create version {} for table at '{}': {}",
version, table_uri, e
)
.into(),
),
})?;
if let Err(e) = self.object_store.inner.delete(&staging_path).await {
log::warn!(
"Failed to delete staging manifest at '{}': {:?}",
staging_path,
e
);
}
if self.table_version_storage_enabled
&& let Some(ref manifest_ns) = self.manifest_ns
{
let table_id_str =
manifest::ManifestNamespace::str_object_id(&request.id.clone().unwrap_or_default());
let object_id =
manifest::ManifestNamespace::build_version_object_id(&table_id_str, version as i64);
let metadata_json = serde_json::json!({
"manifest_path": final_path.to_string(),
"manifest_size": manifest_size,
"e_tag": put_result.e_tag,
"naming_scheme": request.naming_scheme.as_deref().unwrap_or("V2"),
})
.to_string();
if let Err(e) = manifest_ns
.insert_into_manifest_with_metadata(
vec![manifest::ManifestEntry {
object_id,
object_type: manifest::ObjectType::TableVersion,
location: None,
metadata: Some(metadata_json),
}],
None,
)
.await
{
log::warn!(
"Failed to record table version in __manifest (best-effort): {:?}",
e
);
}
}
Ok(CreateTableVersionResponse {
transaction_id: None,
version: Some(Box::new(TableVersion {
version: version as i64,
manifest_path: final_path.to_string(),
manifest_size: Some(manifest_size),
e_tag: put_result.e_tag,
timestamp_millis: None,
metadata: None,
})),
})
}
async fn describe_table_version(
&self,
request: DescribeTableVersionRequest,
) -> Result<DescribeTableVersionResponse> {
if self.table_version_storage_enabled
&& let (Some(manifest_ns), Some(version)) = (&self.manifest_ns, request.version)
{
let table_id = request.id.clone().unwrap_or_default();
return manifest_ns.describe_table_version(&table_id, version).await;
}
let table_uri = self.resolve_table_location(&request.id).await?;
let mut builder = DatasetBuilder::from_uri(&table_uri);
if let Some(opts) = &self.storage_options {
builder = builder.with_storage_options(opts.clone());
}
if let Some(sess) = &self.session {
builder = builder.with_session(sess.clone());
}
let mut dataset = builder.load().await.map_err(|e| {
Error::namespace_source(
format!("Failed to open table at '{}': {}", table_uri, e).into(),
)
})?;
if let Some(version) = request.version {
dataset = dataset
.checkout_version(version as u64)
.await
.map_err(|e| {
Error::namespace_source(
format!(
"Failed to checkout version {} for table at '{}': {}",
version, table_uri, e
)
.into(),
)
})?;
}
let version_info = dataset.version();
let manifest_location = dataset.manifest_location();
let metadata: std::collections::HashMap<String, String> =
version_info.metadata.into_iter().collect();
let table_version = TableVersion {
version: version_info.version as i64,
manifest_path: manifest_location.path.to_string(),
manifest_size: manifest_location.size.map(|s| s as i64),
e_tag: manifest_location.e_tag.clone(),
timestamp_millis: Some(version_info.timestamp.timestamp_millis()),
metadata: if metadata.is_empty() {
None
} else {
Some(metadata)
},
};
Ok(DescribeTableVersionResponse {
version: Box::new(table_version),
})
}
async fn batch_delete_table_versions(
&self,
request: BatchDeleteTableVersionsRequest,
) -> Result<BatchDeleteTableVersionsResponse> {
let ranges: Vec<(i64, i64)> = request
.ranges
.iter()
.map(|r| {
let start = r.start_version;
let end = if r.end_version > 0 {
r.end_version
} else {
start
};
(start, end)
})
.collect();
let table_entries = vec![TableDeleteEntry {
table_id: request.id.clone(),
ranges,
}];
let mut total_deleted_count = 0i64;
if self.table_version_storage_enabled
&& let Some(ref manifest_ns) = self.manifest_ns
{
let mut all_object_ids: Vec<String> = Vec::new();
for te in &table_entries {
let table_id_str = manifest::ManifestNamespace::str_object_id(
&te.table_id.clone().unwrap_or_default(),
);
for (start, end) in &te.ranges {
for version in *start..=*end {
let object_id = manifest::ManifestNamespace::build_version_object_id(
&table_id_str,
version,
);
all_object_ids.push(object_id);
}
}
}
if !all_object_ids.is_empty() {
total_deleted_count = manifest_ns
.batch_delete_table_versions_by_object_ids(&all_object_ids)
.await?;
}
let _ = self
.delete_physical_version_files(&table_entries, true)
.await;
return Ok(BatchDeleteTableVersionsResponse {
deleted_count: Some(total_deleted_count),
transaction_id: None,
});
}
total_deleted_count = self
.delete_physical_version_files(&table_entries, false)
.await?;
Ok(BatchDeleteTableVersionsResponse {
deleted_count: Some(total_deleted_count),
transaction_id: None,
})
}
fn namespace_id(&self) -> String {
format!("DirectoryNamespace {{ root: {:?} }}", self.root)
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_ipc::reader::StreamReader;
use lance::dataset::Dataset;
use lance_core::utils::tempfile::{TempStdDir, TempStrDir};
use lance_namespace::models::{
CreateTableRequest, JsonArrowDataType, JsonArrowField, JsonArrowSchema, ListTablesRequest,
};
use lance_namespace::schema::convert_json_arrow_schema;
use std::io::Cursor;
use std::sync::Arc;
async fn create_test_namespace() -> (DirectoryNamespace, TempStdDir) {
let temp_dir = TempStdDir::default();
let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
.build()
.await
.unwrap();
(namespace, temp_dir)
}
fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
use arrow::ipc::writer::StreamWriter;
let arrow_schema = convert_json_arrow_schema(schema).unwrap();
let arrow_schema = Arc::new(arrow_schema);
let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
let mut buffer = Vec::new();
{
let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
}
buffer
}
fn create_test_schema() -> JsonArrowSchema {
let int_type = JsonArrowDataType::new("int32".to_string());
let string_type = JsonArrowDataType::new("utf8".to_string());
let id_field = JsonArrowField {
name: "id".to_string(),
r#type: Box::new(int_type),
nullable: false,
metadata: None,
};
let name_field = JsonArrowField {
name: "name".to_string(),
r#type: Box::new(string_type),
nullable: true,
metadata: None,
};
JsonArrowSchema {
fields: vec![id_field, name_field],
metadata: None,
}
}
#[tokio::test]
async fn test_create_table() {
let (namespace, _temp_dir) = create_test_namespace().await;
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut request = CreateTableRequest::new();
request.id = Some(vec!["test_table".to_string()]);
let response = namespace
.create_table(request, bytes::Bytes::from(ipc_data))
.await
.unwrap();
assert!(response.location.is_some());
assert!(response.location.unwrap().ends_with("test_table.lance"));
assert_eq!(response.version, Some(1));
}
#[tokio::test]
async fn test_create_table_without_data() {
let (namespace, _temp_dir) = create_test_namespace().await;
let mut request = CreateTableRequest::new();
request.id = Some(vec!["test_table".to_string()]);
let result = namespace.create_table(request, bytes::Bytes::new()).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Arrow IPC stream) is required")
);
}
#[tokio::test]
async fn test_create_table_with_invalid_id() {
let (namespace, _temp_dir) = create_test_namespace().await;
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut request = CreateTableRequest::new();
request.id = Some(vec![]);
let result = namespace
.create_table(request, bytes::Bytes::from(ipc_data.clone()))
.await;
assert!(result.is_err());
let mut create_ns_req = CreateNamespaceRequest::new();
create_ns_req.id = Some(vec!["test_namespace".to_string()]);
namespace.create_namespace(create_ns_req).await.unwrap();
let mut request = CreateTableRequest::new();
request.id = Some(vec!["test_namespace".to_string(), "table".to_string()]);
let result = namespace
.create_table(request, bytes::Bytes::from(ipc_data))
.await;
assert!(
result.is_ok(),
"Multi-level table IDs should work with manifest enabled"
);
}
#[tokio::test]
async fn test_list_tables() {
let (namespace, _temp_dir) = create_test_namespace().await;
let mut request = ListTablesRequest::new();
request.id = Some(vec![]);
let response = namespace.list_tables(request).await.unwrap();
assert_eq!(response.tables.len(), 0);
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["table1".to_string()]);
namespace
.create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
.await
.unwrap();
let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["table2".to_string()]);
namespace
.create_table(create_request, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut request = ListTablesRequest::new();
request.id = Some(vec![]);
let response = namespace.list_tables(request).await.unwrap();
let tables = response.tables;
assert_eq!(tables.len(), 2);
assert!(tables.contains(&"table1".to_string()));
assert!(tables.contains(&"table2".to_string()));
}
#[tokio::test]
async fn test_list_tables_with_namespace_id() {
let (namespace, _temp_dir) = create_test_namespace().await;
let mut create_ns_req = CreateNamespaceRequest::new();
create_ns_req.id = Some(vec!["test_namespace".to_string()]);
namespace.create_namespace(create_ns_req).await.unwrap();
let mut request = ListTablesRequest::new();
request.id = Some(vec!["test_namespace".to_string()]);
let result = namespace.list_tables(request).await;
assert!(
result.is_ok(),
"list_tables should work with child namespace when manifest is enabled"
);
let response = result.unwrap();
assert_eq!(
response.tables.len(),
0,
"Namespace should have no tables yet"
);
}
#[tokio::test]
async fn test_describe_table() {
let (namespace, _temp_dir) = create_test_namespace().await;
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["test_table".to_string()]);
namespace
.create_table(create_request, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut request = DescribeTableRequest::new();
request.id = Some(vec!["test_table".to_string()]);
let response = namespace.describe_table(request).await.unwrap();
assert!(response.location.is_some());
assert!(response.location.unwrap().ends_with("test_table.lance"));
}
#[tokio::test]
async fn test_describe_nonexistent_table() {
let (namespace, _temp_dir) = create_test_namespace().await;
let mut request = DescribeTableRequest::new();
request.id = Some(vec!["nonexistent".to_string()]);
let result = namespace.describe_table(request).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Table does not exist")
);
}
#[tokio::test]
async fn test_table_exists() {
let (namespace, _temp_dir) = create_test_namespace().await;
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["existing_table".to_string()]);
namespace
.create_table(create_request, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut request = TableExistsRequest::new();
request.id = Some(vec!["existing_table".to_string()]);
let result = namespace.table_exists(request).await;
assert!(result.is_ok());
let mut request = TableExistsRequest::new();
request.id = Some(vec!["nonexistent".to_string()]);
let result = namespace.table_exists(request).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Table does not exist")
);
}
#[tokio::test]
async fn test_drop_table() {
let (namespace, _temp_dir) = create_test_namespace().await;
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["table_to_drop".to_string()]);
namespace
.create_table(create_request, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut exists_request = TableExistsRequest::new();
exists_request.id = Some(vec!["table_to_drop".to_string()]);
assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
let mut drop_request = DropTableRequest::new();
drop_request.id = Some(vec!["table_to_drop".to_string()]);
let response = namespace.drop_table(drop_request).await.unwrap();
assert!(response.location.is_some());
assert!(namespace.table_exists(exists_request).await.is_err());
}
#[tokio::test]
async fn test_drop_nonexistent_table() {
let (namespace, _temp_dir) = create_test_namespace().await;
let mut request = DropTableRequest::new();
request.id = Some(vec!["nonexistent".to_string()]);
let result = namespace.drop_table(request).await;
let _ = result;
}
#[tokio::test]
async fn test_root_namespace_operations() {
let (namespace, _temp_dir) = create_test_namespace().await;
let mut request = ListNamespacesRequest::new();
request.id = Some(vec![]);
let result = namespace.list_namespaces(request).await;
assert!(result.is_ok());
assert_eq!(result.unwrap().namespaces.len(), 0);
let mut request = DescribeNamespaceRequest::new();
request.id = Some(vec![]);
let result = namespace.describe_namespace(request).await;
assert!(result.is_ok());
let mut request = NamespaceExistsRequest::new();
request.id = Some(vec![]);
let result = namespace.namespace_exists(request).await;
assert!(result.is_ok());
let mut request = CreateNamespaceRequest::new();
request.id = Some(vec![]);
let result = namespace.create_namespace(request).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("already exists"));
let mut request = DropNamespaceRequest::new();
request.id = Some(vec![]);
let result = namespace.drop_namespace(request).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("cannot be dropped")
);
}
#[tokio::test]
async fn test_non_root_namespace_operations() {
let (namespace, _temp_dir) = create_test_namespace().await;
let mut request = CreateNamespaceRequest::new();
request.id = Some(vec!["child".to_string()]);
let result = namespace.create_namespace(request).await;
assert!(
result.is_ok(),
"Child namespace creation should succeed with manifest enabled"
);
let mut request = NamespaceExistsRequest::new();
request.id = Some(vec!["child".to_string()]);
let result = namespace.namespace_exists(request).await;
assert!(
result.is_ok(),
"Child namespace should exist after creation"
);
let mut request = DropNamespaceRequest::new();
request.id = Some(vec!["child".to_string()]);
let result = namespace.drop_namespace(request).await;
assert!(
result.is_ok(),
"Child namespace drop should succeed with manifest enabled"
);
let mut request = NamespaceExistsRequest::new();
request.id = Some(vec!["child".to_string()]);
let result = namespace.namespace_exists(request).await;
assert!(
result.is_err(),
"Child namespace should not exist after drop"
);
}
#[tokio::test]
async fn test_config_custom_root() {
let temp_dir = TempStdDir::default();
let custom_path = temp_dir.join("custom");
std::fs::create_dir(&custom_path).unwrap();
let namespace = DirectoryNamespaceBuilder::new(custom_path.to_string_lossy().to_string())
.build()
.await
.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut request = CreateTableRequest::new();
request.id = Some(vec!["test_table".to_string()]);
let response = namespace
.create_table(request, bytes::Bytes::from(ipc_data))
.await
.unwrap();
assert!(response.location.unwrap().contains("custom"));
}
#[tokio::test]
async fn test_config_storage_options() {
let temp_dir = TempStdDir::default();
let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
.storage_option("option1", "value1")
.storage_option("option2", "value2")
.build()
.await
.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut request = CreateTableRequest::new();
request.id = Some(vec!["test_table".to_string()]);
let response = namespace
.create_table(request, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let storage_options = response.storage_options.unwrap();
assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
}
#[tokio::test]
async fn test_no_storage_options_without_vendor() {
use lance_namespace::models::DeclareTableRequest;
let temp_dir = TempStdDir::default();
let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
.manifest_enabled(false)
.storage_option("aws_access_key_id", "AKID")
.storage_option("aws_secret_access_key", "SECRET")
.storage_option("region", "us-east-1")
.build()
.await
.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec!["t1".to_string()]);
namespace
.create_table(create_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut desc_req = DescribeTableRequest::new();
desc_req.id = Some(vec!["t1".to_string()]);
let resp = namespace.describe_table(desc_req).await.unwrap();
assert!(resp.storage_options.is_none());
let mut decl_req = DeclareTableRequest::new();
decl_req.id = Some(vec!["t2".to_string()]);
let resp = namespace.declare_table(decl_req).await.unwrap();
assert!(resp.storage_options.is_none());
}
#[tokio::test]
async fn test_no_storage_options_without_vendor_manifest() {
let temp_dir = TempStdDir::default();
let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
.storage_option("aws_access_key_id", "AKID")
.storage_option("aws_secret_access_key", "SECRET")
.storage_option("region", "us-east-1")
.build()
.await
.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec!["t1".to_string()]);
namespace
.create_table(create_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut desc_req = DescribeTableRequest::new();
desc_req.id = Some(vec!["t1".to_string()]);
let resp = namespace.describe_table(desc_req).await.unwrap();
assert!(resp.storage_options.is_none());
}
#[tokio::test]
async fn test_from_properties_manifest_enabled() {
let temp_dir = TempStdDir::default();
let mut properties = HashMap::new();
properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
properties.insert("manifest_enabled".to_string(), "true".to_string());
properties.insert("dir_listing_enabled".to_string(), "false".to_string());
let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
assert!(builder.manifest_enabled);
assert!(!builder.dir_listing_enabled);
let namespace = builder.build().await.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut request = CreateTableRequest::new();
request.id = Some(vec!["test_table".to_string()]);
let response = namespace
.create_table(request, bytes::Bytes::from(ipc_data))
.await
.unwrap();
assert!(response.location.is_some());
}
#[tokio::test]
async fn test_from_properties_dir_listing_enabled() {
let temp_dir = TempStdDir::default();
let mut properties = HashMap::new();
properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
properties.insert("manifest_enabled".to_string(), "false".to_string());
properties.insert("dir_listing_enabled".to_string(), "true".to_string());
let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
assert!(!builder.manifest_enabled);
assert!(builder.dir_listing_enabled);
let namespace = builder.build().await.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut request = CreateTableRequest::new();
request.id = Some(vec!["test_table".to_string()]);
let response = namespace
.create_table(request, bytes::Bytes::from(ipc_data))
.await
.unwrap();
assert!(response.location.is_some());
}
#[tokio::test]
async fn test_from_properties_defaults() {
let temp_dir = TempStdDir::default();
let mut properties = HashMap::new();
properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
assert!(builder.manifest_enabled);
assert!(builder.dir_listing_enabled);
}
#[tokio::test]
async fn test_from_properties_with_storage_options() {
let temp_dir = TempStdDir::default();
let mut properties = HashMap::new();
properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
properties.insert("manifest_enabled".to_string(), "true".to_string());
properties.insert("storage.region".to_string(), "us-west-2".to_string());
properties.insert("storage.bucket".to_string(), "my-bucket".to_string());
let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
assert!(builder.manifest_enabled);
assert!(builder.storage_options.is_some());
let storage_options = builder.storage_options.unwrap();
assert_eq!(
storage_options.get("region"),
Some(&"us-west-2".to_string())
);
assert_eq!(
storage_options.get("bucket"),
Some(&"my-bucket".to_string())
);
}
#[tokio::test]
async fn test_various_arrow_types() {
let (namespace, _temp_dir) = create_test_namespace().await;
let fields = vec![
JsonArrowField {
name: "bool_col".to_string(),
r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
nullable: true,
metadata: None,
},
JsonArrowField {
name: "int8_col".to_string(),
r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
nullable: true,
metadata: None,
},
JsonArrowField {
name: "float64_col".to_string(),
r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
nullable: true,
metadata: None,
},
JsonArrowField {
name: "binary_col".to_string(),
r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
nullable: true,
metadata: None,
},
];
let schema = JsonArrowSchema {
fields,
metadata: None,
};
let ipc_data = create_test_ipc_data(&schema);
let mut request = CreateTableRequest::new();
request.id = Some(vec!["complex_table".to_string()]);
let response = namespace
.create_table(request, bytes::Bytes::from(ipc_data))
.await
.unwrap();
assert!(response.location.is_some());
}
#[tokio::test]
async fn test_connect_dir() {
let temp_dir = TempStdDir::default();
let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
.build()
.await
.unwrap();
let mut request = ListTablesRequest::new();
request.id = Some(vec![]);
let response = namespace.list_tables(request).await.unwrap();
assert_eq!(response.tables.len(), 0);
}
#[tokio::test]
async fn test_create_table_with_ipc_data() {
use arrow::array::{Int32Array, StringArray};
use arrow::ipc::writer::StreamWriter;
let (namespace, _temp_dir) = create_test_namespace().await;
let schema = create_test_schema();
let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
let arrow_schema = Arc::new(arrow_schema);
let id_array = Int32Array::from(vec![1, 2, 3]);
let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
let batch = arrow::record_batch::RecordBatch::try_new(
arrow_schema.clone(),
vec![Arc::new(id_array), Arc::new(name_array)],
)
.unwrap();
let mut buffer = Vec::new();
{
let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
}
let mut request = CreateTableRequest::new();
request.id = Some(vec!["test_table_with_data".to_string()]);
let response = namespace
.create_table(request, Bytes::from(buffer))
.await
.unwrap();
assert_eq!(response.version, Some(1));
assert!(
response
.location
.unwrap()
.contains("test_table_with_data.lance")
);
let mut exists_request = TableExistsRequest::new();
exists_request.id = Some(vec!["test_table_with_data".to_string()]);
namespace.table_exists(exists_request).await.unwrap();
}
#[tokio::test]
async fn test_child_namespace_create_and_list() {
let (namespace, _temp_dir) = create_test_namespace().await;
for i in 1..=3 {
let mut create_req = CreateNamespaceRequest::new();
create_req.id = Some(vec![format!("ns{}", i)]);
let result = namespace.create_namespace(create_req).await;
assert!(result.is_ok(), "Failed to create child namespace ns{}", i);
}
let list_req = ListNamespacesRequest {
id: Some(vec![]),
..Default::default()
};
let result = namespace.list_namespaces(list_req).await;
assert!(result.is_ok());
let namespaces = result.unwrap().namespaces;
assert_eq!(namespaces.len(), 3);
assert!(namespaces.contains(&"ns1".to_string()));
assert!(namespaces.contains(&"ns2".to_string()));
assert!(namespaces.contains(&"ns3".to_string()));
}
#[tokio::test]
async fn test_nested_namespace_hierarchy() {
let (namespace, _temp_dir) = create_test_namespace().await;
let mut create_req = CreateNamespaceRequest::new();
create_req.id = Some(vec!["parent".to_string()]);
namespace.create_namespace(create_req).await.unwrap();
let mut create_req = CreateNamespaceRequest::new();
create_req.id = Some(vec!["parent".to_string(), "child1".to_string()]);
namespace.create_namespace(create_req).await.unwrap();
let mut create_req = CreateNamespaceRequest::new();
create_req.id = Some(vec!["parent".to_string(), "child2".to_string()]);
namespace.create_namespace(create_req).await.unwrap();
let list_req = ListNamespacesRequest {
id: Some(vec!["parent".to_string()]),
..Default::default()
};
let result = namespace.list_namespaces(list_req).await;
assert!(result.is_ok());
let children = result.unwrap().namespaces;
assert_eq!(children.len(), 2);
assert!(children.contains(&"child1".to_string()));
assert!(children.contains(&"child2".to_string()));
let list_req = ListNamespacesRequest {
id: Some(vec![]),
..Default::default()
};
let result = namespace.list_namespaces(list_req).await;
assert!(result.is_ok());
let root_namespaces = result.unwrap().namespaces;
assert_eq!(root_namespaces.len(), 1);
assert_eq!(root_namespaces[0], "parent");
}
#[tokio::test]
async fn test_table_in_child_namespace() {
let (namespace, _temp_dir) = create_test_namespace().await;
let mut create_ns_req = CreateNamespaceRequest::new();
create_ns_req.id = Some(vec!["test_ns".to_string()]);
namespace.create_namespace(create_ns_req).await.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_table_req = CreateTableRequest::new();
create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
let result = namespace
.create_table(create_table_req, bytes::Bytes::from(ipc_data))
.await;
assert!(result.is_ok(), "Failed to create table in child namespace");
let list_req = ListTablesRequest {
id: Some(vec!["test_ns".to_string()]),
..Default::default()
};
let result = namespace.list_tables(list_req).await;
assert!(result.is_ok());
let tables = result.unwrap().tables;
assert_eq!(tables.len(), 1);
assert_eq!(tables[0], "table1");
let mut exists_req = TableExistsRequest::new();
exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
let result = namespace.table_exists(exists_req).await;
assert!(result.is_ok());
let mut describe_req = DescribeTableRequest::new();
describe_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
let result = namespace.describe_table(describe_req).await;
assert!(result.is_ok());
let response = result.unwrap();
assert!(response.location.is_some());
}
#[tokio::test]
async fn test_multiple_tables_in_child_namespace() {
let (namespace, _temp_dir) = create_test_namespace().await;
let mut create_ns_req = CreateNamespaceRequest::new();
create_ns_req.id = Some(vec!["test_ns".to_string()]);
namespace.create_namespace(create_ns_req).await.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
for i in 1..=3 {
let mut create_table_req = CreateTableRequest::new();
create_table_req.id = Some(vec!["test_ns".to_string(), format!("table{}", i)]);
namespace
.create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
.await
.unwrap();
}
let list_req = ListTablesRequest {
id: Some(vec!["test_ns".to_string()]),
..Default::default()
};
let result = namespace.list_tables(list_req).await;
assert!(result.is_ok());
let tables = result.unwrap().tables;
assert_eq!(tables.len(), 3);
assert!(tables.contains(&"table1".to_string()));
assert!(tables.contains(&"table2".to_string()));
assert!(tables.contains(&"table3".to_string()));
}
#[tokio::test]
async fn test_drop_table_in_child_namespace() {
let (namespace, _temp_dir) = create_test_namespace().await;
let mut create_ns_req = CreateNamespaceRequest::new();
create_ns_req.id = Some(vec!["test_ns".to_string()]);
namespace.create_namespace(create_ns_req).await.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_table_req = CreateTableRequest::new();
create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
namespace
.create_table(create_table_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut drop_req = DropTableRequest::new();
drop_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
let result = namespace.drop_table(drop_req).await;
assert!(result.is_ok(), "Failed to drop table in child namespace");
let mut exists_req = TableExistsRequest::new();
exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
let result = namespace.table_exists(exists_req).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_deeply_nested_namespace() {
let (namespace, _temp_dir) = create_test_namespace().await;
let mut create_req = CreateNamespaceRequest::new();
create_req.id = Some(vec!["level1".to_string()]);
namespace.create_namespace(create_req).await.unwrap();
let mut create_req = CreateNamespaceRequest::new();
create_req.id = Some(vec!["level1".to_string(), "level2".to_string()]);
namespace.create_namespace(create_req).await.unwrap();
let mut create_req = CreateNamespaceRequest::new();
create_req.id = Some(vec![
"level1".to_string(),
"level2".to_string(),
"level3".to_string(),
]);
namespace.create_namespace(create_req).await.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_table_req = CreateTableRequest::new();
create_table_req.id = Some(vec![
"level1".to_string(),
"level2".to_string(),
"level3".to_string(),
"table1".to_string(),
]);
let result = namespace
.create_table(create_table_req, bytes::Bytes::from(ipc_data))
.await;
assert!(
result.is_ok(),
"Failed to create table in deeply nested namespace"
);
let mut exists_req = TableExistsRequest::new();
exists_req.id = Some(vec![
"level1".to_string(),
"level2".to_string(),
"level3".to_string(),
"table1".to_string(),
]);
let result = namespace.table_exists(exists_req).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_namespace_with_properties() {
let (namespace, _temp_dir) = create_test_namespace().await;
let mut properties = HashMap::new();
properties.insert("owner".to_string(), "test_user".to_string());
properties.insert("description".to_string(), "Test namespace".to_string());
let mut create_req = CreateNamespaceRequest::new();
create_req.id = Some(vec!["test_ns".to_string()]);
create_req.properties = Some(properties.clone());
namespace.create_namespace(create_req).await.unwrap();
let describe_req = DescribeNamespaceRequest {
id: Some(vec!["test_ns".to_string()]),
..Default::default()
};
let result = namespace.describe_namespace(describe_req).await;
assert!(result.is_ok());
let response = result.unwrap();
assert!(response.properties.is_some());
let props = response.properties.unwrap();
assert_eq!(props.get("owner"), Some(&"test_user".to_string()));
assert_eq!(
props.get("description"),
Some(&"Test namespace".to_string())
);
}
#[tokio::test]
async fn test_cannot_drop_namespace_with_tables() {
let (namespace, _temp_dir) = create_test_namespace().await;
let mut create_ns_req = CreateNamespaceRequest::new();
create_ns_req.id = Some(vec!["test_ns".to_string()]);
namespace.create_namespace(create_ns_req).await.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_table_req = CreateTableRequest::new();
create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
namespace
.create_table(create_table_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut drop_req = DropNamespaceRequest::new();
drop_req.id = Some(vec!["test_ns".to_string()]);
let result = namespace.drop_namespace(drop_req).await;
assert!(
result.is_err(),
"Should not be able to drop namespace with tables"
);
}
#[tokio::test]
async fn test_isolation_between_namespaces() {
let (namespace, _temp_dir) = create_test_namespace().await;
let mut create_req = CreateNamespaceRequest::new();
create_req.id = Some(vec!["ns1".to_string()]);
namespace.create_namespace(create_req).await.unwrap();
let mut create_req = CreateNamespaceRequest::new();
create_req.id = Some(vec!["ns2".to_string()]);
namespace.create_namespace(create_req).await.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_table_req = CreateTableRequest::new();
create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
namespace
.create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
.await
.unwrap();
let mut create_table_req = CreateTableRequest::new();
create_table_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
namespace
.create_table(create_table_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let list_req = ListTablesRequest {
id: Some(vec!["ns1".to_string()]),
page_token: None,
limit: None,
..Default::default()
};
let result = namespace.list_tables(list_req).await.unwrap();
assert_eq!(result.tables.len(), 1);
assert_eq!(result.tables[0], "table1");
let list_req = ListTablesRequest {
id: Some(vec!["ns2".to_string()]),
page_token: None,
limit: None,
..Default::default()
};
let result = namespace.list_tables(list_req).await.unwrap();
assert_eq!(result.tables.len(), 1);
assert_eq!(result.tables[0], "table1");
let mut drop_req = DropTableRequest::new();
drop_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
namespace.drop_table(drop_req).await.unwrap();
let mut exists_req = TableExistsRequest::new();
exists_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
assert!(namespace.table_exists(exists_req).await.is_err());
let mut exists_req = TableExistsRequest::new();
exists_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
assert!(namespace.table_exists(exists_req).await.is_ok());
}
#[tokio::test]
async fn test_migrate_directory_tables() {
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let dir_only_ns = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(false)
.dir_listing_enabled(true)
.build()
.await
.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
for i in 1..=3 {
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec![format!("table{}", i)]);
dir_only_ns
.create_table(create_req, bytes::Bytes::from(ipc_data.clone()))
.await
.unwrap();
}
drop(dir_only_ns);
let dual_mode_ns = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(true)
.dir_listing_enabled(true)
.build()
.await
.unwrap();
let mut list_req = ListTablesRequest::new();
list_req.id = Some(vec![]);
let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
assert_eq!(tables.len(), 3);
let migrated_count = dual_mode_ns.migrate().await.unwrap();
assert_eq!(migrated_count, 3, "Should migrate all 3 tables");
let mut list_req = ListTablesRequest::new();
list_req.id = Some(vec![]);
let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
assert_eq!(tables.len(), 3);
let migrated_count = dual_mode_ns.migrate().await.unwrap();
assert_eq!(
migrated_count, 0,
"Should not migrate already-migrated tables"
);
drop(dual_mode_ns);
let manifest_only_ns = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(true)
.dir_listing_enabled(false)
.build()
.await
.unwrap();
let mut list_req = ListTablesRequest::new();
list_req.id = Some(vec![]);
let tables = manifest_only_ns.list_tables(list_req).await.unwrap().tables;
assert_eq!(tables.len(), 3);
assert!(tables.contains(&"table1".to_string()));
assert!(tables.contains(&"table2".to_string()));
assert!(tables.contains(&"table3".to_string()));
}
#[tokio::test]
async fn test_migrate_without_manifest() {
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(false)
.dir_listing_enabled(true)
.build()
.await
.unwrap();
let migrated_count = namespace.migrate().await.unwrap();
assert_eq!(migrated_count, 0);
}
#[tokio::test]
async fn test_register_table() {
use lance_namespace::models::{RegisterTableRequest, TableExistsRequest};
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.build()
.await
.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let table_uri = format!("{}/external_table.lance", temp_path);
let cursor = Cursor::new(ipc_data);
let stream_reader = StreamReader::try_new(cursor, None).unwrap();
let batches: Vec<_> = stream_reader
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
let schema = batches[0].schema();
let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
let reader = RecordBatchIterator::new(batch_results, schema);
Dataset::write(Box::new(reader), &table_uri, None)
.await
.unwrap();
let mut register_req = RegisterTableRequest::new("external_table.lance".to_string());
register_req.id = Some(vec!["registered_table".to_string()]);
let response = namespace.register_table(register_req).await.unwrap();
assert_eq!(response.location, Some("external_table.lance".to_string()));
let mut exists_req = TableExistsRequest::new();
exists_req.id = Some(vec!["registered_table".to_string()]);
assert!(namespace.table_exists(exists_req).await.is_ok());
let mut list_req = ListTablesRequest::new();
list_req.id = Some(vec![]);
let tables = namespace.list_tables(list_req).await.unwrap();
assert!(tables.tables.contains(&"registered_table".to_string()));
}
#[tokio::test]
async fn test_register_table_duplicate_fails() {
use lance_namespace::models::RegisterTableRequest;
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.build()
.await
.unwrap();
let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
register_req.id = Some(vec!["test_table".to_string()]);
namespace
.register_table(register_req.clone())
.await
.unwrap();
let result = namespace.register_table(register_req).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("already exists"));
}
#[tokio::test]
async fn test_deregister_table() {
use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(true)
.dir_listing_enabled(false)
.build()
.await
.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec!["test_table".to_string()]);
namespace
.create_table(create_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut exists_req = TableExistsRequest::new();
exists_req.id = Some(vec!["test_table".to_string()]);
assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
let mut deregister_req = DeregisterTableRequest::new();
deregister_req.id = Some(vec!["test_table".to_string()]);
let response = namespace.deregister_table(deregister_req).await.unwrap();
assert!(
response.location.is_some(),
"Deregister should return location"
);
let location = response.location.as_ref().unwrap();
let expected_url = lance_io::object_store::uri_to_url(temp_path)
.expect("Failed to convert temp path to URL");
let expected_prefix = expected_url.to_string();
assert!(
location.starts_with(&expected_prefix),
"Location should start with '{}', got: {}",
expected_prefix,
location
);
assert!(
location.contains("test_table"),
"Location should contain table name: {}",
location
);
assert_eq!(response.id, Some(vec!["test_table".to_string()]));
assert!(namespace.table_exists(exists_req).await.is_err());
let dataset = Dataset::open(location).await;
assert!(
dataset.is_ok(),
"Physical table data should still exist at {}",
location
);
}
#[tokio::test]
async fn test_deregister_table_in_child_namespace() {
use lance_namespace::models::{
CreateNamespaceRequest, DeregisterTableRequest, TableExistsRequest,
};
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.build()
.await
.unwrap();
let mut create_ns_req = CreateNamespaceRequest::new();
create_ns_req.id = Some(vec!["test_ns".to_string()]);
namespace.create_namespace(create_ns_req).await.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
namespace
.create_table(create_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut deregister_req = DeregisterTableRequest::new();
deregister_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
let response = namespace.deregister_table(deregister_req).await.unwrap();
assert!(
response.location.is_some(),
"Deregister should return location"
);
let location = response.location.as_ref().unwrap();
let expected_url = lance_io::object_store::uri_to_url(temp_path)
.expect("Failed to convert temp path to URL");
let expected_prefix = expected_url.to_string();
assert!(
location.starts_with(&expected_prefix),
"Location should start with '{}', got: {}",
expected_prefix,
location
);
assert!(
location.contains("test_ns") && location.contains("test_table"),
"Location should contain namespace and table name: {}",
location
);
assert_eq!(
response.id,
Some(vec!["test_ns".to_string(), "test_table".to_string()])
);
let mut exists_req = TableExistsRequest::new();
exists_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
assert!(namespace.table_exists(exists_req).await.is_err());
}
#[tokio::test]
async fn test_register_without_manifest_fails() {
use lance_namespace::models::RegisterTableRequest;
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(false)
.build()
.await
.unwrap();
let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
register_req.id = Some(vec!["test_table".to_string()]);
let result = namespace.register_table(register_req).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("manifest mode is enabled")
);
}
#[tokio::test]
async fn test_register_table_rejects_absolute_uri() {
use lance_namespace::models::RegisterTableRequest;
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.build()
.await
.unwrap();
let mut register_req = RegisterTableRequest::new("s3://bucket/table.lance".to_string());
register_req.id = Some(vec!["test_table".to_string()]);
let result = namespace.register_table(register_req).await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Absolute URIs are not allowed"));
}
#[tokio::test]
async fn test_register_table_rejects_absolute_path() {
use lance_namespace::models::RegisterTableRequest;
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.build()
.await
.unwrap();
let mut register_req = RegisterTableRequest::new("/tmp/table.lance".to_string());
register_req.id = Some(vec!["test_table".to_string()]);
let result = namespace.register_table(register_req).await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Absolute paths are not allowed"));
}
#[tokio::test]
async fn test_register_table_rejects_path_traversal() {
use lance_namespace::models::RegisterTableRequest;
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.build()
.await
.unwrap();
let mut register_req = RegisterTableRequest::new("../outside/table.lance".to_string());
register_req.id = Some(vec!["test_table".to_string()]);
let result = namespace.register_table(register_req).await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Path traversal is not allowed"));
}
#[tokio::test]
async fn test_namespace_write() {
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
use arrow::record_batch::{RecordBatch, RecordBatchIterator};
use lance::dataset::{Dataset, WriteMode, WriteParams};
use lance_namespace::LanceNamespace;
let (namespace, _temp_dir) = create_test_namespace().await;
let namespace = Arc::new(namespace) as Arc<dyn LanceNamespace>;
let table_id = vec!["test_ns".to_string(), "test_table".to_string()];
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("a", DataType::Int32, false),
ArrowField::new("b", DataType::Int32, false),
]));
let data1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![10, 20, 30])),
],
)
.unwrap();
let reader1 = RecordBatchIterator::new(vec![data1].into_iter().map(Ok), schema.clone());
let dataset =
Dataset::write_into_namespace(reader1, namespace.clone(), table_id.clone(), None)
.await
.unwrap();
assert_eq!(dataset.count_rows(None).await.unwrap(), 3);
assert_eq!(dataset.version().version, 1);
let data2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![4, 5])),
Arc::new(Int32Array::from(vec![40, 50])),
],
)
.unwrap();
let params_append = WriteParams {
mode: WriteMode::Append,
..Default::default()
};
let reader2 = RecordBatchIterator::new(vec![data2].into_iter().map(Ok), schema.clone());
let dataset = Dataset::write_into_namespace(
reader2,
namespace.clone(),
table_id.clone(),
Some(params_append),
)
.await
.unwrap();
assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
assert_eq!(dataset.version().version, 2);
let data3 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![100, 200])),
Arc::new(Int32Array::from(vec![1000, 2000])),
],
)
.unwrap();
let params_overwrite = WriteParams {
mode: WriteMode::Overwrite,
..Default::default()
};
let reader3 = RecordBatchIterator::new(vec![data3].into_iter().map(Ok), schema.clone());
let dataset = Dataset::write_into_namespace(
reader3,
namespace.clone(),
table_id.clone(),
Some(params_overwrite),
)
.await
.unwrap();
assert_eq!(dataset.count_rows(None).await.unwrap(), 2);
assert_eq!(dataset.version().version, 3);
let result = dataset.scan().try_into_batch().await.unwrap();
let a_col = result
.column_by_name("a")
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(a_col.values(), &[100, 200]);
}
#[tokio::test]
async fn test_declare_table_v1_mode() {
use lance_namespace::models::{
DeclareTableRequest, DescribeTableRequest, TableExistsRequest,
};
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(false)
.build()
.await
.unwrap();
let mut declare_req = DeclareTableRequest::new();
declare_req.id = Some(vec!["test_table".to_string()]);
let response = namespace.declare_table(declare_req).await.unwrap();
assert!(response.location.is_some());
let location = response.location.as_ref().unwrap();
assert!(location.ends_with("test_table.lance"));
let mut exists_req = TableExistsRequest::new();
exists_req.id = Some(vec!["test_table".to_string()]);
assert!(namespace.table_exists(exists_req).await.is_ok());
let mut describe_req = DescribeTableRequest::new();
describe_req.id = Some(vec!["test_table".to_string()]);
let describe_response = namespace.describe_table(describe_req).await.unwrap();
assert!(describe_response.location.is_some());
assert!(describe_response.version.is_none()); assert!(describe_response.schema.is_none()); }
#[tokio::test]
async fn test_declare_table_with_manifest() {
use lance_namespace::models::{DeclareTableRequest, TableExistsRequest};
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(true)
.dir_listing_enabled(false)
.build()
.await
.unwrap();
let mut declare_req = DeclareTableRequest::new();
declare_req.id = Some(vec!["test_table".to_string()]);
let response = namespace.declare_table(declare_req).await.unwrap();
assert!(response.location.is_some());
let mut exists_req = TableExistsRequest::new();
exists_req.id = Some(vec!["test_table".to_string()]);
assert!(namespace.table_exists(exists_req).await.is_ok());
}
#[tokio::test]
async fn test_declare_table_when_table_exists() {
use lance_namespace::models::DeclareTableRequest;
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(false)
.build()
.await
.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec!["test_table".to_string()]);
namespace
.create_table(create_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut declare_req = DeclareTableRequest::new();
declare_req.id = Some(vec!["test_table".to_string()]);
let result = namespace.declare_table(declare_req).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_deregister_table_v1_mode() {
use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(false)
.dir_listing_enabled(true)
.build()
.await
.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec!["test_table".to_string()]);
namespace
.create_table(create_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut exists_req = TableExistsRequest::new();
exists_req.id = Some(vec!["test_table".to_string()]);
assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
let mut deregister_req = DeregisterTableRequest::new();
deregister_req.id = Some(vec!["test_table".to_string()]);
let response = namespace.deregister_table(deregister_req).await.unwrap();
assert!(response.location.is_some());
let location = response.location.as_ref().unwrap();
assert!(location.contains("test_table"));
let result = namespace.table_exists(exists_req).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("deregistered"));
let dataset = Dataset::open(location).await;
assert!(dataset.is_ok(), "Physical table data should still exist");
}
#[tokio::test]
async fn test_deregister_table_v1_already_deregistered() {
use lance_namespace::models::DeregisterTableRequest;
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(false)
.dir_listing_enabled(true)
.build()
.await
.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec!["test_table".to_string()]);
namespace
.create_table(create_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut deregister_req = DeregisterTableRequest::new();
deregister_req.id = Some(vec!["test_table".to_string()]);
namespace
.deregister_table(deregister_req.clone())
.await
.unwrap();
let result = namespace.deregister_table(deregister_req).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("already deregistered")
);
}
#[tokio::test]
async fn test_list_tables_skips_deregistered_v1() {
use lance_namespace::models::DeregisterTableRequest;
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(false)
.dir_listing_enabled(true)
.build()
.await
.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req1 = CreateTableRequest::new();
create_req1.id = Some(vec!["table1".to_string()]);
namespace
.create_table(create_req1, bytes::Bytes::from(ipc_data.clone()))
.await
.unwrap();
let mut create_req2 = CreateTableRequest::new();
create_req2.id = Some(vec!["table2".to_string()]);
namespace
.create_table(create_req2, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut list_req = ListTablesRequest::new();
list_req.id = Some(vec![]);
let list_response = namespace.list_tables(list_req.clone()).await.unwrap();
assert_eq!(list_response.tables.len(), 2);
let mut deregister_req = DeregisterTableRequest::new();
deregister_req.id = Some(vec!["table1".to_string()]);
namespace.deregister_table(deregister_req).await.unwrap();
let list_response = namespace.list_tables(list_req).await.unwrap();
assert_eq!(list_response.tables.len(), 1);
assert!(list_response.tables.contains(&"table2".to_string()));
assert!(!list_response.tables.contains(&"table1".to_string()));
}
#[tokio::test]
async fn test_describe_table_fails_for_deregistered_v1() {
use lance_namespace::models::{DeregisterTableRequest, DescribeTableRequest};
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(false)
.dir_listing_enabled(true)
.build()
.await
.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec!["test_table".to_string()]);
namespace
.create_table(create_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut describe_req = DescribeTableRequest::new();
describe_req.id = Some(vec!["test_table".to_string()]);
assert!(namespace.describe_table(describe_req.clone()).await.is_ok());
let mut deregister_req = DeregisterTableRequest::new();
deregister_req.id = Some(vec!["test_table".to_string()]);
namespace.deregister_table(deregister_req).await.unwrap();
let result = namespace.describe_table(describe_req).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("deregistered"));
}
#[tokio::test]
async fn test_table_exists_fails_for_deregistered_v1() {
use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(false)
.dir_listing_enabled(true)
.build()
.await
.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec!["test_table".to_string()]);
namespace
.create_table(create_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut exists_req = TableExistsRequest::new();
exists_req.id = Some(vec!["test_table".to_string()]);
assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
let mut deregister_req = DeregisterTableRequest::new();
deregister_req.id = Some(vec!["test_table".to_string()]);
namespace.deregister_table(deregister_req).await.unwrap();
let result = namespace.table_exists(exists_req).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("deregistered"));
}
#[tokio::test]
async fn test_atomic_table_status_check() {
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(false)
.dir_listing_enabled(true)
.build()
.await
.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec!["test_table".to_string()]);
namespace
.create_table(create_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let status = namespace.check_table_status("test_table").await;
assert!(status.exists);
assert!(!status.is_deregistered);
assert!(!status.has_reserved_file);
}
#[tokio::test]
async fn test_table_version_tracking_enabled_managed_versioning() {
use lance_namespace::models::DescribeTableRequest;
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.table_version_tracking_enabled(true)
.build()
.await
.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec!["test_table".to_string()]);
namespace
.create_table(create_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut describe_req = DescribeTableRequest::new();
describe_req.id = Some(vec!["test_table".to_string()]);
let describe_resp = namespace.describe_table(describe_req).await.unwrap();
assert_eq!(
describe_resp.managed_versioning,
Some(true),
"managed_versioning should be true when table_version_tracking_enabled=true"
);
}
#[tokio::test]
async fn test_table_version_tracking_disabled_no_managed_versioning() {
use lance_namespace::models::DescribeTableRequest;
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.table_version_tracking_enabled(false)
.build()
.await
.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec!["test_table".to_string()]);
namespace
.create_table(create_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut describe_req = DescribeTableRequest::new();
describe_req.id = Some(vec!["test_table".to_string()]);
let describe_resp = namespace.describe_table(describe_req).await.unwrap();
assert!(
describe_resp.managed_versioning.is_none(),
"managed_versioning should be None when table_version_tracking_enabled=false, got: {:?}",
describe_resp.managed_versioning
);
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_list_table_versions() {
use arrow::array::{Int32Array, RecordBatchIterator};
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use arrow::record_batch::RecordBatch;
use lance::dataset::{Dataset, WriteMode, WriteParams};
use lance_namespace::models::{CreateNamespaceRequest, ListTableVersionsRequest};
let temp_dir = TempStrDir::default();
let temp_path: &str = &temp_dir;
let namespace: Arc<dyn LanceNamespace> = Arc::new(
DirectoryNamespaceBuilder::new(temp_path)
.table_version_tracking_enabled(true)
.build()
.await
.unwrap(),
);
let mut create_ns_req = CreateNamespaceRequest::new();
create_ns_req.id = Some(vec!["workspace".to_string()]);
namespace.create_namespace(create_ns_req).await.unwrap();
let table_id = vec!["workspace".to_string(), "test_table".to_string()];
let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"id",
DataType::Int32,
false,
)]));
let batch = RecordBatch::try_new(
arrow_schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let batches = RecordBatchIterator::new(vec![Ok(batch.clone())], arrow_schema.clone());
let write_params = WriteParams {
mode: WriteMode::Create,
..Default::default()
};
let mut dataset = Dataset::write_into_namespace(
batches,
namespace.clone(),
table_id.clone(),
Some(write_params),
)
.await
.unwrap();
let batch2 = RecordBatch::try_new(
arrow_schema.clone(),
vec![Arc::new(Int32Array::from(vec![100, 200]))],
)
.unwrap();
let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema.clone());
dataset.append(batches, None).await.unwrap();
let batch3 = RecordBatch::try_new(
arrow_schema.clone(),
vec![Arc::new(Int32Array::from(vec![300, 400]))],
)
.unwrap();
let batches = RecordBatchIterator::new(vec![Ok(batch3)], arrow_schema);
dataset.append(batches, None).await.unwrap();
let mut list_req = ListTableVersionsRequest::new();
list_req.id = Some(table_id.clone());
let list_resp = namespace.list_table_versions(list_req).await.unwrap();
assert_eq!(
list_resp.versions.len(),
3,
"Should have 3 versions, got: {:?}",
list_resp.versions
);
for expected_version in 1..=3 {
let version = list_resp
.versions
.iter()
.find(|v| v.version == expected_version)
.unwrap_or_else(|| panic!("Expected version {}", expected_version));
assert!(
!version.manifest_path.is_empty(),
"manifest_path should be set for version {}",
expected_version
);
assert!(
version.manifest_path.contains(".manifest"),
"manifest_path should contain .manifest for version {}",
expected_version
);
assert!(
version.manifest_size.is_some(),
"manifest_size should be set for version {}",
expected_version
);
assert!(
version.manifest_size.unwrap() > 0,
"manifest_size should be > 0 for version {}",
expected_version
);
assert!(
version.timestamp_millis.is_some(),
"timestamp_millis should be set for version {}",
expected_version
);
}
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_describe_table_version() {
use arrow::array::{Int32Array, RecordBatchIterator};
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use arrow::record_batch::RecordBatch;
use lance::dataset::{Dataset, WriteMode, WriteParams};
use lance_namespace::models::{CreateNamespaceRequest, DescribeTableVersionRequest};
let temp_dir = TempStrDir::default();
let temp_path: &str = &temp_dir;
let namespace: Arc<dyn LanceNamespace> = Arc::new(
DirectoryNamespaceBuilder::new(temp_path)
.table_version_tracking_enabled(true)
.build()
.await
.unwrap(),
);
let mut create_ns_req = CreateNamespaceRequest::new();
create_ns_req.id = Some(vec!["workspace".to_string()]);
namespace.create_namespace(create_ns_req).await.unwrap();
let table_id = vec!["workspace".to_string(), "test_table".to_string()];
let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"id",
DataType::Int32,
false,
)]));
let batch = RecordBatch::try_new(
arrow_schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let batches = RecordBatchIterator::new(vec![Ok(batch)], arrow_schema.clone());
let write_params = WriteParams {
mode: WriteMode::Create,
..Default::default()
};
let mut dataset = Dataset::write_into_namespace(
batches,
namespace.clone(),
table_id.clone(),
Some(write_params),
)
.await
.unwrap();
let batch2 = RecordBatch::try_new(
arrow_schema.clone(),
vec![Arc::new(Int32Array::from(vec![100, 200]))],
)
.unwrap();
let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema);
dataset.append(batches, None).await.unwrap();
let mut describe_req = DescribeTableVersionRequest::new();
describe_req.id = Some(table_id.clone());
describe_req.version = Some(1);
let describe_resp = namespace
.describe_table_version(describe_req)
.await
.unwrap();
let version = &describe_resp.version;
assert_eq!(version.version, 1);
assert!(version.timestamp_millis.is_some());
assert!(
!version.manifest_path.is_empty(),
"manifest_path should be set"
);
assert!(
version.manifest_path.contains(".manifest"),
"manifest_path should contain .manifest"
);
assert!(
version.manifest_size.is_some(),
"manifest_size should be set"
);
assert!(
version.manifest_size.unwrap() > 0,
"manifest_size should be > 0"
);
let mut describe_req = DescribeTableVersionRequest::new();
describe_req.id = Some(table_id.clone());
describe_req.version = Some(2);
let describe_resp = namespace
.describe_table_version(describe_req)
.await
.unwrap();
let version = &describe_resp.version;
assert_eq!(version.version, 2);
assert!(version.timestamp_millis.is_some());
assert!(
!version.manifest_path.is_empty(),
"manifest_path should be set"
);
assert!(
version.manifest_size.is_some(),
"manifest_size should be set"
);
assert!(
version.manifest_size.unwrap() > 0,
"manifest_size should be > 0"
);
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_describe_table_version_latest() {
use arrow::array::{Int32Array, RecordBatchIterator};
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use arrow::record_batch::RecordBatch;
use lance::dataset::{Dataset, WriteMode, WriteParams};
use lance_namespace::models::{CreateNamespaceRequest, DescribeTableVersionRequest};
let temp_dir = TempStrDir::default();
let temp_path: &str = &temp_dir;
let namespace: Arc<dyn LanceNamespace> = Arc::new(
DirectoryNamespaceBuilder::new(temp_path)
.table_version_tracking_enabled(true)
.build()
.await
.unwrap(),
);
let mut create_ns_req = CreateNamespaceRequest::new();
create_ns_req.id = Some(vec!["workspace".to_string()]);
namespace.create_namespace(create_ns_req).await.unwrap();
let table_id = vec!["workspace".to_string(), "test_table".to_string()];
let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"id",
DataType::Int32,
false,
)]));
let batch = RecordBatch::try_new(
arrow_schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let batches = RecordBatchIterator::new(vec![Ok(batch)], arrow_schema.clone());
let write_params = WriteParams {
mode: WriteMode::Create,
..Default::default()
};
let mut dataset = Dataset::write_into_namespace(
batches,
namespace.clone(),
table_id.clone(),
Some(write_params),
)
.await
.unwrap();
let batch2 = RecordBatch::try_new(
arrow_schema.clone(),
vec![Arc::new(Int32Array::from(vec![100, 200]))],
)
.unwrap();
let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema.clone());
dataset.append(batches, None).await.unwrap();
let batch3 = RecordBatch::try_new(
arrow_schema.clone(),
vec![Arc::new(Int32Array::from(vec![300, 400]))],
)
.unwrap();
let batches = RecordBatchIterator::new(vec![Ok(batch3)], arrow_schema);
dataset.append(batches, None).await.unwrap();
let mut describe_req = DescribeTableVersionRequest::new();
describe_req.id = Some(table_id.clone());
describe_req.version = None;
let describe_resp = namespace
.describe_table_version(describe_req)
.await
.unwrap();
assert_eq!(describe_resp.version.version, 3);
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_create_table_version() {
use futures::TryStreamExt;
use lance::dataset::builder::DatasetBuilder;
use lance_namespace::models::CreateTableVersionRequest;
let temp_dir = TempStrDir::default();
let temp_path: &str = &temp_dir;
let namespace: Arc<dyn LanceNamespace> = Arc::new(
DirectoryNamespaceBuilder::new(temp_path)
.table_version_tracking_enabled(true)
.build()
.await
.unwrap(),
);
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec!["test_table".to_string()]);
namespace
.create_table(create_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let table_id = vec!["test_table".to_string()];
let dataset = DatasetBuilder::from_namespace(namespace.clone(), table_id.clone())
.await
.unwrap()
.load()
.await
.unwrap();
let versions_path = dataset.versions_dir();
let manifest_metas: Vec<_> = dataset
.object_store()
.inner
.list(Some(&versions_path))
.try_collect()
.await
.unwrap();
let manifest_meta = manifest_metas
.iter()
.find(|m| {
m.location
.filename()
.map(|f| f.ends_with(".manifest"))
.unwrap_or(false)
})
.expect("No manifest file found");
let manifest_data = dataset
.object_store()
.inner
.get(&manifest_meta.location)
.await
.unwrap()
.bytes()
.await
.unwrap();
let staging_path = dataset.versions_dir().child("staging_manifest");
dataset
.object_store()
.inner
.put(&staging_path, manifest_data.into())
.await
.unwrap();
let mut create_version_req = CreateTableVersionRequest::new(2, staging_path.to_string());
create_version_req.id = Some(table_id.clone());
create_version_req.naming_scheme = Some("V2".to_string());
let result = namespace.create_table_version(create_version_req).await;
assert!(
result.is_ok(),
"create_table_version should succeed: {:?}",
result
);
let response = result.unwrap();
let version_info = response
.version
.expect("response should contain version info");
let version_2_path = Path::from(version_info.manifest_path);
let head_result = dataset.object_store().inner.head(&version_2_path).await;
assert!(
head_result.is_ok(),
"Version 2 manifest should exist at {}",
version_2_path
);
let staging_head_result = dataset.object_store().inner.head(&staging_path).await;
assert!(
staging_head_result.is_err(),
"Staging manifest should have been deleted after create_table_version"
);
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_create_table_version_conflict() {
use futures::TryStreamExt;
use lance::dataset::builder::DatasetBuilder;
use lance_namespace::models::CreateTableVersionRequest;
let temp_dir = TempStrDir::default();
let temp_path: &str = &temp_dir;
let namespace: Arc<dyn LanceNamespace> = Arc::new(
DirectoryNamespaceBuilder::new(temp_path)
.table_version_tracking_enabled(true)
.build()
.await
.unwrap(),
);
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec!["test_table".to_string()]);
namespace
.create_table(create_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let table_id = vec!["test_table".to_string()];
let dataset = DatasetBuilder::from_namespace(namespace.clone(), table_id.clone())
.await
.unwrap()
.load()
.await
.unwrap();
let versions_path = dataset.versions_dir();
let manifest_metas: Vec<_> = dataset
.object_store()
.inner
.list(Some(&versions_path))
.try_collect()
.await
.unwrap();
let manifest_meta = manifest_metas
.iter()
.find(|m| {
m.location
.filename()
.map(|f| f.ends_with(".manifest"))
.unwrap_or(false)
})
.expect("No manifest file found");
let manifest_data = dataset
.object_store()
.inner
.get(&manifest_meta.location)
.await
.unwrap()
.bytes()
.await
.unwrap();
let staging_path = dataset.versions_dir().child("staging_manifest");
dataset
.object_store()
.inner
.put(&staging_path, manifest_data.into())
.await
.unwrap();
let mut create_version_req = CreateTableVersionRequest::new(2, staging_path.to_string());
create_version_req.id = Some(table_id.clone());
create_version_req.naming_scheme = Some("V2".to_string());
let first_result = namespace.create_table_version(create_version_req).await;
assert!(
first_result.is_ok(),
"First create_table_version for version 2 should succeed: {:?}",
first_result
);
let version_2_path = Path::from(
first_result
.unwrap()
.version
.expect("response should contain version info")
.manifest_path,
);
let mut create_version_req = CreateTableVersionRequest::new(2, staging_path.to_string());
create_version_req.id = Some(table_id.clone());
create_version_req.naming_scheme = Some("V2".to_string());
let result = namespace.create_table_version(create_version_req).await;
assert!(
result.is_err(),
"create_table_version should fail for existing version"
);
let head_result = dataset.object_store().inner.head(&version_2_path).await;
assert!(
head_result.is_ok(),
"Version 2 manifest should still exist at {}",
version_2_path
);
}
#[tokio::test]
async fn test_create_table_version_table_not_found() {
use lance_namespace::models::CreateTableVersionRequest;
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let namespace = DirectoryNamespaceBuilder::new(temp_path)
.table_version_tracking_enabled(true)
.build()
.await
.unwrap();
let mut create_version_req =
CreateTableVersionRequest::new(1, "/some/staging/path".to_string());
create_version_req.id = Some(vec!["non_existent_table".to_string()]);
let result = namespace.create_table_version(create_version_req).await;
assert!(
result.is_err(),
"create_table_version should fail for non-existent table"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("does not exist"),
"Error should mention table does not exist, got: {}",
err_msg
);
}
mod e2e_table_version_tracking {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
struct TrackingNamespace {
inner: DirectoryNamespace,
create_table_version_count: AtomicUsize,
describe_table_version_count: AtomicUsize,
list_table_versions_count: AtomicUsize,
}
impl TrackingNamespace {
fn new(inner: DirectoryNamespace) -> Self {
Self {
inner,
create_table_version_count: AtomicUsize::new(0),
describe_table_version_count: AtomicUsize::new(0),
list_table_versions_count: AtomicUsize::new(0),
}
}
fn create_table_version_calls(&self) -> usize {
self.create_table_version_count.load(Ordering::SeqCst)
}
fn describe_table_version_calls(&self) -> usize {
self.describe_table_version_count.load(Ordering::SeqCst)
}
fn list_table_versions_calls(&self) -> usize {
self.list_table_versions_count.load(Ordering::SeqCst)
}
}
impl std::fmt::Debug for TrackingNamespace {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TrackingNamespace")
.field(
"create_table_version_calls",
&self.create_table_version_calls(),
)
.finish()
}
}
#[async_trait]
impl LanceNamespace for TrackingNamespace {
async fn create_namespace(
&self,
request: CreateNamespaceRequest,
) -> Result<CreateNamespaceResponse> {
self.inner.create_namespace(request).await
}
async fn describe_namespace(
&self,
request: DescribeNamespaceRequest,
) -> Result<DescribeNamespaceResponse> {
self.inner.describe_namespace(request).await
}
async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
self.inner.namespace_exists(request).await
}
async fn list_namespaces(
&self,
request: ListNamespacesRequest,
) -> Result<ListNamespacesResponse> {
self.inner.list_namespaces(request).await
}
async fn drop_namespace(
&self,
request: DropNamespaceRequest,
) -> Result<DropNamespaceResponse> {
self.inner.drop_namespace(request).await
}
async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
self.inner.list_tables(request).await
}
async fn describe_table(
&self,
request: DescribeTableRequest,
) -> Result<DescribeTableResponse> {
self.inner.describe_table(request).await
}
async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
self.inner.table_exists(request).await
}
async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
self.inner.drop_table(request).await
}
async fn create_table(
&self,
request: CreateTableRequest,
request_data: Bytes,
) -> Result<CreateTableResponse> {
self.inner.create_table(request, request_data).await
}
async fn declare_table(
&self,
request: DeclareTableRequest,
) -> Result<DeclareTableResponse> {
self.inner.declare_table(request).await
}
async fn list_table_versions(
&self,
request: ListTableVersionsRequest,
) -> Result<ListTableVersionsResponse> {
self.list_table_versions_count
.fetch_add(1, Ordering::SeqCst);
self.inner.list_table_versions(request).await
}
async fn create_table_version(
&self,
request: CreateTableVersionRequest,
) -> Result<CreateTableVersionResponse> {
self.create_table_version_count
.fetch_add(1, Ordering::SeqCst);
self.inner.create_table_version(request).await
}
async fn describe_table_version(
&self,
request: DescribeTableVersionRequest,
) -> Result<DescribeTableVersionResponse> {
self.describe_table_version_count
.fetch_add(1, Ordering::SeqCst);
self.inner.describe_table_version(request).await
}
async fn batch_delete_table_versions(
&self,
request: BatchDeleteTableVersionsRequest,
) -> Result<BatchDeleteTableVersionsResponse> {
self.inner.batch_delete_table_versions(request).await
}
fn namespace_id(&self) -> String {
self.inner.namespace_id()
}
}
#[tokio::test]
async fn test_describe_table_returns_managed_versioning() {
use lance_namespace::models::{CreateNamespaceRequest, DescribeTableRequest};
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let ns = DirectoryNamespaceBuilder::new(temp_path)
.table_version_tracking_enabled(true)
.manifest_enabled(true)
.build()
.await
.unwrap();
let mut create_ns_req = CreateNamespaceRequest::new();
create_ns_req.id = Some(vec!["workspace".to_string()]);
ns.create_namespace(create_ns_req).await.unwrap();
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec!["workspace".to_string(), "test_table".to_string()]);
ns.create_table(create_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let mut describe_req = DescribeTableRequest::new();
describe_req.id = Some(vec!["workspace".to_string(), "test_table".to_string()]);
let describe_resp = ns.describe_table(describe_req).await.unwrap();
assert_eq!(
describe_resp.managed_versioning,
Some(true),
"managed_versioning should be true when table_version_tracking_enabled=true"
);
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_external_manifest_store_invokes_namespace_apis() {
use arrow::array::{Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use arrow::record_batch::RecordBatch;
use lance::Dataset;
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::{WriteMode, WriteParams};
use lance_namespace::models::CreateNamespaceRequest;
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let inner_ns = DirectoryNamespaceBuilder::new(temp_path)
.table_version_tracking_enabled(true)
.manifest_enabled(true)
.build()
.await
.unwrap();
let tracking_ns = Arc::new(TrackingNamespace::new(inner_ns));
let ns: Arc<dyn LanceNamespace> = tracking_ns.clone();
let mut create_ns_req = CreateNamespaceRequest::new();
create_ns_req.id = Some(vec!["workspace".to_string()]);
ns.create_namespace(create_ns_req).await.unwrap();
let table_id = vec!["workspace".to_string(), "test_table".to_string()];
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(
arrow_schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
],
)
.unwrap();
let batches = RecordBatchIterator::new(vec![Ok(batch.clone())], arrow_schema.clone());
let write_params = WriteParams {
mode: WriteMode::Create,
..Default::default()
};
let mut dataset = Dataset::write_into_namespace(
batches,
ns.clone(),
table_id.clone(),
Some(write_params),
)
.await
.unwrap();
assert_eq!(dataset.version().version, 1);
assert_eq!(
tracking_ns.create_table_version_calls(),
1,
"create_table_version should have been called once during initial write_into_namespace"
);
let append_batch = RecordBatch::try_new(
arrow_schema.clone(),
vec![
Arc::new(Int32Array::from(vec![4, 5, 6])),
Arc::new(StringArray::from(vec!["d", "e", "f"])),
],
)
.unwrap();
let append_batches = RecordBatchIterator::new(vec![Ok(append_batch)], arrow_schema);
dataset.append(append_batches, None).await.unwrap();
assert_eq!(
tracking_ns.create_table_version_calls(),
2,
"create_table_version should have been called twice (once for create, once for append)"
);
let initial_list_calls = tracking_ns.list_table_versions_calls();
let latest_dataset = DatasetBuilder::from_namespace(ns.clone(), table_id.clone())
.await
.unwrap()
.load()
.await
.unwrap();
assert_eq!(latest_dataset.version().version, 2);
assert_eq!(
tracking_ns.list_table_versions_calls(),
initial_list_calls + 1,
"list_table_versions should have been called exactly once during checkout_latest"
);
let initial_describe_calls = tracking_ns.describe_table_version_calls();
let v1_dataset = DatasetBuilder::from_namespace(ns.clone(), table_id.clone())
.await
.unwrap()
.with_version(1)
.load()
.await
.unwrap();
assert_eq!(v1_dataset.version().version, 1);
assert_eq!(
tracking_ns.describe_table_version_calls(),
initial_describe_calls + 1,
"describe_table_version should have been called exactly once during checkout to version 1"
);
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_dataset_commit_with_external_manifest_store() {
use arrow::array::{Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use arrow::record_batch::RecordBatch;
use futures::TryStreamExt;
use lance::dataset::{Dataset, WriteMode, WriteParams};
use lance_namespace::models::CreateNamespaceRequest;
use lance_table::io::commit::ManifestNamingScheme;
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let inner_ns = DirectoryNamespaceBuilder::new(temp_path)
.table_version_tracking_enabled(true)
.manifest_enabled(true)
.build()
.await
.unwrap();
let tracking_ns: Arc<dyn LanceNamespace> = Arc::new(TrackingNamespace::new(inner_ns));
let mut create_ns_req = CreateNamespaceRequest::new();
create_ns_req.id = Some(vec!["workspace".to_string()]);
tracking_ns.create_namespace(create_ns_req).await.unwrap();
let table_id = vec!["workspace".to_string(), "test_table".to_string()];
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(
arrow_schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
],
)
.unwrap();
let batches = RecordBatchIterator::new(vec![Ok(batch)], arrow_schema.clone());
let write_params = WriteParams {
mode: WriteMode::Create,
..Default::default()
};
let dataset = Dataset::write_into_namespace(
batches,
tracking_ns.clone(),
table_id.clone(),
Some(write_params),
)
.await
.unwrap();
assert_eq!(dataset.version().version, 1);
let batch2 = RecordBatch::try_new(
arrow_schema.clone(),
vec![
Arc::new(Int32Array::from(vec![4, 5, 6])),
Arc::new(StringArray::from(vec!["d", "e", "f"])),
],
)
.unwrap();
let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema);
let write_params = WriteParams {
mode: WriteMode::Append,
..Default::default()
};
Dataset::write_into_namespace(
batches,
tracking_ns.clone(),
table_id.clone(),
Some(write_params),
)
.await
.unwrap();
let manifest_metas: Vec<_> = dataset
.object_store()
.inner
.list(Some(&dataset.versions_dir()))
.try_collect()
.await
.unwrap();
let version_2_found = manifest_metas.iter().any(|m| {
m.location
.filename()
.map(|f| {
f.ends_with(".manifest")
&& ManifestNamingScheme::V2.parse_version(f) == Some(2)
})
.unwrap_or(false)
});
assert!(
version_2_found,
"Version 2 manifest should exist in versions directory"
);
}
}
mod multi_table_transactions {
use super::*;
use futures::TryStreamExt;
use lance::dataset::builder::DatasetBuilder;
use lance_namespace::models::CreateTableVersionRequest;
async fn create_managed_namespace(temp_path: &str) -> Arc<DirectoryNamespace> {
Arc::new(
DirectoryNamespaceBuilder::new(temp_path)
.table_version_tracking_enabled(true)
.table_version_storage_enabled(true)
.manifest_enabled(true)
.build()
.await
.unwrap(),
)
}
async fn create_table_and_get_staging(
namespace: Arc<dyn LanceNamespace>,
table_name: &str,
) -> (Vec<String>, object_store::path::Path) {
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);
let mut create_req = CreateTableRequest::new();
create_req.id = Some(vec![table_name.to_string()]);
namespace
.create_table(create_req, bytes::Bytes::from(ipc_data))
.await
.unwrap();
let table_id = vec![table_name.to_string()];
let dataset = DatasetBuilder::from_namespace(namespace.clone(), table_id.clone())
.await
.unwrap()
.load()
.await
.unwrap();
let versions_path = dataset.versions_dir();
let manifest_metas: Vec<_> = dataset
.object_store()
.inner
.list(Some(&versions_path))
.try_collect()
.await
.unwrap();
let manifest_meta = manifest_metas
.iter()
.find(|m| {
m.location
.filename()
.map(|f| f.ends_with(".manifest"))
.unwrap_or(false)
})
.expect("No manifest file found");
let manifest_data = dataset
.object_store()
.inner
.get(&manifest_meta.location)
.await
.unwrap()
.bytes()
.await
.unwrap();
let staging_path = dataset
.versions_dir()
.child(format!("staging_{}", table_name));
dataset
.object_store()
.inner
.put(&staging_path, manifest_data.into())
.await
.unwrap();
(table_id, staging_path)
}
#[tokio::test]
async fn test_table_version_storage_enabled_requires_manifest() {
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
let result = DirectoryNamespaceBuilder::new(temp_path)
.table_version_storage_enabled(true)
.manifest_enabled(false)
.build()
.await;
assert!(
result.is_err(),
"Should fail when table_version_storage_enabled=true but manifest_enabled=false"
);
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_create_table_version_records_in_manifest() {
let temp_dir = TempStrDir::default();
let temp_path: &str = &temp_dir;
let namespace = create_managed_namespace(temp_path).await;
let ns: Arc<dyn LanceNamespace> = namespace.clone();
let (table_id, staging_path) =
create_table_and_get_staging(ns.clone(), "table_managed").await;
let mut create_req = CreateTableVersionRequest::new(2, staging_path.to_string());
create_req.id = Some(table_id.clone());
create_req.naming_scheme = Some("V2".to_string());
let response = namespace.create_table_version(create_req).await.unwrap();
assert!(response.version.is_some());
let version = response.version.unwrap();
assert_eq!(version.version, 2);
let manifest_ns = namespace.manifest_ns.as_ref().unwrap();
let table_id_str = manifest::ManifestNamespace::str_object_id(&table_id);
let versions = manifest_ns
.query_table_versions(&table_id_str, false, None)
.await
.unwrap();
assert!(
!versions.is_empty(),
"Version should be recorded in __manifest"
);
let (ver, _path) = &versions[0];
assert_eq!(*ver, 2, "Recorded version should be 2");
}
}
}