1pub mod manifest;
10
11use arrow::record_batch::RecordBatchIterator;
12use arrow_ipc::reader::StreamReader;
13use async_trait::async_trait;
14use bytes::Bytes;
15use futures::TryStreamExt;
16use lance::dataset::builder::DatasetBuilder;
17use lance::dataset::{Dataset, WriteParams};
18use lance::session::Session;
19use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
20use lance_table::io::commit::ManifestNamingScheme;
21use object_store::path::Path;
22use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, PutMode, PutOptions};
23use std::collections::HashMap;
24use std::io::Cursor;
25use std::sync::Arc;
26
27use crate::context::DynamicContextProvider;
28use lance_namespace::models::{
29 BatchDeleteTableVersionsRequest, BatchDeleteTableVersionsResponse, CreateNamespaceRequest,
30 CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, CreateTableVersionRequest,
31 CreateTableVersionResponse, DeclareTableRequest, DeclareTableResponse,
32 DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
33 DescribeTableResponse, DescribeTableVersionRequest, DescribeTableVersionResponse,
34 DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, DropTableResponse, Identity,
35 ListNamespacesRequest, ListNamespacesResponse, ListTableVersionsRequest,
36 ListTableVersionsResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest,
37 TableExistsRequest, TableVersion,
38};
39
40use lance_core::{Error, Result, box_error};
41use lance_namespace::LanceNamespace;
42use lance_namespace::schema::arrow_schema_to_json;
43
44use crate::credentials::{
45 CredentialVendor, create_credential_vendor_for_location, has_credential_vendor_config,
46};
47
48pub(crate) struct TableStatus {
53 pub(crate) exists: bool,
55 pub(crate) is_deregistered: bool,
57 pub(crate) has_reserved_file: bool,
59}
60
61#[derive(Clone)]
96pub struct DirectoryNamespaceBuilder {
97 root: String,
98 storage_options: Option<HashMap<String, String>>,
99 session: Option<Arc<Session>>,
100 manifest_enabled: bool,
101 dir_listing_enabled: bool,
102 inline_optimization_enabled: bool,
103 table_version_tracking_enabled: bool,
104 credential_vendor_properties: HashMap<String, String>,
105 context_provider: Option<Arc<dyn DynamicContextProvider>>,
106 commit_retries: Option<u32>,
107}
108
109impl std::fmt::Debug for DirectoryNamespaceBuilder {
110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 f.debug_struct("DirectoryNamespaceBuilder")
112 .field("root", &self.root)
113 .field("storage_options", &self.storage_options)
114 .field("manifest_enabled", &self.manifest_enabled)
115 .field("dir_listing_enabled", &self.dir_listing_enabled)
116 .field(
117 "inline_optimization_enabled",
118 &self.inline_optimization_enabled,
119 )
120 .field(
121 "table_version_tracking_enabled",
122 &self.table_version_tracking_enabled,
123 )
124 .field(
125 "context_provider",
126 &self.context_provider.as_ref().map(|_| "Some(...)"),
127 )
128 .finish()
129 }
130}
131
132impl DirectoryNamespaceBuilder {
133 pub fn new(root: impl Into<String>) -> Self {
139 Self {
140 root: root.into().trim_end_matches('/').to_string(),
141 storage_options: None,
142 session: None,
143 manifest_enabled: true,
144 dir_listing_enabled: true, inline_optimization_enabled: true,
146 table_version_tracking_enabled: false, credential_vendor_properties: HashMap::new(),
148 context_provider: None,
149 commit_retries: None,
150 }
151 }
152
153 pub fn manifest_enabled(mut self, enabled: bool) -> Self {
158 self.manifest_enabled = enabled;
159 self
160 }
161
162 pub fn dir_listing_enabled(mut self, enabled: bool) -> Self {
167 self.dir_listing_enabled = enabled;
168 self
169 }
170
171 pub fn inline_optimization_enabled(mut self, enabled: bool) -> Self {
177 self.inline_optimization_enabled = enabled;
178 self
179 }
180
181 pub fn table_version_tracking_enabled(mut self, enabled: bool) -> Self {
189 self.table_version_tracking_enabled = enabled;
190 self
191 }
192
193 pub fn from_properties(
258 properties: HashMap<String, String>,
259 session: Option<Arc<Session>>,
260 ) -> Result<Self> {
261 let root = properties.get("root").cloned().ok_or_else(|| {
263 Error::namespace_source(
264 "Missing required property 'root' for directory namespace".into(),
265 )
266 })?;
267
268 let storage_options: HashMap<String, String> = properties
270 .iter()
271 .filter_map(|(k, v)| {
272 k.strip_prefix("storage.")
273 .map(|key| (key.to_string(), v.clone()))
274 })
275 .collect();
276
277 let storage_options = if storage_options.is_empty() {
278 None
279 } else {
280 Some(storage_options)
281 };
282
283 let manifest_enabled = properties
285 .get("manifest_enabled")
286 .and_then(|v| v.parse::<bool>().ok())
287 .unwrap_or(true);
288
289 let dir_listing_enabled = properties
291 .get("dir_listing_enabled")
292 .and_then(|v| v.parse::<bool>().ok())
293 .unwrap_or(true);
294
295 let inline_optimization_enabled = properties
297 .get("inline_optimization_enabled")
298 .and_then(|v| v.parse::<bool>().ok())
299 .unwrap_or(true);
300
301 let table_version_tracking_enabled = properties
303 .get("table_version_tracking_enabled")
304 .and_then(|v| v.parse::<bool>().ok())
305 .unwrap_or(false);
306
307 let credential_vendor_properties: HashMap<String, String> = properties
311 .iter()
312 .filter_map(|(k, v)| {
313 k.strip_prefix("credential_vendor.")
314 .map(|key| (key.to_string(), v.clone()))
315 })
316 .collect();
317
318 let commit_retries = properties
319 .get("commit_retries")
320 .and_then(|v| v.parse::<u32>().ok());
321
322 Ok(Self {
323 root: root.trim_end_matches('/').to_string(),
324 storage_options,
325 session,
326 manifest_enabled,
327 dir_listing_enabled,
328 inline_optimization_enabled,
329 table_version_tracking_enabled,
330 credential_vendor_properties,
331 context_provider: None,
332 commit_retries,
333 })
334 }
335
336 pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
343 self.storage_options
344 .get_or_insert_with(HashMap::new)
345 .insert(key.into(), value.into());
346 self
347 }
348
349 pub fn storage_options(mut self, options: HashMap<String, String>) -> Self {
355 self.storage_options
356 .get_or_insert_with(HashMap::new)
357 .extend(options);
358 self
359 }
360
361 pub fn session(mut self, session: Arc<Session>) -> Self {
371 self.session = Some(session);
372 self
373 }
374
375 pub fn commit_retries(mut self, retries: u32) -> Self {
378 self.commit_retries = Some(retries);
379 self
380 }
381
382 pub fn credential_vendor_property(
410 mut self,
411 key: impl Into<String>,
412 value: impl Into<String>,
413 ) -> Self {
414 self.credential_vendor_properties
415 .insert(key.into(), value.into());
416 self
417 }
418
419 pub fn credential_vendor_properties(mut self, properties: HashMap<String, String>) -> Self {
427 self.credential_vendor_properties.extend(properties);
428 self
429 }
430
431 pub fn context_provider(mut self, provider: Arc<dyn DynamicContextProvider>) -> Self {
441 self.context_provider = Some(provider);
442 self
443 }
444
445 pub async fn build(self) -> Result<DirectoryNamespace> {
458 let (object_store, base_path) =
459 Self::initialize_object_store(&self.root, &self.storage_options, &self.session).await?;
460
461 let manifest_ns = if self.manifest_enabled {
462 match manifest::ManifestNamespace::from_directory(
463 self.root.clone(),
464 self.storage_options.clone(),
465 self.session.clone(),
466 object_store.clone(),
467 base_path.clone(),
468 self.dir_listing_enabled,
469 self.inline_optimization_enabled,
470 self.commit_retries,
471 )
472 .await
473 {
474 Ok(ns) => Some(Arc::new(ns)),
475 Err(e) => {
476 log::warn!(
478 "Failed to initialize manifest namespace, falling back to directory listing only: {}",
479 e
480 );
481 None
482 }
483 }
484 } else {
485 None
486 };
487
488 let credential_vendor = if has_credential_vendor_config(&self.credential_vendor_properties)
490 {
491 create_credential_vendor_for_location(&self.root, &self.credential_vendor_properties)
492 .await?
493 .map(Arc::from)
494 } else {
495 None
496 };
497
498 Ok(DirectoryNamespace {
499 root: self.root,
500 storage_options: self.storage_options,
501 session: self.session,
502 object_store,
503 base_path,
504 manifest_ns,
505 dir_listing_enabled: self.dir_listing_enabled,
506 table_version_tracking_enabled: self.table_version_tracking_enabled,
507 credential_vendor,
508 context_provider: self.context_provider,
509 })
510 }
511
512 async fn initialize_object_store(
514 root: &str,
515 storage_options: &Option<HashMap<String, String>>,
516 session: &Option<Arc<Session>>,
517 ) -> Result<(Arc<ObjectStore>, Path)> {
518 let accessor = storage_options.clone().map(|opts| {
520 Arc::new(lance_io::object_store::StorageOptionsAccessor::with_static_options(opts))
521 });
522 let params = ObjectStoreParams {
523 storage_options_accessor: accessor,
524 ..Default::default()
525 };
526
527 let registry = if let Some(session) = session {
529 session.store_registry()
530 } else {
531 Arc::new(ObjectStoreRegistry::default())
532 };
533
534 let (object_store, base_path) = ObjectStore::from_uri_and_params(registry, root, ¶ms)
536 .await
537 .map_err(|e| {
538 Error::namespace_source(format!("Failed to create object store: {}", e).into())
539 })?;
540
541 Ok((object_store, base_path))
542 }
543}
544
545pub struct DirectoryNamespace {
569 root: String,
570 storage_options: Option<HashMap<String, String>>,
571 #[allow(dead_code)]
572 session: Option<Arc<Session>>,
573 object_store: Arc<ObjectStore>,
574 base_path: Path,
575 manifest_ns: Option<Arc<manifest::ManifestNamespace>>,
576 dir_listing_enabled: bool,
577 table_version_tracking_enabled: bool,
580 credential_vendor: Option<Arc<dyn CredentialVendor>>,
583 #[allow(dead_code)]
586 context_provider: Option<Arc<dyn DynamicContextProvider>>,
587}
588
589impl std::fmt::Debug for DirectoryNamespace {
590 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
591 write!(f, "{}", self.namespace_id())
592 }
593}
594
595impl std::fmt::Display for DirectoryNamespace {
596 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
597 write!(f, "{}", self.namespace_id())
598 }
599}
600
601impl DirectoryNamespace {
602 fn apply_pagination(names: &mut Vec<String>, page_token: Option<String>, limit: Option<i32>) {
611 names.sort();
613
614 if let Some(start_after) = page_token {
616 if let Some(index) = names
617 .iter()
618 .position(|name| name.as_str() > start_after.as_str())
619 {
620 names.drain(0..index);
621 } else {
622 names.clear();
623 }
624 }
625
626 if let Some(limit) = limit
628 && limit >= 0
629 {
630 names.truncate(limit as usize);
631 }
632 }
633
634 async fn list_directory_tables(&self) -> Result<Vec<String>> {
636 let mut tables = Vec::new();
637 let entries = self
638 .object_store
639 .read_dir(self.base_path.clone())
640 .await
641 .map_err(|e| {
642 Error::io_source(box_error(std::io::Error::other(format!(
643 "Failed to list directory: {}",
644 e
645 ))))
646 })?;
647
648 for entry in entries {
649 let path = entry.trim_end_matches('/');
650 if !path.ends_with(".lance") {
651 continue;
652 }
653
654 let table_name = &path[..path.len() - 6];
655
656 let status = self.check_table_status(table_name).await;
658 if status.is_deregistered || status.has_reserved_file {
659 continue;
660 }
661
662 tables.push(table_name.to_string());
663 }
664
665 Ok(tables)
666 }
667
668 fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
670 if let Some(id) = id
671 && !id.is_empty()
672 {
673 return Err(Error::namespace_source(format!(
674 "Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
675 id
676 ).into()));
677 }
678 Ok(())
679 }
680
681 fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
683 let id = id.as_ref().ok_or_else(|| {
684 Error::namespace_source("Directory namespace table ID cannot be empty".into())
685 })?;
686
687 if id.len() != 1 {
688 return Err(Error::namespace_source(format!(
689 "Multi-level table IDs are only supported when manifest mode is enabled, but got: {:?}",
690 id
691 )
692 .into()));
693 }
694
695 Ok(id[0].clone())
696 }
697
698 async fn resolve_table_location(&self, id: &Option<Vec<String>>) -> Result<String> {
699 let mut describe_req = DescribeTableRequest::new();
700 describe_req.id = id.clone();
701 describe_req.load_detailed_metadata = Some(false);
702
703 let describe_resp = self.describe_table(describe_req).await?;
704
705 describe_resp.location.ok_or_else(|| {
706 Error::namespace_source(format!("Table location not found for: {:?}", id).into())
707 })
708 }
709
710 fn table_full_uri(&self, table_name: &str) -> String {
711 format!("{}/{}.lance", &self.root, table_name)
712 }
713
714 fn uri_to_object_store_path(uri: &str) -> Path {
715 let path_str = if let Some(rest) = uri.strip_prefix("file://") {
716 rest
717 } else if let Some(rest) = uri.strip_prefix("s3://") {
718 rest.split_once('/').map(|(_, p)| p).unwrap_or(rest)
719 } else if let Some(rest) = uri.strip_prefix("gs://") {
720 rest.split_once('/').map(|(_, p)| p).unwrap_or(rest)
721 } else if let Some(rest) = uri.strip_prefix("az://") {
722 rest.split_once('/').map(|(_, p)| p).unwrap_or(rest)
723 } else {
724 uri
725 };
726 Path::from(path_str)
727 }
728
729 fn table_path(&self, table_name: &str) -> Path {
731 self.base_path
732 .child(format!("{}.lance", table_name).as_str())
733 }
734
735 fn table_reserved_file_path(&self, table_name: &str) -> Path {
737 self.base_path
738 .child(format!("{}.lance", table_name).as_str())
739 .child(".lance-reserved")
740 }
741
742 fn table_deregistered_file_path(&self, table_name: &str) -> Path {
744 self.base_path
745 .child(format!("{}.lance", table_name).as_str())
746 .child(".lance-deregistered")
747 }
748
749 pub(crate) async fn check_table_status(&self, table_name: &str) -> TableStatus {
755 let table_path = self.table_path(table_name);
756 match self.object_store.read_dir(table_path).await {
757 Ok(entries) => {
758 let exists = !entries.is_empty();
759 let is_deregistered = entries.iter().any(|e| e.ends_with(".lance-deregistered"));
760 let has_reserved_file = entries.iter().any(|e| e.ends_with(".lance-reserved"));
761 TableStatus {
762 exists,
763 is_deregistered,
764 has_reserved_file,
765 }
766 }
767 Err(_) => TableStatus {
768 exists: false,
769 is_deregistered: false,
770 has_reserved_file: false,
771 },
772 }
773 }
774
775 async fn put_marker_file_atomic(
776 &self,
777 path: &Path,
778 file_description: &str,
779 ) -> std::result::Result<(), String> {
780 let put_opts = PutOptions {
781 mode: PutMode::Create,
782 ..Default::default()
783 };
784
785 match self
786 .object_store
787 .inner
788 .put_opts(path, bytes::Bytes::new().into(), put_opts)
789 .await
790 {
791 Ok(_) => Ok(()),
792 Err(ObjectStoreError::AlreadyExists { .. })
793 | Err(ObjectStoreError::Precondition { .. }) => {
794 Err(format!("{} already exists", file_description))
795 }
796 Err(e) => Err(format!("Failed to create {}: {}", file_description, e)),
797 }
798 }
799
800 async fn get_storage_options_for_table(
820 &self,
821 table_uri: &str,
822 identity: Option<&Identity>,
823 ) -> Result<Option<HashMap<String, String>>> {
824 if let Some(ref vendor) = self.credential_vendor {
825 let vended = vendor.vend_credentials(table_uri, identity).await?;
826 return Ok(Some(vended.storage_options));
827 }
828 Ok(self.storage_options.clone())
829 }
830
831 pub async fn migrate(&self) -> Result<usize> {
884 let Some(ref manifest_ns) = self.manifest_ns else {
886 return Ok(0); };
888
889 let manifest_locations = manifest_ns.list_manifest_table_locations().await?;
891
892 let dir_tables = self.list_directory_tables().await?;
894
895 let mut migrated_count = 0;
900 for table_name in dir_tables {
901 let dir_name = format!("{}.lance", table_name);
903 if !manifest_locations.contains(&dir_name) {
904 manifest_ns.register_table(&table_name, dir_name).await?;
905 migrated_count += 1;
906 }
907 }
908
909 Ok(migrated_count)
910 }
911}
912
913#[async_trait]
914impl LanceNamespace for DirectoryNamespace {
915 async fn list_namespaces(
916 &self,
917 request: ListNamespacesRequest,
918 ) -> Result<ListNamespacesResponse> {
919 if let Some(ref manifest_ns) = self.manifest_ns {
920 return manifest_ns.list_namespaces(request).await;
921 }
922
923 Self::validate_root_namespace_id(&request.id)?;
924 Ok(ListNamespacesResponse::new(vec![]))
925 }
926
927 async fn describe_namespace(
928 &self,
929 request: DescribeNamespaceRequest,
930 ) -> Result<DescribeNamespaceResponse> {
931 if let Some(ref manifest_ns) = self.manifest_ns {
932 return manifest_ns.describe_namespace(request).await;
933 }
934
935 Self::validate_root_namespace_id(&request.id)?;
936 #[allow(clippy::needless_update)]
937 Ok(DescribeNamespaceResponse {
938 properties: Some(HashMap::new()),
939 ..Default::default()
940 })
941 }
942
943 async fn create_namespace(
944 &self,
945 request: CreateNamespaceRequest,
946 ) -> Result<CreateNamespaceResponse> {
947 if let Some(ref manifest_ns) = self.manifest_ns {
948 return manifest_ns.create_namespace(request).await;
949 }
950
951 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
952 return Err(Error::namespace_source(
953 "Root namespace already exists and cannot be created".into(),
954 ));
955 }
956
957 Err(Error::not_supported_source(
958 "Child namespaces are only supported when manifest mode is enabled".into(),
959 ))
960 }
961
962 async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
963 if let Some(ref manifest_ns) = self.manifest_ns {
964 return manifest_ns.drop_namespace(request).await;
965 }
966
967 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
968 return Err(Error::namespace_source(
969 "Root namespace cannot be dropped".into(),
970 ));
971 }
972
973 Err(Error::not_supported_source(
974 "Child namespaces are only supported when manifest mode is enabled".into(),
975 ))
976 }
977
978 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
979 if let Some(ref manifest_ns) = self.manifest_ns {
980 return manifest_ns.namespace_exists(request).await;
981 }
982
983 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
984 return Ok(());
985 }
986
987 Err(Error::namespace_source(
988 "Child namespaces are only supported when manifest mode is enabled".into(),
989 ))
990 }
991
992 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
993 let namespace_id = request
995 .id
996 .as_ref()
997 .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
998
999 if !namespace_id.is_empty() {
1001 if let Some(ref manifest_ns) = self.manifest_ns {
1002 return manifest_ns.list_tables(request).await;
1003 }
1004 return Err(Error::not_supported_source(
1005 "Child namespaces are only supported when manifest mode is enabled".into(),
1006 ));
1007 }
1008
1009 if let Some(ref manifest_ns) = self.manifest_ns
1011 && !self.dir_listing_enabled
1012 {
1013 return manifest_ns.list_tables(request).await;
1014 }
1015
1016 let mut tables = if self.manifest_ns.is_some() && self.dir_listing_enabled {
1018 let manifest_locations = if let Some(ref manifest_ns) = self.manifest_ns {
1020 manifest_ns.list_manifest_table_locations().await?
1021 } else {
1022 std::collections::HashSet::new()
1023 };
1024
1025 let mut manifest_request = request.clone();
1027 manifest_request.limit = None;
1028 manifest_request.page_token = None;
1029 let manifest_tables = if let Some(ref manifest_ns) = self.manifest_ns {
1030 let manifest_response = manifest_ns.list_tables(manifest_request).await?;
1031 manifest_response.tables
1032 } else {
1033 vec![]
1034 };
1035
1036 let mut all_tables: Vec<String> = manifest_tables;
1039 let dir_tables = self.list_directory_tables().await?;
1040 for table_name in dir_tables {
1041 let full_location = format!("{}/{}.lance", self.root, table_name);
1044 let relative_location = format!("{}.lance", table_name);
1045 if !manifest_locations.contains(&full_location)
1046 && !manifest_locations.contains(&relative_location)
1047 {
1048 all_tables.push(table_name);
1049 }
1050 }
1051
1052 all_tables
1053 } else {
1054 self.list_directory_tables().await?
1055 };
1056
1057 Self::apply_pagination(&mut tables, request.page_token, request.limit);
1059 let response = ListTablesResponse::new(tables);
1060 Ok(response)
1061 }
1062
1063 async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1064 if let Some(ref manifest_ns) = self.manifest_ns {
1065 match manifest_ns.describe_table(request.clone()).await {
1066 Ok(mut response) => {
1067 if request.vend_credentials == Some(true) && self.credential_vendor.is_some() {
1069 if let Some(ref table_uri) = response.table_uri {
1070 let identity = request.identity.as_deref();
1071 response.storage_options = self
1072 .get_storage_options_for_table(table_uri, identity)
1073 .await?;
1074 }
1075 } else if request.vend_credentials == Some(false) {
1076 response.storage_options = None;
1077 }
1078 if self.table_version_tracking_enabled {
1080 response.managed_versioning = Some(true);
1081 }
1082 return Ok(response);
1083 }
1084 Err(_)
1085 if self.dir_listing_enabled
1086 && request.id.as_ref().is_some_and(|id| id.len() == 1) =>
1087 {
1088 }
1090 Err(e) => return Err(e),
1091 }
1092 }
1093
1094 let table_name = Self::table_name_from_id(&request.id)?;
1095 let table_uri = self.table_full_uri(&table_name);
1096
1097 let status = self.check_table_status(&table_name).await;
1099
1100 if !status.exists {
1101 return Err(Error::namespace_source(
1102 format!("Table does not exist: {}", table_name).into(),
1103 ));
1104 }
1105
1106 if status.is_deregistered {
1107 return Err(Error::namespace_source(
1108 format!("Table is deregistered: {}", table_name).into(),
1109 ));
1110 }
1111
1112 let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
1113 let vend_credentials = request.vend_credentials.unwrap_or(true);
1115 let identity = request.identity.as_deref();
1116
1117 if !load_detailed_metadata {
1119 let storage_options = if vend_credentials {
1120 self.get_storage_options_for_table(&table_uri, identity)
1121 .await?
1122 } else {
1123 None
1124 };
1125 return Ok(DescribeTableResponse {
1126 table: Some(table_name),
1127 namespace: request.id.as_ref().map(|id| {
1128 if id.len() > 1 {
1129 id[..id.len() - 1].to_vec()
1130 } else {
1131 vec![]
1132 }
1133 }),
1134 location: Some(table_uri.clone()),
1135 table_uri: Some(table_uri),
1136 storage_options,
1137 managed_versioning: if self.table_version_tracking_enabled {
1138 Some(true)
1139 } else {
1140 None
1141 },
1142 ..Default::default()
1143 });
1144 }
1145
1146 let mut builder = DatasetBuilder::from_uri(&table_uri);
1149 if let Some(opts) = &self.storage_options {
1150 builder = builder.with_storage_options(opts.clone());
1151 }
1152 if let Some(sess) = &self.session {
1153 builder = builder.with_session(sess.clone());
1154 }
1155 match builder.load().await {
1156 Ok(mut dataset) => {
1157 if let Some(requested_version) = request.version {
1159 dataset = dataset.checkout_version(requested_version as u64).await?;
1160 }
1161
1162 let version_info = dataset.version();
1163 let lance_schema = dataset.schema();
1164 let arrow_schema: arrow_schema::Schema = lance_schema.into();
1165 let json_schema = arrow_schema_to_json(&arrow_schema)?;
1166 let storage_options = if vend_credentials {
1167 self.get_storage_options_for_table(&table_uri, identity)
1168 .await?
1169 } else {
1170 None
1171 };
1172
1173 let metadata: std::collections::HashMap<String, String> =
1175 version_info.metadata.into_iter().collect();
1176
1177 Ok(DescribeTableResponse {
1178 table: Some(table_name),
1179 namespace: request.id.as_ref().map(|id| {
1180 if id.len() > 1 {
1181 id[..id.len() - 1].to_vec()
1182 } else {
1183 vec![]
1184 }
1185 }),
1186 version: Some(version_info.version as i64),
1187 location: Some(table_uri.clone()),
1188 table_uri: Some(table_uri),
1189 schema: Some(Box::new(json_schema)),
1190 storage_options,
1191 metadata: Some(metadata),
1192 managed_versioning: if self.table_version_tracking_enabled {
1193 Some(true)
1194 } else {
1195 None
1196 },
1197 ..Default::default()
1198 })
1199 }
1200 Err(err) => {
1201 if status.has_reserved_file {
1203 let storage_options = if vend_credentials {
1204 self.get_storage_options_for_table(&table_uri, identity)
1205 .await?
1206 } else {
1207 None
1208 };
1209 Ok(DescribeTableResponse {
1210 table: Some(table_name),
1211 namespace: request.id.as_ref().map(|id| {
1212 if id.len() > 1 {
1213 id[..id.len() - 1].to_vec()
1214 } else {
1215 vec![]
1216 }
1217 }),
1218 location: Some(table_uri.clone()),
1219 table_uri: Some(table_uri),
1220 storage_options,
1221 managed_versioning: if self.table_version_tracking_enabled {
1222 Some(true)
1223 } else {
1224 None
1225 },
1226 ..Default::default()
1227 })
1228 } else {
1229 Err(Error::namespace_source(
1230 format!(
1231 "Table directory exists but cannot load dataset {}: {:?}",
1232 table_name, err
1233 )
1234 .into(),
1235 ))
1236 }
1237 }
1238 }
1239 }
1240
1241 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1242 if let Some(ref manifest_ns) = self.manifest_ns {
1243 match manifest_ns.table_exists(request.clone()).await {
1244 Ok(()) => return Ok(()),
1245 Err(_) if self.dir_listing_enabled => {
1246 }
1248 Err(e) => return Err(e),
1249 }
1250 }
1251
1252 let table_name = Self::table_name_from_id(&request.id)?;
1253
1254 let status = self.check_table_status(&table_name).await;
1256
1257 if !status.exists {
1258 return Err(Error::namespace_source(
1259 format!("Table does not exist: {}", table_name).into(),
1260 ));
1261 }
1262
1263 if status.is_deregistered {
1264 return Err(Error::namespace_source(
1265 format!("Table is deregistered: {}", table_name).into(),
1266 ));
1267 }
1268
1269 Ok(())
1270 }
1271
1272 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1273 if let Some(ref manifest_ns) = self.manifest_ns {
1274 return manifest_ns.drop_table(request).await;
1275 }
1276
1277 let table_name = Self::table_name_from_id(&request.id)?;
1278 let table_uri = self.table_full_uri(&table_name);
1279 let table_path = self.table_path(&table_name);
1280
1281 self.object_store
1282 .remove_dir_all(table_path)
1283 .await
1284 .map_err(|e| {
1285 Error::namespace_source(
1286 format!("Failed to drop table {}: {}", table_name, e).into(),
1287 )
1288 })?;
1289
1290 Ok(DropTableResponse {
1291 id: request.id,
1292 location: Some(table_uri),
1293 ..Default::default()
1294 })
1295 }
1296
1297 async fn create_table(
1298 &self,
1299 request: CreateTableRequest,
1300 request_data: Bytes,
1301 ) -> Result<CreateTableResponse> {
1302 if let Some(ref manifest_ns) = self.manifest_ns {
1303 return manifest_ns.create_table(request, request_data).await;
1304 }
1305
1306 let table_name = Self::table_name_from_id(&request.id)?;
1307 let table_uri = self.table_full_uri(&table_name);
1308 if request_data.is_empty() {
1309 return Err(Error::namespace_source(
1310 "Request data (Arrow IPC stream) is required for create_table".into(),
1311 ));
1312 }
1313
1314 let cursor = Cursor::new(request_data.to_vec());
1316 let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| {
1317 Error::namespace_source(format!("Invalid Arrow IPC stream: {}", e).into())
1318 })?;
1319 let arrow_schema = stream_reader.schema();
1320
1321 let mut batches = Vec::new();
1323 for batch_result in stream_reader {
1324 batches.push(batch_result.map_err(|e| {
1325 Error::namespace_source(
1326 format!("Failed to read batch from IPC stream: {}", e).into(),
1327 )
1328 })?);
1329 }
1330
1331 let reader = if batches.is_empty() {
1333 let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
1334 let batches = vec![Ok(batch)];
1335 RecordBatchIterator::new(batches, arrow_schema.clone())
1336 } else {
1337 let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
1338 RecordBatchIterator::new(batch_results, arrow_schema)
1339 };
1340
1341 let store_params = self.storage_options.as_ref().map(|opts| ObjectStoreParams {
1342 storage_options_accessor: Some(Arc::new(
1343 lance_io::object_store::StorageOptionsAccessor::with_static_options(opts.clone()),
1344 )),
1345 ..Default::default()
1346 });
1347
1348 let write_params = WriteParams {
1349 mode: lance::dataset::WriteMode::Create,
1350 store_params,
1351 ..Default::default()
1352 };
1353
1354 Dataset::write(reader, &table_uri, Some(write_params))
1356 .await
1357 .map_err(|e| {
1358 Error::namespace_source(format!("Failed to create Lance dataset: {}", e).into())
1359 })?;
1360
1361 Ok(CreateTableResponse {
1362 version: Some(1),
1363 location: Some(table_uri),
1364 storage_options: self.storage_options.clone(),
1365 ..Default::default()
1366 })
1367 }
1368
1369 async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
1370 if let Some(ref manifest_ns) = self.manifest_ns {
1371 let mut response = manifest_ns.declare_table(request.clone()).await?;
1372 if request.vend_credentials == Some(true) && self.credential_vendor.is_some() {
1374 if let Some(ref location) = response.location {
1375 let identity = request.identity.as_deref();
1376 response.storage_options = self
1377 .get_storage_options_for_table(location, identity)
1378 .await?;
1379 }
1380 } else if request.vend_credentials == Some(false) {
1381 response.storage_options = None;
1382 }
1383 if self.table_version_tracking_enabled {
1385 response.managed_versioning = Some(true);
1386 }
1387 return Ok(response);
1388 }
1389
1390 let table_name = Self::table_name_from_id(&request.id)?;
1391 let table_uri = self.table_full_uri(&table_name);
1392
1393 if let Some(location) = &request.location {
1395 let location = location.trim_end_matches('/');
1396 if location != table_uri {
1397 return Err(Error::namespace_source(
1398 format!(
1399 "Cannot declare table {} at location {}, must be at location {}",
1400 table_name, location, table_uri
1401 )
1402 .into(),
1403 ));
1404 }
1405 }
1406
1407 let status = self.check_table_status(&table_name).await;
1411 if status.exists && !status.has_reserved_file {
1412 return Err(Error::namespace_source(
1414 format!("Table already exists: {}", table_name).into(),
1415 ));
1416 }
1417
1418 let reserved_file_path = self.table_reserved_file_path(&table_name);
1422
1423 self.put_marker_file_atomic(&reserved_file_path, &format!("table {}", table_name))
1424 .await
1425 .map_err(|e| Error::namespace_source(e.into()))?;
1426
1427 let vend_credentials = request.vend_credentials.unwrap_or(true);
1429 let identity = request.identity.as_deref();
1430 let storage_options = if vend_credentials {
1431 self.get_storage_options_for_table(&table_uri, identity)
1432 .await?
1433 } else {
1434 None
1435 };
1436
1437 Ok(DeclareTableResponse {
1438 location: Some(table_uri),
1439 storage_options,
1440 managed_versioning: if self.table_version_tracking_enabled {
1441 Some(true)
1442 } else {
1443 None
1444 },
1445 ..Default::default()
1446 })
1447 }
1448
1449 async fn register_table(
1450 &self,
1451 request: lance_namespace::models::RegisterTableRequest,
1452 ) -> Result<lance_namespace::models::RegisterTableResponse> {
1453 if let Some(ref manifest_ns) = self.manifest_ns {
1455 return LanceNamespace::register_table(manifest_ns.as_ref(), request).await;
1456 }
1457
1458 Err(Error::not_supported_source(
1460 "register_table is only supported when manifest mode is enabled".into(),
1461 ))
1462 }
1463
1464 async fn deregister_table(
1465 &self,
1466 request: lance_namespace::models::DeregisterTableRequest,
1467 ) -> Result<lance_namespace::models::DeregisterTableResponse> {
1468 if let Some(ref manifest_ns) = self.manifest_ns {
1470 return LanceNamespace::deregister_table(manifest_ns.as_ref(), request).await;
1471 }
1472
1473 let table_name = Self::table_name_from_id(&request.id)?;
1475 let table_uri = self.table_full_uri(&table_name);
1476
1477 let status = self.check_table_status(&table_name).await;
1480
1481 if !status.exists {
1482 return Err(Error::namespace_source(
1483 format!("Table does not exist: {}", table_name).into(),
1484 ));
1485 }
1486
1487 if status.is_deregistered {
1488 return Err(Error::namespace_source(
1489 format!("Table is already deregistered: {}", table_name).into(),
1490 ));
1491 }
1492
1493 let deregistered_path = self.table_deregistered_file_path(&table_name);
1499 self.put_marker_file_atomic(
1500 &deregistered_path,
1501 &format!("deregistration marker for table {}", table_name),
1502 )
1503 .await
1504 .map_err(|e| {
1505 let message = if e.contains("already exists") {
1507 format!("Table is already deregistered: {}", table_name)
1508 } else {
1509 e
1510 };
1511 Error::namespace_source(message.into())
1512 })?;
1513
1514 Ok(lance_namespace::models::DeregisterTableResponse {
1515 id: request.id,
1516 location: Some(table_uri),
1517 ..Default::default()
1518 })
1519 }
1520
1521 async fn list_table_versions(
1522 &self,
1523 request: ListTableVersionsRequest,
1524 ) -> Result<ListTableVersionsResponse> {
1525 let table_uri = self.resolve_table_location(&request.id).await?;
1526
1527 let table_path = Self::uri_to_object_store_path(&table_uri);
1528 let versions_dir = table_path.child("_versions");
1529 let manifest_metas: Vec<_> = self
1530 .object_store
1531 .read_dir_all(&versions_dir, None)
1532 .try_collect()
1533 .await
1534 .map_err(|e| {
1535 Error::namespace_source(
1536 format!(
1537 "Failed to list manifest files for table at '{}': {}",
1538 table_uri, e
1539 )
1540 .into(),
1541 )
1542 })?;
1543
1544 let is_v2_naming = manifest_metas
1545 .first()
1546 .is_some_and(|meta| meta.location.filename().is_some_and(|f| f.len() == 29));
1547
1548 let mut table_versions: Vec<TableVersion> = manifest_metas
1549 .into_iter()
1550 .filter_map(|meta| {
1551 let filename = meta.location.filename()?;
1552 let version_str = filename.strip_suffix(".manifest")?;
1553 if version_str.starts_with('d') {
1554 return None;
1555 }
1556 let file_version: u64 = version_str.parse().ok()?;
1557
1558 let actual_version = if file_version > u64::MAX / 2 {
1559 u64::MAX - file_version
1560 } else {
1561 file_version
1562 };
1563
1564 Some(TableVersion {
1566 version: actual_version as i64,
1567 manifest_path: meta.location.to_string(),
1568 manifest_size: Some(meta.size as i64),
1569 e_tag: meta.e_tag,
1570 timestamp_millis: Some(meta.last_modified.timestamp_millis()),
1571 metadata: None,
1572 })
1573 })
1574 .collect();
1575
1576 let list_is_ordered = self.object_store.list_is_lexically_ordered;
1577 let want_descending = request.descending == Some(true);
1578
1579 let needs_sort = if list_is_ordered {
1580 if is_v2_naming {
1581 !want_descending
1582 } else {
1583 want_descending
1584 }
1585 } else {
1586 true
1587 };
1588
1589 if needs_sort {
1590 if want_descending {
1591 table_versions.sort_by(|a, b| b.version.cmp(&a.version));
1592 } else {
1593 table_versions.sort_by(|a, b| a.version.cmp(&b.version));
1594 }
1595 }
1596
1597 if let Some(limit) = request.limit {
1598 table_versions.truncate(limit as usize);
1599 }
1600
1601 Ok(ListTableVersionsResponse {
1602 versions: table_versions,
1603 page_token: None,
1604 })
1605 }
1606
1607 async fn create_table_version(
1608 &self,
1609 request: CreateTableVersionRequest,
1610 ) -> Result<CreateTableVersionResponse> {
1611 let table_uri = self.resolve_table_location(&request.id).await?;
1612
1613 let staging_manifest_path = &request.manifest_path;
1614 let version = request.version as u64;
1615
1616 let table_path = Self::uri_to_object_store_path(&table_uri);
1617
1618 let naming_scheme = match request.naming_scheme.as_deref() {
1620 Some("V1") => ManifestNamingScheme::V1,
1621 _ => ManifestNamingScheme::V2,
1622 };
1623
1624 let final_path = naming_scheme.manifest_path(&table_path, version);
1626
1627 let staging_path = Self::uri_to_object_store_path(staging_manifest_path);
1628 let manifest_data = self
1629 .object_store
1630 .inner
1631 .get(&staging_path)
1632 .await
1633 .map_err(|e| {
1634 Error::namespace_source(
1635 format!(
1636 "Failed to read staging manifest at '{}': {}",
1637 staging_manifest_path, e
1638 )
1639 .into(),
1640 )
1641 })?
1642 .bytes()
1643 .await
1644 .map_err(|e| {
1645 Error::namespace_source(
1646 format!(
1647 "Failed to read staging manifest bytes at '{}': {}",
1648 staging_manifest_path, e
1649 )
1650 .into(),
1651 )
1652 })?;
1653
1654 let manifest_size = manifest_data.len() as i64;
1655
1656 let put_result = self
1657 .object_store
1658 .inner
1659 .put_opts(
1660 &final_path,
1661 manifest_data.into(),
1662 PutOptions {
1663 mode: PutMode::Create,
1664 ..Default::default()
1665 },
1666 )
1667 .await
1668 .map_err(|e| match e {
1669 object_store::Error::AlreadyExists { .. }
1670 | object_store::Error::Precondition { .. } => Error::namespace_source(
1671 format!(
1672 "Version {} already exists for table at '{}'",
1673 version, table_uri
1674 )
1675 .into(),
1676 ),
1677 _ => Error::namespace_source(
1678 format!(
1679 "Failed to create version {} for table at '{}': {}",
1680 version, table_uri, e
1681 )
1682 .into(),
1683 ),
1684 })?;
1685
1686 if let Err(e) = self.object_store.inner.delete(&staging_path).await {
1688 log::warn!(
1689 "Failed to delete staging manifest at '{}': {:?}",
1690 staging_path,
1691 e
1692 );
1693 }
1694
1695 Ok(CreateTableVersionResponse {
1696 transaction_id: None,
1697 version: Some(Box::new(TableVersion {
1698 version: version as i64,
1699 manifest_path: final_path.to_string(),
1700 manifest_size: Some(manifest_size),
1701 e_tag: put_result.e_tag,
1702 timestamp_millis: None,
1703 metadata: None,
1704 })),
1705 })
1706 }
1707
1708 async fn describe_table_version(
1709 &self,
1710 request: DescribeTableVersionRequest,
1711 ) -> Result<DescribeTableVersionResponse> {
1712 let table_uri = self.resolve_table_location(&request.id).await?;
1713
1714 let mut builder = DatasetBuilder::from_uri(&table_uri);
1716 if let Some(opts) = &self.storage_options {
1717 builder = builder.with_storage_options(opts.clone());
1718 }
1719 if let Some(sess) = &self.session {
1720 builder = builder.with_session(sess.clone());
1721 }
1722 let mut dataset = builder.load().await.map_err(|e| {
1723 Error::namespace_source(
1724 format!("Failed to open table at '{}': {}", table_uri, e).into(),
1725 )
1726 })?;
1727
1728 if let Some(version) = request.version {
1729 dataset = dataset
1730 .checkout_version(version as u64)
1731 .await
1732 .map_err(|e| {
1733 Error::namespace_source(
1734 format!(
1735 "Failed to checkout version {} for table at '{}': {}",
1736 version, table_uri, e
1737 )
1738 .into(),
1739 )
1740 })?;
1741 }
1742
1743 let version_info = dataset.version();
1744 let manifest_location = dataset.manifest_location();
1745 let metadata: std::collections::HashMap<String, String> =
1746 version_info.metadata.into_iter().collect();
1747
1748 let table_version = TableVersion {
1749 version: version_info.version as i64,
1750 manifest_path: manifest_location.path.to_string(),
1751 manifest_size: manifest_location.size.map(|s| s as i64),
1752 e_tag: manifest_location.e_tag.clone(),
1753 timestamp_millis: Some(version_info.timestamp.timestamp_millis()),
1754 metadata: if metadata.is_empty() {
1755 None
1756 } else {
1757 Some(metadata)
1758 },
1759 };
1760
1761 Ok(DescribeTableVersionResponse {
1762 version: Box::new(table_version),
1763 })
1764 }
1765
1766 async fn batch_delete_table_versions(
1767 &self,
1768 request: BatchDeleteTableVersionsRequest,
1769 ) -> Result<BatchDeleteTableVersionsResponse> {
1770 let table_uri = self.resolve_table_location(&request.id).await?;
1771
1772 let table_path = Self::uri_to_object_store_path(&table_uri);
1773 let table_path_str = table_path.as_ref();
1774 let versions_dir_path = Path::from(format!("{}_versions", table_path_str));
1775
1776 let mut deleted_count = 0i64;
1777
1778 for range in &request.ranges {
1779 let start = range.start_version as u64;
1780 let end = if range.end_version > 0 {
1781 range.end_version as u64
1782 } else {
1783 start
1784 };
1785
1786 for version in start..=end {
1787 let version_path = versions_dir_path.child(format!("{}.manifest", version));
1788 match self.object_store.inner.delete(&version_path).await {
1789 Ok(_) => {
1790 deleted_count += 1;
1791 }
1792 Err(object_store::Error::NotFound { .. }) => {}
1793 Err(e) => {
1794 return Err(Error::namespace_source(
1795 format!(
1796 "Failed to delete version {} for table at '{}': {}",
1797 version, table_uri, e
1798 )
1799 .into(),
1800 ));
1801 }
1802 }
1803 }
1804 }
1805
1806 Ok(BatchDeleteTableVersionsResponse {
1807 deleted_count: Some(deleted_count),
1808 transaction_id: None,
1809 })
1810 }
1811
1812 fn namespace_id(&self) -> String {
1813 format!("DirectoryNamespace {{ root: {:?} }}", self.root)
1814 }
1815}
1816
1817#[cfg(test)]
1818mod tests {
1819 use super::*;
1820 use arrow_ipc::reader::StreamReader;
1821 use lance::dataset::Dataset;
1822 use lance_core::utils::tempfile::{TempStdDir, TempStrDir};
1823 use lance_namespace::models::{
1824 CreateTableRequest, JsonArrowDataType, JsonArrowField, JsonArrowSchema, ListTablesRequest,
1825 };
1826 use lance_namespace::schema::convert_json_arrow_schema;
1827 use std::io::Cursor;
1828 use std::sync::Arc;
1829
1830 async fn create_test_namespace() -> (DirectoryNamespace, TempStdDir) {
1832 let temp_dir = TempStdDir::default();
1833
1834 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1835 .build()
1836 .await
1837 .unwrap();
1838 (namespace, temp_dir)
1839 }
1840
1841 fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
1843 use arrow::ipc::writer::StreamWriter;
1844
1845 let arrow_schema = convert_json_arrow_schema(schema).unwrap();
1846 let arrow_schema = Arc::new(arrow_schema);
1847 let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
1848 let mut buffer = Vec::new();
1849 {
1850 let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
1851 writer.write(&batch).unwrap();
1852 writer.finish().unwrap();
1853 }
1854 buffer
1855 }
1856
1857 fn create_test_schema() -> JsonArrowSchema {
1859 let int_type = JsonArrowDataType::new("int32".to_string());
1860 let string_type = JsonArrowDataType::new("utf8".to_string());
1861
1862 let id_field = JsonArrowField {
1863 name: "id".to_string(),
1864 r#type: Box::new(int_type),
1865 nullable: false,
1866 metadata: None,
1867 };
1868
1869 let name_field = JsonArrowField {
1870 name: "name".to_string(),
1871 r#type: Box::new(string_type),
1872 nullable: true,
1873 metadata: None,
1874 };
1875
1876 JsonArrowSchema {
1877 fields: vec![id_field, name_field],
1878 metadata: None,
1879 }
1880 }
1881
1882 #[tokio::test]
1883 async fn test_create_table() {
1884 let (namespace, _temp_dir) = create_test_namespace().await;
1885
1886 let schema = create_test_schema();
1888 let ipc_data = create_test_ipc_data(&schema);
1889
1890 let mut request = CreateTableRequest::new();
1891 request.id = Some(vec!["test_table".to_string()]);
1892
1893 let response = namespace
1894 .create_table(request, bytes::Bytes::from(ipc_data))
1895 .await
1896 .unwrap();
1897
1898 assert!(response.location.is_some());
1899 assert!(response.location.unwrap().ends_with("test_table.lance"));
1900 assert_eq!(response.version, Some(1));
1901 }
1902
1903 #[tokio::test]
1904 async fn test_create_table_without_data() {
1905 let (namespace, _temp_dir) = create_test_namespace().await;
1906
1907 let mut request = CreateTableRequest::new();
1908 request.id = Some(vec!["test_table".to_string()]);
1909
1910 let result = namespace.create_table(request, bytes::Bytes::new()).await;
1911 assert!(result.is_err());
1912 assert!(
1913 result
1914 .unwrap_err()
1915 .to_string()
1916 .contains("Arrow IPC stream) is required")
1917 );
1918 }
1919
1920 #[tokio::test]
1921 async fn test_create_table_with_invalid_id() {
1922 let (namespace, _temp_dir) = create_test_namespace().await;
1923
1924 let schema = create_test_schema();
1926 let ipc_data = create_test_ipc_data(&schema);
1927
1928 let mut request = CreateTableRequest::new();
1930 request.id = Some(vec![]);
1931
1932 let result = namespace
1933 .create_table(request, bytes::Bytes::from(ipc_data.clone()))
1934 .await;
1935 assert!(result.is_err());
1936
1937 let mut create_ns_req = CreateNamespaceRequest::new();
1940 create_ns_req.id = Some(vec!["test_namespace".to_string()]);
1941 namespace.create_namespace(create_ns_req).await.unwrap();
1942
1943 let mut request = CreateTableRequest::new();
1945 request.id = Some(vec!["test_namespace".to_string(), "table".to_string()]);
1946
1947 let result = namespace
1948 .create_table(request, bytes::Bytes::from(ipc_data))
1949 .await;
1950 assert!(
1952 result.is_ok(),
1953 "Multi-level table IDs should work with manifest enabled"
1954 );
1955 }
1956
1957 #[tokio::test]
1958 async fn test_list_tables() {
1959 let (namespace, _temp_dir) = create_test_namespace().await;
1960
1961 let mut request = ListTablesRequest::new();
1963 request.id = Some(vec![]);
1964 let response = namespace.list_tables(request).await.unwrap();
1965 assert_eq!(response.tables.len(), 0);
1966
1967 let schema = create_test_schema();
1969 let ipc_data = create_test_ipc_data(&schema);
1970
1971 let mut create_request = CreateTableRequest::new();
1973 create_request.id = Some(vec!["table1".to_string()]);
1974 namespace
1975 .create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
1976 .await
1977 .unwrap();
1978
1979 let mut create_request = CreateTableRequest::new();
1981 create_request.id = Some(vec!["table2".to_string()]);
1982 namespace
1983 .create_table(create_request, bytes::Bytes::from(ipc_data))
1984 .await
1985 .unwrap();
1986
1987 let mut request = ListTablesRequest::new();
1989 request.id = Some(vec![]);
1990 let response = namespace.list_tables(request).await.unwrap();
1991 let tables = response.tables;
1992 assert_eq!(tables.len(), 2);
1993 assert!(tables.contains(&"table1".to_string()));
1994 assert!(tables.contains(&"table2".to_string()));
1995 }
1996
1997 #[tokio::test]
1998 async fn test_list_tables_with_namespace_id() {
1999 let (namespace, _temp_dir) = create_test_namespace().await;
2000
2001 let mut create_ns_req = CreateNamespaceRequest::new();
2003 create_ns_req.id = Some(vec!["test_namespace".to_string()]);
2004 namespace.create_namespace(create_ns_req).await.unwrap();
2005
2006 let mut request = ListTablesRequest::new();
2008 request.id = Some(vec!["test_namespace".to_string()]);
2009
2010 let result = namespace.list_tables(request).await;
2011 assert!(
2013 result.is_ok(),
2014 "list_tables should work with child namespace when manifest is enabled"
2015 );
2016 let response = result.unwrap();
2017 assert_eq!(
2018 response.tables.len(),
2019 0,
2020 "Namespace should have no tables yet"
2021 );
2022 }
2023
2024 #[tokio::test]
2025 async fn test_describe_table() {
2026 let (namespace, _temp_dir) = create_test_namespace().await;
2027
2028 let schema = create_test_schema();
2030 let ipc_data = create_test_ipc_data(&schema);
2031
2032 let mut create_request = CreateTableRequest::new();
2033 create_request.id = Some(vec!["test_table".to_string()]);
2034 namespace
2035 .create_table(create_request, bytes::Bytes::from(ipc_data))
2036 .await
2037 .unwrap();
2038
2039 let mut request = DescribeTableRequest::new();
2041 request.id = Some(vec!["test_table".to_string()]);
2042 let response = namespace.describe_table(request).await.unwrap();
2043
2044 assert!(response.location.is_some());
2045 assert!(response.location.unwrap().ends_with("test_table.lance"));
2046 }
2047
2048 #[tokio::test]
2049 async fn test_describe_nonexistent_table() {
2050 let (namespace, _temp_dir) = create_test_namespace().await;
2051
2052 let mut request = DescribeTableRequest::new();
2053 request.id = Some(vec!["nonexistent".to_string()]);
2054
2055 let result = namespace.describe_table(request).await;
2056 assert!(result.is_err());
2057 assert!(
2058 result
2059 .unwrap_err()
2060 .to_string()
2061 .contains("Table does not exist")
2062 );
2063 }
2064
2065 #[tokio::test]
2066 async fn test_table_exists() {
2067 let (namespace, _temp_dir) = create_test_namespace().await;
2068
2069 let schema = create_test_schema();
2071 let ipc_data = create_test_ipc_data(&schema);
2072
2073 let mut create_request = CreateTableRequest::new();
2074 create_request.id = Some(vec!["existing_table".to_string()]);
2075 namespace
2076 .create_table(create_request, bytes::Bytes::from(ipc_data))
2077 .await
2078 .unwrap();
2079
2080 let mut request = TableExistsRequest::new();
2082 request.id = Some(vec!["existing_table".to_string()]);
2083 let result = namespace.table_exists(request).await;
2084 assert!(result.is_ok());
2085
2086 let mut request = TableExistsRequest::new();
2088 request.id = Some(vec!["nonexistent".to_string()]);
2089 let result = namespace.table_exists(request).await;
2090 assert!(result.is_err());
2091 assert!(
2092 result
2093 .unwrap_err()
2094 .to_string()
2095 .contains("Table does not exist")
2096 );
2097 }
2098
2099 #[tokio::test]
2100 async fn test_drop_table() {
2101 let (namespace, _temp_dir) = create_test_namespace().await;
2102
2103 let schema = create_test_schema();
2105 let ipc_data = create_test_ipc_data(&schema);
2106
2107 let mut create_request = CreateTableRequest::new();
2108 create_request.id = Some(vec!["table_to_drop".to_string()]);
2109 namespace
2110 .create_table(create_request, bytes::Bytes::from(ipc_data))
2111 .await
2112 .unwrap();
2113
2114 let mut exists_request = TableExistsRequest::new();
2116 exists_request.id = Some(vec!["table_to_drop".to_string()]);
2117 assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
2118
2119 let mut drop_request = DropTableRequest::new();
2121 drop_request.id = Some(vec!["table_to_drop".to_string()]);
2122 let response = namespace.drop_table(drop_request).await.unwrap();
2123 assert!(response.location.is_some());
2124
2125 assert!(namespace.table_exists(exists_request).await.is_err());
2127 }
2128
2129 #[tokio::test]
2130 async fn test_drop_nonexistent_table() {
2131 let (namespace, _temp_dir) = create_test_namespace().await;
2132
2133 let mut request = DropTableRequest::new();
2134 request.id = Some(vec!["nonexistent".to_string()]);
2135
2136 let result = namespace.drop_table(request).await;
2138 let _ = result;
2141 }
2142
2143 #[tokio::test]
2144 async fn test_root_namespace_operations() {
2145 let (namespace, _temp_dir) = create_test_namespace().await;
2146
2147 let mut request = ListNamespacesRequest::new();
2149 request.id = Some(vec![]);
2150 let result = namespace.list_namespaces(request).await;
2151 assert!(result.is_ok());
2152 assert_eq!(result.unwrap().namespaces.len(), 0);
2153
2154 let mut request = DescribeNamespaceRequest::new();
2156 request.id = Some(vec![]);
2157 let result = namespace.describe_namespace(request).await;
2158 assert!(result.is_ok());
2159
2160 let mut request = NamespaceExistsRequest::new();
2162 request.id = Some(vec![]);
2163 let result = namespace.namespace_exists(request).await;
2164 assert!(result.is_ok());
2165
2166 let mut request = CreateNamespaceRequest::new();
2168 request.id = Some(vec![]);
2169 let result = namespace.create_namespace(request).await;
2170 assert!(result.is_err());
2171 assert!(result.unwrap_err().to_string().contains("already exists"));
2172
2173 let mut request = DropNamespaceRequest::new();
2175 request.id = Some(vec![]);
2176 let result = namespace.drop_namespace(request).await;
2177 assert!(result.is_err());
2178 assert!(
2179 result
2180 .unwrap_err()
2181 .to_string()
2182 .contains("cannot be dropped")
2183 );
2184 }
2185
2186 #[tokio::test]
2187 async fn test_non_root_namespace_operations() {
2188 let (namespace, _temp_dir) = create_test_namespace().await;
2189
2190 let mut request = CreateNamespaceRequest::new();
2193 request.id = Some(vec!["child".to_string()]);
2194 let result = namespace.create_namespace(request).await;
2195 assert!(
2196 result.is_ok(),
2197 "Child namespace creation should succeed with manifest enabled"
2198 );
2199
2200 let mut request = NamespaceExistsRequest::new();
2202 request.id = Some(vec!["child".to_string()]);
2203 let result = namespace.namespace_exists(request).await;
2204 assert!(
2205 result.is_ok(),
2206 "Child namespace should exist after creation"
2207 );
2208
2209 let mut request = DropNamespaceRequest::new();
2211 request.id = Some(vec!["child".to_string()]);
2212 let result = namespace.drop_namespace(request).await;
2213 assert!(
2214 result.is_ok(),
2215 "Child namespace drop should succeed with manifest enabled"
2216 );
2217
2218 let mut request = NamespaceExistsRequest::new();
2220 request.id = Some(vec!["child".to_string()]);
2221 let result = namespace.namespace_exists(request).await;
2222 assert!(
2223 result.is_err(),
2224 "Child namespace should not exist after drop"
2225 );
2226 }
2227
2228 #[tokio::test]
2229 async fn test_config_custom_root() {
2230 let temp_dir = TempStdDir::default();
2231 let custom_path = temp_dir.join("custom");
2232 std::fs::create_dir(&custom_path).unwrap();
2233
2234 let namespace = DirectoryNamespaceBuilder::new(custom_path.to_string_lossy().to_string())
2235 .build()
2236 .await
2237 .unwrap();
2238
2239 let schema = create_test_schema();
2241 let ipc_data = create_test_ipc_data(&schema);
2242
2243 let mut request = CreateTableRequest::new();
2245 request.id = Some(vec!["test_table".to_string()]);
2246
2247 let response = namespace
2248 .create_table(request, bytes::Bytes::from(ipc_data))
2249 .await
2250 .unwrap();
2251
2252 assert!(response.location.unwrap().contains("custom"));
2253 }
2254
2255 #[tokio::test]
2256 async fn test_config_storage_options() {
2257 let temp_dir = TempStdDir::default();
2258
2259 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
2260 .storage_option("option1", "value1")
2261 .storage_option("option2", "value2")
2262 .build()
2263 .await
2264 .unwrap();
2265
2266 let schema = create_test_schema();
2268 let ipc_data = create_test_ipc_data(&schema);
2269
2270 let mut request = CreateTableRequest::new();
2272 request.id = Some(vec!["test_table".to_string()]);
2273
2274 let response = namespace
2275 .create_table(request, bytes::Bytes::from(ipc_data))
2276 .await
2277 .unwrap();
2278
2279 let storage_options = response.storage_options.unwrap();
2280 assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
2281 assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
2282 }
2283
2284 #[tokio::test]
2285 async fn test_from_properties_manifest_enabled() {
2286 let temp_dir = TempStdDir::default();
2287
2288 let mut properties = HashMap::new();
2289 properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2290 properties.insert("manifest_enabled".to_string(), "true".to_string());
2291 properties.insert("dir_listing_enabled".to_string(), "false".to_string());
2292
2293 let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2294 assert!(builder.manifest_enabled);
2295 assert!(!builder.dir_listing_enabled);
2296
2297 let namespace = builder.build().await.unwrap();
2298
2299 let schema = create_test_schema();
2301 let ipc_data = create_test_ipc_data(&schema);
2302
2303 let mut request = CreateTableRequest::new();
2305 request.id = Some(vec!["test_table".to_string()]);
2306
2307 let response = namespace
2308 .create_table(request, bytes::Bytes::from(ipc_data))
2309 .await
2310 .unwrap();
2311
2312 assert!(response.location.is_some());
2313 }
2314
2315 #[tokio::test]
2316 async fn test_from_properties_dir_listing_enabled() {
2317 let temp_dir = TempStdDir::default();
2318
2319 let mut properties = HashMap::new();
2320 properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2321 properties.insert("manifest_enabled".to_string(), "false".to_string());
2322 properties.insert("dir_listing_enabled".to_string(), "true".to_string());
2323
2324 let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2325 assert!(!builder.manifest_enabled);
2326 assert!(builder.dir_listing_enabled);
2327
2328 let namespace = builder.build().await.unwrap();
2329
2330 let schema = create_test_schema();
2332 let ipc_data = create_test_ipc_data(&schema);
2333
2334 let mut request = CreateTableRequest::new();
2336 request.id = Some(vec!["test_table".to_string()]);
2337
2338 let response = namespace
2339 .create_table(request, bytes::Bytes::from(ipc_data))
2340 .await
2341 .unwrap();
2342
2343 assert!(response.location.is_some());
2344 }
2345
2346 #[tokio::test]
2347 async fn test_from_properties_defaults() {
2348 let temp_dir = TempStdDir::default();
2349
2350 let mut properties = HashMap::new();
2351 properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2352
2353 let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2354 assert!(builder.manifest_enabled);
2356 assert!(builder.dir_listing_enabled);
2357 }
2358
2359 #[tokio::test]
2360 async fn test_from_properties_with_storage_options() {
2361 let temp_dir = TempStdDir::default();
2362
2363 let mut properties = HashMap::new();
2364 properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2365 properties.insert("manifest_enabled".to_string(), "true".to_string());
2366 properties.insert("storage.region".to_string(), "us-west-2".to_string());
2367 properties.insert("storage.bucket".to_string(), "my-bucket".to_string());
2368
2369 let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2370 assert!(builder.manifest_enabled);
2371 assert!(builder.storage_options.is_some());
2372
2373 let storage_options = builder.storage_options.unwrap();
2374 assert_eq!(
2375 storage_options.get("region"),
2376 Some(&"us-west-2".to_string())
2377 );
2378 assert_eq!(
2379 storage_options.get("bucket"),
2380 Some(&"my-bucket".to_string())
2381 );
2382 }
2383
2384 #[tokio::test]
2385 async fn test_various_arrow_types() {
2386 let (namespace, _temp_dir) = create_test_namespace().await;
2387
2388 let fields = vec![
2390 JsonArrowField {
2391 name: "bool_col".to_string(),
2392 r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
2393 nullable: true,
2394 metadata: None,
2395 },
2396 JsonArrowField {
2397 name: "int8_col".to_string(),
2398 r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
2399 nullable: true,
2400 metadata: None,
2401 },
2402 JsonArrowField {
2403 name: "float64_col".to_string(),
2404 r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
2405 nullable: true,
2406 metadata: None,
2407 },
2408 JsonArrowField {
2409 name: "binary_col".to_string(),
2410 r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
2411 nullable: true,
2412 metadata: None,
2413 },
2414 ];
2415
2416 let schema = JsonArrowSchema {
2417 fields,
2418 metadata: None,
2419 };
2420
2421 let ipc_data = create_test_ipc_data(&schema);
2423
2424 let mut request = CreateTableRequest::new();
2425 request.id = Some(vec!["complex_table".to_string()]);
2426
2427 let response = namespace
2428 .create_table(request, bytes::Bytes::from(ipc_data))
2429 .await
2430 .unwrap();
2431
2432 assert!(response.location.is_some());
2433 }
2434
2435 #[tokio::test]
2436 async fn test_connect_dir() {
2437 let temp_dir = TempStdDir::default();
2438
2439 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
2440 .build()
2441 .await
2442 .unwrap();
2443
2444 let mut request = ListTablesRequest::new();
2446 request.id = Some(vec![]);
2447 let response = namespace.list_tables(request).await.unwrap();
2448 assert_eq!(response.tables.len(), 0);
2449 }
2450
2451 #[tokio::test]
2452 async fn test_create_table_with_ipc_data() {
2453 use arrow::array::{Int32Array, StringArray};
2454 use arrow::ipc::writer::StreamWriter;
2455
2456 let (namespace, _temp_dir) = create_test_namespace().await;
2457
2458 let schema = create_test_schema();
2460
2461 let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
2463 let arrow_schema = Arc::new(arrow_schema);
2464
2465 let id_array = Int32Array::from(vec![1, 2, 3]);
2467 let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
2468 let batch = arrow::record_batch::RecordBatch::try_new(
2469 arrow_schema.clone(),
2470 vec![Arc::new(id_array), Arc::new(name_array)],
2471 )
2472 .unwrap();
2473
2474 let mut buffer = Vec::new();
2476 {
2477 let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
2478 writer.write(&batch).unwrap();
2479 writer.finish().unwrap();
2480 }
2481
2482 let mut request = CreateTableRequest::new();
2484 request.id = Some(vec!["test_table_with_data".to_string()]);
2485
2486 let response = namespace
2487 .create_table(request, Bytes::from(buffer))
2488 .await
2489 .unwrap();
2490
2491 assert_eq!(response.version, Some(1));
2492 assert!(
2493 response
2494 .location
2495 .unwrap()
2496 .contains("test_table_with_data.lance")
2497 );
2498
2499 let mut exists_request = TableExistsRequest::new();
2501 exists_request.id = Some(vec!["test_table_with_data".to_string()]);
2502 namespace.table_exists(exists_request).await.unwrap();
2503 }
2504
2505 #[tokio::test]
2506 async fn test_child_namespace_create_and_list() {
2507 let (namespace, _temp_dir) = create_test_namespace().await;
2508
2509 for i in 1..=3 {
2511 let mut create_req = CreateNamespaceRequest::new();
2512 create_req.id = Some(vec![format!("ns{}", i)]);
2513 let result = namespace.create_namespace(create_req).await;
2514 assert!(result.is_ok(), "Failed to create child namespace ns{}", i);
2515 }
2516
2517 let list_req = ListNamespacesRequest {
2519 id: Some(vec![]),
2520 ..Default::default()
2521 };
2522 let result = namespace.list_namespaces(list_req).await;
2523 assert!(result.is_ok());
2524 let namespaces = result.unwrap().namespaces;
2525 assert_eq!(namespaces.len(), 3);
2526 assert!(namespaces.contains(&"ns1".to_string()));
2527 assert!(namespaces.contains(&"ns2".to_string()));
2528 assert!(namespaces.contains(&"ns3".to_string()));
2529 }
2530
2531 #[tokio::test]
2532 async fn test_nested_namespace_hierarchy() {
2533 let (namespace, _temp_dir) = create_test_namespace().await;
2534
2535 let mut create_req = CreateNamespaceRequest::new();
2537 create_req.id = Some(vec!["parent".to_string()]);
2538 namespace.create_namespace(create_req).await.unwrap();
2539
2540 let mut create_req = CreateNamespaceRequest::new();
2542 create_req.id = Some(vec!["parent".to_string(), "child1".to_string()]);
2543 namespace.create_namespace(create_req).await.unwrap();
2544
2545 let mut create_req = CreateNamespaceRequest::new();
2546 create_req.id = Some(vec!["parent".to_string(), "child2".to_string()]);
2547 namespace.create_namespace(create_req).await.unwrap();
2548
2549 let list_req = ListNamespacesRequest {
2551 id: Some(vec!["parent".to_string()]),
2552 ..Default::default()
2553 };
2554 let result = namespace.list_namespaces(list_req).await;
2555 assert!(result.is_ok());
2556 let children = result.unwrap().namespaces;
2557 assert_eq!(children.len(), 2);
2558 assert!(children.contains(&"child1".to_string()));
2559 assert!(children.contains(&"child2".to_string()));
2560
2561 let list_req = ListNamespacesRequest {
2563 id: Some(vec![]),
2564 ..Default::default()
2565 };
2566 let result = namespace.list_namespaces(list_req).await;
2567 assert!(result.is_ok());
2568 let root_namespaces = result.unwrap().namespaces;
2569 assert_eq!(root_namespaces.len(), 1);
2570 assert_eq!(root_namespaces[0], "parent");
2571 }
2572
2573 #[tokio::test]
2574 async fn test_table_in_child_namespace() {
2575 let (namespace, _temp_dir) = create_test_namespace().await;
2576
2577 let mut create_ns_req = CreateNamespaceRequest::new();
2579 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2580 namespace.create_namespace(create_ns_req).await.unwrap();
2581
2582 let schema = create_test_schema();
2584 let ipc_data = create_test_ipc_data(&schema);
2585 let mut create_table_req = CreateTableRequest::new();
2586 create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2587 let result = namespace
2588 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2589 .await;
2590 assert!(result.is_ok(), "Failed to create table in child namespace");
2591
2592 let list_req = ListTablesRequest {
2594 id: Some(vec!["test_ns".to_string()]),
2595 ..Default::default()
2596 };
2597 let result = namespace.list_tables(list_req).await;
2598 assert!(result.is_ok());
2599 let tables = result.unwrap().tables;
2600 assert_eq!(tables.len(), 1);
2601 assert_eq!(tables[0], "table1");
2602
2603 let mut exists_req = TableExistsRequest::new();
2605 exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2606 let result = namespace.table_exists(exists_req).await;
2607 assert!(result.is_ok());
2608
2609 let mut describe_req = DescribeTableRequest::new();
2611 describe_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2612 let result = namespace.describe_table(describe_req).await;
2613 assert!(result.is_ok());
2614 let response = result.unwrap();
2615 assert!(response.location.is_some());
2616 }
2617
2618 #[tokio::test]
2619 async fn test_multiple_tables_in_child_namespace() {
2620 let (namespace, _temp_dir) = create_test_namespace().await;
2621
2622 let mut create_ns_req = CreateNamespaceRequest::new();
2624 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2625 namespace.create_namespace(create_ns_req).await.unwrap();
2626
2627 let schema = create_test_schema();
2629 let ipc_data = create_test_ipc_data(&schema);
2630 for i in 1..=3 {
2631 let mut create_table_req = CreateTableRequest::new();
2632 create_table_req.id = Some(vec!["test_ns".to_string(), format!("table{}", i)]);
2633 namespace
2634 .create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
2635 .await
2636 .unwrap();
2637 }
2638
2639 let list_req = ListTablesRequest {
2641 id: Some(vec!["test_ns".to_string()]),
2642 ..Default::default()
2643 };
2644 let result = namespace.list_tables(list_req).await;
2645 assert!(result.is_ok());
2646 let tables = result.unwrap().tables;
2647 assert_eq!(tables.len(), 3);
2648 assert!(tables.contains(&"table1".to_string()));
2649 assert!(tables.contains(&"table2".to_string()));
2650 assert!(tables.contains(&"table3".to_string()));
2651 }
2652
2653 #[tokio::test]
2654 async fn test_drop_table_in_child_namespace() {
2655 let (namespace, _temp_dir) = create_test_namespace().await;
2656
2657 let mut create_ns_req = CreateNamespaceRequest::new();
2659 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2660 namespace.create_namespace(create_ns_req).await.unwrap();
2661
2662 let schema = create_test_schema();
2664 let ipc_data = create_test_ipc_data(&schema);
2665 let mut create_table_req = CreateTableRequest::new();
2666 create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2667 namespace
2668 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2669 .await
2670 .unwrap();
2671
2672 let mut drop_req = DropTableRequest::new();
2674 drop_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2675 let result = namespace.drop_table(drop_req).await;
2676 assert!(result.is_ok(), "Failed to drop table in child namespace");
2677
2678 let mut exists_req = TableExistsRequest::new();
2680 exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2681 let result = namespace.table_exists(exists_req).await;
2682 assert!(result.is_err());
2683 }
2684
2685 #[tokio::test]
2686 async fn test_deeply_nested_namespace() {
2687 let (namespace, _temp_dir) = create_test_namespace().await;
2688
2689 let mut create_req = CreateNamespaceRequest::new();
2691 create_req.id = Some(vec!["level1".to_string()]);
2692 namespace.create_namespace(create_req).await.unwrap();
2693
2694 let mut create_req = CreateNamespaceRequest::new();
2695 create_req.id = Some(vec!["level1".to_string(), "level2".to_string()]);
2696 namespace.create_namespace(create_req).await.unwrap();
2697
2698 let mut create_req = CreateNamespaceRequest::new();
2699 create_req.id = Some(vec![
2700 "level1".to_string(),
2701 "level2".to_string(),
2702 "level3".to_string(),
2703 ]);
2704 namespace.create_namespace(create_req).await.unwrap();
2705
2706 let schema = create_test_schema();
2708 let ipc_data = create_test_ipc_data(&schema);
2709 let mut create_table_req = CreateTableRequest::new();
2710 create_table_req.id = Some(vec![
2711 "level1".to_string(),
2712 "level2".to_string(),
2713 "level3".to_string(),
2714 "table1".to_string(),
2715 ]);
2716 let result = namespace
2717 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2718 .await;
2719 assert!(
2720 result.is_ok(),
2721 "Failed to create table in deeply nested namespace"
2722 );
2723
2724 let mut exists_req = TableExistsRequest::new();
2726 exists_req.id = Some(vec![
2727 "level1".to_string(),
2728 "level2".to_string(),
2729 "level3".to_string(),
2730 "table1".to_string(),
2731 ]);
2732 let result = namespace.table_exists(exists_req).await;
2733 assert!(result.is_ok());
2734 }
2735
2736 #[tokio::test]
2737 async fn test_namespace_with_properties() {
2738 let (namespace, _temp_dir) = create_test_namespace().await;
2739
2740 let mut properties = HashMap::new();
2742 properties.insert("owner".to_string(), "test_user".to_string());
2743 properties.insert("description".to_string(), "Test namespace".to_string());
2744
2745 let mut create_req = CreateNamespaceRequest::new();
2746 create_req.id = Some(vec!["test_ns".to_string()]);
2747 create_req.properties = Some(properties.clone());
2748 namespace.create_namespace(create_req).await.unwrap();
2749
2750 let describe_req = DescribeNamespaceRequest {
2752 id: Some(vec!["test_ns".to_string()]),
2753 ..Default::default()
2754 };
2755 let result = namespace.describe_namespace(describe_req).await;
2756 assert!(result.is_ok());
2757 let response = result.unwrap();
2758 assert!(response.properties.is_some());
2759 let props = response.properties.unwrap();
2760 assert_eq!(props.get("owner"), Some(&"test_user".to_string()));
2761 assert_eq!(
2762 props.get("description"),
2763 Some(&"Test namespace".to_string())
2764 );
2765 }
2766
2767 #[tokio::test]
2768 async fn test_cannot_drop_namespace_with_tables() {
2769 let (namespace, _temp_dir) = create_test_namespace().await;
2770
2771 let mut create_ns_req = CreateNamespaceRequest::new();
2773 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2774 namespace.create_namespace(create_ns_req).await.unwrap();
2775
2776 let schema = create_test_schema();
2778 let ipc_data = create_test_ipc_data(&schema);
2779 let mut create_table_req = CreateTableRequest::new();
2780 create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2781 namespace
2782 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2783 .await
2784 .unwrap();
2785
2786 let mut drop_req = DropNamespaceRequest::new();
2788 drop_req.id = Some(vec!["test_ns".to_string()]);
2789 let result = namespace.drop_namespace(drop_req).await;
2790 assert!(
2791 result.is_err(),
2792 "Should not be able to drop namespace with tables"
2793 );
2794 }
2795
2796 #[tokio::test]
2797 async fn test_isolation_between_namespaces() {
2798 let (namespace, _temp_dir) = create_test_namespace().await;
2799
2800 let mut create_req = CreateNamespaceRequest::new();
2802 create_req.id = Some(vec!["ns1".to_string()]);
2803 namespace.create_namespace(create_req).await.unwrap();
2804
2805 let mut create_req = CreateNamespaceRequest::new();
2806 create_req.id = Some(vec!["ns2".to_string()]);
2807 namespace.create_namespace(create_req).await.unwrap();
2808
2809 let schema = create_test_schema();
2811 let ipc_data = create_test_ipc_data(&schema);
2812
2813 let mut create_table_req = CreateTableRequest::new();
2814 create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2815 namespace
2816 .create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
2817 .await
2818 .unwrap();
2819
2820 let mut create_table_req = CreateTableRequest::new();
2821 create_table_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
2822 namespace
2823 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2824 .await
2825 .unwrap();
2826
2827 let list_req = ListTablesRequest {
2829 id: Some(vec!["ns1".to_string()]),
2830 page_token: None,
2831 limit: None,
2832 ..Default::default()
2833 };
2834 let result = namespace.list_tables(list_req).await.unwrap();
2835 assert_eq!(result.tables.len(), 1);
2836 assert_eq!(result.tables[0], "table1");
2837
2838 let list_req = ListTablesRequest {
2839 id: Some(vec!["ns2".to_string()]),
2840 page_token: None,
2841 limit: None,
2842 ..Default::default()
2843 };
2844 let result = namespace.list_tables(list_req).await.unwrap();
2845 assert_eq!(result.tables.len(), 1);
2846 assert_eq!(result.tables[0], "table1");
2847
2848 let mut drop_req = DropTableRequest::new();
2850 drop_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2851 namespace.drop_table(drop_req).await.unwrap();
2852
2853 let mut exists_req = TableExistsRequest::new();
2855 exists_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2856 assert!(namespace.table_exists(exists_req).await.is_err());
2857
2858 let mut exists_req = TableExistsRequest::new();
2859 exists_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
2860 assert!(namespace.table_exists(exists_req).await.is_ok());
2861 }
2862
2863 #[tokio::test]
2864 async fn test_migrate_directory_tables() {
2865 let temp_dir = TempStdDir::default();
2866 let temp_path = temp_dir.to_str().unwrap();
2867
2868 let dir_only_ns = DirectoryNamespaceBuilder::new(temp_path)
2870 .manifest_enabled(false)
2871 .dir_listing_enabled(true)
2872 .build()
2873 .await
2874 .unwrap();
2875
2876 let schema = create_test_schema();
2878 let ipc_data = create_test_ipc_data(&schema);
2879
2880 for i in 1..=3 {
2881 let mut create_req = CreateTableRequest::new();
2882 create_req.id = Some(vec![format!("table{}", i)]);
2883 dir_only_ns
2884 .create_table(create_req, bytes::Bytes::from(ipc_data.clone()))
2885 .await
2886 .unwrap();
2887 }
2888
2889 drop(dir_only_ns);
2890
2891 let dual_mode_ns = DirectoryNamespaceBuilder::new(temp_path)
2893 .manifest_enabled(true)
2894 .dir_listing_enabled(true)
2895 .build()
2896 .await
2897 .unwrap();
2898
2899 let mut list_req = ListTablesRequest::new();
2901 list_req.id = Some(vec![]);
2902 let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
2903 assert_eq!(tables.len(), 3);
2904
2905 let migrated_count = dual_mode_ns.migrate().await.unwrap();
2907 assert_eq!(migrated_count, 3, "Should migrate all 3 tables");
2908
2909 let mut list_req = ListTablesRequest::new();
2911 list_req.id = Some(vec![]);
2912 let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
2913 assert_eq!(tables.len(), 3);
2914
2915 let migrated_count = dual_mode_ns.migrate().await.unwrap();
2917 assert_eq!(
2918 migrated_count, 0,
2919 "Should not migrate already-migrated tables"
2920 );
2921
2922 drop(dual_mode_ns);
2923
2924 let manifest_only_ns = DirectoryNamespaceBuilder::new(temp_path)
2926 .manifest_enabled(true)
2927 .dir_listing_enabled(false)
2928 .build()
2929 .await
2930 .unwrap();
2931
2932 let mut list_req = ListTablesRequest::new();
2934 list_req.id = Some(vec![]);
2935 let tables = manifest_only_ns.list_tables(list_req).await.unwrap().tables;
2936 assert_eq!(tables.len(), 3);
2937 assert!(tables.contains(&"table1".to_string()));
2938 assert!(tables.contains(&"table2".to_string()));
2939 assert!(tables.contains(&"table3".to_string()));
2940 }
2941
2942 #[tokio::test]
2943 async fn test_migrate_without_manifest() {
2944 let temp_dir = TempStdDir::default();
2945 let temp_path = temp_dir.to_str().unwrap();
2946
2947 let namespace = DirectoryNamespaceBuilder::new(temp_path)
2949 .manifest_enabled(false)
2950 .dir_listing_enabled(true)
2951 .build()
2952 .await
2953 .unwrap();
2954
2955 let migrated_count = namespace.migrate().await.unwrap();
2957 assert_eq!(migrated_count, 0);
2958 }
2959
2960 #[tokio::test]
2961 async fn test_register_table() {
2962 use lance_namespace::models::{RegisterTableRequest, TableExistsRequest};
2963
2964 let temp_dir = TempStdDir::default();
2965 let temp_path = temp_dir.to_str().unwrap();
2966
2967 let namespace = DirectoryNamespaceBuilder::new(temp_path)
2968 .build()
2969 .await
2970 .unwrap();
2971
2972 let schema = create_test_schema();
2974 let ipc_data = create_test_ipc_data(&schema);
2975
2976 let table_uri = format!("{}/external_table.lance", temp_path);
2977 let cursor = Cursor::new(ipc_data);
2978 let stream_reader = StreamReader::try_new(cursor, None).unwrap();
2979 let batches: Vec<_> = stream_reader
2980 .collect::<std::result::Result<Vec<_>, _>>()
2981 .unwrap();
2982 let schema = batches[0].schema();
2983 let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
2984 let reader = RecordBatchIterator::new(batch_results, schema);
2985 Dataset::write(Box::new(reader), &table_uri, None)
2986 .await
2987 .unwrap();
2988
2989 let mut register_req = RegisterTableRequest::new("external_table.lance".to_string());
2991 register_req.id = Some(vec!["registered_table".to_string()]);
2992
2993 let response = namespace.register_table(register_req).await.unwrap();
2994 assert_eq!(response.location, Some("external_table.lance".to_string()));
2995
2996 let mut exists_req = TableExistsRequest::new();
2998 exists_req.id = Some(vec!["registered_table".to_string()]);
2999 assert!(namespace.table_exists(exists_req).await.is_ok());
3000
3001 let mut list_req = ListTablesRequest::new();
3003 list_req.id = Some(vec![]);
3004 let tables = namespace.list_tables(list_req).await.unwrap();
3005 assert!(tables.tables.contains(&"registered_table".to_string()));
3006 }
3007
3008 #[tokio::test]
3009 async fn test_register_table_duplicate_fails() {
3010 use lance_namespace::models::RegisterTableRequest;
3011
3012 let temp_dir = TempStdDir::default();
3013 let temp_path = temp_dir.to_str().unwrap();
3014
3015 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3016 .build()
3017 .await
3018 .unwrap();
3019
3020 let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
3022 register_req.id = Some(vec!["test_table".to_string()]);
3023
3024 namespace
3025 .register_table(register_req.clone())
3026 .await
3027 .unwrap();
3028
3029 let result = namespace.register_table(register_req).await;
3031 assert!(result.is_err());
3032 assert!(result.unwrap_err().to_string().contains("already exists"));
3033 }
3034
3035 #[tokio::test]
3036 async fn test_deregister_table() {
3037 use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
3038
3039 let temp_dir = TempStdDir::default();
3040 let temp_path = temp_dir.to_str().unwrap();
3041
3042 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3045 .manifest_enabled(true)
3046 .dir_listing_enabled(false)
3047 .build()
3048 .await
3049 .unwrap();
3050
3051 let schema = create_test_schema();
3053 let ipc_data = create_test_ipc_data(&schema);
3054
3055 let mut create_req = CreateTableRequest::new();
3056 create_req.id = Some(vec!["test_table".to_string()]);
3057 namespace
3058 .create_table(create_req, bytes::Bytes::from(ipc_data))
3059 .await
3060 .unwrap();
3061
3062 let mut exists_req = TableExistsRequest::new();
3064 exists_req.id = Some(vec!["test_table".to_string()]);
3065 assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
3066
3067 let mut deregister_req = DeregisterTableRequest::new();
3069 deregister_req.id = Some(vec!["test_table".to_string()]);
3070 let response = namespace.deregister_table(deregister_req).await.unwrap();
3071
3072 assert!(
3074 response.location.is_some(),
3075 "Deregister should return location"
3076 );
3077 let location = response.location.as_ref().unwrap();
3078 let expected_url = lance_io::object_store::uri_to_url(temp_path)
3081 .expect("Failed to convert temp path to URL");
3082 let expected_prefix = expected_url.to_string();
3083 assert!(
3084 location.starts_with(&expected_prefix),
3085 "Location should start with '{}', got: {}",
3086 expected_prefix,
3087 location
3088 );
3089 assert!(
3090 location.contains("test_table"),
3091 "Location should contain table name: {}",
3092 location
3093 );
3094 assert_eq!(response.id, Some(vec!["test_table".to_string()]));
3095
3096 assert!(namespace.table_exists(exists_req).await.is_err());
3098
3099 let dataset = Dataset::open(location).await;
3101 assert!(
3102 dataset.is_ok(),
3103 "Physical table data should still exist at {}",
3104 location
3105 );
3106 }
3107
3108 #[tokio::test]
3109 async fn test_deregister_table_in_child_namespace() {
3110 use lance_namespace::models::{
3111 CreateNamespaceRequest, DeregisterTableRequest, TableExistsRequest,
3112 };
3113
3114 let temp_dir = TempStdDir::default();
3115 let temp_path = temp_dir.to_str().unwrap();
3116
3117 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3118 .build()
3119 .await
3120 .unwrap();
3121
3122 let mut create_ns_req = CreateNamespaceRequest::new();
3124 create_ns_req.id = Some(vec!["test_ns".to_string()]);
3125 namespace.create_namespace(create_ns_req).await.unwrap();
3126
3127 let schema = create_test_schema();
3129 let ipc_data = create_test_ipc_data(&schema);
3130
3131 let mut create_req = CreateTableRequest::new();
3132 create_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
3133 namespace
3134 .create_table(create_req, bytes::Bytes::from(ipc_data))
3135 .await
3136 .unwrap();
3137
3138 let mut deregister_req = DeregisterTableRequest::new();
3140 deregister_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
3141 let response = namespace.deregister_table(deregister_req).await.unwrap();
3142
3143 assert!(
3145 response.location.is_some(),
3146 "Deregister should return location"
3147 );
3148 let location = response.location.as_ref().unwrap();
3149 let expected_url = lance_io::object_store::uri_to_url(temp_path)
3152 .expect("Failed to convert temp path to URL");
3153 let expected_prefix = expected_url.to_string();
3154 assert!(
3155 location.starts_with(&expected_prefix),
3156 "Location should start with '{}', got: {}",
3157 expected_prefix,
3158 location
3159 );
3160 assert!(
3161 location.contains("test_ns") && location.contains("test_table"),
3162 "Location should contain namespace and table name: {}",
3163 location
3164 );
3165 assert_eq!(
3166 response.id,
3167 Some(vec!["test_ns".to_string(), "test_table".to_string()])
3168 );
3169
3170 let mut exists_req = TableExistsRequest::new();
3172 exists_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
3173 assert!(namespace.table_exists(exists_req).await.is_err());
3174 }
3175
3176 #[tokio::test]
3177 async fn test_register_without_manifest_fails() {
3178 use lance_namespace::models::RegisterTableRequest;
3179
3180 let temp_dir = TempStdDir::default();
3181 let temp_path = temp_dir.to_str().unwrap();
3182
3183 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3185 .manifest_enabled(false)
3186 .build()
3187 .await
3188 .unwrap();
3189
3190 let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
3192 register_req.id = Some(vec!["test_table".to_string()]);
3193 let result = namespace.register_table(register_req).await;
3194 assert!(result.is_err());
3195 assert!(
3196 result
3197 .unwrap_err()
3198 .to_string()
3199 .contains("manifest mode is enabled")
3200 );
3201
3202 }
3205
3206 #[tokio::test]
3207 async fn test_register_table_rejects_absolute_uri() {
3208 use lance_namespace::models::RegisterTableRequest;
3209
3210 let temp_dir = TempStdDir::default();
3211 let temp_path = temp_dir.to_str().unwrap();
3212
3213 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3214 .build()
3215 .await
3216 .unwrap();
3217
3218 let mut register_req = RegisterTableRequest::new("s3://bucket/table.lance".to_string());
3220 register_req.id = Some(vec!["test_table".to_string()]);
3221 let result = namespace.register_table(register_req).await;
3222 assert!(result.is_err());
3223 let err_msg = result.unwrap_err().to_string();
3224 assert!(err_msg.contains("Absolute URIs are not allowed"));
3225 }
3226
3227 #[tokio::test]
3228 async fn test_register_table_rejects_absolute_path() {
3229 use lance_namespace::models::RegisterTableRequest;
3230
3231 let temp_dir = TempStdDir::default();
3232 let temp_path = temp_dir.to_str().unwrap();
3233
3234 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3235 .build()
3236 .await
3237 .unwrap();
3238
3239 let mut register_req = RegisterTableRequest::new("/tmp/table.lance".to_string());
3241 register_req.id = Some(vec!["test_table".to_string()]);
3242 let result = namespace.register_table(register_req).await;
3243 assert!(result.is_err());
3244 let err_msg = result.unwrap_err().to_string();
3245 assert!(err_msg.contains("Absolute paths are not allowed"));
3246 }
3247
3248 #[tokio::test]
3249 async fn test_register_table_rejects_path_traversal() {
3250 use lance_namespace::models::RegisterTableRequest;
3251
3252 let temp_dir = TempStdDir::default();
3253 let temp_path = temp_dir.to_str().unwrap();
3254
3255 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3256 .build()
3257 .await
3258 .unwrap();
3259
3260 let mut register_req = RegisterTableRequest::new("../outside/table.lance".to_string());
3262 register_req.id = Some(vec!["test_table".to_string()]);
3263 let result = namespace.register_table(register_req).await;
3264 assert!(result.is_err());
3265 let err_msg = result.unwrap_err().to_string();
3266 assert!(err_msg.contains("Path traversal is not allowed"));
3267 }
3268
3269 #[tokio::test]
3270 async fn test_namespace_write() {
3271 use arrow::array::Int32Array;
3272 use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
3273 use arrow::record_batch::{RecordBatch, RecordBatchIterator};
3274 use lance::dataset::{Dataset, WriteMode, WriteParams};
3275 use lance_namespace::LanceNamespace;
3276
3277 let (namespace, _temp_dir) = create_test_namespace().await;
3278 let namespace = Arc::new(namespace) as Arc<dyn LanceNamespace>;
3279
3280 let table_id = vec!["test_ns".to_string(), "test_table".to_string()];
3282 let schema = Arc::new(ArrowSchema::new(vec![
3283 ArrowField::new("a", DataType::Int32, false),
3284 ArrowField::new("b", DataType::Int32, false),
3285 ]));
3286
3287 let data1 = RecordBatch::try_new(
3289 schema.clone(),
3290 vec![
3291 Arc::new(Int32Array::from(vec![1, 2, 3])),
3292 Arc::new(Int32Array::from(vec![10, 20, 30])),
3293 ],
3294 )
3295 .unwrap();
3296
3297 let reader1 = RecordBatchIterator::new(vec![data1].into_iter().map(Ok), schema.clone());
3298 let dataset =
3299 Dataset::write_into_namespace(reader1, namespace.clone(), table_id.clone(), None)
3300 .await
3301 .unwrap();
3302
3303 assert_eq!(dataset.count_rows(None).await.unwrap(), 3);
3304 assert_eq!(dataset.version().version, 1);
3305
3306 let data2 = RecordBatch::try_new(
3308 schema.clone(),
3309 vec![
3310 Arc::new(Int32Array::from(vec![4, 5])),
3311 Arc::new(Int32Array::from(vec![40, 50])),
3312 ],
3313 )
3314 .unwrap();
3315
3316 let params_append = WriteParams {
3317 mode: WriteMode::Append,
3318 ..Default::default()
3319 };
3320
3321 let reader2 = RecordBatchIterator::new(vec![data2].into_iter().map(Ok), schema.clone());
3322 let dataset = Dataset::write_into_namespace(
3323 reader2,
3324 namespace.clone(),
3325 table_id.clone(),
3326 Some(params_append),
3327 )
3328 .await
3329 .unwrap();
3330
3331 assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
3332 assert_eq!(dataset.version().version, 2);
3333
3334 let data3 = RecordBatch::try_new(
3336 schema.clone(),
3337 vec![
3338 Arc::new(Int32Array::from(vec![100, 200])),
3339 Arc::new(Int32Array::from(vec![1000, 2000])),
3340 ],
3341 )
3342 .unwrap();
3343
3344 let params_overwrite = WriteParams {
3345 mode: WriteMode::Overwrite,
3346 ..Default::default()
3347 };
3348
3349 let reader3 = RecordBatchIterator::new(vec![data3].into_iter().map(Ok), schema.clone());
3350 let dataset = Dataset::write_into_namespace(
3351 reader3,
3352 namespace.clone(),
3353 table_id.clone(),
3354 Some(params_overwrite),
3355 )
3356 .await
3357 .unwrap();
3358
3359 assert_eq!(dataset.count_rows(None).await.unwrap(), 2);
3360 assert_eq!(dataset.version().version, 3);
3361
3362 let result = dataset.scan().try_into_batch().await.unwrap();
3364 let a_col = result
3365 .column_by_name("a")
3366 .unwrap()
3367 .as_any()
3368 .downcast_ref::<Int32Array>()
3369 .unwrap();
3370 assert_eq!(a_col.values(), &[100, 200]);
3371 }
3372
3373 #[tokio::test]
3378 async fn test_declare_table_v1_mode() {
3379 use lance_namespace::models::{
3380 DeclareTableRequest, DescribeTableRequest, TableExistsRequest,
3381 };
3382
3383 let temp_dir = TempStdDir::default();
3384 let temp_path = temp_dir.to_str().unwrap();
3385
3386 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3388 .manifest_enabled(false)
3389 .build()
3390 .await
3391 .unwrap();
3392
3393 let mut declare_req = DeclareTableRequest::new();
3395 declare_req.id = Some(vec!["test_table".to_string()]);
3396 let response = namespace.declare_table(declare_req).await.unwrap();
3397
3398 assert!(response.location.is_some());
3400 let location = response.location.as_ref().unwrap();
3401 assert!(location.ends_with("test_table.lance"));
3402
3403 let mut exists_req = TableExistsRequest::new();
3405 exists_req.id = Some(vec!["test_table".to_string()]);
3406 assert!(namespace.table_exists(exists_req).await.is_ok());
3407
3408 let mut describe_req = DescribeTableRequest::new();
3410 describe_req.id = Some(vec!["test_table".to_string()]);
3411 let describe_response = namespace.describe_table(describe_req).await.unwrap();
3412 assert!(describe_response.location.is_some());
3413 assert!(describe_response.version.is_none()); assert!(describe_response.schema.is_none()); }
3416
3417 #[tokio::test]
3418 async fn test_declare_table_with_manifest() {
3419 use lance_namespace::models::{DeclareTableRequest, TableExistsRequest};
3420
3421 let temp_dir = TempStdDir::default();
3422 let temp_path = temp_dir.to_str().unwrap();
3423
3424 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3426 .manifest_enabled(true)
3427 .dir_listing_enabled(false)
3428 .build()
3429 .await
3430 .unwrap();
3431
3432 let mut declare_req = DeclareTableRequest::new();
3434 declare_req.id = Some(vec!["test_table".to_string()]);
3435 let response = namespace.declare_table(declare_req).await.unwrap();
3436
3437 assert!(response.location.is_some());
3439
3440 let mut exists_req = TableExistsRequest::new();
3442 exists_req.id = Some(vec!["test_table".to_string()]);
3443 assert!(namespace.table_exists(exists_req).await.is_ok());
3444 }
3445
3446 #[tokio::test]
3447 async fn test_declare_table_when_table_exists() {
3448 use lance_namespace::models::DeclareTableRequest;
3449
3450 let temp_dir = TempStdDir::default();
3451 let temp_path = temp_dir.to_str().unwrap();
3452
3453 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3454 .manifest_enabled(false)
3455 .build()
3456 .await
3457 .unwrap();
3458
3459 let schema = create_test_schema();
3461 let ipc_data = create_test_ipc_data(&schema);
3462 let mut create_req = CreateTableRequest::new();
3463 create_req.id = Some(vec!["test_table".to_string()]);
3464 namespace
3465 .create_table(create_req, bytes::Bytes::from(ipc_data))
3466 .await
3467 .unwrap();
3468
3469 let mut declare_req = DeclareTableRequest::new();
3471 declare_req.id = Some(vec!["test_table".to_string()]);
3472 let result = namespace.declare_table(declare_req).await;
3473 assert!(result.is_err());
3474 }
3475
3476 #[tokio::test]
3481 async fn test_deregister_table_v1_mode() {
3482 use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
3483
3484 let temp_dir = TempStdDir::default();
3485 let temp_path = temp_dir.to_str().unwrap();
3486
3487 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3489 .manifest_enabled(false)
3490 .dir_listing_enabled(true)
3491 .build()
3492 .await
3493 .unwrap();
3494
3495 let schema = create_test_schema();
3497 let ipc_data = create_test_ipc_data(&schema);
3498 let mut create_req = CreateTableRequest::new();
3499 create_req.id = Some(vec!["test_table".to_string()]);
3500 namespace
3501 .create_table(create_req, bytes::Bytes::from(ipc_data))
3502 .await
3503 .unwrap();
3504
3505 let mut exists_req = TableExistsRequest::new();
3507 exists_req.id = Some(vec!["test_table".to_string()]);
3508 assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
3509
3510 let mut deregister_req = DeregisterTableRequest::new();
3512 deregister_req.id = Some(vec!["test_table".to_string()]);
3513 let response = namespace.deregister_table(deregister_req).await.unwrap();
3514
3515 assert!(response.location.is_some());
3517 let location = response.location.as_ref().unwrap();
3518 assert!(location.contains("test_table"));
3519
3520 let result = namespace.table_exists(exists_req).await;
3522 assert!(result.is_err());
3523 assert!(result.unwrap_err().to_string().contains("deregistered"));
3524
3525 let dataset = Dataset::open(location).await;
3527 assert!(dataset.is_ok(), "Physical table data should still exist");
3528 }
3529
3530 #[tokio::test]
3531 async fn test_deregister_table_v1_already_deregistered() {
3532 use lance_namespace::models::DeregisterTableRequest;
3533
3534 let temp_dir = TempStdDir::default();
3535 let temp_path = temp_dir.to_str().unwrap();
3536
3537 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3538 .manifest_enabled(false)
3539 .dir_listing_enabled(true)
3540 .build()
3541 .await
3542 .unwrap();
3543
3544 let schema = create_test_schema();
3546 let ipc_data = create_test_ipc_data(&schema);
3547 let mut create_req = CreateTableRequest::new();
3548 create_req.id = Some(vec!["test_table".to_string()]);
3549 namespace
3550 .create_table(create_req, bytes::Bytes::from(ipc_data))
3551 .await
3552 .unwrap();
3553
3554 let mut deregister_req = DeregisterTableRequest::new();
3556 deregister_req.id = Some(vec!["test_table".to_string()]);
3557 namespace
3558 .deregister_table(deregister_req.clone())
3559 .await
3560 .unwrap();
3561
3562 let result = namespace.deregister_table(deregister_req).await;
3564 assert!(result.is_err());
3565 assert!(
3566 result
3567 .unwrap_err()
3568 .to_string()
3569 .contains("already deregistered")
3570 );
3571 }
3572
3573 #[tokio::test]
3578 async fn test_list_tables_skips_deregistered_v1() {
3579 use lance_namespace::models::DeregisterTableRequest;
3580
3581 let temp_dir = TempStdDir::default();
3582 let temp_path = temp_dir.to_str().unwrap();
3583
3584 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3585 .manifest_enabled(false)
3586 .dir_listing_enabled(true)
3587 .build()
3588 .await
3589 .unwrap();
3590
3591 let schema = create_test_schema();
3593 let ipc_data = create_test_ipc_data(&schema);
3594
3595 let mut create_req1 = CreateTableRequest::new();
3596 create_req1.id = Some(vec!["table1".to_string()]);
3597 namespace
3598 .create_table(create_req1, bytes::Bytes::from(ipc_data.clone()))
3599 .await
3600 .unwrap();
3601
3602 let mut create_req2 = CreateTableRequest::new();
3603 create_req2.id = Some(vec!["table2".to_string()]);
3604 namespace
3605 .create_table(create_req2, bytes::Bytes::from(ipc_data))
3606 .await
3607 .unwrap();
3608
3609 let mut list_req = ListTablesRequest::new();
3611 list_req.id = Some(vec![]);
3612 let list_response = namespace.list_tables(list_req.clone()).await.unwrap();
3613 assert_eq!(list_response.tables.len(), 2);
3614
3615 let mut deregister_req = DeregisterTableRequest::new();
3617 deregister_req.id = Some(vec!["table1".to_string()]);
3618 namespace.deregister_table(deregister_req).await.unwrap();
3619
3620 let list_response = namespace.list_tables(list_req).await.unwrap();
3622 assert_eq!(list_response.tables.len(), 1);
3623 assert!(list_response.tables.contains(&"table2".to_string()));
3624 assert!(!list_response.tables.contains(&"table1".to_string()));
3625 }
3626
3627 #[tokio::test]
3632 async fn test_describe_table_fails_for_deregistered_v1() {
3633 use lance_namespace::models::{DeregisterTableRequest, DescribeTableRequest};
3634
3635 let temp_dir = TempStdDir::default();
3636 let temp_path = temp_dir.to_str().unwrap();
3637
3638 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3639 .manifest_enabled(false)
3640 .dir_listing_enabled(true)
3641 .build()
3642 .await
3643 .unwrap();
3644
3645 let schema = create_test_schema();
3647 let ipc_data = create_test_ipc_data(&schema);
3648 let mut create_req = CreateTableRequest::new();
3649 create_req.id = Some(vec!["test_table".to_string()]);
3650 namespace
3651 .create_table(create_req, bytes::Bytes::from(ipc_data))
3652 .await
3653 .unwrap();
3654
3655 let mut describe_req = DescribeTableRequest::new();
3657 describe_req.id = Some(vec!["test_table".to_string()]);
3658 assert!(namespace.describe_table(describe_req.clone()).await.is_ok());
3659
3660 let mut deregister_req = DeregisterTableRequest::new();
3662 deregister_req.id = Some(vec!["test_table".to_string()]);
3663 namespace.deregister_table(deregister_req).await.unwrap();
3664
3665 let result = namespace.describe_table(describe_req).await;
3667 assert!(result.is_err());
3668 assert!(result.unwrap_err().to_string().contains("deregistered"));
3669 }
3670
3671 #[tokio::test]
3672 async fn test_table_exists_fails_for_deregistered_v1() {
3673 use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
3674
3675 let temp_dir = TempStdDir::default();
3676 let temp_path = temp_dir.to_str().unwrap();
3677
3678 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3679 .manifest_enabled(false)
3680 .dir_listing_enabled(true)
3681 .build()
3682 .await
3683 .unwrap();
3684
3685 let schema = create_test_schema();
3687 let ipc_data = create_test_ipc_data(&schema);
3688 let mut create_req = CreateTableRequest::new();
3689 create_req.id = Some(vec!["test_table".to_string()]);
3690 namespace
3691 .create_table(create_req, bytes::Bytes::from(ipc_data))
3692 .await
3693 .unwrap();
3694
3695 let mut exists_req = TableExistsRequest::new();
3697 exists_req.id = Some(vec!["test_table".to_string()]);
3698 assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
3699
3700 let mut deregister_req = DeregisterTableRequest::new();
3702 deregister_req.id = Some(vec!["test_table".to_string()]);
3703 namespace.deregister_table(deregister_req).await.unwrap();
3704
3705 let result = namespace.table_exists(exists_req).await;
3707 assert!(result.is_err());
3708 assert!(result.unwrap_err().to_string().contains("deregistered"));
3709 }
3710
3711 #[tokio::test]
3712 async fn test_atomic_table_status_check() {
3713 let temp_dir = TempStdDir::default();
3717 let temp_path = temp_dir.to_str().unwrap();
3718
3719 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3720 .manifest_enabled(false)
3721 .dir_listing_enabled(true)
3722 .build()
3723 .await
3724 .unwrap();
3725
3726 let schema = create_test_schema();
3728 let ipc_data = create_test_ipc_data(&schema);
3729 let mut create_req = CreateTableRequest::new();
3730 create_req.id = Some(vec!["test_table".to_string()]);
3731 namespace
3732 .create_table(create_req, bytes::Bytes::from(ipc_data))
3733 .await
3734 .unwrap();
3735
3736 let status = namespace.check_table_status("test_table").await;
3738 assert!(status.exists);
3739 assert!(!status.is_deregistered);
3740 assert!(!status.has_reserved_file);
3741 }
3742
3743 #[tokio::test]
3744 async fn test_table_version_tracking_enabled_managed_versioning() {
3745 use lance_namespace::models::DescribeTableRequest;
3746
3747 let temp_dir = TempStdDir::default();
3748 let temp_path = temp_dir.to_str().unwrap();
3749
3750 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3752 .table_version_tracking_enabled(true)
3753 .build()
3754 .await
3755 .unwrap();
3756
3757 let schema = create_test_schema();
3759 let ipc_data = create_test_ipc_data(&schema);
3760 let mut create_req = CreateTableRequest::new();
3761 create_req.id = Some(vec!["test_table".to_string()]);
3762 namespace
3763 .create_table(create_req, bytes::Bytes::from(ipc_data))
3764 .await
3765 .unwrap();
3766
3767 let mut describe_req = DescribeTableRequest::new();
3769 describe_req.id = Some(vec!["test_table".to_string()]);
3770 let describe_resp = namespace.describe_table(describe_req).await.unwrap();
3771
3772 assert_eq!(
3774 describe_resp.managed_versioning,
3775 Some(true),
3776 "managed_versioning should be true when table_version_tracking_enabled=true"
3777 );
3778 }
3779
3780 #[tokio::test]
3781 async fn test_table_version_tracking_disabled_no_managed_versioning() {
3782 use lance_namespace::models::DescribeTableRequest;
3783
3784 let temp_dir = TempStdDir::default();
3785 let temp_path = temp_dir.to_str().unwrap();
3786
3787 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3789 .table_version_tracking_enabled(false)
3790 .build()
3791 .await
3792 .unwrap();
3793
3794 let schema = create_test_schema();
3796 let ipc_data = create_test_ipc_data(&schema);
3797 let mut create_req = CreateTableRequest::new();
3798 create_req.id = Some(vec!["test_table".to_string()]);
3799 namespace
3800 .create_table(create_req, bytes::Bytes::from(ipc_data))
3801 .await
3802 .unwrap();
3803
3804 let mut describe_req = DescribeTableRequest::new();
3806 describe_req.id = Some(vec!["test_table".to_string()]);
3807 let describe_resp = namespace.describe_table(describe_req).await.unwrap();
3808
3809 assert!(
3811 describe_resp.managed_versioning.is_none(),
3812 "managed_versioning should be None when table_version_tracking_enabled=false, got: {:?}",
3813 describe_resp.managed_versioning
3814 );
3815 }
3816
3817 #[tokio::test]
3818 #[cfg(not(windows))]
3819 async fn test_list_table_versions() {
3820 use arrow::array::{Int32Array, RecordBatchIterator};
3821 use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
3822 use arrow::record_batch::RecordBatch;
3823 use lance::dataset::{Dataset, WriteMode, WriteParams};
3824 use lance_namespace::models::{CreateNamespaceRequest, ListTableVersionsRequest};
3825
3826 let temp_dir = TempStrDir::default();
3827 let temp_path: &str = &temp_dir;
3828
3829 let namespace: Arc<dyn LanceNamespace> = Arc::new(
3830 DirectoryNamespaceBuilder::new(temp_path)
3831 .table_version_tracking_enabled(true)
3832 .build()
3833 .await
3834 .unwrap(),
3835 );
3836
3837 let mut create_ns_req = CreateNamespaceRequest::new();
3839 create_ns_req.id = Some(vec!["workspace".to_string()]);
3840 namespace.create_namespace(create_ns_req).await.unwrap();
3841
3842 let table_id = vec!["workspace".to_string(), "test_table".to_string()];
3844 let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
3845 "id",
3846 DataType::Int32,
3847 false,
3848 )]));
3849 let batch = RecordBatch::try_new(
3850 arrow_schema.clone(),
3851 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
3852 )
3853 .unwrap();
3854 let batches = RecordBatchIterator::new(vec![Ok(batch.clone())], arrow_schema.clone());
3855 let write_params = WriteParams {
3856 mode: WriteMode::Create,
3857 ..Default::default()
3858 };
3859 let mut dataset = Dataset::write_into_namespace(
3860 batches,
3861 namespace.clone(),
3862 table_id.clone(),
3863 Some(write_params),
3864 )
3865 .await
3866 .unwrap();
3867
3868 let batch2 = RecordBatch::try_new(
3870 arrow_schema.clone(),
3871 vec![Arc::new(Int32Array::from(vec![100, 200]))],
3872 )
3873 .unwrap();
3874 let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema.clone());
3875 dataset.append(batches, None).await.unwrap();
3876
3877 let batch3 = RecordBatch::try_new(
3879 arrow_schema.clone(),
3880 vec![Arc::new(Int32Array::from(vec![300, 400]))],
3881 )
3882 .unwrap();
3883 let batches = RecordBatchIterator::new(vec![Ok(batch3)], arrow_schema);
3884 dataset.append(batches, None).await.unwrap();
3885
3886 let mut list_req = ListTableVersionsRequest::new();
3888 list_req.id = Some(table_id.clone());
3889 let list_resp = namespace.list_table_versions(list_req).await.unwrap();
3890
3891 assert_eq!(
3892 list_resp.versions.len(),
3893 3,
3894 "Should have 3 versions, got: {:?}",
3895 list_resp.versions
3896 );
3897
3898 for expected_version in 1..=3 {
3900 let version = list_resp
3901 .versions
3902 .iter()
3903 .find(|v| v.version == expected_version)
3904 .unwrap_or_else(|| panic!("Expected version {}", expected_version));
3905
3906 assert!(
3907 !version.manifest_path.is_empty(),
3908 "manifest_path should be set for version {}",
3909 expected_version
3910 );
3911 assert!(
3912 version.manifest_path.contains(".manifest"),
3913 "manifest_path should contain .manifest for version {}",
3914 expected_version
3915 );
3916 assert!(
3917 version.manifest_size.is_some(),
3918 "manifest_size should be set for version {}",
3919 expected_version
3920 );
3921 assert!(
3922 version.manifest_size.unwrap() > 0,
3923 "manifest_size should be > 0 for version {}",
3924 expected_version
3925 );
3926 assert!(
3927 version.timestamp_millis.is_some(),
3928 "timestamp_millis should be set for version {}",
3929 expected_version
3930 );
3931 }
3932 }
3933
3934 #[tokio::test]
3935 #[cfg(not(windows))]
3936 async fn test_describe_table_version() {
3937 use arrow::array::{Int32Array, RecordBatchIterator};
3938 use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
3939 use arrow::record_batch::RecordBatch;
3940 use lance::dataset::{Dataset, WriteMode, WriteParams};
3941 use lance_namespace::models::{CreateNamespaceRequest, DescribeTableVersionRequest};
3942
3943 let temp_dir = TempStrDir::default();
3944 let temp_path: &str = &temp_dir;
3945
3946 let namespace: Arc<dyn LanceNamespace> = Arc::new(
3947 DirectoryNamespaceBuilder::new(temp_path)
3948 .table_version_tracking_enabled(true)
3949 .build()
3950 .await
3951 .unwrap(),
3952 );
3953
3954 let mut create_ns_req = CreateNamespaceRequest::new();
3956 create_ns_req.id = Some(vec!["workspace".to_string()]);
3957 namespace.create_namespace(create_ns_req).await.unwrap();
3958
3959 let table_id = vec!["workspace".to_string(), "test_table".to_string()];
3961 let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
3962 "id",
3963 DataType::Int32,
3964 false,
3965 )]));
3966 let batch = RecordBatch::try_new(
3967 arrow_schema.clone(),
3968 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
3969 )
3970 .unwrap();
3971 let batches = RecordBatchIterator::new(vec![Ok(batch)], arrow_schema.clone());
3972 let write_params = WriteParams {
3973 mode: WriteMode::Create,
3974 ..Default::default()
3975 };
3976 let mut dataset = Dataset::write_into_namespace(
3977 batches,
3978 namespace.clone(),
3979 table_id.clone(),
3980 Some(write_params),
3981 )
3982 .await
3983 .unwrap();
3984
3985 let batch2 = RecordBatch::try_new(
3987 arrow_schema.clone(),
3988 vec![Arc::new(Int32Array::from(vec![100, 200]))],
3989 )
3990 .unwrap();
3991 let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema);
3992 dataset.append(batches, None).await.unwrap();
3993
3994 let mut describe_req = DescribeTableVersionRequest::new();
3996 describe_req.id = Some(table_id.clone());
3997 describe_req.version = Some(1);
3998 let describe_resp = namespace
3999 .describe_table_version(describe_req)
4000 .await
4001 .unwrap();
4002
4003 let version = &describe_resp.version;
4004 assert_eq!(version.version, 1);
4005 assert!(version.timestamp_millis.is_some());
4006 assert!(
4007 !version.manifest_path.is_empty(),
4008 "manifest_path should be set"
4009 );
4010 assert!(
4011 version.manifest_path.contains(".manifest"),
4012 "manifest_path should contain .manifest"
4013 );
4014 assert!(
4015 version.manifest_size.is_some(),
4016 "manifest_size should be set"
4017 );
4018 assert!(
4019 version.manifest_size.unwrap() > 0,
4020 "manifest_size should be > 0"
4021 );
4022
4023 let mut describe_req = DescribeTableVersionRequest::new();
4025 describe_req.id = Some(table_id.clone());
4026 describe_req.version = Some(2);
4027 let describe_resp = namespace
4028 .describe_table_version(describe_req)
4029 .await
4030 .unwrap();
4031
4032 let version = &describe_resp.version;
4033 assert_eq!(version.version, 2);
4034 assert!(version.timestamp_millis.is_some());
4035 assert!(
4036 !version.manifest_path.is_empty(),
4037 "manifest_path should be set"
4038 );
4039 assert!(
4040 version.manifest_size.is_some(),
4041 "manifest_size should be set"
4042 );
4043 assert!(
4044 version.manifest_size.unwrap() > 0,
4045 "manifest_size should be > 0"
4046 );
4047 }
4048
4049 #[tokio::test]
4050 #[cfg(not(windows))]
4051 async fn test_describe_table_version_latest() {
4052 use arrow::array::{Int32Array, RecordBatchIterator};
4053 use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
4054 use arrow::record_batch::RecordBatch;
4055 use lance::dataset::{Dataset, WriteMode, WriteParams};
4056 use lance_namespace::models::{CreateNamespaceRequest, DescribeTableVersionRequest};
4057
4058 let temp_dir = TempStrDir::default();
4059 let temp_path: &str = &temp_dir;
4060
4061 let namespace: Arc<dyn LanceNamespace> = Arc::new(
4062 DirectoryNamespaceBuilder::new(temp_path)
4063 .table_version_tracking_enabled(true)
4064 .build()
4065 .await
4066 .unwrap(),
4067 );
4068
4069 let mut create_ns_req = CreateNamespaceRequest::new();
4071 create_ns_req.id = Some(vec!["workspace".to_string()]);
4072 namespace.create_namespace(create_ns_req).await.unwrap();
4073
4074 let table_id = vec!["workspace".to_string(), "test_table".to_string()];
4076 let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
4077 "id",
4078 DataType::Int32,
4079 false,
4080 )]));
4081 let batch = RecordBatch::try_new(
4082 arrow_schema.clone(),
4083 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
4084 )
4085 .unwrap();
4086 let batches = RecordBatchIterator::new(vec![Ok(batch)], arrow_schema.clone());
4087 let write_params = WriteParams {
4088 mode: WriteMode::Create,
4089 ..Default::default()
4090 };
4091 let mut dataset = Dataset::write_into_namespace(
4092 batches,
4093 namespace.clone(),
4094 table_id.clone(),
4095 Some(write_params),
4096 )
4097 .await
4098 .unwrap();
4099
4100 let batch2 = RecordBatch::try_new(
4102 arrow_schema.clone(),
4103 vec![Arc::new(Int32Array::from(vec![100, 200]))],
4104 )
4105 .unwrap();
4106 let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema.clone());
4107 dataset.append(batches, None).await.unwrap();
4108
4109 let batch3 = RecordBatch::try_new(
4111 arrow_schema.clone(),
4112 vec![Arc::new(Int32Array::from(vec![300, 400]))],
4113 )
4114 .unwrap();
4115 let batches = RecordBatchIterator::new(vec![Ok(batch3)], arrow_schema);
4116 dataset.append(batches, None).await.unwrap();
4117
4118 let mut describe_req = DescribeTableVersionRequest::new();
4120 describe_req.id = Some(table_id.clone());
4121 describe_req.version = None;
4122 let describe_resp = namespace
4123 .describe_table_version(describe_req)
4124 .await
4125 .unwrap();
4126
4127 assert_eq!(describe_resp.version.version, 3);
4129 }
4130
4131 #[tokio::test]
4132 #[cfg(not(windows))]
4133 async fn test_create_table_version() {
4134 use futures::TryStreamExt;
4135 use lance::dataset::builder::DatasetBuilder;
4136 use lance_namespace::models::CreateTableVersionRequest;
4137
4138 let temp_dir = TempStrDir::default();
4139 let temp_path: &str = &temp_dir;
4140
4141 let namespace: Arc<dyn LanceNamespace> = Arc::new(
4142 DirectoryNamespaceBuilder::new(temp_path)
4143 .table_version_tracking_enabled(true)
4144 .build()
4145 .await
4146 .unwrap(),
4147 );
4148
4149 let schema = create_test_schema();
4151 let ipc_data = create_test_ipc_data(&schema);
4152 let mut create_req = CreateTableRequest::new();
4153 create_req.id = Some(vec!["test_table".to_string()]);
4154 namespace
4155 .create_table(create_req, bytes::Bytes::from(ipc_data))
4156 .await
4157 .unwrap();
4158
4159 let table_id = vec!["test_table".to_string()];
4161 let dataset = DatasetBuilder::from_namespace(namespace.clone(), table_id.clone())
4162 .await
4163 .unwrap()
4164 .load()
4165 .await
4166 .unwrap();
4167
4168 let versions_path = dataset.versions_dir();
4170 let manifest_metas: Vec<_> = dataset
4171 .object_store()
4172 .inner
4173 .list(Some(&versions_path))
4174 .try_collect()
4175 .await
4176 .unwrap();
4177
4178 let manifest_meta = manifest_metas
4179 .iter()
4180 .find(|m| {
4181 m.location
4182 .filename()
4183 .map(|f| f.ends_with(".manifest"))
4184 .unwrap_or(false)
4185 })
4186 .expect("No manifest file found");
4187
4188 let manifest_data = dataset
4190 .object_store()
4191 .inner
4192 .get(&manifest_meta.location)
4193 .await
4194 .unwrap()
4195 .bytes()
4196 .await
4197 .unwrap();
4198
4199 let staging_path = dataset.versions_dir().child("staging_manifest");
4201 dataset
4202 .object_store()
4203 .inner
4204 .put(&staging_path, manifest_data.into())
4205 .await
4206 .unwrap();
4207
4208 let mut create_version_req = CreateTableVersionRequest::new(2, staging_path.to_string());
4211 create_version_req.id = Some(table_id.clone());
4212 create_version_req.naming_scheme = Some("V2".to_string());
4213
4214 let result = namespace.create_table_version(create_version_req).await;
4215 assert!(
4216 result.is_ok(),
4217 "create_table_version should succeed: {:?}",
4218 result
4219 );
4220
4221 let response = result.unwrap();
4223 let version_info = response
4224 .version
4225 .expect("response should contain version info");
4226 let version_2_path = Path::from(version_info.manifest_path);
4227 let head_result = dataset.object_store().inner.head(&version_2_path).await;
4228 assert!(
4229 head_result.is_ok(),
4230 "Version 2 manifest should exist at {}",
4231 version_2_path
4232 );
4233
4234 let staging_head_result = dataset.object_store().inner.head(&staging_path).await;
4236 assert!(
4237 staging_head_result.is_err(),
4238 "Staging manifest should have been deleted after create_table_version"
4239 );
4240 }
4241
4242 #[tokio::test]
4243 #[cfg(not(windows))]
4244 async fn test_create_table_version_conflict() {
4245 use futures::TryStreamExt;
4248 use lance::dataset::builder::DatasetBuilder;
4249 use lance_namespace::models::CreateTableVersionRequest;
4250
4251 let temp_dir = TempStrDir::default();
4252 let temp_path: &str = &temp_dir;
4253
4254 let namespace: Arc<dyn LanceNamespace> = Arc::new(
4255 DirectoryNamespaceBuilder::new(temp_path)
4256 .table_version_tracking_enabled(true)
4257 .build()
4258 .await
4259 .unwrap(),
4260 );
4261
4262 let schema = create_test_schema();
4264 let ipc_data = create_test_ipc_data(&schema);
4265 let mut create_req = CreateTableRequest::new();
4266 create_req.id = Some(vec!["test_table".to_string()]);
4267 namespace
4268 .create_table(create_req, bytes::Bytes::from(ipc_data))
4269 .await
4270 .unwrap();
4271
4272 let table_id = vec!["test_table".to_string()];
4274 let dataset = DatasetBuilder::from_namespace(namespace.clone(), table_id.clone())
4275 .await
4276 .unwrap()
4277 .load()
4278 .await
4279 .unwrap();
4280
4281 let versions_path = dataset.versions_dir();
4283 let manifest_metas: Vec<_> = dataset
4284 .object_store()
4285 .inner
4286 .list(Some(&versions_path))
4287 .try_collect()
4288 .await
4289 .unwrap();
4290
4291 let manifest_meta = manifest_metas
4292 .iter()
4293 .find(|m| {
4294 m.location
4295 .filename()
4296 .map(|f| f.ends_with(".manifest"))
4297 .unwrap_or(false)
4298 })
4299 .expect("No manifest file found");
4300
4301 let manifest_data = dataset
4303 .object_store()
4304 .inner
4305 .get(&manifest_meta.location)
4306 .await
4307 .unwrap()
4308 .bytes()
4309 .await
4310 .unwrap();
4311
4312 let staging_path = dataset.versions_dir().child("staging_manifest");
4314 dataset
4315 .object_store()
4316 .inner
4317 .put(&staging_path, manifest_data.into())
4318 .await
4319 .unwrap();
4320
4321 let mut create_version_req = CreateTableVersionRequest::new(2, staging_path.to_string());
4323 create_version_req.id = Some(table_id.clone());
4324 create_version_req.naming_scheme = Some("V2".to_string());
4325 let first_result = namespace.create_table_version(create_version_req).await;
4326 assert!(
4327 first_result.is_ok(),
4328 "First create_table_version for version 2 should succeed: {:?}",
4329 first_result
4330 );
4331
4332 let version_2_path = Path::from(
4334 first_result
4335 .unwrap()
4336 .version
4337 .expect("response should contain version info")
4338 .manifest_path,
4339 );
4340
4341 let mut create_version_req = CreateTableVersionRequest::new(2, staging_path.to_string());
4343 create_version_req.id = Some(table_id.clone());
4344 create_version_req.naming_scheme = Some("V2".to_string());
4345
4346 let result = namespace.create_table_version(create_version_req).await;
4347 assert!(
4348 result.is_err(),
4349 "create_table_version should fail for existing version"
4350 );
4351
4352 let head_result = dataset.object_store().inner.head(&version_2_path).await;
4354 assert!(
4355 head_result.is_ok(),
4356 "Version 2 manifest should still exist at {}",
4357 version_2_path
4358 );
4359 }
4360
4361 #[tokio::test]
4362 async fn test_create_table_version_table_not_found() {
4363 use lance_namespace::models::CreateTableVersionRequest;
4364
4365 let temp_dir = TempStdDir::default();
4366 let temp_path = temp_dir.to_str().unwrap();
4367
4368 let namespace = DirectoryNamespaceBuilder::new(temp_path)
4369 .table_version_tracking_enabled(true)
4370 .build()
4371 .await
4372 .unwrap();
4373
4374 let mut create_version_req =
4376 CreateTableVersionRequest::new(1, "/some/staging/path".to_string());
4377 create_version_req.id = Some(vec!["non_existent_table".to_string()]);
4378
4379 let result = namespace.create_table_version(create_version_req).await;
4380 assert!(
4381 result.is_err(),
4382 "create_table_version should fail for non-existent table"
4383 );
4384 let err_msg = result.unwrap_err().to_string();
4385 assert!(
4386 err_msg.contains("does not exist"),
4387 "Error should mention table does not exist, got: {}",
4388 err_msg
4389 );
4390 }
4391
4392 mod e2e_table_version_tracking {
4394 use super::*;
4395 use std::sync::atomic::{AtomicUsize, Ordering};
4396
4397 struct TrackingNamespace {
4399 inner: DirectoryNamespace,
4400 create_table_version_count: AtomicUsize,
4401 describe_table_version_count: AtomicUsize,
4402 list_table_versions_count: AtomicUsize,
4403 }
4404
4405 impl TrackingNamespace {
4406 fn new(inner: DirectoryNamespace) -> Self {
4407 Self {
4408 inner,
4409 create_table_version_count: AtomicUsize::new(0),
4410 describe_table_version_count: AtomicUsize::new(0),
4411 list_table_versions_count: AtomicUsize::new(0),
4412 }
4413 }
4414
4415 fn create_table_version_calls(&self) -> usize {
4416 self.create_table_version_count.load(Ordering::SeqCst)
4417 }
4418
4419 fn describe_table_version_calls(&self) -> usize {
4420 self.describe_table_version_count.load(Ordering::SeqCst)
4421 }
4422
4423 fn list_table_versions_calls(&self) -> usize {
4424 self.list_table_versions_count.load(Ordering::SeqCst)
4425 }
4426 }
4427
4428 impl std::fmt::Debug for TrackingNamespace {
4429 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4430 f.debug_struct("TrackingNamespace")
4431 .field(
4432 "create_table_version_calls",
4433 &self.create_table_version_calls(),
4434 )
4435 .finish()
4436 }
4437 }
4438
4439 #[async_trait]
4440 impl LanceNamespace for TrackingNamespace {
4441 async fn create_namespace(
4442 &self,
4443 request: CreateNamespaceRequest,
4444 ) -> Result<CreateNamespaceResponse> {
4445 self.inner.create_namespace(request).await
4446 }
4447
4448 async fn describe_namespace(
4449 &self,
4450 request: DescribeNamespaceRequest,
4451 ) -> Result<DescribeNamespaceResponse> {
4452 self.inner.describe_namespace(request).await
4453 }
4454
4455 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
4456 self.inner.namespace_exists(request).await
4457 }
4458
4459 async fn list_namespaces(
4460 &self,
4461 request: ListNamespacesRequest,
4462 ) -> Result<ListNamespacesResponse> {
4463 self.inner.list_namespaces(request).await
4464 }
4465
4466 async fn drop_namespace(
4467 &self,
4468 request: DropNamespaceRequest,
4469 ) -> Result<DropNamespaceResponse> {
4470 self.inner.drop_namespace(request).await
4471 }
4472
4473 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
4474 self.inner.list_tables(request).await
4475 }
4476
4477 async fn describe_table(
4478 &self,
4479 request: DescribeTableRequest,
4480 ) -> Result<DescribeTableResponse> {
4481 self.inner.describe_table(request).await
4482 }
4483
4484 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
4485 self.inner.table_exists(request).await
4486 }
4487
4488 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
4489 self.inner.drop_table(request).await
4490 }
4491
4492 async fn create_table(
4493 &self,
4494 request: CreateTableRequest,
4495 request_data: Bytes,
4496 ) -> Result<CreateTableResponse> {
4497 self.inner.create_table(request, request_data).await
4498 }
4499
4500 async fn declare_table(
4501 &self,
4502 request: DeclareTableRequest,
4503 ) -> Result<DeclareTableResponse> {
4504 self.inner.declare_table(request).await
4505 }
4506
4507 async fn list_table_versions(
4508 &self,
4509 request: ListTableVersionsRequest,
4510 ) -> Result<ListTableVersionsResponse> {
4511 self.list_table_versions_count
4512 .fetch_add(1, Ordering::SeqCst);
4513 self.inner.list_table_versions(request).await
4514 }
4515
4516 async fn create_table_version(
4517 &self,
4518 request: CreateTableVersionRequest,
4519 ) -> Result<CreateTableVersionResponse> {
4520 self.create_table_version_count
4521 .fetch_add(1, Ordering::SeqCst);
4522 self.inner.create_table_version(request).await
4523 }
4524
4525 async fn describe_table_version(
4526 &self,
4527 request: DescribeTableVersionRequest,
4528 ) -> Result<DescribeTableVersionResponse> {
4529 self.describe_table_version_count
4530 .fetch_add(1, Ordering::SeqCst);
4531 self.inner.describe_table_version(request).await
4532 }
4533
4534 async fn batch_delete_table_versions(
4535 &self,
4536 request: BatchDeleteTableVersionsRequest,
4537 ) -> Result<BatchDeleteTableVersionsResponse> {
4538 self.inner.batch_delete_table_versions(request).await
4539 }
4540
4541 fn namespace_id(&self) -> String {
4542 self.inner.namespace_id()
4543 }
4544 }
4545
4546 #[tokio::test]
4547 async fn test_describe_table_returns_managed_versioning() {
4548 use lance_namespace::models::{CreateNamespaceRequest, DescribeTableRequest};
4549
4550 let temp_dir = TempStdDir::default();
4551 let temp_path = temp_dir.to_str().unwrap();
4552
4553 let ns = DirectoryNamespaceBuilder::new(temp_path)
4555 .table_version_tracking_enabled(true)
4556 .manifest_enabled(true)
4557 .build()
4558 .await
4559 .unwrap();
4560
4561 let mut create_ns_req = CreateNamespaceRequest::new();
4563 create_ns_req.id = Some(vec!["workspace".to_string()]);
4564 ns.create_namespace(create_ns_req).await.unwrap();
4565
4566 let schema = create_test_schema();
4568 let ipc_data = create_test_ipc_data(&schema);
4569 let mut create_req = CreateTableRequest::new();
4570 create_req.id = Some(vec!["workspace".to_string(), "test_table".to_string()]);
4571 ns.create_table(create_req, bytes::Bytes::from(ipc_data))
4572 .await
4573 .unwrap();
4574
4575 let mut describe_req = DescribeTableRequest::new();
4577 describe_req.id = Some(vec!["workspace".to_string(), "test_table".to_string()]);
4578 let describe_resp = ns.describe_table(describe_req).await.unwrap();
4579
4580 assert_eq!(
4582 describe_resp.managed_versioning,
4583 Some(true),
4584 "managed_versioning should be true when table_version_tracking_enabled=true"
4585 );
4586 }
4587
4588 #[tokio::test]
4589 #[cfg(not(windows))]
4590 async fn test_external_manifest_store_invokes_namespace_apis() {
4591 use arrow::array::{Int32Array, StringArray};
4592 use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
4593 use arrow::record_batch::RecordBatch;
4594 use lance::Dataset;
4595 use lance::dataset::builder::DatasetBuilder;
4596 use lance::dataset::{WriteMode, WriteParams};
4597 use lance_namespace::models::CreateNamespaceRequest;
4598
4599 let temp_dir = TempStdDir::default();
4600 let temp_path = temp_dir.to_str().unwrap();
4601
4602 let inner_ns = DirectoryNamespaceBuilder::new(temp_path)
4604 .table_version_tracking_enabled(true)
4605 .manifest_enabled(true)
4606 .build()
4607 .await
4608 .unwrap();
4609
4610 let tracking_ns = Arc::new(TrackingNamespace::new(inner_ns));
4611 let ns: Arc<dyn LanceNamespace> = tracking_ns.clone();
4612
4613 let mut create_ns_req = CreateNamespaceRequest::new();
4615 create_ns_req.id = Some(vec!["workspace".to_string()]);
4616 ns.create_namespace(create_ns_req).await.unwrap();
4617
4618 let table_id = vec!["workspace".to_string(), "test_table".to_string()];
4620
4621 let arrow_schema = Arc::new(ArrowSchema::new(vec![
4623 Field::new("id", DataType::Int32, false),
4624 Field::new("name", DataType::Utf8, true),
4625 ]));
4626 let batch = RecordBatch::try_new(
4627 arrow_schema.clone(),
4628 vec![
4629 Arc::new(Int32Array::from(vec![1, 2, 3])),
4630 Arc::new(StringArray::from(vec!["a", "b", "c"])),
4631 ],
4632 )
4633 .unwrap();
4634
4635 let batches = RecordBatchIterator::new(vec![Ok(batch.clone())], arrow_schema.clone());
4637 let write_params = WriteParams {
4638 mode: WriteMode::Create,
4639 ..Default::default()
4640 };
4641 let mut dataset = Dataset::write_into_namespace(
4642 batches,
4643 ns.clone(),
4644 table_id.clone(),
4645 Some(write_params),
4646 )
4647 .await
4648 .unwrap();
4649 assert_eq!(dataset.version().version, 1);
4650
4651 assert_eq!(
4653 tracking_ns.create_table_version_calls(),
4654 1,
4655 "create_table_version should have been called once during initial write_into_namespace"
4656 );
4657
4658 let append_batch = RecordBatch::try_new(
4660 arrow_schema.clone(),
4661 vec![
4662 Arc::new(Int32Array::from(vec![4, 5, 6])),
4663 Arc::new(StringArray::from(vec!["d", "e", "f"])),
4664 ],
4665 )
4666 .unwrap();
4667 let append_batches = RecordBatchIterator::new(vec![Ok(append_batch)], arrow_schema);
4668 dataset.append(append_batches, None).await.unwrap();
4669
4670 assert_eq!(
4671 tracking_ns.create_table_version_calls(),
4672 2,
4673 "create_table_version should have been called twice (once for create, once for append)"
4674 );
4675
4676 let initial_list_calls = tracking_ns.list_table_versions_calls();
4678 let latest_dataset = DatasetBuilder::from_namespace(ns.clone(), table_id.clone())
4679 .await
4680 .unwrap()
4681 .load()
4682 .await
4683 .unwrap();
4684 assert_eq!(latest_dataset.version().version, 2);
4685 assert_eq!(
4686 tracking_ns.list_table_versions_calls(),
4687 initial_list_calls + 1,
4688 "list_table_versions should have been called exactly once during checkout_latest"
4689 );
4690
4691 let initial_describe_calls = tracking_ns.describe_table_version_calls();
4693 let v1_dataset = DatasetBuilder::from_namespace(ns.clone(), table_id.clone())
4694 .await
4695 .unwrap()
4696 .with_version(1)
4697 .load()
4698 .await
4699 .unwrap();
4700 assert_eq!(v1_dataset.version().version, 1);
4701 assert_eq!(
4702 tracking_ns.describe_table_version_calls(),
4703 initial_describe_calls + 1,
4704 "describe_table_version should have been called exactly once during checkout to version 1"
4705 );
4706 }
4707
4708 #[tokio::test]
4709 #[cfg(not(windows))]
4710 async fn test_dataset_commit_with_external_manifest_store() {
4711 use arrow::array::{Int32Array, StringArray};
4712 use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
4713 use arrow::record_batch::RecordBatch;
4714 use futures::TryStreamExt;
4715 use lance::dataset::{Dataset, WriteMode, WriteParams};
4716 use lance_namespace::models::CreateNamespaceRequest;
4717 use lance_table::io::commit::ManifestNamingScheme;
4718
4719 let temp_dir = TempStdDir::default();
4720 let temp_path = temp_dir.to_str().unwrap();
4721
4722 let inner_ns = DirectoryNamespaceBuilder::new(temp_path)
4724 .table_version_tracking_enabled(true)
4725 .manifest_enabled(true)
4726 .build()
4727 .await
4728 .unwrap();
4729
4730 let tracking_ns: Arc<dyn LanceNamespace> = Arc::new(TrackingNamespace::new(inner_ns));
4731
4732 let mut create_ns_req = CreateNamespaceRequest::new();
4734 create_ns_req.id = Some(vec!["workspace".to_string()]);
4735 tracking_ns.create_namespace(create_ns_req).await.unwrap();
4736
4737 let table_id = vec!["workspace".to_string(), "test_table".to_string()];
4739 let arrow_schema = Arc::new(ArrowSchema::new(vec![
4740 Field::new("id", DataType::Int32, false),
4741 Field::new("name", DataType::Utf8, true),
4742 ]));
4743 let batch = RecordBatch::try_new(
4744 arrow_schema.clone(),
4745 vec![
4746 Arc::new(Int32Array::from(vec![1, 2, 3])),
4747 Arc::new(StringArray::from(vec!["a", "b", "c"])),
4748 ],
4749 )
4750 .unwrap();
4751 let batches = RecordBatchIterator::new(vec![Ok(batch)], arrow_schema.clone());
4752 let write_params = WriteParams {
4753 mode: WriteMode::Create,
4754 ..Default::default()
4755 };
4756 let dataset = Dataset::write_into_namespace(
4757 batches,
4758 tracking_ns.clone(),
4759 table_id.clone(),
4760 Some(write_params),
4761 )
4762 .await
4763 .unwrap();
4764 assert_eq!(dataset.version().version, 1);
4765
4766 let batch2 = RecordBatch::try_new(
4768 arrow_schema.clone(),
4769 vec![
4770 Arc::new(Int32Array::from(vec![4, 5, 6])),
4771 Arc::new(StringArray::from(vec!["d", "e", "f"])),
4772 ],
4773 )
4774 .unwrap();
4775 let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema);
4776 let write_params = WriteParams {
4777 mode: WriteMode::Append,
4778 ..Default::default()
4779 };
4780 Dataset::write_into_namespace(
4781 batches,
4782 tracking_ns.clone(),
4783 table_id.clone(),
4784 Some(write_params),
4785 )
4786 .await
4787 .unwrap();
4788
4789 let manifest_metas: Vec<_> = dataset
4792 .object_store()
4793 .inner
4794 .list(Some(&dataset.versions_dir()))
4795 .try_collect()
4796 .await
4797 .unwrap();
4798 let version_2_found = manifest_metas.iter().any(|m| {
4799 m.location
4800 .filename()
4801 .map(|f| {
4802 f.ends_with(".manifest")
4803 && ManifestNamingScheme::V2.parse_version(f) == Some(2)
4804 })
4805 .unwrap_or(false)
4806 });
4807 assert!(
4808 version_2_found,
4809 "Version 2 manifest should exist in versions directory"
4810 );
4811 }
4812 }
4813}