1pub mod manifest;
10
11use arrow::record_batch::RecordBatchIterator;
12use arrow_ipc::reader::StreamReader;
13use async_trait::async_trait;
14use bytes::Bytes;
15use lance::dataset::{Dataset, WriteParams};
16use lance::session::Session;
17use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
18use object_store::path::Path;
19use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, PutMode, PutOptions};
20use std::collections::HashMap;
21use std::io::Cursor;
22use std::sync::Arc;
23
24use crate::context::DynamicContextProvider;
25use lance_namespace::models::{
26 CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
27 CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DeclareTableRequest,
28 DeclareTableResponse, DescribeNamespaceRequest, DescribeNamespaceResponse,
29 DescribeTableRequest, DescribeTableResponse, DropNamespaceRequest, DropNamespaceResponse,
30 DropTableRequest, DropTableResponse, Identity, ListNamespacesRequest, ListNamespacesResponse,
31 ListTablesRequest, ListTablesResponse, NamespaceExistsRequest, TableExistsRequest,
32};
33
34use lance_core::{box_error, Error, Result};
35use lance_namespace::schema::arrow_schema_to_json;
36use lance_namespace::LanceNamespace;
37
38use crate::credentials::{
39 create_credential_vendor_for_location, has_credential_vendor_config, CredentialVendor,
40};
41
42pub(crate) struct TableStatus {
47 pub(crate) exists: bool,
49 pub(crate) is_deregistered: bool,
51 pub(crate) has_reserved_file: bool,
53}
54
55#[derive(Clone)]
90pub struct DirectoryNamespaceBuilder {
91 root: String,
92 storage_options: Option<HashMap<String, String>>,
93 session: Option<Arc<Session>>,
94 manifest_enabled: bool,
95 dir_listing_enabled: bool,
96 inline_optimization_enabled: bool,
97 credential_vendor_properties: HashMap<String, String>,
98 context_provider: Option<Arc<dyn DynamicContextProvider>>,
99}
100
101impl std::fmt::Debug for DirectoryNamespaceBuilder {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 f.debug_struct("DirectoryNamespaceBuilder")
104 .field("root", &self.root)
105 .field("storage_options", &self.storage_options)
106 .field("manifest_enabled", &self.manifest_enabled)
107 .field("dir_listing_enabled", &self.dir_listing_enabled)
108 .field(
109 "inline_optimization_enabled",
110 &self.inline_optimization_enabled,
111 )
112 .field(
113 "context_provider",
114 &self.context_provider.as_ref().map(|_| "Some(...)"),
115 )
116 .finish()
117 }
118}
119
120impl DirectoryNamespaceBuilder {
121 pub fn new(root: impl Into<String>) -> Self {
127 Self {
128 root: root.into().trim_end_matches('/').to_string(),
129 storage_options: None,
130 session: None,
131 manifest_enabled: true,
132 dir_listing_enabled: true, inline_optimization_enabled: true,
134 credential_vendor_properties: HashMap::new(),
135 context_provider: None,
136 }
137 }
138
139 pub fn manifest_enabled(mut self, enabled: bool) -> Self {
144 self.manifest_enabled = enabled;
145 self
146 }
147
148 pub fn dir_listing_enabled(mut self, enabled: bool) -> Self {
153 self.dir_listing_enabled = enabled;
154 self
155 }
156
157 pub fn inline_optimization_enabled(mut self, enabled: bool) -> Self {
163 self.inline_optimization_enabled = enabled;
164 self
165 }
166
167 pub fn from_properties(
232 properties: HashMap<String, String>,
233 session: Option<Arc<Session>>,
234 ) -> Result<Self> {
235 let root = properties
237 .get("root")
238 .cloned()
239 .ok_or_else(|| Error::Namespace {
240 source: "Missing required property 'root' for directory namespace".into(),
241 location: snafu::location!(),
242 })?;
243
244 let storage_options: HashMap<String, String> = properties
246 .iter()
247 .filter_map(|(k, v)| {
248 k.strip_prefix("storage.")
249 .map(|key| (key.to_string(), v.clone()))
250 })
251 .collect();
252
253 let storage_options = if storage_options.is_empty() {
254 None
255 } else {
256 Some(storage_options)
257 };
258
259 let manifest_enabled = properties
261 .get("manifest_enabled")
262 .and_then(|v| v.parse::<bool>().ok())
263 .unwrap_or(true);
264
265 let dir_listing_enabled = properties
267 .get("dir_listing_enabled")
268 .and_then(|v| v.parse::<bool>().ok())
269 .unwrap_or(true);
270
271 let inline_optimization_enabled = properties
273 .get("inline_optimization_enabled")
274 .and_then(|v| v.parse::<bool>().ok())
275 .unwrap_or(true);
276
277 let credential_vendor_properties: HashMap<String, String> = properties
281 .iter()
282 .filter_map(|(k, v)| {
283 k.strip_prefix("credential_vendor.")
284 .map(|key| (key.to_string(), v.clone()))
285 })
286 .collect();
287
288 Ok(Self {
289 root: root.trim_end_matches('/').to_string(),
290 storage_options,
291 session,
292 manifest_enabled,
293 dir_listing_enabled,
294 inline_optimization_enabled,
295 credential_vendor_properties,
296 context_provider: None,
297 })
298 }
299
300 pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
307 self.storage_options
308 .get_or_insert_with(HashMap::new)
309 .insert(key.into(), value.into());
310 self
311 }
312
313 pub fn storage_options(mut self, options: HashMap<String, String>) -> Self {
319 self.storage_options
320 .get_or_insert_with(HashMap::new)
321 .extend(options);
322 self
323 }
324
325 pub fn session(mut self, session: Arc<Session>) -> Self {
335 self.session = Some(session);
336 self
337 }
338
339 pub fn credential_vendor_property(
367 mut self,
368 key: impl Into<String>,
369 value: impl Into<String>,
370 ) -> Self {
371 self.credential_vendor_properties
372 .insert(key.into(), value.into());
373 self
374 }
375
376 pub fn credential_vendor_properties(mut self, properties: HashMap<String, String>) -> Self {
384 self.credential_vendor_properties.extend(properties);
385 self
386 }
387
388 pub fn context_provider(mut self, provider: Arc<dyn DynamicContextProvider>) -> Self {
398 self.context_provider = Some(provider);
399 self
400 }
401
402 pub async fn build(self) -> Result<DirectoryNamespace> {
415 let (object_store, base_path) =
416 Self::initialize_object_store(&self.root, &self.storage_options, &self.session).await?;
417
418 let manifest_ns = if self.manifest_enabled {
419 match manifest::ManifestNamespace::from_directory(
420 self.root.clone(),
421 self.storage_options.clone(),
422 self.session.clone(),
423 object_store.clone(),
424 base_path.clone(),
425 self.dir_listing_enabled,
426 self.inline_optimization_enabled,
427 )
428 .await
429 {
430 Ok(ns) => Some(Arc::new(ns)),
431 Err(e) => {
432 log::warn!(
434 "Failed to initialize manifest namespace, falling back to directory listing only: {}",
435 e
436 );
437 None
438 }
439 }
440 } else {
441 None
442 };
443
444 let credential_vendor = if has_credential_vendor_config(&self.credential_vendor_properties)
446 {
447 create_credential_vendor_for_location(&self.root, &self.credential_vendor_properties)
448 .await?
449 .map(Arc::from)
450 } else {
451 None
452 };
453
454 Ok(DirectoryNamespace {
455 root: self.root,
456 storage_options: self.storage_options,
457 session: self.session,
458 object_store,
459 base_path,
460 manifest_ns,
461 dir_listing_enabled: self.dir_listing_enabled,
462 credential_vendor,
463 context_provider: self.context_provider,
464 })
465 }
466
467 async fn initialize_object_store(
469 root: &str,
470 storage_options: &Option<HashMap<String, String>>,
471 session: &Option<Arc<Session>>,
472 ) -> Result<(Arc<ObjectStore>, Path)> {
473 let accessor = storage_options.clone().map(|opts| {
475 Arc::new(lance_io::object_store::StorageOptionsAccessor::with_static_options(opts))
476 });
477 let params = ObjectStoreParams {
478 storage_options_accessor: accessor,
479 ..Default::default()
480 };
481
482 let registry = if let Some(session) = session {
484 session.store_registry()
485 } else {
486 Arc::new(ObjectStoreRegistry::default())
487 };
488
489 let (object_store, base_path) = ObjectStore::from_uri_and_params(registry, root, ¶ms)
491 .await
492 .map_err(|e| Error::Namespace {
493 source: format!("Failed to create object store: {}", e).into(),
494 location: snafu::location!(),
495 })?;
496
497 Ok((object_store, base_path))
498 }
499}
500
501pub struct DirectoryNamespace {
525 root: String,
526 storage_options: Option<HashMap<String, String>>,
527 #[allow(dead_code)]
528 session: Option<Arc<Session>>,
529 object_store: Arc<ObjectStore>,
530 base_path: Path,
531 manifest_ns: Option<Arc<manifest::ManifestNamespace>>,
532 dir_listing_enabled: bool,
533 credential_vendor: Option<Arc<dyn CredentialVendor>>,
536 #[allow(dead_code)]
539 context_provider: Option<Arc<dyn DynamicContextProvider>>,
540}
541
542impl std::fmt::Debug for DirectoryNamespace {
543 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
544 write!(f, "{}", self.namespace_id())
545 }
546}
547
548impl std::fmt::Display for DirectoryNamespace {
549 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
550 write!(f, "{}", self.namespace_id())
551 }
552}
553
554impl DirectoryNamespace {
555 fn apply_pagination(names: &mut Vec<String>, page_token: Option<String>, limit: Option<i32>) {
564 names.sort();
566
567 if let Some(start_after) = page_token {
569 if let Some(index) = names
570 .iter()
571 .position(|name| name.as_str() > start_after.as_str())
572 {
573 names.drain(0..index);
574 } else {
575 names.clear();
576 }
577 }
578
579 if let Some(limit) = limit {
581 if limit >= 0 {
582 names.truncate(limit as usize);
583 }
584 }
585 }
586
587 async fn list_directory_tables(&self) -> Result<Vec<String>> {
589 let mut tables = Vec::new();
590 let entries = self
591 .object_store
592 .read_dir(self.base_path.clone())
593 .await
594 .map_err(|e| Error::IO {
595 source: box_error(std::io::Error::other(format!(
596 "Failed to list directory: {}",
597 e
598 ))),
599 location: snafu::location!(),
600 })?;
601
602 for entry in entries {
603 let path = entry.trim_end_matches('/');
604 if !path.ends_with(".lance") {
605 continue;
606 }
607
608 let table_name = &path[..path.len() - 6];
609
610 let status = self.check_table_status(table_name).await;
612 if status.is_deregistered || status.has_reserved_file {
613 continue;
614 }
615
616 tables.push(table_name.to_string());
617 }
618
619 Ok(tables)
620 }
621
622 fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
624 if let Some(id) = id {
625 if !id.is_empty() {
626 return Err(Error::Namespace {
627 source: format!(
628 "Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
629 id
630 ).into(),
631 location: snafu::location!(),
632 });
633 }
634 }
635 Ok(())
636 }
637
638 fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
640 let id = id.as_ref().ok_or_else(|| Error::Namespace {
641 source: "Directory namespace table ID cannot be empty".into(),
642 location: snafu::location!(),
643 })?;
644
645 if id.len() != 1 {
646 return Err(Error::Namespace {
647 source: format!(
648 "Multi-level table IDs are only supported when manifest mode is enabled, but got: {:?}",
649 id
650 )
651 .into(),
652 location: snafu::location!(),
653 });
654 }
655
656 Ok(id[0].clone())
657 }
658
659 fn table_full_uri(&self, table_name: &str) -> String {
661 format!("{}/{}.lance", &self.root, table_name)
662 }
663
664 fn table_path(&self, table_name: &str) -> Path {
666 self.base_path
667 .child(format!("{}.lance", table_name).as_str())
668 }
669
670 fn table_reserved_file_path(&self, table_name: &str) -> Path {
672 self.base_path
673 .child(format!("{}.lance", table_name).as_str())
674 .child(".lance-reserved")
675 }
676
677 fn table_deregistered_file_path(&self, table_name: &str) -> Path {
679 self.base_path
680 .child(format!("{}.lance", table_name).as_str())
681 .child(".lance-deregistered")
682 }
683
684 pub(crate) async fn check_table_status(&self, table_name: &str) -> TableStatus {
690 let table_path = self.table_path(table_name);
691 match self.object_store.read_dir(table_path).await {
692 Ok(entries) => {
693 let exists = !entries.is_empty();
694 let is_deregistered = entries.iter().any(|e| e.ends_with(".lance-deregistered"));
695 let has_reserved_file = entries.iter().any(|e| e.ends_with(".lance-reserved"));
696 TableStatus {
697 exists,
698 is_deregistered,
699 has_reserved_file,
700 }
701 }
702 Err(_) => TableStatus {
703 exists: false,
704 is_deregistered: false,
705 has_reserved_file: false,
706 },
707 }
708 }
709
710 async fn put_marker_file_atomic(
718 &self,
719 path: &Path,
720 file_description: &str,
721 ) -> std::result::Result<(), String> {
722 let put_opts = PutOptions {
723 mode: PutMode::Create,
724 ..Default::default()
725 };
726
727 match self
728 .object_store
729 .inner
730 .put_opts(path, bytes::Bytes::new().into(), put_opts)
731 .await
732 {
733 Ok(_) => Ok(()),
734 Err(ObjectStoreError::AlreadyExists { .. })
735 | Err(ObjectStoreError::Precondition { .. }) => {
736 Err(format!("{} already exists", file_description))
737 }
738 Err(e) => Err(format!("Failed to create {}: {}", file_description, e)),
739 }
740 }
741
742 async fn get_storage_options_for_table(
762 &self,
763 table_uri: &str,
764 identity: Option<&Identity>,
765 ) -> Result<Option<HashMap<String, String>>> {
766 if let Some(ref vendor) = self.credential_vendor {
767 let vended = vendor.vend_credentials(table_uri, identity).await?;
768 return Ok(Some(vended.storage_options));
769 }
770 Ok(self.storage_options.clone())
771 }
772
773 pub async fn migrate(&self) -> Result<usize> {
826 let Some(ref manifest_ns) = self.manifest_ns else {
828 return Ok(0); };
830
831 let manifest_locations = manifest_ns.list_manifest_table_locations().await?;
833
834 let dir_tables = self.list_directory_tables().await?;
836
837 let mut migrated_count = 0;
842 for table_name in dir_tables {
843 let dir_name = format!("{}.lance", table_name);
845 if !manifest_locations.contains(&dir_name) {
846 manifest_ns.register_table(&table_name, dir_name).await?;
847 migrated_count += 1;
848 }
849 }
850
851 Ok(migrated_count)
852 }
853}
854
855#[async_trait]
856impl LanceNamespace for DirectoryNamespace {
857 async fn list_namespaces(
858 &self,
859 request: ListNamespacesRequest,
860 ) -> Result<ListNamespacesResponse> {
861 if let Some(ref manifest_ns) = self.manifest_ns {
862 return manifest_ns.list_namespaces(request).await;
863 }
864
865 Self::validate_root_namespace_id(&request.id)?;
866 Ok(ListNamespacesResponse::new(vec![]))
867 }
868
869 async fn describe_namespace(
870 &self,
871 request: DescribeNamespaceRequest,
872 ) -> Result<DescribeNamespaceResponse> {
873 if let Some(ref manifest_ns) = self.manifest_ns {
874 return manifest_ns.describe_namespace(request).await;
875 }
876
877 Self::validate_root_namespace_id(&request.id)?;
878 #[allow(clippy::needless_update)]
879 Ok(DescribeNamespaceResponse {
880 properties: Some(HashMap::new()),
881 ..Default::default()
882 })
883 }
884
885 async fn create_namespace(
886 &self,
887 request: CreateNamespaceRequest,
888 ) -> Result<CreateNamespaceResponse> {
889 if let Some(ref manifest_ns) = self.manifest_ns {
890 return manifest_ns.create_namespace(request).await;
891 }
892
893 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
894 return Err(Error::Namespace {
895 source: "Root namespace already exists and cannot be created".into(),
896 location: snafu::location!(),
897 });
898 }
899
900 Err(Error::NotSupported {
901 source: "Child namespaces are only supported when manifest mode is enabled".into(),
902 location: snafu::location!(),
903 })
904 }
905
906 async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
907 if let Some(ref manifest_ns) = self.manifest_ns {
908 return manifest_ns.drop_namespace(request).await;
909 }
910
911 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
912 return Err(Error::Namespace {
913 source: "Root namespace cannot be dropped".into(),
914 location: snafu::location!(),
915 });
916 }
917
918 Err(Error::NotSupported {
919 source: "Child namespaces are only supported when manifest mode is enabled".into(),
920 location: snafu::location!(),
921 })
922 }
923
924 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
925 if let Some(ref manifest_ns) = self.manifest_ns {
926 return manifest_ns.namespace_exists(request).await;
927 }
928
929 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
930 return Ok(());
931 }
932
933 Err(Error::Namespace {
934 source: "Child namespaces are only supported when manifest mode is enabled".into(),
935 location: snafu::location!(),
936 })
937 }
938
939 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
940 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
942 source: "Namespace ID is required".into(),
943 location: snafu::location!(),
944 })?;
945
946 if !namespace_id.is_empty() {
948 if let Some(ref manifest_ns) = self.manifest_ns {
949 return manifest_ns.list_tables(request).await;
950 }
951 return Err(Error::NotSupported {
952 source: "Child namespaces are only supported when manifest mode is enabled".into(),
953 location: snafu::location!(),
954 });
955 }
956
957 if let Some(ref manifest_ns) = self.manifest_ns {
959 if !self.dir_listing_enabled {
960 return manifest_ns.list_tables(request).await;
961 }
962 }
963
964 let mut tables = if self.manifest_ns.is_some() && self.dir_listing_enabled {
966 let manifest_locations = if let Some(ref manifest_ns) = self.manifest_ns {
968 manifest_ns.list_manifest_table_locations().await?
969 } else {
970 std::collections::HashSet::new()
971 };
972
973 let mut manifest_request = request.clone();
975 manifest_request.limit = None;
976 manifest_request.page_token = None;
977 let manifest_tables = if let Some(ref manifest_ns) = self.manifest_ns {
978 let manifest_response = manifest_ns.list_tables(manifest_request).await?;
979 manifest_response.tables
980 } else {
981 vec![]
982 };
983
984 let mut all_tables: Vec<String> = manifest_tables;
987 let dir_tables = self.list_directory_tables().await?;
988 for table_name in dir_tables {
989 let full_location = format!("{}/{}.lance", self.root, table_name);
992 let relative_location = format!("{}.lance", table_name);
993 if !manifest_locations.contains(&full_location)
994 && !manifest_locations.contains(&relative_location)
995 {
996 all_tables.push(table_name);
997 }
998 }
999
1000 all_tables
1001 } else {
1002 self.list_directory_tables().await?
1003 };
1004
1005 Self::apply_pagination(&mut tables, request.page_token, request.limit);
1007 let response = ListTablesResponse::new(tables);
1008 Ok(response)
1009 }
1010
1011 async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1012 if let Some(ref manifest_ns) = self.manifest_ns {
1013 match manifest_ns.describe_table(request.clone()).await {
1014 Ok(mut response) => {
1015 if request.vend_credentials == Some(true) && self.credential_vendor.is_some() {
1017 if let Some(ref table_uri) = response.table_uri {
1018 let identity = request.identity.as_deref();
1019 response.storage_options = self
1020 .get_storage_options_for_table(table_uri, identity)
1021 .await?;
1022 }
1023 } else if request.vend_credentials == Some(false) {
1024 response.storage_options = None;
1025 }
1026 return Ok(response);
1027 }
1028 Err(_)
1029 if self.dir_listing_enabled
1030 && request.id.as_ref().is_some_and(|id| id.len() == 1) =>
1031 {
1032 }
1034 Err(e) => return Err(e),
1035 }
1036 }
1037
1038 let table_name = Self::table_name_from_id(&request.id)?;
1039 let table_uri = self.table_full_uri(&table_name);
1040
1041 let status = self.check_table_status(&table_name).await;
1043
1044 if !status.exists {
1045 return Err(Error::Namespace {
1046 source: format!("Table does not exist: {}", table_name).into(),
1047 location: snafu::location!(),
1048 });
1049 }
1050
1051 if status.is_deregistered {
1052 return Err(Error::Namespace {
1053 source: format!("Table is deregistered: {}", table_name).into(),
1054 location: snafu::location!(),
1055 });
1056 }
1057
1058 let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
1059 let vend_credentials = request.vend_credentials.unwrap_or(true);
1061 let identity = request.identity.as_deref();
1062
1063 if !load_detailed_metadata {
1065 let storage_options = if vend_credentials {
1066 self.get_storage_options_for_table(&table_uri, identity)
1067 .await?
1068 } else {
1069 None
1070 };
1071 return Ok(DescribeTableResponse {
1072 table: Some(table_name),
1073 namespace: request.id.as_ref().map(|id| {
1074 if id.len() > 1 {
1075 id[..id.len() - 1].to_vec()
1076 } else {
1077 vec![]
1078 }
1079 }),
1080 location: Some(table_uri.clone()),
1081 table_uri: Some(table_uri),
1082 storage_options,
1083 ..Default::default()
1084 });
1085 }
1086
1087 match Dataset::open(&table_uri).await {
1089 Ok(mut dataset) => {
1090 if let Some(requested_version) = request.version {
1092 dataset = dataset.checkout_version(requested_version as u64).await?;
1093 }
1094
1095 let version_info = dataset.version();
1096 let lance_schema = dataset.schema();
1097 let arrow_schema: arrow_schema::Schema = lance_schema.into();
1098 let json_schema = arrow_schema_to_json(&arrow_schema)?;
1099 let storage_options = if vend_credentials {
1100 self.get_storage_options_for_table(&table_uri, identity)
1101 .await?
1102 } else {
1103 None
1104 };
1105
1106 let metadata: std::collections::HashMap<String, String> =
1108 version_info.metadata.into_iter().collect();
1109
1110 Ok(DescribeTableResponse {
1111 table: Some(table_name),
1112 namespace: request.id.as_ref().map(|id| {
1113 if id.len() > 1 {
1114 id[..id.len() - 1].to_vec()
1115 } else {
1116 vec![]
1117 }
1118 }),
1119 version: Some(version_info.version as i64),
1120 location: Some(table_uri.clone()),
1121 table_uri: Some(table_uri),
1122 schema: Some(Box::new(json_schema)),
1123 storage_options,
1124 metadata: Some(metadata),
1125 ..Default::default()
1126 })
1127 }
1128 Err(err) => {
1129 if status.has_reserved_file {
1131 let storage_options = if vend_credentials {
1132 self.get_storage_options_for_table(&table_uri, identity)
1133 .await?
1134 } else {
1135 None
1136 };
1137 Ok(DescribeTableResponse {
1138 table: Some(table_name),
1139 namespace: request.id.as_ref().map(|id| {
1140 if id.len() > 1 {
1141 id[..id.len() - 1].to_vec()
1142 } else {
1143 vec![]
1144 }
1145 }),
1146 location: Some(table_uri.clone()),
1147 table_uri: Some(table_uri),
1148 storage_options,
1149 ..Default::default()
1150 })
1151 } else {
1152 Err(Error::Namespace {
1153 source: format!(
1154 "Table directory exists but cannot load dataset {}: {:?}",
1155 table_name, err
1156 )
1157 .into(),
1158 location: snafu::location!(),
1159 })
1160 }
1161 }
1162 }
1163 }
1164
1165 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1166 if let Some(ref manifest_ns) = self.manifest_ns {
1167 match manifest_ns.table_exists(request.clone()).await {
1168 Ok(()) => return Ok(()),
1169 Err(_) if self.dir_listing_enabled => {
1170 }
1172 Err(e) => return Err(e),
1173 }
1174 }
1175
1176 let table_name = Self::table_name_from_id(&request.id)?;
1177
1178 let status = self.check_table_status(&table_name).await;
1180
1181 if !status.exists {
1182 return Err(Error::Namespace {
1183 source: format!("Table does not exist: {}", table_name).into(),
1184 location: snafu::location!(),
1185 });
1186 }
1187
1188 if status.is_deregistered {
1189 return Err(Error::Namespace {
1190 source: format!("Table is deregistered: {}", table_name).into(),
1191 location: snafu::location!(),
1192 });
1193 }
1194
1195 Ok(())
1196 }
1197
1198 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1199 if let Some(ref manifest_ns) = self.manifest_ns {
1200 return manifest_ns.drop_table(request).await;
1201 }
1202
1203 let table_name = Self::table_name_from_id(&request.id)?;
1204 let table_uri = self.table_full_uri(&table_name);
1205 let table_path = self.table_path(&table_name);
1206
1207 self.object_store
1208 .remove_dir_all(table_path)
1209 .await
1210 .map_err(|e| Error::Namespace {
1211 source: format!("Failed to drop table {}: {}", table_name, e).into(),
1212 location: snafu::location!(),
1213 })?;
1214
1215 Ok(DropTableResponse {
1216 id: request.id,
1217 location: Some(table_uri),
1218 ..Default::default()
1219 })
1220 }
1221
1222 async fn create_table(
1223 &self,
1224 request: CreateTableRequest,
1225 request_data: Bytes,
1226 ) -> Result<CreateTableResponse> {
1227 if let Some(ref manifest_ns) = self.manifest_ns {
1228 return manifest_ns.create_table(request, request_data).await;
1229 }
1230
1231 let table_name = Self::table_name_from_id(&request.id)?;
1232 let table_uri = self.table_full_uri(&table_name);
1233 if request_data.is_empty() {
1234 return Err(Error::Namespace {
1235 source: "Request data (Arrow IPC stream) is required for create_table".into(),
1236 location: snafu::location!(),
1237 });
1238 }
1239
1240 let cursor = Cursor::new(request_data.to_vec());
1242 let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Namespace {
1243 source: format!("Invalid Arrow IPC stream: {}", e).into(),
1244 location: snafu::location!(),
1245 })?;
1246 let arrow_schema = stream_reader.schema();
1247
1248 let mut batches = Vec::new();
1250 for batch_result in stream_reader {
1251 batches.push(batch_result.map_err(|e| Error::Namespace {
1252 source: format!("Failed to read batch from IPC stream: {}", e).into(),
1253 location: snafu::location!(),
1254 })?);
1255 }
1256
1257 let reader = if batches.is_empty() {
1259 let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
1260 let batches = vec![Ok(batch)];
1261 RecordBatchIterator::new(batches, arrow_schema.clone())
1262 } else {
1263 let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
1264 RecordBatchIterator::new(batch_results, arrow_schema)
1265 };
1266
1267 let store_params = self.storage_options.as_ref().map(|opts| ObjectStoreParams {
1268 storage_options_accessor: Some(Arc::new(
1269 lance_io::object_store::StorageOptionsAccessor::with_static_options(opts.clone()),
1270 )),
1271 ..Default::default()
1272 });
1273
1274 let write_params = WriteParams {
1275 mode: lance::dataset::WriteMode::Create,
1276 store_params,
1277 ..Default::default()
1278 };
1279
1280 Dataset::write(reader, &table_uri, Some(write_params))
1282 .await
1283 .map_err(|e| Error::Namespace {
1284 source: format!("Failed to create Lance dataset: {}", e).into(),
1285 location: snafu::location!(),
1286 })?;
1287
1288 Ok(CreateTableResponse {
1289 version: Some(1),
1290 location: Some(table_uri),
1291 storage_options: self.storage_options.clone(),
1292 ..Default::default()
1293 })
1294 }
1295
1296 async fn create_empty_table(
1297 &self,
1298 request: CreateEmptyTableRequest,
1299 ) -> Result<CreateEmptyTableResponse> {
1300 if let Some(ref manifest_ns) = self.manifest_ns {
1301 #[allow(deprecated)]
1302 let mut response = manifest_ns.create_empty_table(request.clone()).await?;
1303 if request.vend_credentials == Some(true) && self.credential_vendor.is_some() {
1305 if let Some(ref location) = response.location {
1306 let identity = request.identity.as_deref();
1307 response.storage_options = self
1308 .get_storage_options_for_table(location, identity)
1309 .await?;
1310 }
1311 } else if request.vend_credentials == Some(false) {
1312 response.storage_options = None;
1313 }
1314 return Ok(response);
1315 }
1316
1317 let table_name = Self::table_name_from_id(&request.id)?;
1318 let table_uri = self.table_full_uri(&table_name);
1319
1320 if let Some(location) = &request.location {
1322 let location = location.trim_end_matches('/');
1323 if location != table_uri {
1324 return Err(Error::Namespace {
1325 source: format!(
1326 "Cannot create table {} at location {}, must be at location {}",
1327 table_name, location, table_uri
1328 )
1329 .into(),
1330 location: snafu::location!(),
1331 });
1332 }
1333 }
1334
1335 let reserved_file_path = self.table_reserved_file_path(&table_name);
1338
1339 self.put_marker_file_atomic(&reserved_file_path, &format!("table {}", table_name))
1340 .await
1341 .map_err(|e| Error::Namespace {
1342 source: e.into(),
1343 location: snafu::location!(),
1344 })?;
1345
1346 let vend_credentials = request.vend_credentials.unwrap_or(true);
1348 let identity = request.identity.as_deref();
1349 let storage_options = if vend_credentials {
1350 self.get_storage_options_for_table(&table_uri, identity)
1351 .await?
1352 } else {
1353 None
1354 };
1355
1356 Ok(CreateEmptyTableResponse {
1357 location: Some(table_uri),
1358 storage_options,
1359 ..Default::default()
1360 })
1361 }
1362
1363 async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
1364 if let Some(ref manifest_ns) = self.manifest_ns {
1365 let mut response = manifest_ns.declare_table(request.clone()).await?;
1366 if request.vend_credentials == Some(true) && self.credential_vendor.is_some() {
1368 if let Some(ref location) = response.location {
1369 let identity = request.identity.as_deref();
1370 response.storage_options = self
1371 .get_storage_options_for_table(location, identity)
1372 .await?;
1373 }
1374 } else if request.vend_credentials == Some(false) {
1375 response.storage_options = None;
1376 }
1377 return Ok(response);
1378 }
1379
1380 let table_name = Self::table_name_from_id(&request.id)?;
1381 let table_uri = self.table_full_uri(&table_name);
1382
1383 if let Some(location) = &request.location {
1385 let location = location.trim_end_matches('/');
1386 if location != table_uri {
1387 return Err(Error::Namespace {
1388 source: format!(
1389 "Cannot declare table {} at location {}, must be at location {}",
1390 table_name, location, table_uri
1391 )
1392 .into(),
1393 location: snafu::location!(),
1394 });
1395 }
1396 }
1397
1398 let status = self.check_table_status(&table_name).await;
1402 if status.exists && !status.has_reserved_file {
1403 return Err(Error::Namespace {
1405 source: format!("Table already exists: {}", table_name).into(),
1406 location: snafu::location!(),
1407 });
1408 }
1409
1410 let reserved_file_path = self.table_reserved_file_path(&table_name);
1414
1415 self.put_marker_file_atomic(&reserved_file_path, &format!("table {}", table_name))
1416 .await
1417 .map_err(|e| Error::Namespace {
1418 source: e.into(),
1419 location: snafu::location!(),
1420 })?;
1421
1422 let vend_credentials = request.vend_credentials.unwrap_or(true);
1424 let identity = request.identity.as_deref();
1425 let storage_options = if vend_credentials {
1426 self.get_storage_options_for_table(&table_uri, identity)
1427 .await?
1428 } else {
1429 None
1430 };
1431
1432 Ok(DeclareTableResponse {
1433 location: Some(table_uri),
1434 storage_options,
1435 ..Default::default()
1436 })
1437 }
1438
1439 async fn register_table(
1440 &self,
1441 request: lance_namespace::models::RegisterTableRequest,
1442 ) -> Result<lance_namespace::models::RegisterTableResponse> {
1443 if let Some(ref manifest_ns) = self.manifest_ns {
1445 return LanceNamespace::register_table(manifest_ns.as_ref(), request).await;
1446 }
1447
1448 Err(Error::NotSupported {
1450 source: "register_table is only supported when manifest mode is enabled".into(),
1451 location: snafu::location!(),
1452 })
1453 }
1454
1455 async fn deregister_table(
1456 &self,
1457 request: lance_namespace::models::DeregisterTableRequest,
1458 ) -> Result<lance_namespace::models::DeregisterTableResponse> {
1459 if let Some(ref manifest_ns) = self.manifest_ns {
1461 return LanceNamespace::deregister_table(manifest_ns.as_ref(), request).await;
1462 }
1463
1464 let table_name = Self::table_name_from_id(&request.id)?;
1466 let table_uri = self.table_full_uri(&table_name);
1467
1468 let status = self.check_table_status(&table_name).await;
1471
1472 if !status.exists {
1473 return Err(Error::Namespace {
1474 source: format!("Table does not exist: {}", table_name).into(),
1475 location: snafu::location!(),
1476 });
1477 }
1478
1479 if status.is_deregistered {
1480 return Err(Error::Namespace {
1481 source: format!("Table is already deregistered: {}", table_name).into(),
1482 location: snafu::location!(),
1483 });
1484 }
1485
1486 let deregistered_path = self.table_deregistered_file_path(&table_name);
1492 self.put_marker_file_atomic(
1493 &deregistered_path,
1494 &format!("deregistration marker for table {}", table_name),
1495 )
1496 .await
1497 .map_err(|e| {
1498 let message = if e.contains("already exists") {
1500 format!("Table is already deregistered: {}", table_name)
1501 } else {
1502 e
1503 };
1504 Error::Namespace {
1505 source: message.into(),
1506 location: snafu::location!(),
1507 }
1508 })?;
1509
1510 Ok(lance_namespace::models::DeregisterTableResponse {
1511 id: request.id,
1512 location: Some(table_uri),
1513 ..Default::default()
1514 })
1515 }
1516
1517 fn namespace_id(&self) -> String {
1518 format!("DirectoryNamespace {{ root: {:?} }}", self.root)
1519 }
1520}
1521
1522#[cfg(test)]
1523mod tests {
1524 use super::*;
1525 use arrow_ipc::reader::StreamReader;
1526 use lance::dataset::Dataset;
1527 use lance_core::utils::tempfile::TempStdDir;
1528 use lance_namespace::models::{
1529 CreateTableRequest, JsonArrowDataType, JsonArrowField, JsonArrowSchema, ListTablesRequest,
1530 };
1531 use lance_namespace::schema::convert_json_arrow_schema;
1532 use std::io::Cursor;
1533 use std::sync::Arc;
1534
1535 async fn create_test_namespace() -> (DirectoryNamespace, TempStdDir) {
1537 let temp_dir = TempStdDir::default();
1538
1539 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1540 .build()
1541 .await
1542 .unwrap();
1543 (namespace, temp_dir)
1544 }
1545
1546 fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
1548 use arrow::ipc::writer::StreamWriter;
1549
1550 let arrow_schema = convert_json_arrow_schema(schema).unwrap();
1551 let arrow_schema = Arc::new(arrow_schema);
1552 let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
1553 let mut buffer = Vec::new();
1554 {
1555 let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
1556 writer.write(&batch).unwrap();
1557 writer.finish().unwrap();
1558 }
1559 buffer
1560 }
1561
1562 fn create_test_schema() -> JsonArrowSchema {
1564 let int_type = JsonArrowDataType::new("int32".to_string());
1565 let string_type = JsonArrowDataType::new("utf8".to_string());
1566
1567 let id_field = JsonArrowField {
1568 name: "id".to_string(),
1569 r#type: Box::new(int_type),
1570 nullable: false,
1571 metadata: None,
1572 };
1573
1574 let name_field = JsonArrowField {
1575 name: "name".to_string(),
1576 r#type: Box::new(string_type),
1577 nullable: true,
1578 metadata: None,
1579 };
1580
1581 JsonArrowSchema {
1582 fields: vec![id_field, name_field],
1583 metadata: None,
1584 }
1585 }
1586
1587 #[tokio::test]
1588 async fn test_create_table() {
1589 let (namespace, _temp_dir) = create_test_namespace().await;
1590
1591 let schema = create_test_schema();
1593 let ipc_data = create_test_ipc_data(&schema);
1594
1595 let mut request = CreateTableRequest::new();
1596 request.id = Some(vec!["test_table".to_string()]);
1597
1598 let response = namespace
1599 .create_table(request, bytes::Bytes::from(ipc_data))
1600 .await
1601 .unwrap();
1602
1603 assert!(response.location.is_some());
1604 assert!(response.location.unwrap().ends_with("test_table.lance"));
1605 assert_eq!(response.version, Some(1));
1606 }
1607
1608 #[tokio::test]
1609 async fn test_create_table_without_data() {
1610 let (namespace, _temp_dir) = create_test_namespace().await;
1611
1612 let mut request = CreateTableRequest::new();
1613 request.id = Some(vec!["test_table".to_string()]);
1614
1615 let result = namespace.create_table(request, bytes::Bytes::new()).await;
1616 assert!(result.is_err());
1617 assert!(result
1618 .unwrap_err()
1619 .to_string()
1620 .contains("Arrow IPC stream) is required"));
1621 }
1622
1623 #[tokio::test]
1624 async fn test_create_table_with_invalid_id() {
1625 let (namespace, _temp_dir) = create_test_namespace().await;
1626
1627 let schema = create_test_schema();
1629 let ipc_data = create_test_ipc_data(&schema);
1630
1631 let mut request = CreateTableRequest::new();
1633 request.id = Some(vec![]);
1634
1635 let result = namespace
1636 .create_table(request, bytes::Bytes::from(ipc_data.clone()))
1637 .await;
1638 assert!(result.is_err());
1639
1640 let mut create_ns_req = CreateNamespaceRequest::new();
1643 create_ns_req.id = Some(vec!["test_namespace".to_string()]);
1644 namespace.create_namespace(create_ns_req).await.unwrap();
1645
1646 let mut request = CreateTableRequest::new();
1648 request.id = Some(vec!["test_namespace".to_string(), "table".to_string()]);
1649
1650 let result = namespace
1651 .create_table(request, bytes::Bytes::from(ipc_data))
1652 .await;
1653 assert!(
1655 result.is_ok(),
1656 "Multi-level table IDs should work with manifest enabled"
1657 );
1658 }
1659
1660 #[tokio::test]
1661 async fn test_list_tables() {
1662 let (namespace, _temp_dir) = create_test_namespace().await;
1663
1664 let mut request = ListTablesRequest::new();
1666 request.id = Some(vec![]);
1667 let response = namespace.list_tables(request).await.unwrap();
1668 assert_eq!(response.tables.len(), 0);
1669
1670 let schema = create_test_schema();
1672 let ipc_data = create_test_ipc_data(&schema);
1673
1674 let mut create_request = CreateTableRequest::new();
1676 create_request.id = Some(vec!["table1".to_string()]);
1677 namespace
1678 .create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
1679 .await
1680 .unwrap();
1681
1682 let mut create_request = CreateTableRequest::new();
1684 create_request.id = Some(vec!["table2".to_string()]);
1685 namespace
1686 .create_table(create_request, bytes::Bytes::from(ipc_data))
1687 .await
1688 .unwrap();
1689
1690 let mut request = ListTablesRequest::new();
1692 request.id = Some(vec![]);
1693 let response = namespace.list_tables(request).await.unwrap();
1694 let tables = response.tables;
1695 assert_eq!(tables.len(), 2);
1696 assert!(tables.contains(&"table1".to_string()));
1697 assert!(tables.contains(&"table2".to_string()));
1698 }
1699
1700 #[tokio::test]
1701 async fn test_list_tables_with_namespace_id() {
1702 let (namespace, _temp_dir) = create_test_namespace().await;
1703
1704 let mut create_ns_req = CreateNamespaceRequest::new();
1706 create_ns_req.id = Some(vec!["test_namespace".to_string()]);
1707 namespace.create_namespace(create_ns_req).await.unwrap();
1708
1709 let mut request = ListTablesRequest::new();
1711 request.id = Some(vec!["test_namespace".to_string()]);
1712
1713 let result = namespace.list_tables(request).await;
1714 assert!(
1716 result.is_ok(),
1717 "list_tables should work with child namespace when manifest is enabled"
1718 );
1719 let response = result.unwrap();
1720 assert_eq!(
1721 response.tables.len(),
1722 0,
1723 "Namespace should have no tables yet"
1724 );
1725 }
1726
1727 #[tokio::test]
1728 async fn test_describe_table() {
1729 let (namespace, _temp_dir) = create_test_namespace().await;
1730
1731 let schema = create_test_schema();
1733 let ipc_data = create_test_ipc_data(&schema);
1734
1735 let mut create_request = CreateTableRequest::new();
1736 create_request.id = Some(vec!["test_table".to_string()]);
1737 namespace
1738 .create_table(create_request, bytes::Bytes::from(ipc_data))
1739 .await
1740 .unwrap();
1741
1742 let mut request = DescribeTableRequest::new();
1744 request.id = Some(vec!["test_table".to_string()]);
1745 let response = namespace.describe_table(request).await.unwrap();
1746
1747 assert!(response.location.is_some());
1748 assert!(response.location.unwrap().ends_with("test_table.lance"));
1749 }
1750
1751 #[tokio::test]
1752 async fn test_describe_nonexistent_table() {
1753 let (namespace, _temp_dir) = create_test_namespace().await;
1754
1755 let mut request = DescribeTableRequest::new();
1756 request.id = Some(vec!["nonexistent".to_string()]);
1757
1758 let result = namespace.describe_table(request).await;
1759 assert!(result.is_err());
1760 assert!(result
1761 .unwrap_err()
1762 .to_string()
1763 .contains("Table does not exist"));
1764 }
1765
1766 #[tokio::test]
1767 async fn test_table_exists() {
1768 let (namespace, _temp_dir) = create_test_namespace().await;
1769
1770 let schema = create_test_schema();
1772 let ipc_data = create_test_ipc_data(&schema);
1773
1774 let mut create_request = CreateTableRequest::new();
1775 create_request.id = Some(vec!["existing_table".to_string()]);
1776 namespace
1777 .create_table(create_request, bytes::Bytes::from(ipc_data))
1778 .await
1779 .unwrap();
1780
1781 let mut request = TableExistsRequest::new();
1783 request.id = Some(vec!["existing_table".to_string()]);
1784 let result = namespace.table_exists(request).await;
1785 assert!(result.is_ok());
1786
1787 let mut request = TableExistsRequest::new();
1789 request.id = Some(vec!["nonexistent".to_string()]);
1790 let result = namespace.table_exists(request).await;
1791 assert!(result.is_err());
1792 assert!(result
1793 .unwrap_err()
1794 .to_string()
1795 .contains("Table does not exist"));
1796 }
1797
1798 #[tokio::test]
1799 async fn test_drop_table() {
1800 let (namespace, _temp_dir) = create_test_namespace().await;
1801
1802 let schema = create_test_schema();
1804 let ipc_data = create_test_ipc_data(&schema);
1805
1806 let mut create_request = CreateTableRequest::new();
1807 create_request.id = Some(vec!["table_to_drop".to_string()]);
1808 namespace
1809 .create_table(create_request, bytes::Bytes::from(ipc_data))
1810 .await
1811 .unwrap();
1812
1813 let mut exists_request = TableExistsRequest::new();
1815 exists_request.id = Some(vec!["table_to_drop".to_string()]);
1816 assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
1817
1818 let mut drop_request = DropTableRequest::new();
1820 drop_request.id = Some(vec!["table_to_drop".to_string()]);
1821 let response = namespace.drop_table(drop_request).await.unwrap();
1822 assert!(response.location.is_some());
1823
1824 assert!(namespace.table_exists(exists_request).await.is_err());
1826 }
1827
1828 #[tokio::test]
1829 async fn test_drop_nonexistent_table() {
1830 let (namespace, _temp_dir) = create_test_namespace().await;
1831
1832 let mut request = DropTableRequest::new();
1833 request.id = Some(vec!["nonexistent".to_string()]);
1834
1835 let result = namespace.drop_table(request).await;
1837 let _ = result;
1840 }
1841
1842 #[tokio::test]
1843 async fn test_root_namespace_operations() {
1844 let (namespace, _temp_dir) = create_test_namespace().await;
1845
1846 let mut request = ListNamespacesRequest::new();
1848 request.id = Some(vec![]);
1849 let result = namespace.list_namespaces(request).await;
1850 assert!(result.is_ok());
1851 assert_eq!(result.unwrap().namespaces.len(), 0);
1852
1853 let mut request = DescribeNamespaceRequest::new();
1855 request.id = Some(vec![]);
1856 let result = namespace.describe_namespace(request).await;
1857 assert!(result.is_ok());
1858
1859 let mut request = NamespaceExistsRequest::new();
1861 request.id = Some(vec![]);
1862 let result = namespace.namespace_exists(request).await;
1863 assert!(result.is_ok());
1864
1865 let mut request = CreateNamespaceRequest::new();
1867 request.id = Some(vec![]);
1868 let result = namespace.create_namespace(request).await;
1869 assert!(result.is_err());
1870 assert!(result.unwrap_err().to_string().contains("already exists"));
1871
1872 let mut request = DropNamespaceRequest::new();
1874 request.id = Some(vec![]);
1875 let result = namespace.drop_namespace(request).await;
1876 assert!(result.is_err());
1877 assert!(result
1878 .unwrap_err()
1879 .to_string()
1880 .contains("cannot be dropped"));
1881 }
1882
1883 #[tokio::test]
1884 async fn test_non_root_namespace_operations() {
1885 let (namespace, _temp_dir) = create_test_namespace().await;
1886
1887 let mut request = CreateNamespaceRequest::new();
1890 request.id = Some(vec!["child".to_string()]);
1891 let result = namespace.create_namespace(request).await;
1892 assert!(
1893 result.is_ok(),
1894 "Child namespace creation should succeed with manifest enabled"
1895 );
1896
1897 let mut request = NamespaceExistsRequest::new();
1899 request.id = Some(vec!["child".to_string()]);
1900 let result = namespace.namespace_exists(request).await;
1901 assert!(
1902 result.is_ok(),
1903 "Child namespace should exist after creation"
1904 );
1905
1906 let mut request = DropNamespaceRequest::new();
1908 request.id = Some(vec!["child".to_string()]);
1909 let result = namespace.drop_namespace(request).await;
1910 assert!(
1911 result.is_ok(),
1912 "Child namespace drop should succeed with manifest enabled"
1913 );
1914
1915 let mut request = NamespaceExistsRequest::new();
1917 request.id = Some(vec!["child".to_string()]);
1918 let result = namespace.namespace_exists(request).await;
1919 assert!(
1920 result.is_err(),
1921 "Child namespace should not exist after drop"
1922 );
1923 }
1924
1925 #[tokio::test]
1926 async fn test_config_custom_root() {
1927 let temp_dir = TempStdDir::default();
1928 let custom_path = temp_dir.join("custom");
1929 std::fs::create_dir(&custom_path).unwrap();
1930
1931 let namespace = DirectoryNamespaceBuilder::new(custom_path.to_string_lossy().to_string())
1932 .build()
1933 .await
1934 .unwrap();
1935
1936 let schema = create_test_schema();
1938 let ipc_data = create_test_ipc_data(&schema);
1939
1940 let mut request = CreateTableRequest::new();
1942 request.id = Some(vec!["test_table".to_string()]);
1943
1944 let response = namespace
1945 .create_table(request, bytes::Bytes::from(ipc_data))
1946 .await
1947 .unwrap();
1948
1949 assert!(response.location.unwrap().contains("custom"));
1950 }
1951
1952 #[tokio::test]
1953 async fn test_config_storage_options() {
1954 let temp_dir = TempStdDir::default();
1955
1956 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1957 .storage_option("option1", "value1")
1958 .storage_option("option2", "value2")
1959 .build()
1960 .await
1961 .unwrap();
1962
1963 let schema = create_test_schema();
1965 let ipc_data = create_test_ipc_data(&schema);
1966
1967 let mut request = CreateTableRequest::new();
1969 request.id = Some(vec!["test_table".to_string()]);
1970
1971 let response = namespace
1972 .create_table(request, bytes::Bytes::from(ipc_data))
1973 .await
1974 .unwrap();
1975
1976 let storage_options = response.storage_options.unwrap();
1977 assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
1978 assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
1979 }
1980
1981 #[tokio::test]
1982 async fn test_from_properties_manifest_enabled() {
1983 let temp_dir = TempStdDir::default();
1984
1985 let mut properties = HashMap::new();
1986 properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
1987 properties.insert("manifest_enabled".to_string(), "true".to_string());
1988 properties.insert("dir_listing_enabled".to_string(), "false".to_string());
1989
1990 let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
1991 assert!(builder.manifest_enabled);
1992 assert!(!builder.dir_listing_enabled);
1993
1994 let namespace = builder.build().await.unwrap();
1995
1996 let schema = create_test_schema();
1998 let ipc_data = create_test_ipc_data(&schema);
1999
2000 let mut request = CreateTableRequest::new();
2002 request.id = Some(vec!["test_table".to_string()]);
2003
2004 let response = namespace
2005 .create_table(request, bytes::Bytes::from(ipc_data))
2006 .await
2007 .unwrap();
2008
2009 assert!(response.location.is_some());
2010 }
2011
2012 #[tokio::test]
2013 async fn test_from_properties_dir_listing_enabled() {
2014 let temp_dir = TempStdDir::default();
2015
2016 let mut properties = HashMap::new();
2017 properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2018 properties.insert("manifest_enabled".to_string(), "false".to_string());
2019 properties.insert("dir_listing_enabled".to_string(), "true".to_string());
2020
2021 let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2022 assert!(!builder.manifest_enabled);
2023 assert!(builder.dir_listing_enabled);
2024
2025 let namespace = builder.build().await.unwrap();
2026
2027 let schema = create_test_schema();
2029 let ipc_data = create_test_ipc_data(&schema);
2030
2031 let mut request = CreateTableRequest::new();
2033 request.id = Some(vec!["test_table".to_string()]);
2034
2035 let response = namespace
2036 .create_table(request, bytes::Bytes::from(ipc_data))
2037 .await
2038 .unwrap();
2039
2040 assert!(response.location.is_some());
2041 }
2042
2043 #[tokio::test]
2044 async fn test_from_properties_defaults() {
2045 let temp_dir = TempStdDir::default();
2046
2047 let mut properties = HashMap::new();
2048 properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2049
2050 let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2051 assert!(builder.manifest_enabled);
2053 assert!(builder.dir_listing_enabled);
2054 }
2055
2056 #[tokio::test]
2057 async fn test_from_properties_with_storage_options() {
2058 let temp_dir = TempStdDir::default();
2059
2060 let mut properties = HashMap::new();
2061 properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2062 properties.insert("manifest_enabled".to_string(), "true".to_string());
2063 properties.insert("storage.region".to_string(), "us-west-2".to_string());
2064 properties.insert("storage.bucket".to_string(), "my-bucket".to_string());
2065
2066 let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2067 assert!(builder.manifest_enabled);
2068 assert!(builder.storage_options.is_some());
2069
2070 let storage_options = builder.storage_options.unwrap();
2071 assert_eq!(
2072 storage_options.get("region"),
2073 Some(&"us-west-2".to_string())
2074 );
2075 assert_eq!(
2076 storage_options.get("bucket"),
2077 Some(&"my-bucket".to_string())
2078 );
2079 }
2080
2081 #[tokio::test]
2082 async fn test_various_arrow_types() {
2083 let (namespace, _temp_dir) = create_test_namespace().await;
2084
2085 let fields = vec![
2087 JsonArrowField {
2088 name: "bool_col".to_string(),
2089 r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
2090 nullable: true,
2091 metadata: None,
2092 },
2093 JsonArrowField {
2094 name: "int8_col".to_string(),
2095 r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
2096 nullable: true,
2097 metadata: None,
2098 },
2099 JsonArrowField {
2100 name: "float64_col".to_string(),
2101 r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
2102 nullable: true,
2103 metadata: None,
2104 },
2105 JsonArrowField {
2106 name: "binary_col".to_string(),
2107 r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
2108 nullable: true,
2109 metadata: None,
2110 },
2111 ];
2112
2113 let schema = JsonArrowSchema {
2114 fields,
2115 metadata: None,
2116 };
2117
2118 let ipc_data = create_test_ipc_data(&schema);
2120
2121 let mut request = CreateTableRequest::new();
2122 request.id = Some(vec!["complex_table".to_string()]);
2123
2124 let response = namespace
2125 .create_table(request, bytes::Bytes::from(ipc_data))
2126 .await
2127 .unwrap();
2128
2129 assert!(response.location.is_some());
2130 }
2131
2132 #[tokio::test]
2133 async fn test_connect_dir() {
2134 let temp_dir = TempStdDir::default();
2135
2136 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
2137 .build()
2138 .await
2139 .unwrap();
2140
2141 let mut request = ListTablesRequest::new();
2143 request.id = Some(vec![]);
2144 let response = namespace.list_tables(request).await.unwrap();
2145 assert_eq!(response.tables.len(), 0);
2146 }
2147
2148 #[tokio::test]
2149 async fn test_create_table_with_ipc_data() {
2150 use arrow::array::{Int32Array, StringArray};
2151 use arrow::ipc::writer::StreamWriter;
2152
2153 let (namespace, _temp_dir) = create_test_namespace().await;
2154
2155 let schema = create_test_schema();
2157
2158 let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
2160 let arrow_schema = Arc::new(arrow_schema);
2161
2162 let id_array = Int32Array::from(vec![1, 2, 3]);
2164 let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
2165 let batch = arrow::record_batch::RecordBatch::try_new(
2166 arrow_schema.clone(),
2167 vec![Arc::new(id_array), Arc::new(name_array)],
2168 )
2169 .unwrap();
2170
2171 let mut buffer = Vec::new();
2173 {
2174 let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
2175 writer.write(&batch).unwrap();
2176 writer.finish().unwrap();
2177 }
2178
2179 let mut request = CreateTableRequest::new();
2181 request.id = Some(vec!["test_table_with_data".to_string()]);
2182
2183 let response = namespace
2184 .create_table(request, Bytes::from(buffer))
2185 .await
2186 .unwrap();
2187
2188 assert_eq!(response.version, Some(1));
2189 assert!(response
2190 .location
2191 .unwrap()
2192 .contains("test_table_with_data.lance"));
2193
2194 let mut exists_request = TableExistsRequest::new();
2196 exists_request.id = Some(vec!["test_table_with_data".to_string()]);
2197 namespace.table_exists(exists_request).await.unwrap();
2198 }
2199
2200 #[tokio::test]
2201 #[allow(deprecated)]
2202 async fn test_create_empty_table() {
2203 let (namespace, temp_dir) = create_test_namespace().await;
2204
2205 let mut request = CreateEmptyTableRequest::new();
2206 request.id = Some(vec!["empty_table".to_string()]);
2207
2208 let response = namespace.create_empty_table(request).await.unwrap();
2209
2210 assert!(response.location.is_some());
2211 assert!(response.location.unwrap().ends_with("empty_table.lance"));
2212
2213 let table_dir = temp_dir.join("empty_table.lance");
2215 assert!(table_dir.exists());
2216 assert!(table_dir.is_dir());
2217
2218 let reserved_file = table_dir.join(".lance-reserved");
2219 assert!(reserved_file.exists());
2220 assert!(reserved_file.is_file());
2221
2222 let metadata = std::fs::metadata(&reserved_file).unwrap();
2224 assert_eq!(metadata.len(), 0);
2225
2226 let mut exists_request = TableExistsRequest::new();
2228 exists_request.id = Some(vec!["empty_table".to_string()]);
2229 namespace.table_exists(exists_request).await.unwrap();
2230
2231 let mut list_request = ListTablesRequest::new();
2233 list_request.id = Some(vec![]);
2234 let list_response = namespace.list_tables(list_request).await.unwrap();
2235 assert!(list_response.tables.contains(&"empty_table".to_string()));
2236
2237 let mut describe_request = DescribeTableRequest::new();
2239 describe_request.id = Some(vec!["empty_table".to_string()]);
2240 let describe_response = namespace.describe_table(describe_request).await.unwrap();
2241 assert!(describe_response.location.is_some());
2242 assert!(describe_response.location.unwrap().contains("empty_table"));
2243 }
2244
2245 #[tokio::test]
2246 #[allow(deprecated)]
2247 async fn test_create_empty_table_with_wrong_location() {
2248 let (namespace, _temp_dir) = create_test_namespace().await;
2249
2250 let mut request = CreateEmptyTableRequest::new();
2251 request.id = Some(vec!["test_table".to_string()]);
2252 request.location = Some("/wrong/path/table.lance".to_string());
2253
2254 let result = namespace.create_empty_table(request).await;
2255 assert!(result.is_err());
2256 assert!(result
2257 .unwrap_err()
2258 .to_string()
2259 .contains("must be at location"));
2260 }
2261
2262 #[tokio::test]
2263 #[allow(deprecated)]
2264 async fn test_create_empty_table_then_drop() {
2265 let (namespace, temp_dir) = create_test_namespace().await;
2266
2267 let mut create_request = CreateEmptyTableRequest::new();
2269 create_request.id = Some(vec!["empty_table_to_drop".to_string()]);
2270
2271 let create_response = namespace.create_empty_table(create_request).await.unwrap();
2272 assert!(create_response.location.is_some());
2273
2274 let table_dir = temp_dir.join("empty_table_to_drop.lance");
2276 assert!(table_dir.exists());
2277 let reserved_file = table_dir.join(".lance-reserved");
2278 assert!(reserved_file.exists());
2279
2280 let mut drop_request = DropTableRequest::new();
2282 drop_request.id = Some(vec!["empty_table_to_drop".to_string()]);
2283 let drop_response = namespace.drop_table(drop_request).await.unwrap();
2284 assert!(drop_response.location.is_some());
2285
2286 assert!(!table_dir.exists());
2288 assert!(!reserved_file.exists());
2289
2290 let mut exists_request = TableExistsRequest::new();
2292 exists_request.id = Some(vec!["empty_table_to_drop".to_string()]);
2293 let exists_result = namespace.table_exists(exists_request).await;
2294 assert!(exists_result.is_err());
2295 }
2296
2297 #[tokio::test]
2298 async fn test_child_namespace_create_and_list() {
2299 let (namespace, _temp_dir) = create_test_namespace().await;
2300
2301 for i in 1..=3 {
2303 let mut create_req = CreateNamespaceRequest::new();
2304 create_req.id = Some(vec![format!("ns{}", i)]);
2305 let result = namespace.create_namespace(create_req).await;
2306 assert!(result.is_ok(), "Failed to create child namespace ns{}", i);
2307 }
2308
2309 let list_req = ListNamespacesRequest {
2311 id: Some(vec![]),
2312 ..Default::default()
2313 };
2314 let result = namespace.list_namespaces(list_req).await;
2315 assert!(result.is_ok());
2316 let namespaces = result.unwrap().namespaces;
2317 assert_eq!(namespaces.len(), 3);
2318 assert!(namespaces.contains(&"ns1".to_string()));
2319 assert!(namespaces.contains(&"ns2".to_string()));
2320 assert!(namespaces.contains(&"ns3".to_string()));
2321 }
2322
2323 #[tokio::test]
2324 async fn test_nested_namespace_hierarchy() {
2325 let (namespace, _temp_dir) = create_test_namespace().await;
2326
2327 let mut create_req = CreateNamespaceRequest::new();
2329 create_req.id = Some(vec!["parent".to_string()]);
2330 namespace.create_namespace(create_req).await.unwrap();
2331
2332 let mut create_req = CreateNamespaceRequest::new();
2334 create_req.id = Some(vec!["parent".to_string(), "child1".to_string()]);
2335 namespace.create_namespace(create_req).await.unwrap();
2336
2337 let mut create_req = CreateNamespaceRequest::new();
2338 create_req.id = Some(vec!["parent".to_string(), "child2".to_string()]);
2339 namespace.create_namespace(create_req).await.unwrap();
2340
2341 let list_req = ListNamespacesRequest {
2343 id: Some(vec!["parent".to_string()]),
2344 ..Default::default()
2345 };
2346 let result = namespace.list_namespaces(list_req).await;
2347 assert!(result.is_ok());
2348 let children = result.unwrap().namespaces;
2349 assert_eq!(children.len(), 2);
2350 assert!(children.contains(&"child1".to_string()));
2351 assert!(children.contains(&"child2".to_string()));
2352
2353 let list_req = ListNamespacesRequest {
2355 id: Some(vec![]),
2356 ..Default::default()
2357 };
2358 let result = namespace.list_namespaces(list_req).await;
2359 assert!(result.is_ok());
2360 let root_namespaces = result.unwrap().namespaces;
2361 assert_eq!(root_namespaces.len(), 1);
2362 assert_eq!(root_namespaces[0], "parent");
2363 }
2364
2365 #[tokio::test]
2366 async fn test_table_in_child_namespace() {
2367 let (namespace, _temp_dir) = create_test_namespace().await;
2368
2369 let mut create_ns_req = CreateNamespaceRequest::new();
2371 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2372 namespace.create_namespace(create_ns_req).await.unwrap();
2373
2374 let schema = create_test_schema();
2376 let ipc_data = create_test_ipc_data(&schema);
2377 let mut create_table_req = CreateTableRequest::new();
2378 create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2379 let result = namespace
2380 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2381 .await;
2382 assert!(result.is_ok(), "Failed to create table in child namespace");
2383
2384 let list_req = ListTablesRequest {
2386 id: Some(vec!["test_ns".to_string()]),
2387 ..Default::default()
2388 };
2389 let result = namespace.list_tables(list_req).await;
2390 assert!(result.is_ok());
2391 let tables = result.unwrap().tables;
2392 assert_eq!(tables.len(), 1);
2393 assert_eq!(tables[0], "table1");
2394
2395 let mut exists_req = TableExistsRequest::new();
2397 exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2398 let result = namespace.table_exists(exists_req).await;
2399 assert!(result.is_ok());
2400
2401 let mut describe_req = DescribeTableRequest::new();
2403 describe_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2404 let result = namespace.describe_table(describe_req).await;
2405 assert!(result.is_ok());
2406 let response = result.unwrap();
2407 assert!(response.location.is_some());
2408 }
2409
2410 #[tokio::test]
2411 async fn test_multiple_tables_in_child_namespace() {
2412 let (namespace, _temp_dir) = create_test_namespace().await;
2413
2414 let mut create_ns_req = CreateNamespaceRequest::new();
2416 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2417 namespace.create_namespace(create_ns_req).await.unwrap();
2418
2419 let schema = create_test_schema();
2421 let ipc_data = create_test_ipc_data(&schema);
2422 for i in 1..=3 {
2423 let mut create_table_req = CreateTableRequest::new();
2424 create_table_req.id = Some(vec!["test_ns".to_string(), format!("table{}", i)]);
2425 namespace
2426 .create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
2427 .await
2428 .unwrap();
2429 }
2430
2431 let list_req = ListTablesRequest {
2433 id: Some(vec!["test_ns".to_string()]),
2434 ..Default::default()
2435 };
2436 let result = namespace.list_tables(list_req).await;
2437 assert!(result.is_ok());
2438 let tables = result.unwrap().tables;
2439 assert_eq!(tables.len(), 3);
2440 assert!(tables.contains(&"table1".to_string()));
2441 assert!(tables.contains(&"table2".to_string()));
2442 assert!(tables.contains(&"table3".to_string()));
2443 }
2444
2445 #[tokio::test]
2446 async fn test_drop_table_in_child_namespace() {
2447 let (namespace, _temp_dir) = create_test_namespace().await;
2448
2449 let mut create_ns_req = CreateNamespaceRequest::new();
2451 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2452 namespace.create_namespace(create_ns_req).await.unwrap();
2453
2454 let schema = create_test_schema();
2456 let ipc_data = create_test_ipc_data(&schema);
2457 let mut create_table_req = CreateTableRequest::new();
2458 create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2459 namespace
2460 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2461 .await
2462 .unwrap();
2463
2464 let mut drop_req = DropTableRequest::new();
2466 drop_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2467 let result = namespace.drop_table(drop_req).await;
2468 assert!(result.is_ok(), "Failed to drop table in child namespace");
2469
2470 let mut exists_req = TableExistsRequest::new();
2472 exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2473 let result = namespace.table_exists(exists_req).await;
2474 assert!(result.is_err());
2475 }
2476
2477 #[tokio::test]
2478 #[allow(deprecated)]
2479 async fn test_empty_table_in_child_namespace() {
2480 let (namespace, _temp_dir) = create_test_namespace().await;
2481
2482 let mut create_ns_req = CreateNamespaceRequest::new();
2484 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2485 namespace.create_namespace(create_ns_req).await.unwrap();
2486
2487 let mut create_empty_req = CreateEmptyTableRequest::new();
2489 create_empty_req.id = Some(vec!["test_ns".to_string(), "empty_table".to_string()]);
2490 let result = namespace.create_empty_table(create_empty_req).await;
2491 assert!(
2492 result.is_ok(),
2493 "Failed to create empty table in child namespace"
2494 );
2495
2496 let mut exists_req = TableExistsRequest::new();
2498 exists_req.id = Some(vec!["test_ns".to_string(), "empty_table".to_string()]);
2499 let result = namespace.table_exists(exists_req).await;
2500 assert!(result.is_ok());
2501 }
2502
2503 #[tokio::test]
2504 async fn test_deeply_nested_namespace() {
2505 let (namespace, _temp_dir) = create_test_namespace().await;
2506
2507 let mut create_req = CreateNamespaceRequest::new();
2509 create_req.id = Some(vec!["level1".to_string()]);
2510 namespace.create_namespace(create_req).await.unwrap();
2511
2512 let mut create_req = CreateNamespaceRequest::new();
2513 create_req.id = Some(vec!["level1".to_string(), "level2".to_string()]);
2514 namespace.create_namespace(create_req).await.unwrap();
2515
2516 let mut create_req = CreateNamespaceRequest::new();
2517 create_req.id = Some(vec![
2518 "level1".to_string(),
2519 "level2".to_string(),
2520 "level3".to_string(),
2521 ]);
2522 namespace.create_namespace(create_req).await.unwrap();
2523
2524 let schema = create_test_schema();
2526 let ipc_data = create_test_ipc_data(&schema);
2527 let mut create_table_req = CreateTableRequest::new();
2528 create_table_req.id = Some(vec![
2529 "level1".to_string(),
2530 "level2".to_string(),
2531 "level3".to_string(),
2532 "table1".to_string(),
2533 ]);
2534 let result = namespace
2535 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2536 .await;
2537 assert!(
2538 result.is_ok(),
2539 "Failed to create table in deeply nested namespace"
2540 );
2541
2542 let mut exists_req = TableExistsRequest::new();
2544 exists_req.id = Some(vec![
2545 "level1".to_string(),
2546 "level2".to_string(),
2547 "level3".to_string(),
2548 "table1".to_string(),
2549 ]);
2550 let result = namespace.table_exists(exists_req).await;
2551 assert!(result.is_ok());
2552 }
2553
2554 #[tokio::test]
2555 async fn test_namespace_with_properties() {
2556 let (namespace, _temp_dir) = create_test_namespace().await;
2557
2558 let mut properties = HashMap::new();
2560 properties.insert("owner".to_string(), "test_user".to_string());
2561 properties.insert("description".to_string(), "Test namespace".to_string());
2562
2563 let mut create_req = CreateNamespaceRequest::new();
2564 create_req.id = Some(vec!["test_ns".to_string()]);
2565 create_req.properties = Some(properties.clone());
2566 namespace.create_namespace(create_req).await.unwrap();
2567
2568 let describe_req = DescribeNamespaceRequest {
2570 id: Some(vec!["test_ns".to_string()]),
2571 ..Default::default()
2572 };
2573 let result = namespace.describe_namespace(describe_req).await;
2574 assert!(result.is_ok());
2575 let response = result.unwrap();
2576 assert!(response.properties.is_some());
2577 let props = response.properties.unwrap();
2578 assert_eq!(props.get("owner"), Some(&"test_user".to_string()));
2579 assert_eq!(
2580 props.get("description"),
2581 Some(&"Test namespace".to_string())
2582 );
2583 }
2584
2585 #[tokio::test]
2586 async fn test_cannot_drop_namespace_with_tables() {
2587 let (namespace, _temp_dir) = create_test_namespace().await;
2588
2589 let mut create_ns_req = CreateNamespaceRequest::new();
2591 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2592 namespace.create_namespace(create_ns_req).await.unwrap();
2593
2594 let schema = create_test_schema();
2596 let ipc_data = create_test_ipc_data(&schema);
2597 let mut create_table_req = CreateTableRequest::new();
2598 create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2599 namespace
2600 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2601 .await
2602 .unwrap();
2603
2604 let mut drop_req = DropNamespaceRequest::new();
2606 drop_req.id = Some(vec!["test_ns".to_string()]);
2607 let result = namespace.drop_namespace(drop_req).await;
2608 assert!(
2609 result.is_err(),
2610 "Should not be able to drop namespace with tables"
2611 );
2612 }
2613
2614 #[tokio::test]
2615 async fn test_isolation_between_namespaces() {
2616 let (namespace, _temp_dir) = create_test_namespace().await;
2617
2618 let mut create_req = CreateNamespaceRequest::new();
2620 create_req.id = Some(vec!["ns1".to_string()]);
2621 namespace.create_namespace(create_req).await.unwrap();
2622
2623 let mut create_req = CreateNamespaceRequest::new();
2624 create_req.id = Some(vec!["ns2".to_string()]);
2625 namespace.create_namespace(create_req).await.unwrap();
2626
2627 let schema = create_test_schema();
2629 let ipc_data = create_test_ipc_data(&schema);
2630
2631 let mut create_table_req = CreateTableRequest::new();
2632 create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2633 namespace
2634 .create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
2635 .await
2636 .unwrap();
2637
2638 let mut create_table_req = CreateTableRequest::new();
2639 create_table_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
2640 namespace
2641 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2642 .await
2643 .unwrap();
2644
2645 let list_req = ListTablesRequest {
2647 id: Some(vec!["ns1".to_string()]),
2648 page_token: None,
2649 limit: None,
2650 ..Default::default()
2651 };
2652 let result = namespace.list_tables(list_req).await.unwrap();
2653 assert_eq!(result.tables.len(), 1);
2654 assert_eq!(result.tables[0], "table1");
2655
2656 let list_req = ListTablesRequest {
2657 id: Some(vec!["ns2".to_string()]),
2658 page_token: None,
2659 limit: None,
2660 ..Default::default()
2661 };
2662 let result = namespace.list_tables(list_req).await.unwrap();
2663 assert_eq!(result.tables.len(), 1);
2664 assert_eq!(result.tables[0], "table1");
2665
2666 let mut drop_req = DropTableRequest::new();
2668 drop_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2669 namespace.drop_table(drop_req).await.unwrap();
2670
2671 let mut exists_req = TableExistsRequest::new();
2673 exists_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2674 assert!(namespace.table_exists(exists_req).await.is_err());
2675
2676 let mut exists_req = TableExistsRequest::new();
2677 exists_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
2678 assert!(namespace.table_exists(exists_req).await.is_ok());
2679 }
2680
2681 #[tokio::test]
2682 async fn test_migrate_directory_tables() {
2683 let temp_dir = TempStdDir::default();
2684 let temp_path = temp_dir.to_str().unwrap();
2685
2686 let dir_only_ns = DirectoryNamespaceBuilder::new(temp_path)
2688 .manifest_enabled(false)
2689 .dir_listing_enabled(true)
2690 .build()
2691 .await
2692 .unwrap();
2693
2694 let schema = create_test_schema();
2696 let ipc_data = create_test_ipc_data(&schema);
2697
2698 for i in 1..=3 {
2699 let mut create_req = CreateTableRequest::new();
2700 create_req.id = Some(vec![format!("table{}", i)]);
2701 dir_only_ns
2702 .create_table(create_req, bytes::Bytes::from(ipc_data.clone()))
2703 .await
2704 .unwrap();
2705 }
2706
2707 drop(dir_only_ns);
2708
2709 let dual_mode_ns = DirectoryNamespaceBuilder::new(temp_path)
2711 .manifest_enabled(true)
2712 .dir_listing_enabled(true)
2713 .build()
2714 .await
2715 .unwrap();
2716
2717 let mut list_req = ListTablesRequest::new();
2719 list_req.id = Some(vec![]);
2720 let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
2721 assert_eq!(tables.len(), 3);
2722
2723 let migrated_count = dual_mode_ns.migrate().await.unwrap();
2725 assert_eq!(migrated_count, 3, "Should migrate all 3 tables");
2726
2727 let mut list_req = ListTablesRequest::new();
2729 list_req.id = Some(vec![]);
2730 let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
2731 assert_eq!(tables.len(), 3);
2732
2733 let migrated_count = dual_mode_ns.migrate().await.unwrap();
2735 assert_eq!(
2736 migrated_count, 0,
2737 "Should not migrate already-migrated tables"
2738 );
2739
2740 drop(dual_mode_ns);
2741
2742 let manifest_only_ns = DirectoryNamespaceBuilder::new(temp_path)
2744 .manifest_enabled(true)
2745 .dir_listing_enabled(false)
2746 .build()
2747 .await
2748 .unwrap();
2749
2750 let mut list_req = ListTablesRequest::new();
2752 list_req.id = Some(vec![]);
2753 let tables = manifest_only_ns.list_tables(list_req).await.unwrap().tables;
2754 assert_eq!(tables.len(), 3);
2755 assert!(tables.contains(&"table1".to_string()));
2756 assert!(tables.contains(&"table2".to_string()));
2757 assert!(tables.contains(&"table3".to_string()));
2758 }
2759
2760 #[tokio::test]
2761 async fn test_migrate_without_manifest() {
2762 let temp_dir = TempStdDir::default();
2763 let temp_path = temp_dir.to_str().unwrap();
2764
2765 let namespace = DirectoryNamespaceBuilder::new(temp_path)
2767 .manifest_enabled(false)
2768 .dir_listing_enabled(true)
2769 .build()
2770 .await
2771 .unwrap();
2772
2773 let migrated_count = namespace.migrate().await.unwrap();
2775 assert_eq!(migrated_count, 0);
2776 }
2777
2778 #[tokio::test]
2779 async fn test_register_table() {
2780 use lance_namespace::models::{RegisterTableRequest, TableExistsRequest};
2781
2782 let temp_dir = TempStdDir::default();
2783 let temp_path = temp_dir.to_str().unwrap();
2784
2785 let namespace = DirectoryNamespaceBuilder::new(temp_path)
2786 .build()
2787 .await
2788 .unwrap();
2789
2790 let schema = create_test_schema();
2792 let ipc_data = create_test_ipc_data(&schema);
2793
2794 let table_uri = format!("{}/external_table.lance", temp_path);
2795 let cursor = Cursor::new(ipc_data);
2796 let stream_reader = StreamReader::try_new(cursor, None).unwrap();
2797 let batches: Vec<_> = stream_reader
2798 .collect::<std::result::Result<Vec<_>, _>>()
2799 .unwrap();
2800 let schema = batches[0].schema();
2801 let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
2802 let reader = RecordBatchIterator::new(batch_results, schema);
2803 Dataset::write(Box::new(reader), &table_uri, None)
2804 .await
2805 .unwrap();
2806
2807 let mut register_req = RegisterTableRequest::new("external_table.lance".to_string());
2809 register_req.id = Some(vec!["registered_table".to_string()]);
2810
2811 let response = namespace.register_table(register_req).await.unwrap();
2812 assert_eq!(response.location, Some("external_table.lance".to_string()));
2813
2814 let mut exists_req = TableExistsRequest::new();
2816 exists_req.id = Some(vec!["registered_table".to_string()]);
2817 assert!(namespace.table_exists(exists_req).await.is_ok());
2818
2819 let mut list_req = ListTablesRequest::new();
2821 list_req.id = Some(vec![]);
2822 let tables = namespace.list_tables(list_req).await.unwrap();
2823 assert!(tables.tables.contains(&"registered_table".to_string()));
2824 }
2825
2826 #[tokio::test]
2827 async fn test_register_table_duplicate_fails() {
2828 use lance_namespace::models::RegisterTableRequest;
2829
2830 let temp_dir = TempStdDir::default();
2831 let temp_path = temp_dir.to_str().unwrap();
2832
2833 let namespace = DirectoryNamespaceBuilder::new(temp_path)
2834 .build()
2835 .await
2836 .unwrap();
2837
2838 let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
2840 register_req.id = Some(vec!["test_table".to_string()]);
2841
2842 namespace
2843 .register_table(register_req.clone())
2844 .await
2845 .unwrap();
2846
2847 let result = namespace.register_table(register_req).await;
2849 assert!(result.is_err());
2850 assert!(result.unwrap_err().to_string().contains("already exists"));
2851 }
2852
2853 #[tokio::test]
2854 async fn test_deregister_table() {
2855 use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
2856
2857 let temp_dir = TempStdDir::default();
2858 let temp_path = temp_dir.to_str().unwrap();
2859
2860 let namespace = DirectoryNamespaceBuilder::new(temp_path)
2863 .manifest_enabled(true)
2864 .dir_listing_enabled(false)
2865 .build()
2866 .await
2867 .unwrap();
2868
2869 let schema = create_test_schema();
2871 let ipc_data = create_test_ipc_data(&schema);
2872
2873 let mut create_req = CreateTableRequest::new();
2874 create_req.id = Some(vec!["test_table".to_string()]);
2875 namespace
2876 .create_table(create_req, bytes::Bytes::from(ipc_data))
2877 .await
2878 .unwrap();
2879
2880 let mut exists_req = TableExistsRequest::new();
2882 exists_req.id = Some(vec!["test_table".to_string()]);
2883 assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
2884
2885 let mut deregister_req = DeregisterTableRequest::new();
2887 deregister_req.id = Some(vec!["test_table".to_string()]);
2888 let response = namespace.deregister_table(deregister_req).await.unwrap();
2889
2890 assert!(
2892 response.location.is_some(),
2893 "Deregister should return location"
2894 );
2895 let location = response.location.as_ref().unwrap();
2896 let expected_url = lance_io::object_store::uri_to_url(temp_path)
2899 .expect("Failed to convert temp path to URL");
2900 let expected_prefix = expected_url.to_string();
2901 assert!(
2902 location.starts_with(&expected_prefix),
2903 "Location should start with '{}', got: {}",
2904 expected_prefix,
2905 location
2906 );
2907 assert!(
2908 location.contains("test_table"),
2909 "Location should contain table name: {}",
2910 location
2911 );
2912 assert_eq!(response.id, Some(vec!["test_table".to_string()]));
2913
2914 assert!(namespace.table_exists(exists_req).await.is_err());
2916
2917 let dataset = Dataset::open(location).await;
2919 assert!(
2920 dataset.is_ok(),
2921 "Physical table data should still exist at {}",
2922 location
2923 );
2924 }
2925
2926 #[tokio::test]
2927 async fn test_deregister_table_in_child_namespace() {
2928 use lance_namespace::models::{
2929 CreateNamespaceRequest, DeregisterTableRequest, TableExistsRequest,
2930 };
2931
2932 let temp_dir = TempStdDir::default();
2933 let temp_path = temp_dir.to_str().unwrap();
2934
2935 let namespace = DirectoryNamespaceBuilder::new(temp_path)
2936 .build()
2937 .await
2938 .unwrap();
2939
2940 let mut create_ns_req = CreateNamespaceRequest::new();
2942 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2943 namespace.create_namespace(create_ns_req).await.unwrap();
2944
2945 let schema = create_test_schema();
2947 let ipc_data = create_test_ipc_data(&schema);
2948
2949 let mut create_req = CreateTableRequest::new();
2950 create_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
2951 namespace
2952 .create_table(create_req, bytes::Bytes::from(ipc_data))
2953 .await
2954 .unwrap();
2955
2956 let mut deregister_req = DeregisterTableRequest::new();
2958 deregister_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
2959 let response = namespace.deregister_table(deregister_req).await.unwrap();
2960
2961 assert!(
2963 response.location.is_some(),
2964 "Deregister should return location"
2965 );
2966 let location = response.location.as_ref().unwrap();
2967 let expected_url = lance_io::object_store::uri_to_url(temp_path)
2970 .expect("Failed to convert temp path to URL");
2971 let expected_prefix = expected_url.to_string();
2972 assert!(
2973 location.starts_with(&expected_prefix),
2974 "Location should start with '{}', got: {}",
2975 expected_prefix,
2976 location
2977 );
2978 assert!(
2979 location.contains("test_ns") && location.contains("test_table"),
2980 "Location should contain namespace and table name: {}",
2981 location
2982 );
2983 assert_eq!(
2984 response.id,
2985 Some(vec!["test_ns".to_string(), "test_table".to_string()])
2986 );
2987
2988 let mut exists_req = TableExistsRequest::new();
2990 exists_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
2991 assert!(namespace.table_exists(exists_req).await.is_err());
2992 }
2993
2994 #[tokio::test]
2995 async fn test_register_without_manifest_fails() {
2996 use lance_namespace::models::RegisterTableRequest;
2997
2998 let temp_dir = TempStdDir::default();
2999 let temp_path = temp_dir.to_str().unwrap();
3000
3001 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3003 .manifest_enabled(false)
3004 .build()
3005 .await
3006 .unwrap();
3007
3008 let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
3010 register_req.id = Some(vec!["test_table".to_string()]);
3011 let result = namespace.register_table(register_req).await;
3012 assert!(result.is_err());
3013 assert!(result
3014 .unwrap_err()
3015 .to_string()
3016 .contains("manifest mode is enabled"));
3017
3018 }
3021
3022 #[tokio::test]
3023 async fn test_register_table_rejects_absolute_uri() {
3024 use lance_namespace::models::RegisterTableRequest;
3025
3026 let temp_dir = TempStdDir::default();
3027 let temp_path = temp_dir.to_str().unwrap();
3028
3029 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3030 .build()
3031 .await
3032 .unwrap();
3033
3034 let mut register_req = RegisterTableRequest::new("s3://bucket/table.lance".to_string());
3036 register_req.id = Some(vec!["test_table".to_string()]);
3037 let result = namespace.register_table(register_req).await;
3038 assert!(result.is_err());
3039 let err_msg = result.unwrap_err().to_string();
3040 assert!(err_msg.contains("Absolute URIs are not allowed"));
3041 }
3042
3043 #[tokio::test]
3044 async fn test_register_table_rejects_absolute_path() {
3045 use lance_namespace::models::RegisterTableRequest;
3046
3047 let temp_dir = TempStdDir::default();
3048 let temp_path = temp_dir.to_str().unwrap();
3049
3050 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3051 .build()
3052 .await
3053 .unwrap();
3054
3055 let mut register_req = RegisterTableRequest::new("/tmp/table.lance".to_string());
3057 register_req.id = Some(vec!["test_table".to_string()]);
3058 let result = namespace.register_table(register_req).await;
3059 assert!(result.is_err());
3060 let err_msg = result.unwrap_err().to_string();
3061 assert!(err_msg.contains("Absolute paths are not allowed"));
3062 }
3063
3064 #[tokio::test]
3065 async fn test_register_table_rejects_path_traversal() {
3066 use lance_namespace::models::RegisterTableRequest;
3067
3068 let temp_dir = TempStdDir::default();
3069 let temp_path = temp_dir.to_str().unwrap();
3070
3071 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3072 .build()
3073 .await
3074 .unwrap();
3075
3076 let mut register_req = RegisterTableRequest::new("../outside/table.lance".to_string());
3078 register_req.id = Some(vec!["test_table".to_string()]);
3079 let result = namespace.register_table(register_req).await;
3080 assert!(result.is_err());
3081 let err_msg = result.unwrap_err().to_string();
3082 assert!(err_msg.contains("Path traversal is not allowed"));
3083 }
3084
3085 #[tokio::test]
3086 async fn test_namespace_write() {
3087 use arrow::array::Int32Array;
3088 use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
3089 use arrow::record_batch::{RecordBatch, RecordBatchIterator};
3090 use lance::dataset::{Dataset, WriteMode, WriteParams};
3091 use lance_namespace::LanceNamespace;
3092
3093 let (namespace, _temp_dir) = create_test_namespace().await;
3094 let namespace = Arc::new(namespace) as Arc<dyn LanceNamespace>;
3095
3096 let table_id = vec!["test_ns".to_string(), "test_table".to_string()];
3098 let schema = Arc::new(ArrowSchema::new(vec![
3099 ArrowField::new("a", DataType::Int32, false),
3100 ArrowField::new("b", DataType::Int32, false),
3101 ]));
3102
3103 let data1 = RecordBatch::try_new(
3105 schema.clone(),
3106 vec![
3107 Arc::new(Int32Array::from(vec![1, 2, 3])),
3108 Arc::new(Int32Array::from(vec![10, 20, 30])),
3109 ],
3110 )
3111 .unwrap();
3112
3113 let reader1 = RecordBatchIterator::new(vec![data1].into_iter().map(Ok), schema.clone());
3114 let dataset =
3115 Dataset::write_into_namespace(reader1, namespace.clone(), table_id.clone(), None)
3116 .await
3117 .unwrap();
3118
3119 assert_eq!(dataset.count_rows(None).await.unwrap(), 3);
3120 assert_eq!(dataset.version().version, 1);
3121
3122 let data2 = RecordBatch::try_new(
3124 schema.clone(),
3125 vec![
3126 Arc::new(Int32Array::from(vec![4, 5])),
3127 Arc::new(Int32Array::from(vec![40, 50])),
3128 ],
3129 )
3130 .unwrap();
3131
3132 let params_append = WriteParams {
3133 mode: WriteMode::Append,
3134 ..Default::default()
3135 };
3136
3137 let reader2 = RecordBatchIterator::new(vec![data2].into_iter().map(Ok), schema.clone());
3138 let dataset = Dataset::write_into_namespace(
3139 reader2,
3140 namespace.clone(),
3141 table_id.clone(),
3142 Some(params_append),
3143 )
3144 .await
3145 .unwrap();
3146
3147 assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
3148 assert_eq!(dataset.version().version, 2);
3149
3150 let data3 = RecordBatch::try_new(
3152 schema.clone(),
3153 vec![
3154 Arc::new(Int32Array::from(vec![100, 200])),
3155 Arc::new(Int32Array::from(vec![1000, 2000])),
3156 ],
3157 )
3158 .unwrap();
3159
3160 let params_overwrite = WriteParams {
3161 mode: WriteMode::Overwrite,
3162 ..Default::default()
3163 };
3164
3165 let reader3 = RecordBatchIterator::new(vec![data3].into_iter().map(Ok), schema.clone());
3166 let dataset = Dataset::write_into_namespace(
3167 reader3,
3168 namespace.clone(),
3169 table_id.clone(),
3170 Some(params_overwrite),
3171 )
3172 .await
3173 .unwrap();
3174
3175 assert_eq!(dataset.count_rows(None).await.unwrap(), 2);
3176 assert_eq!(dataset.version().version, 3);
3177
3178 let result = dataset.scan().try_into_batch().await.unwrap();
3180 let a_col = result
3181 .column_by_name("a")
3182 .unwrap()
3183 .as_any()
3184 .downcast_ref::<Int32Array>()
3185 .unwrap();
3186 assert_eq!(a_col.values(), &[100, 200]);
3187 }
3188
3189 #[tokio::test]
3194 async fn test_declare_table_v1_mode() {
3195 use lance_namespace::models::{
3196 DeclareTableRequest, DescribeTableRequest, TableExistsRequest,
3197 };
3198
3199 let temp_dir = TempStdDir::default();
3200 let temp_path = temp_dir.to_str().unwrap();
3201
3202 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3204 .manifest_enabled(false)
3205 .build()
3206 .await
3207 .unwrap();
3208
3209 let mut declare_req = DeclareTableRequest::new();
3211 declare_req.id = Some(vec!["test_table".to_string()]);
3212 let response = namespace.declare_table(declare_req).await.unwrap();
3213
3214 assert!(response.location.is_some());
3216 let location = response.location.as_ref().unwrap();
3217 assert!(location.ends_with("test_table.lance"));
3218
3219 let mut exists_req = TableExistsRequest::new();
3221 exists_req.id = Some(vec!["test_table".to_string()]);
3222 assert!(namespace.table_exists(exists_req).await.is_ok());
3223
3224 let mut describe_req = DescribeTableRequest::new();
3226 describe_req.id = Some(vec!["test_table".to_string()]);
3227 let describe_response = namespace.describe_table(describe_req).await.unwrap();
3228 assert!(describe_response.location.is_some());
3229 assert!(describe_response.version.is_none()); assert!(describe_response.schema.is_none()); }
3232
3233 #[tokio::test]
3234 async fn test_declare_table_with_manifest() {
3235 use lance_namespace::models::{DeclareTableRequest, TableExistsRequest};
3236
3237 let temp_dir = TempStdDir::default();
3238 let temp_path = temp_dir.to_str().unwrap();
3239
3240 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3242 .manifest_enabled(true)
3243 .dir_listing_enabled(false)
3244 .build()
3245 .await
3246 .unwrap();
3247
3248 let mut declare_req = DeclareTableRequest::new();
3250 declare_req.id = Some(vec!["test_table".to_string()]);
3251 let response = namespace.declare_table(declare_req).await.unwrap();
3252
3253 assert!(response.location.is_some());
3255
3256 let mut exists_req = TableExistsRequest::new();
3258 exists_req.id = Some(vec!["test_table".to_string()]);
3259 assert!(namespace.table_exists(exists_req).await.is_ok());
3260 }
3261
3262 #[tokio::test]
3263 async fn test_declare_table_when_table_exists() {
3264 use lance_namespace::models::DeclareTableRequest;
3265
3266 let temp_dir = TempStdDir::default();
3267 let temp_path = temp_dir.to_str().unwrap();
3268
3269 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3270 .manifest_enabled(false)
3271 .build()
3272 .await
3273 .unwrap();
3274
3275 let schema = create_test_schema();
3277 let ipc_data = create_test_ipc_data(&schema);
3278 let mut create_req = CreateTableRequest::new();
3279 create_req.id = Some(vec!["test_table".to_string()]);
3280 namespace
3281 .create_table(create_req, bytes::Bytes::from(ipc_data))
3282 .await
3283 .unwrap();
3284
3285 let mut declare_req = DeclareTableRequest::new();
3287 declare_req.id = Some(vec!["test_table".to_string()]);
3288 let result = namespace.declare_table(declare_req).await;
3289 assert!(result.is_err());
3290 }
3291
3292 #[tokio::test]
3297 async fn test_deregister_table_v1_mode() {
3298 use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
3299
3300 let temp_dir = TempStdDir::default();
3301 let temp_path = temp_dir.to_str().unwrap();
3302
3303 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3305 .manifest_enabled(false)
3306 .dir_listing_enabled(true)
3307 .build()
3308 .await
3309 .unwrap();
3310
3311 let schema = create_test_schema();
3313 let ipc_data = create_test_ipc_data(&schema);
3314 let mut create_req = CreateTableRequest::new();
3315 create_req.id = Some(vec!["test_table".to_string()]);
3316 namespace
3317 .create_table(create_req, bytes::Bytes::from(ipc_data))
3318 .await
3319 .unwrap();
3320
3321 let mut exists_req = TableExistsRequest::new();
3323 exists_req.id = Some(vec!["test_table".to_string()]);
3324 assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
3325
3326 let mut deregister_req = DeregisterTableRequest::new();
3328 deregister_req.id = Some(vec!["test_table".to_string()]);
3329 let response = namespace.deregister_table(deregister_req).await.unwrap();
3330
3331 assert!(response.location.is_some());
3333 let location = response.location.as_ref().unwrap();
3334 assert!(location.contains("test_table"));
3335
3336 let result = namespace.table_exists(exists_req).await;
3338 assert!(result.is_err());
3339 assert!(result.unwrap_err().to_string().contains("deregistered"));
3340
3341 let dataset = Dataset::open(location).await;
3343 assert!(dataset.is_ok(), "Physical table data should still exist");
3344 }
3345
3346 #[tokio::test]
3347 async fn test_deregister_table_v1_already_deregistered() {
3348 use lance_namespace::models::DeregisterTableRequest;
3349
3350 let temp_dir = TempStdDir::default();
3351 let temp_path = temp_dir.to_str().unwrap();
3352
3353 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3354 .manifest_enabled(false)
3355 .dir_listing_enabled(true)
3356 .build()
3357 .await
3358 .unwrap();
3359
3360 let schema = create_test_schema();
3362 let ipc_data = create_test_ipc_data(&schema);
3363 let mut create_req = CreateTableRequest::new();
3364 create_req.id = Some(vec!["test_table".to_string()]);
3365 namespace
3366 .create_table(create_req, bytes::Bytes::from(ipc_data))
3367 .await
3368 .unwrap();
3369
3370 let mut deregister_req = DeregisterTableRequest::new();
3372 deregister_req.id = Some(vec!["test_table".to_string()]);
3373 namespace
3374 .deregister_table(deregister_req.clone())
3375 .await
3376 .unwrap();
3377
3378 let result = namespace.deregister_table(deregister_req).await;
3380 assert!(result.is_err());
3381 assert!(result
3382 .unwrap_err()
3383 .to_string()
3384 .contains("already deregistered"));
3385 }
3386
3387 #[tokio::test]
3392 async fn test_list_tables_skips_deregistered_v1() {
3393 use lance_namespace::models::DeregisterTableRequest;
3394
3395 let temp_dir = TempStdDir::default();
3396 let temp_path = temp_dir.to_str().unwrap();
3397
3398 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3399 .manifest_enabled(false)
3400 .dir_listing_enabled(true)
3401 .build()
3402 .await
3403 .unwrap();
3404
3405 let schema = create_test_schema();
3407 let ipc_data = create_test_ipc_data(&schema);
3408
3409 let mut create_req1 = CreateTableRequest::new();
3410 create_req1.id = Some(vec!["table1".to_string()]);
3411 namespace
3412 .create_table(create_req1, bytes::Bytes::from(ipc_data.clone()))
3413 .await
3414 .unwrap();
3415
3416 let mut create_req2 = CreateTableRequest::new();
3417 create_req2.id = Some(vec!["table2".to_string()]);
3418 namespace
3419 .create_table(create_req2, bytes::Bytes::from(ipc_data))
3420 .await
3421 .unwrap();
3422
3423 let mut list_req = ListTablesRequest::new();
3425 list_req.id = Some(vec![]);
3426 let list_response = namespace.list_tables(list_req.clone()).await.unwrap();
3427 assert_eq!(list_response.tables.len(), 2);
3428
3429 let mut deregister_req = DeregisterTableRequest::new();
3431 deregister_req.id = Some(vec!["table1".to_string()]);
3432 namespace.deregister_table(deregister_req).await.unwrap();
3433
3434 let list_response = namespace.list_tables(list_req).await.unwrap();
3436 assert_eq!(list_response.tables.len(), 1);
3437 assert!(list_response.tables.contains(&"table2".to_string()));
3438 assert!(!list_response.tables.contains(&"table1".to_string()));
3439 }
3440
3441 #[tokio::test]
3446 async fn test_describe_table_fails_for_deregistered_v1() {
3447 use lance_namespace::models::{DeregisterTableRequest, DescribeTableRequest};
3448
3449 let temp_dir = TempStdDir::default();
3450 let temp_path = temp_dir.to_str().unwrap();
3451
3452 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3453 .manifest_enabled(false)
3454 .dir_listing_enabled(true)
3455 .build()
3456 .await
3457 .unwrap();
3458
3459 let schema = create_test_schema();
3461 let ipc_data = create_test_ipc_data(&schema);
3462 let mut create_req = CreateTableRequest::new();
3463 create_req.id = Some(vec!["test_table".to_string()]);
3464 namespace
3465 .create_table(create_req, bytes::Bytes::from(ipc_data))
3466 .await
3467 .unwrap();
3468
3469 let mut describe_req = DescribeTableRequest::new();
3471 describe_req.id = Some(vec!["test_table".to_string()]);
3472 assert!(namespace.describe_table(describe_req.clone()).await.is_ok());
3473
3474 let mut deregister_req = DeregisterTableRequest::new();
3476 deregister_req.id = Some(vec!["test_table".to_string()]);
3477 namespace.deregister_table(deregister_req).await.unwrap();
3478
3479 let result = namespace.describe_table(describe_req).await;
3481 assert!(result.is_err());
3482 assert!(result.unwrap_err().to_string().contains("deregistered"));
3483 }
3484
3485 #[tokio::test]
3486 async fn test_table_exists_fails_for_deregistered_v1() {
3487 use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
3488
3489 let temp_dir = TempStdDir::default();
3490 let temp_path = temp_dir.to_str().unwrap();
3491
3492 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3493 .manifest_enabled(false)
3494 .dir_listing_enabled(true)
3495 .build()
3496 .await
3497 .unwrap();
3498
3499 let schema = create_test_schema();
3501 let ipc_data = create_test_ipc_data(&schema);
3502 let mut create_req = CreateTableRequest::new();
3503 create_req.id = Some(vec!["test_table".to_string()]);
3504 namespace
3505 .create_table(create_req, bytes::Bytes::from(ipc_data))
3506 .await
3507 .unwrap();
3508
3509 let mut exists_req = TableExistsRequest::new();
3511 exists_req.id = Some(vec!["test_table".to_string()]);
3512 assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
3513
3514 let mut deregister_req = DeregisterTableRequest::new();
3516 deregister_req.id = Some(vec!["test_table".to_string()]);
3517 namespace.deregister_table(deregister_req).await.unwrap();
3518
3519 let result = namespace.table_exists(exists_req).await;
3521 assert!(result.is_err());
3522 assert!(result.unwrap_err().to_string().contains("deregistered"));
3523 }
3524
3525 #[tokio::test]
3526 async fn test_atomic_table_status_check() {
3527 let temp_dir = TempStdDir::default();
3531 let temp_path = temp_dir.to_str().unwrap();
3532
3533 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3534 .manifest_enabled(false)
3535 .dir_listing_enabled(true)
3536 .build()
3537 .await
3538 .unwrap();
3539
3540 let schema = create_test_schema();
3542 let ipc_data = create_test_ipc_data(&schema);
3543 let mut create_req = CreateTableRequest::new();
3544 create_req.id = Some(vec!["test_table".to_string()]);
3545 namespace
3546 .create_table(create_req, bytes::Bytes::from(ipc_data))
3547 .await
3548 .unwrap();
3549
3550 let status = namespace.check_table_status("test_table").await;
3552 assert!(status.exists);
3553 assert!(!status.is_deregistered);
3554 assert!(!status.has_reserved_file);
3555 }
3556}