use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use http::StatusCode;
use lance_io::object_store::StorageOptions;
use moka::future::Cache;
use reqwest::header::CONTENT_TYPE;
use lance_namespace::models::{
CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest,
DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
};
use crate::Error;
use crate::database::{
CloneTableRequest, CreateTableMode, CreateTableRequest, Database, DatabaseOptions,
OpenTableRequest, ReadConsistency, TableNamesRequest,
};
use crate::error::Result;
use crate::remote::util::stream_as_body;
use crate::table::BaseTable;
use super::ARROW_STREAM_CONTENT_TYPE;
use super::client::{ClientConfig, HttpSend, RequestResultExt, RestfulLanceDbClient, Sender};
use super::table::RemoteTable;
use super::util::parse_server_version;
#[derive(serde::Serialize)]
struct RemoteCloneTableRequest {
source_location: String,
#[serde(skip_serializing_if = "Option::is_none")]
source_version: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
source_tag: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
is_shallow: Option<bool>,
}
pub const DEFAULT_SERVER_VERSION: semver::Version = semver::Version::new(0, 1, 0);
#[derive(Debug, Clone)]
pub struct ServerVersion(pub semver::Version);
impl Default for ServerVersion {
fn default() -> Self {
Self(DEFAULT_SERVER_VERSION.clone())
}
}
impl ServerVersion {
pub fn parse(version: &str) -> Result<Self> {
let version = Self(
semver::Version::parse(version).map_err(|e| Error::InvalidInput {
message: e.to_string(),
})?,
);
Ok(version)
}
pub fn support_multivector(&self) -> bool {
self.0 >= semver::Version::new(0, 2, 0)
}
pub fn support_structural_fts(&self) -> bool {
self.0 >= semver::Version::new(0, 3, 0)
}
}
pub const OPT_REMOTE_PREFIX: &str = "remote_database_";
pub const OPT_REMOTE_API_KEY: &str = "remote_database_api_key";
pub const OPT_REMOTE_REGION: &str = "remote_database_region";
pub const OPT_REMOTE_HOST_OVERRIDE: &str = "remote_database_host_override";
#[derive(Clone, Debug, Default)]
pub struct RemoteDatabaseOptions {
pub api_key: Option<String>,
pub region: Option<String>,
pub host_override: Option<String>,
pub storage_options: HashMap<String, String>,
}
impl RemoteDatabaseOptions {
pub fn builder() -> RemoteDatabaseOptionsBuilder {
RemoteDatabaseOptionsBuilder::new()
}
pub(crate) fn parse_from_map(map: &HashMap<String, String>) -> Result<Self> {
let api_key = map.get(OPT_REMOTE_API_KEY).cloned();
let region = map.get(OPT_REMOTE_REGION).cloned();
let host_override = map.get(OPT_REMOTE_HOST_OVERRIDE).cloned();
let storage_options = map
.iter()
.filter(|(key, _)| !key.starts_with(OPT_REMOTE_PREFIX))
.map(|(key, value)| (key.clone(), value.clone()))
.collect();
Ok(Self {
api_key,
region,
host_override,
storage_options,
})
}
}
impl DatabaseOptions for RemoteDatabaseOptions {
fn serialize_into_map(&self, map: &mut HashMap<String, String>) {
for (key, value) in &self.storage_options {
map.insert(key.clone(), value.clone());
}
if let Some(api_key) = &self.api_key {
map.insert(OPT_REMOTE_API_KEY.to_string(), api_key.clone());
}
if let Some(region) = &self.region {
map.insert(OPT_REMOTE_REGION.to_string(), region.clone());
}
if let Some(host_override) = &self.host_override {
map.insert(OPT_REMOTE_HOST_OVERRIDE.to_string(), host_override.clone());
}
}
}
#[derive(Clone, Debug, Default)]
pub struct RemoteDatabaseOptionsBuilder {
options: RemoteDatabaseOptions,
}
impl RemoteDatabaseOptionsBuilder {
pub fn new() -> Self {
Self {
options: RemoteDatabaseOptions::default(),
}
}
pub fn api_key(mut self, api_key: String) -> Self {
self.options.api_key = Some(api_key);
self
}
pub fn region(mut self, region: String) -> Self {
self.options.region = Some(region);
self
}
pub fn host_override(mut self, host_override: String) -> Self {
self.options.host_override = Some(host_override);
self
}
}
#[derive(Debug)]
pub struct RemoteDatabase<S: HttpSend = Sender> {
client: RestfulLanceDbClient<S>,
table_cache: Cache<String, Arc<RemoteTable<S>>>,
uri: String,
namespace_headers: HashMap<String, String>,
tls_config: Option<super::client::TlsConfig>,
}
impl RemoteDatabase {
pub fn try_new(
uri: &str,
api_key: &str,
region: &str,
host_override: Option<String>,
client_config: ClientConfig,
options: RemoteOptions,
) -> Result<Self> {
let parsed = super::client::parse_db_url(uri)?;
let header_map = RestfulLanceDbClient::<Sender>::default_headers(
api_key,
region,
&parsed.db_name,
host_override.is_some(),
&options,
parsed.db_prefix.as_deref(),
&client_config,
)?;
let namespace_headers: HashMap<String, String> = header_map
.iter()
.filter_map(|(k, v)| {
v.to_str()
.ok()
.map(|val| (k.as_str().to_string(), val.to_string()))
})
.collect();
let client = RestfulLanceDbClient::try_new(
&parsed,
region,
host_override,
header_map,
client_config.clone(),
)?;
let table_cache = Cache::builder()
.time_to_live(std::time::Duration::from_secs(300))
.max_capacity(10_000)
.build();
Ok(Self {
client,
table_cache,
uri: uri.to_owned(),
namespace_headers,
tls_config: client_config.tls_config,
})
}
}
#[cfg(all(test, feature = "remote"))]
mod test_utils {
use super::*;
use crate::remote::ClientConfig;
use crate::remote::client::test_utils::MockSender;
use crate::remote::client::test_utils::{client_with_handler, client_with_handler_and_config};
impl RemoteDatabase<MockSender> {
pub fn new_mock<F, T>(handler: F) -> Self
where
F: Fn(reqwest::Request) -> http::Response<T> + Send + Sync + 'static,
T: Into<reqwest::Body>,
{
let client = client_with_handler(handler);
Self {
client,
table_cache: Cache::new(0),
uri: "http://localhost".to_string(),
namespace_headers: HashMap::new(),
tls_config: None,
}
}
pub fn new_mock_with_config<F, T>(handler: F, config: ClientConfig) -> Self
where
F: Fn(reqwest::Request) -> http::Response<T> + Send + Sync + 'static,
T: Into<reqwest::Body>,
{
let client = client_with_handler_and_config(handler, config.clone());
Self {
client,
table_cache: Cache::new(0),
uri: "http://localhost".to_string(),
namespace_headers: config.extra_headers.clone(),
tls_config: config.tls_config.clone(),
}
}
}
}
impl<S: HttpSend> std::fmt::Display for RemoteDatabase<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "RemoteDatabase(host={})", self.client.host())
}
}
impl From<&CreateTableMode> for &'static str {
fn from(val: &CreateTableMode) -> Self {
match val {
CreateTableMode::Create => "create",
CreateTableMode::Overwrite => "overwrite",
CreateTableMode::ExistOk(_) => "exist_ok",
}
}
}
fn build_table_identifier(name: &str, namespace: &[String], delimiter: &str) -> String {
if !namespace.is_empty() {
let mut parts = namespace.to_vec();
parts.push(name.to_string());
parts.join(delimiter)
} else {
name.to_string()
}
}
fn build_namespace_identifier(namespace: &[String], delimiter: &str) -> String {
if namespace.is_empty() {
delimiter.to_string()
} else {
namespace.join(delimiter)
}
}
fn build_cache_key(name: &str, namespace: &[String]) -> String {
let mut key = Vec::new();
for ns in namespace {
let bytes = ns.as_bytes();
key.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
key.extend_from_slice(bytes);
}
let name_bytes = name.as_bytes();
key.extend_from_slice(&(name_bytes.len() as u32).to_le_bytes());
key.extend_from_slice(name_bytes);
key.iter().map(|b| format!("{:02x}", b)).collect()
}
#[async_trait]
impl<S: HttpSend> Database for RemoteDatabase<S> {
fn uri(&self) -> &str {
&self.uri
}
async fn read_consistency(&self) -> Result<ReadConsistency> {
Err(Error::NotSupported {
message: "Getting the read consistency of a remote database is not yet supported"
.to_string(),
})
}
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
let mut req = if !request.namespace.is_empty() {
let namespace_id =
build_namespace_identifier(&request.namespace, &self.client.id_delimiter);
self.client
.get(&format!("/v1/namespace/{}/table/list", namespace_id))
} else {
self.client.get("/v1/table/")
};
if let Some(limit) = request.limit {
req = req.query(&[("limit", limit)]);
}
if let Some(start_after) = request.start_after {
req = req.query(&[("page_token", start_after)]);
}
let (request_id, rsp) = self.client.send_with_retry(req, None, true).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let tables = rsp
.json::<ListTablesResponse>()
.await
.err_to_http(request_id)?
.tables;
for table in &tables {
let table_identifier =
build_table_identifier(table, &request.namespace, &self.client.id_delimiter);
let cache_key = build_cache_key(table, &request.namespace);
let remote_table = Arc::new(RemoteTable::new(
self.client.clone(),
table.clone(),
request.namespace.clone(),
table_identifier.clone(),
version.clone(),
));
self.table_cache.insert(cache_key, remote_table).await;
}
Ok(tables)
}
async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
let namespace_parts = request.id.as_deref().unwrap_or(&[]);
let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter);
let mut req = self
.client
.get(&format!("/v1/namespace/{}/table/list", namespace_id));
if let Some(limit) = request.limit {
req = req.query(&[("limit", limit)]);
}
if let Some(ref page_token) = request.page_token {
req = req.query(&[("page_token", page_token)]);
}
let (request_id, rsp) = self.client.send_with_retry(req, None, true).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let response: ListTablesResponse = rsp.json().await.err_to_http(request_id)?;
let namespace_vec = namespace_parts.to_vec();
for table in &response.tables {
let table_identifier =
build_table_identifier(table, &namespace_vec, &self.client.id_delimiter);
let cache_key = build_cache_key(table, &namespace_vec);
let remote_table = Arc::new(RemoteTable::new(
self.client.clone(),
table.clone(),
namespace_vec.clone(),
table_identifier.clone(),
version.clone(),
));
self.table_cache.insert(cache_key, remote_table).await;
}
Ok(response)
}
async fn create_table(&self, mut request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> {
let body = stream_as_body(request.data.scan_as_stream())?;
let identifier =
build_table_identifier(&request.name, &request.namespace, &self.client.id_delimiter);
let req = self
.client
.post(&format!("/v1/table/{}/create/", identifier))
.query(&[("mode", Into::<&str>::into(&request.mode))])
.body(body)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
let (request_id, rsp) = self.client.send(req).await?;
if rsp.status() == StatusCode::BAD_REQUEST {
let body = rsp.text().await.err_to_http(request_id.clone())?;
if body.contains("already exists") {
return match request.mode {
CreateTableMode::Create => {
Err(crate::Error::TableAlreadyExists { name: request.name })
}
CreateTableMode::ExistOk(callback) => {
let req = OpenTableRequest {
name: request.name.clone(),
namespace: request.namespace.clone(),
index_cache_size: None,
lance_read_params: None,
location: None,
namespace_client: None,
managed_versioning: None,
};
let req = (callback)(req);
self.open_table(req).await
}
CreateTableMode::Overwrite => Err(crate::Error::Http {
source: format!(
"unexpected response from server for create mode overwrite: {}",
body
)
.into(),
request_id,
status_code: Some(StatusCode::BAD_REQUEST),
}),
};
} else {
return Err(crate::Error::InvalidInput { message: body });
}
}
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let table_identifier =
build_table_identifier(&request.name, &request.namespace, &self.client.id_delimiter);
let cache_key = build_cache_key(&request.name, &request.namespace);
let table = Arc::new(RemoteTable::new(
self.client.clone(),
request.name.clone(),
request.namespace.clone(),
table_identifier,
version,
));
self.table_cache.insert(cache_key, table.clone()).await;
Ok(table)
}
async fn clone_table(&self, request: CloneTableRequest) -> Result<Arc<dyn BaseTable>> {
let table_identifier = build_table_identifier(
&request.target_table_name,
&request.target_namespace,
&self.client.id_delimiter,
);
let remote_request = RemoteCloneTableRequest {
source_location: request.source_uri,
source_version: request.source_version,
source_tag: request.source_tag,
is_shallow: Some(request.is_shallow),
};
let req = self
.client
.post(&format!("/v1/table/{}/clone", table_identifier.clone()))
.json(&remote_request);
let (request_id, rsp) = self.client.send(req).await?;
let status = rsp.status();
if status != StatusCode::OK {
let body = rsp.text().await.err_to_http(request_id.clone())?;
return Err(crate::Error::Http {
source: format!("Failed to clone table: {}", body).into(),
request_id,
status_code: Some(status),
});
}
let version = parse_server_version(&request_id, &rsp)?;
let cache_key = build_cache_key(&request.target_table_name, &request.target_namespace);
let table = Arc::new(RemoteTable::new(
self.client.clone(),
request.target_table_name.clone(),
request.target_namespace.clone(),
table_identifier,
version,
));
self.table_cache.insert(cache_key, table.clone()).await;
Ok(table)
}
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
let identifier =
build_table_identifier(&request.name, &request.namespace, &self.client.id_delimiter);
let cache_key = build_cache_key(&request.name, &request.namespace);
if let Some(table) = self.table_cache.get(&cache_key).await {
Ok(table.clone())
} else {
let req = self
.client
.post(&format!("/v1/table/{}/describe/", identifier));
let (request_id, rsp) = self.client.send_with_retry(req, None, true).await?;
let rsp =
RemoteTable::<S>::handle_table_not_found(&request.name, rsp, &request_id).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let table_identifier = build_table_identifier(
&request.name,
&request.namespace,
&self.client.id_delimiter,
);
let table = Arc::new(RemoteTable::new(
self.client.clone(),
request.name.clone(),
request.namespace.clone(),
table_identifier,
version,
));
let cache_key = build_cache_key(&request.name, &request.namespace);
self.table_cache.insert(cache_key, table.clone()).await;
Ok(table)
}
}
async fn rename_table(
&self,
current_name: &str,
new_name: &str,
cur_namespace: &[String],
new_namespace: &[String],
) -> Result<()> {
let current_identifier =
build_table_identifier(current_name, cur_namespace, &self.client.id_delimiter);
let current_cache_key = build_cache_key(current_name, cur_namespace);
let new_cache_key = build_cache_key(new_name, new_namespace);
let mut body = serde_json::json!({ "new_table_name": new_name });
if !new_namespace.is_empty() {
body["new_namespace"] = serde_json::Value::Array(
new_namespace
.iter()
.map(|s| serde_json::Value::String(s.clone()))
.collect(),
);
}
let req = self
.client
.post(&format!("/v1/table/{}/rename/", current_identifier))
.json(&body);
let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?;
let table = self.table_cache.remove(¤t_cache_key).await;
if let Some(table) = table {
self.table_cache.insert(new_cache_key, table).await;
}
Ok(())
}
async fn drop_table(&self, name: &str, namespace: &[String]) -> Result<()> {
let identifier = build_table_identifier(name, namespace, &self.client.id_delimiter);
let cache_key = build_cache_key(name, namespace);
let req = self.client.post(&format!("/v1/table/{}/drop/", identifier));
let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?;
self.table_cache.remove(&cache_key).await;
Ok(())
}
async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> {
let _namespace = namespace; Err(crate::Error::NotSupported {
message: "Dropping all tables is not currently supported in the remote API".to_string(),
})
}
async fn list_namespaces(
&self,
request: ListNamespacesRequest,
) -> Result<ListNamespacesResponse> {
let namespace_parts = request.id.as_deref().unwrap_or(&[]);
let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter);
let mut req = self
.client
.get(&format!("/v1/namespace/{}/list", namespace_id));
if let Some(limit) = request.limit {
req = req.query(&[("limit", limit)]);
}
if let Some(ref page_token) = request.page_token {
req = req.query(&[("page_token", page_token)]);
}
let (request_id, resp) = self.client.send(req).await?;
let resp = self.client.check_response(&request_id, resp).await?;
resp.json().await.err_to_http(request_id)
}
async fn create_namespace(
&self,
request: CreateNamespaceRequest,
) -> Result<CreateNamespaceResponse> {
let namespace_parts = request.id.as_deref().unwrap_or(&[]);
let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter);
let mut req = self
.client
.post(&format!("/v1/namespace/{}/create", namespace_id));
#[derive(serde::Serialize)]
struct CreateNamespaceRequestBody {
#[serde(skip_serializing_if = "Option::is_none")]
mode: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
properties: Option<HashMap<String, String>>,
}
let body = CreateNamespaceRequestBody {
mode: request.mode.as_ref().map(|m| format!("{:?}", m)),
properties: request.properties,
};
req = req.json(&body);
let (request_id, resp) = self.client.send(req).await?;
let resp = self.client.check_response(&request_id, resp).await?;
resp.json().await.err_to_http(request_id)
}
async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
let namespace_parts = request.id.as_deref().unwrap_or(&[]);
let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter);
let mut req = self
.client
.post(&format!("/v1/namespace/{}/drop", namespace_id));
#[derive(serde::Serialize)]
struct DropNamespaceRequestBody {
#[serde(skip_serializing_if = "Option::is_none")]
mode: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
behavior: Option<String>,
}
let body = DropNamespaceRequestBody {
mode: request.mode.as_ref().map(|m| format!("{:?}", m)),
behavior: request.behavior.as_ref().map(|b| format!("{:?}", b)),
};
req = req.json(&body);
let (request_id, resp) = self.client.send(req).await?;
let resp = self.client.check_response(&request_id, resp).await?;
resp.json().await.err_to_http(request_id)
}
async fn describe_namespace(
&self,
request: DescribeNamespaceRequest,
) -> Result<DescribeNamespaceResponse> {
let namespace_parts = request.id.as_deref().unwrap_or(&[]);
let namespace_id = build_namespace_identifier(namespace_parts, &self.client.id_delimiter);
let req = self
.client
.post(&format!("/v1/namespace/{}/describe", namespace_id))
.json(&DescribeNamespaceRequest::default());
let (request_id, resp) = self.client.send(req).await?;
let resp = self.client.check_response(&request_id, resp).await?;
resp.json().await.err_to_http(request_id)
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn namespace_client(&self) -> Result<Arc<dyn lance_namespace::LanceNamespace>> {
let mut builder = lance_namespace_impls::RestNamespaceBuilder::new(self.client.host())
.delimiter(&self.client.id_delimiter)
.headers(self.namespace_headers.clone());
if let Some(tls_config) = &self.tls_config {
if let Some(cert_file) = &tls_config.cert_file {
builder = builder.cert_file(cert_file);
}
if let Some(key_file) = &tls_config.key_file {
builder = builder.key_file(key_file);
}
if let Some(ssl_ca_cert) = &tls_config.ssl_ca_cert {
builder = builder.ssl_ca_cert(ssl_ca_cert);
}
builder = builder.assert_hostname(tls_config.assert_hostname);
}
let namespace = builder.build();
Ok(Arc::new(namespace) as Arc<dyn lance_namespace::LanceNamespace>)
}
}
#[derive(Clone, Debug, Default)]
pub struct RemoteOptions(pub HashMap<String, String>);
impl RemoteOptions {
pub fn new(options: HashMap<String, String>) -> Self {
Self(options)
}
}
impl From<StorageOptions> for RemoteOptions {
fn from(options: StorageOptions) -> Self {
let supported_opts = vec![
"account_name",
"azure_storage_account_name",
"azure_client_id",
"azure_tenant_id",
];
let mut filtered = HashMap::new();
for opt in supported_opts {
if let Some(v) = options.0.get(opt) {
filtered.insert(opt.to_string(), v.clone());
}
}
Self::new(filtered)
}
}
#[cfg(test)]
mod tests {
use super::build_cache_key;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use arrow_array::{Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use crate::connection::ConnectBuilder;
use crate::{
Connection, Error,
database::CreateTableMode,
remote::{ARROW_STREAM_CONTENT_TYPE, ClientConfig, HeaderProvider, JSON_CONTENT_TYPE},
};
#[test]
fn test_cache_key_security() {
let key1 = build_cache_key("table1", &["ns1".to_string(), "ns2".to_string()]);
let key2 = build_cache_key("table1", &["ns1$ns2".to_string()]);
assert_ne!(
key1, key2,
"Cache keys should differ for different namespace structures"
);
let key3 = build_cache_key("ns2$table1", &["ns1".to_string()]);
assert_ne!(
key1, key3,
"Cache key should be different when table name contains delimiter"
);
let key4 = build_cache_key("table1", &[]);
let key5 = build_cache_key("table1", &["".to_string()]);
assert_ne!(
key4, key5,
"Empty namespace should differ from namespace with empty string"
);
let key6 = build_cache_key("table1", &["ns1".to_string(), "ns2".to_string()]);
assert_eq!(key1, key6, "Same inputs should produce same cache key");
}
#[tokio::test]
async fn test_retries() {
let seen_request_id = Arc::new(OnceLock::new());
let seen_request_id_ref = seen_request_id.clone();
let conn = Connection::new_with_handler(move |request| {
let request_id = request.headers()["x-request-id"]
.to_str()
.unwrap()
.to_string();
let seen_id = seen_request_id_ref.get_or_init(|| request_id.clone());
assert_eq!(&request_id, seen_id);
http::Response::builder()
.status(500)
.body("internal server error")
.unwrap()
});
let result = conn.table_names().execute().await;
if let Err(Error::Retry {
request_id,
request_failures,
max_request_failures,
source,
..
}) = result
{
let expected_id = seen_request_id.get().unwrap();
assert_eq!(&request_id, expected_id);
assert_eq!(request_failures, max_request_failures);
assert!(
source.to_string().contains("internal server error"),
"source: {:?}",
source
);
} else {
panic!("unexpected result: {:?}", result);
};
}
#[tokio::test]
async fn test_table_names() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::GET);
assert_eq!(request.url().path(), "/v1/table/");
assert_eq!(request.url().query(), None);
http::Response::builder()
.status(200)
.body(r#"{"tables": ["table1", "table2"]}"#)
.unwrap()
});
let names = conn.table_names().execute().await.unwrap();
assert_eq!(names, vec!["table1", "table2"]);
}
#[tokio::test]
async fn test_table_names_pagination() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::GET);
assert_eq!(request.url().path(), "/v1/table/");
assert!(request.url().query().unwrap().contains("limit=2"));
assert!(request.url().query().unwrap().contains("page_token=table2"));
http::Response::builder()
.status(200)
.body(r#"{"tables": ["table3", "table4"], "page_token": "token"}"#)
.unwrap()
});
let names = conn
.table_names()
.start_after("table2")
.limit(2)
.execute()
.await
.unwrap();
assert_eq!(names, vec!["table3", "table4"]);
}
#[tokio::test]
async fn test_open_table() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/table1/describe/");
assert_eq!(request.url().query(), None);
http::Response::builder()
.status(200)
.body(r#"{"table": "table1"}"#)
.unwrap()
});
let table = conn.open_table("table1").execute().await.unwrap();
assert_eq!(table.name(), "table1");
let table = conn
.open_table("table1")
.storage_option("key", "value")
.execute()
.await
.unwrap();
assert_eq!(table.name(), "table1");
}
#[tokio::test]
async fn test_open_table_not_found() {
let conn = Connection::new_with_handler(|_| {
http::Response::builder()
.status(404)
.body("table not found")
.unwrap()
});
let result = conn.open_table("table1").execute().await;
assert!(result.is_err());
assert!(matches!(result, Err(crate::Error::TableNotFound { .. })));
}
#[tokio::test]
async fn test_create_table() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/table1/create/");
assert_eq!(
request
.headers()
.get(reqwest::header::CONTENT_TYPE)
.unwrap(),
ARROW_STREAM_CONTENT_TYPE.as_bytes()
);
http::Response::builder().status(200).body("").unwrap()
});
let data = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let table = conn.create_table("table1", data).execute().await.unwrap();
assert_eq!(table.name(), "table1");
}
#[tokio::test]
async fn test_create_table_already_exists() {
let conn = Connection::new_with_handler(|_| {
http::Response::builder()
.status(400)
.body("table table1 already exists")
.unwrap()
});
let data = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let result = conn.create_table("table1", data).execute().await;
assert!(result.is_err());
assert!(
matches!(result, Err(crate::Error::TableAlreadyExists { name }) if name == "table1")
);
}
#[tokio::test]
async fn test_create_table_modes() {
let test_cases = [
(None, "mode=create"),
(Some(CreateTableMode::Create), "mode=create"),
(Some(CreateTableMode::Overwrite), "mode=overwrite"),
(
Some(CreateTableMode::ExistOk(Box::new(|b| b))),
"mode=exist_ok",
),
];
for (mode, expected_query_string) in test_cases {
let conn = Connection::new_with_handler(move |request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/table1/create/");
assert_eq!(request.url().query(), Some(expected_query_string));
http::Response::builder().status(200).body("").unwrap()
});
let data = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let mut builder = conn.create_table("table1", data.clone());
if let Some(mode) = mode {
builder = builder.mode(mode);
}
builder.execute().await.unwrap();
}
let conn = Connection::new_with_handler(|request| match request.url().path() {
"/v1/table/table1/create/" => http::Response::builder()
.status(400)
.body("Table table1 already exists")
.unwrap(),
"/v1/table/table1/describe/" => http::Response::builder().status(200).body("").unwrap(),
_ => {
panic!("unexpected path: {:?}", request.url().path());
}
});
let data = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let called: Arc<OnceLock<bool>> = Arc::new(OnceLock::new());
let called_in_cb = called.clone();
conn.create_table("table1", data)
.mode(CreateTableMode::ExistOk(Box::new(move |b| {
called_in_cb.clone().set(true).unwrap();
b
})))
.execute()
.await
.unwrap();
let called = *called.get().unwrap_or(&false);
assert!(called);
}
#[tokio::test]
async fn test_create_table_empty() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/table1/create/");
assert_eq!(
request
.headers()
.get(reqwest::header::CONTENT_TYPE)
.unwrap(),
ARROW_STREAM_CONTENT_TYPE.as_bytes()
);
http::Response::builder().status(200).body("").unwrap()
});
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
conn.create_empty_table("table1", schema)
.execute()
.await
.unwrap();
}
#[tokio::test]
async fn test_drop_table() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/table1/drop/");
assert_eq!(request.url().query(), None);
assert!(request.body().is_none());
http::Response::builder().status(200).body("").unwrap()
});
conn.drop_table("table1", &[]).await.unwrap();
}
#[tokio::test]
async fn test_rename_table() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/table1/rename/");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(body["new_table_name"], "table2");
http::Response::builder().status(200).body("").unwrap()
});
conn.rename_table("table1", "table2", &[], &[])
.await
.unwrap();
}
#[tokio::test]
async fn test_connect_remote_options() {
let db_uri = "db://my-container/my-prefix";
let _ = ConnectBuilder::new(db_uri)
.region("us-east-1")
.api_key("my-api-key")
.storage_options(vec![("azure_storage_account_name", "my-storage-account")])
.execute()
.await
.unwrap();
}
#[tokio::test]
async fn test_table_names_with_root_namespace() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::GET);
assert_eq!(request.url().path(), "/v1/table/");
assert_eq!(request.url().query(), None);
http::Response::builder()
.status(200)
.body(r#"{"tables": ["table1", "table2"]}"#)
.unwrap()
});
let names = conn
.table_names()
.namespace(vec![])
.execute()
.await
.unwrap();
assert_eq!(names, vec!["table1", "table2"]);
}
#[tokio::test]
async fn test_table_names_with_namespace() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::GET);
assert_eq!(request.url().path(), "/v1/namespace/test/table/list");
assert_eq!(request.url().query(), None);
http::Response::builder()
.status(200)
.body(r#"{"tables": ["table1", "table2"]}"#)
.unwrap()
});
let names = conn
.table_names()
.namespace(vec!["test".to_string()])
.execute()
.await
.unwrap();
assert_eq!(names, vec!["table1", "table2"]);
}
#[tokio::test]
async fn test_table_names_with_nested_namespace() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::GET);
assert_eq!(request.url().path(), "/v1/namespace/ns1$ns2/table/list");
assert_eq!(request.url().query(), None);
http::Response::builder()
.status(200)
.body(r#"{"tables": ["ns1$ns2$table1", "ns1$ns2$table2"]}"#)
.unwrap()
});
let names = conn
.table_names()
.namespace(vec!["ns1".to_string(), "ns2".to_string()])
.execute()
.await
.unwrap();
assert_eq!(names, vec!["ns1$ns2$table1", "ns1$ns2$table2"]);
}
#[tokio::test]
async fn test_open_table_with_namespace() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/ns1$ns2$table1/describe/");
assert_eq!(request.url().query(), None);
http::Response::builder()
.status(200)
.body(r#"{"table": "table1"}"#)
.unwrap()
});
let table = conn
.open_table("table1")
.namespace(vec!["ns1".to_string(), "ns2".to_string()])
.execute()
.await
.unwrap();
assert_eq!(table.name(), "table1");
}
#[tokio::test]
async fn test_create_table_with_namespace() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/ns1$table1/create/");
assert_eq!(
request
.headers()
.get(reqwest::header::CONTENT_TYPE)
.unwrap(),
ARROW_STREAM_CONTENT_TYPE.as_bytes()
);
http::Response::builder().status(200).body("").unwrap()
});
let data = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let table = conn
.create_table("table1", data)
.namespace(vec!["ns1".to_string()])
.execute()
.await
.unwrap();
assert_eq!(table.name(), "table1");
}
#[tokio::test]
async fn test_drop_table_with_namespace() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/ns1$ns2$table1/drop/");
assert_eq!(request.url().query(), None);
assert!(request.body().is_none());
http::Response::builder().status(200).body("").unwrap()
});
conn.drop_table("table1", &["ns1".to_string(), "ns2".to_string()])
.await
.unwrap();
}
#[tokio::test]
async fn test_rename_table_with_namespace() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/ns1$table1/rename/");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(body["new_table_name"], "table2");
assert_eq!(body["new_namespace"], serde_json::json!(["ns2"]));
http::Response::builder().status(200).body("").unwrap()
});
conn.rename_table(
"table1",
"table2",
&["ns1".to_string()],
&["ns2".to_string()],
)
.await
.unwrap();
}
#[tokio::test]
async fn test_create_empty_table_with_namespace() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/prod$data$metrics/create/");
assert_eq!(
request
.headers()
.get(reqwest::header::CONTENT_TYPE)
.unwrap(),
ARROW_STREAM_CONTENT_TYPE.as_bytes()
);
http::Response::builder().status(200).body("").unwrap()
});
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
conn.create_empty_table("metrics", schema)
.namespace(vec!["prod".to_string(), "data".to_string()])
.execute()
.await
.unwrap();
}
#[tokio::test]
async fn test_header_provider_in_request() {
#[derive(Debug, Clone)]
struct TestHeaderProvider {
headers: HashMap<String, String>,
}
#[async_trait::async_trait]
impl HeaderProvider for TestHeaderProvider {
async fn get_headers(&self) -> crate::Result<HashMap<String, String>> {
Ok(self.headers.clone())
}
}
let mut headers = HashMap::new();
headers.insert("X-Custom-Auth".to_string(), "test-token".to_string());
headers.insert("X-Request-Id".to_string(), "test-123".to_string());
let provider = Arc::new(TestHeaderProvider { headers }) as Arc<dyn HeaderProvider>;
let client_config = ClientConfig {
header_provider: Some(provider),
..Default::default()
};
let conn = Connection::new_with_handler_and_config(
move |request| {
assert_eq!(
request.headers().get("X-Custom-Auth").unwrap(),
"test-token"
);
assert_eq!(request.headers().get("X-Request-Id").unwrap(), "test-123");
assert_eq!(request.method(), &reqwest::Method::GET);
assert_eq!(request.url().path(), "/v1/table/");
http::Response::builder()
.status(200)
.body(r#"{"tables": ["table1", "table2"]}"#)
.unwrap()
},
client_config,
);
let names = conn.table_names().execute().await.unwrap();
assert_eq!(names, vec!["table1", "table2"]);
}
#[tokio::test]
async fn test_header_provider_error_handling() {
#[derive(Debug)]
struct ErrorHeaderProvider;
#[async_trait::async_trait]
impl HeaderProvider for ErrorHeaderProvider {
async fn get_headers(&self) -> crate::Result<HashMap<String, String>> {
Err(crate::Error::Runtime {
message: "Failed to fetch auth token".to_string(),
})
}
}
let provider = Arc::new(ErrorHeaderProvider) as Arc<dyn HeaderProvider>;
let client_config = ClientConfig {
header_provider: Some(provider),
..Default::default()
};
let conn = Connection::new_with_handler_and_config(
move |_request| -> http::Response<&'static str> {
panic!("Handler should not be called when header provider fails");
},
client_config,
);
let result = conn.table_names().execute().await;
assert!(result.is_err());
match result.unwrap_err() {
crate::Error::Runtime { message } => {
assert_eq!(message, "Failed to fetch auth token");
}
_ => panic!("Expected Runtime error from header provider"),
}
}
#[tokio::test]
async fn test_clone_table() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/cloned_table/clone");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(body["source_location"], "s3://bucket/source_table");
assert_eq!(body["is_shallow"], true);
http::Response::builder().status(200).body("").unwrap()
});
let table = conn
.clone_table("cloned_table", "s3://bucket/source_table")
.execute()
.await
.unwrap();
assert_eq!(table.name(), "cloned_table");
}
#[tokio::test]
async fn test_clone_table_with_version() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/cloned_table/clone");
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(body["source_location"], "s3://bucket/source_table");
assert_eq!(body["source_version"], 42);
assert_eq!(body["is_shallow"], true);
http::Response::builder().status(200).body("").unwrap()
});
let table = conn
.clone_table("cloned_table", "s3://bucket/source_table")
.source_version(42)
.execute()
.await
.unwrap();
assert_eq!(table.name(), "cloned_table");
}
#[tokio::test]
async fn test_clone_table_with_tag() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/cloned_table/clone");
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(body["source_location"], "s3://bucket/source_table");
assert_eq!(body["source_tag"], "v1.0");
assert_eq!(body["is_shallow"], true);
http::Response::builder().status(200).body("").unwrap()
});
let table = conn
.clone_table("cloned_table", "s3://bucket/source_table")
.source_tag("v1.0")
.execute()
.await
.unwrap();
assert_eq!(table.name(), "cloned_table");
}
#[tokio::test]
async fn test_clone_table_deep_clone() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/cloned_table/clone");
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(body["source_location"], "s3://bucket/source_table");
assert_eq!(body["is_shallow"], false);
http::Response::builder().status(200).body("").unwrap()
});
let table = conn
.clone_table("cloned_table", "s3://bucket/source_table")
.is_shallow(false)
.execute()
.await
.unwrap();
assert_eq!(table.name(), "cloned_table");
}
#[tokio::test]
async fn test_clone_table_with_namespace() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/ns1$ns2$cloned_table/clone");
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(body["source_location"], "s3://bucket/source_table");
assert_eq!(body["is_shallow"], true);
http::Response::builder().status(200).body("").unwrap()
});
let table = conn
.clone_table("cloned_table", "s3://bucket/source_table")
.target_namespace(vec!["ns1".to_string(), "ns2".to_string()])
.execute()
.await
.unwrap();
assert_eq!(table.name(), "cloned_table");
}
#[tokio::test]
async fn test_clone_table_error() {
let conn = Connection::new_with_handler(|_| {
http::Response::builder()
.status(500)
.body("Internal server error")
.unwrap()
});
let result = conn
.clone_table("cloned_table", "s3://bucket/source_table")
.execute()
.await;
assert!(result.is_err());
if let Err(crate::Error::Http { source, .. }) = result {
assert!(source.to_string().contains("Failed to clone table"));
} else {
panic!("Expected HTTP error");
}
}
#[tokio::test]
async fn test_namespace_client() {
let conn = Connection::new_with_handler(|_| {
http::Response::builder()
.status(200)
.body(r#"{"tables": []}"#)
.unwrap()
});
let namespace_client = conn.namespace_client().await;
assert!(namespace_client.is_ok());
}
#[tokio::test]
async fn test_namespace_client_with_tls_config() {
use crate::remote::client::TlsConfig;
let tls_config = TlsConfig {
cert_file: Some("/path/to/cert.pem".to_string()),
key_file: Some("/path/to/key.pem".to_string()),
ssl_ca_cert: Some("/path/to/ca.pem".to_string()),
assert_hostname: true,
};
let client_config = ClientConfig {
tls_config: Some(tls_config),
..Default::default()
};
let conn = Connection::new_with_handler_and_config(
|_| {
http::Response::builder()
.status(200)
.body(r#"{"tables": []}"#)
.unwrap()
},
client_config,
);
let namespace_client = conn.namespace_client().await;
assert!(namespace_client.is_ok());
}
#[tokio::test]
async fn test_namespace_client_with_headers() {
let mut extra_headers = HashMap::new();
extra_headers.insert("X-Custom-Header".to_string(), "custom-value".to_string());
let client_config = ClientConfig {
extra_headers,
..Default::default()
};
let conn = Connection::new_with_handler_and_config(
|_| {
http::Response::builder()
.status(200)
.body(r#"{"tables": []}"#)
.unwrap()
},
client_config,
);
let namespace_client = conn.namespace_client().await;
assert!(namespace_client.is_ok());
}
mod rest_adapter_integration {
use super::*;
use lance_namespace::models::ListTablesRequest;
use lance_namespace_impls::{DirectoryNamespaceBuilder, RestAdapter, RestAdapterConfig};
use std::sync::Arc;
use tempfile::TempDir;
struct RestServerFixture {
_temp_dir: TempDir,
server_handle: lance_namespace_impls::RestAdapterHandle,
server_url: String,
}
impl RestServerFixture {
async fn new() -> Self {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path().to_str().unwrap().to_string();
let backend = DirectoryNamespaceBuilder::new(&temp_path)
.build()
.await
.unwrap();
let backend = Arc::new(backend);
let config = RestAdapterConfig {
port: 0,
..Default::default()
};
let server = RestAdapter::new(backend, config);
let server_handle = server.start().await.unwrap();
let actual_port = server_handle.port();
let server_url = format!("http://127.0.0.1:{}", actual_port);
Self {
_temp_dir: temp_dir,
server_handle,
server_url,
}
}
}
impl Drop for RestServerFixture {
fn drop(&mut self) {
self.server_handle.shutdown();
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_remote_database_with_rest_adapter() {
use lance_namespace::models::CreateNamespaceRequest;
let fixture = RestServerFixture::new().await;
let conn = ConnectBuilder::new("db://dummy")
.api_key("test-api-key")
.region("us-east-1")
.host_override(&fixture.server_url)
.execute()
.await
.unwrap();
let namespace = vec!["test_ns".to_string()];
conn.create_namespace(CreateNamespaceRequest {
id: Some(namespace.clone()),
..Default::default()
})
.await
.expect("Failed to create namespace");
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let table = conn
.create_table("test_table", data)
.namespace(namespace.clone())
.execute()
.await;
assert!(table.is_ok(), "Failed to create table: {:?}", table.err());
let list_response = conn
.list_tables(ListTablesRequest {
id: Some(namespace.clone()),
..Default::default()
})
.await
.expect("Failed to list tables");
assert_eq!(list_response.tables, vec!["test_table"]);
let namespace_client = conn.namespace_client().await.unwrap();
let list_response = namespace_client
.list_tables(ListTablesRequest {
id: Some(namespace.clone()),
..Default::default()
})
.await
.unwrap();
assert_eq!(list_response.tables, vec!["test_table"]);
let opened_table = conn
.open_table("test_table")
.namespace(namespace.clone())
.execute()
.await;
assert!(
opened_table.is_ok(),
"Failed to open table: {:?}",
opened_table.err()
);
assert_eq!(opened_table.unwrap().name(), "test_table");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_remote_database_with_multiple_tables() {
use lance_namespace::models::CreateNamespaceRequest;
let fixture = RestServerFixture::new().await;
let conn = ConnectBuilder::new("db://dummy")
.api_key("test-api-key")
.region("us-east-1")
.host_override(&fixture.server_url)
.execute()
.await
.unwrap();
let namespace = vec!["multi_table_ns".to_string()];
conn.create_namespace(CreateNamespaceRequest {
id: Some(namespace.clone()),
..Default::default()
})
.await
.expect("Failed to create namespace");
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
for i in 1..=3 {
let data =
RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![i]))])
.unwrap();
conn.create_table(format!("table{}", i), data)
.namespace(namespace.clone())
.execute()
.await
.unwrap_or_else(|e| panic!("Failed to create table{}: {:?}", i, e));
}
let list_response = conn
.list_tables(ListTablesRequest {
id: Some(namespace.clone()),
..Default::default()
})
.await
.unwrap();
assert_eq!(list_response.tables.len(), 3);
assert!(list_response.tables.contains(&"table1".to_string()));
assert!(list_response.tables.contains(&"table2".to_string()));
assert!(list_response.tables.contains(&"table3".to_string()));
}
}
}