1pub mod manifest;
10
11use arrow::record_batch::RecordBatchIterator;
12use arrow_ipc::reader::StreamReader;
13use async_trait::async_trait;
14use bytes::Bytes;
15use futures::TryStreamExt;
16use lance::dataset::builder::DatasetBuilder;
17use lance::dataset::{Dataset, WriteMode, WriteParams};
18use lance::session::Session;
19use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
20use lance_table::io::commit::ManifestNamingScheme;
21use object_store::path::Path;
22use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, PutMode, PutOptions};
23use std::collections::HashMap;
24use std::io::Cursor;
25use std::sync::Arc;
26
27use crate::context::DynamicContextProvider;
28use lance_namespace::models::{
29 BatchDeleteTableVersionsRequest, BatchDeleteTableVersionsResponse, CreateNamespaceRequest,
30 CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, CreateTableVersionRequest,
31 CreateTableVersionResponse, DeclareTableRequest, DeclareTableResponse,
32 DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
33 DescribeTableResponse, DescribeTableVersionRequest, DescribeTableVersionResponse,
34 DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, DropTableResponse, Identity,
35 ListNamespacesRequest, ListNamespacesResponse, ListTableVersionsRequest,
36 ListTableVersionsResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest,
37 TableExistsRequest, TableVersion,
38};
39
40use lance_core::{Error, Result, box_error};
41use lance_namespace::LanceNamespace;
42use lance_namespace::schema::arrow_schema_to_json;
43
44use crate::credentials::{
45 CredentialVendor, create_credential_vendor_for_location, has_credential_vendor_config,
46};
47
48pub(crate) struct TableStatus {
53 pub(crate) exists: bool,
55 pub(crate) is_deregistered: bool,
57 pub(crate) has_reserved_file: bool,
59}
60
61#[derive(Clone)]
96pub struct DirectoryNamespaceBuilder {
97 root: String,
98 storage_options: Option<HashMap<String, String>>,
99 session: Option<Arc<Session>>,
100 manifest_enabled: bool,
101 dir_listing_enabled: bool,
102 inline_optimization_enabled: bool,
103 table_version_tracking_enabled: bool,
104 table_version_storage_enabled: bool,
107 credential_vendor_properties: HashMap<String, String>,
108 context_provider: Option<Arc<dyn DynamicContextProvider>>,
109 commit_retries: Option<u32>,
110}
111
112impl std::fmt::Debug for DirectoryNamespaceBuilder {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 f.debug_struct("DirectoryNamespaceBuilder")
115 .field("root", &self.root)
116 .field("storage_options", &self.storage_options)
117 .field("manifest_enabled", &self.manifest_enabled)
118 .field("dir_listing_enabled", &self.dir_listing_enabled)
119 .field(
120 "inline_optimization_enabled",
121 &self.inline_optimization_enabled,
122 )
123 .field(
124 "table_version_tracking_enabled",
125 &self.table_version_tracking_enabled,
126 )
127 .field(
128 "table_version_storage_enabled",
129 &self.table_version_storage_enabled,
130 )
131 .field(
132 "context_provider",
133 &self.context_provider.as_ref().map(|_| "Some(...)"),
134 )
135 .finish()
136 }
137}
138
139impl DirectoryNamespaceBuilder {
140 pub fn new(root: impl Into<String>) -> Self {
146 Self {
147 root: root.into().trim_end_matches('/').to_string(),
148 storage_options: None,
149 session: None,
150 manifest_enabled: true,
151 dir_listing_enabled: true, inline_optimization_enabled: true,
153 table_version_tracking_enabled: false, table_version_storage_enabled: false, credential_vendor_properties: HashMap::new(),
156 context_provider: None,
157 commit_retries: None,
158 }
159 }
160
161 pub fn manifest_enabled(mut self, enabled: bool) -> Self {
166 self.manifest_enabled = enabled;
167 self
168 }
169
170 pub fn dir_listing_enabled(mut self, enabled: bool) -> Self {
175 self.dir_listing_enabled = enabled;
176 self
177 }
178
179 pub fn inline_optimization_enabled(mut self, enabled: bool) -> Self {
185 self.inline_optimization_enabled = enabled;
186 self
187 }
188
189 pub fn table_version_tracking_enabled(mut self, enabled: bool) -> Self {
197 self.table_version_tracking_enabled = enabled;
198 self
199 }
200
201 pub fn table_version_storage_enabled(mut self, enabled: bool) -> Self {
210 self.table_version_storage_enabled = enabled;
211 self
212 }
213
214 pub fn from_properties(
279 properties: HashMap<String, String>,
280 session: Option<Arc<Session>>,
281 ) -> Result<Self> {
282 let root = properties.get("root").cloned().ok_or_else(|| {
284 Error::namespace_source(
285 "Missing required property 'root' for directory namespace".into(),
286 )
287 })?;
288
289 let storage_options: HashMap<String, String> = properties
291 .iter()
292 .filter_map(|(k, v)| {
293 k.strip_prefix("storage.")
294 .map(|key| (key.to_string(), v.clone()))
295 })
296 .collect();
297
298 let storage_options = if storage_options.is_empty() {
299 None
300 } else {
301 Some(storage_options)
302 };
303
304 let manifest_enabled = properties
306 .get("manifest_enabled")
307 .and_then(|v| v.parse::<bool>().ok())
308 .unwrap_or(true);
309
310 let dir_listing_enabled = properties
312 .get("dir_listing_enabled")
313 .and_then(|v| v.parse::<bool>().ok())
314 .unwrap_or(true);
315
316 let inline_optimization_enabled = properties
318 .get("inline_optimization_enabled")
319 .and_then(|v| v.parse::<bool>().ok())
320 .unwrap_or(true);
321
322 let table_version_tracking_enabled = properties
324 .get("table_version_tracking_enabled")
325 .and_then(|v| v.parse::<bool>().ok())
326 .unwrap_or(false);
327
328 let table_version_storage_enabled = properties
330 .get("table_version_storage_enabled")
331 .and_then(|v| v.parse::<bool>().ok())
332 .unwrap_or(false);
333
334 let credential_vendor_properties: HashMap<String, String> = properties
338 .iter()
339 .filter_map(|(k, v)| {
340 k.strip_prefix("credential_vendor.")
341 .map(|key| (key.to_string(), v.clone()))
342 })
343 .collect();
344
345 let commit_retries = properties
346 .get("commit_retries")
347 .and_then(|v| v.parse::<u32>().ok());
348
349 Ok(Self {
350 root: root.trim_end_matches('/').to_string(),
351 storage_options,
352 session,
353 manifest_enabled,
354 dir_listing_enabled,
355 inline_optimization_enabled,
356 table_version_tracking_enabled,
357 table_version_storage_enabled,
358 credential_vendor_properties,
359 context_provider: None,
360 commit_retries,
361 })
362 }
363
364 pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
371 self.storage_options
372 .get_or_insert_with(HashMap::new)
373 .insert(key.into(), value.into());
374 self
375 }
376
377 pub fn storage_options(mut self, options: HashMap<String, String>) -> Self {
383 self.storage_options
384 .get_or_insert_with(HashMap::new)
385 .extend(options);
386 self
387 }
388
389 pub fn session(mut self, session: Arc<Session>) -> Self {
399 self.session = Some(session);
400 self
401 }
402
403 pub fn commit_retries(mut self, retries: u32) -> Self {
406 self.commit_retries = Some(retries);
407 self
408 }
409
410 pub fn credential_vendor_property(
438 mut self,
439 key: impl Into<String>,
440 value: impl Into<String>,
441 ) -> Self {
442 self.credential_vendor_properties
443 .insert(key.into(), value.into());
444 self
445 }
446
447 pub fn credential_vendor_properties(mut self, properties: HashMap<String, String>) -> Self {
455 self.credential_vendor_properties.extend(properties);
456 self
457 }
458
459 pub fn context_provider(mut self, provider: Arc<dyn DynamicContextProvider>) -> Self {
469 self.context_provider = Some(provider);
470 self
471 }
472
473 pub async fn build(self) -> Result<DirectoryNamespace> {
486 if self.table_version_storage_enabled && !self.manifest_enabled {
488 return Err(Error::invalid_input(
489 "table_version_storage_enabled requires manifest_enabled=true",
490 ));
491 }
492
493 let (object_store, base_path) =
494 Self::initialize_object_store(&self.root, &self.storage_options, &self.session).await?;
495
496 let manifest_ns = if self.manifest_enabled {
497 match manifest::ManifestNamespace::from_directory(
498 self.root.clone(),
499 self.storage_options.clone(),
500 self.session.clone(),
501 object_store.clone(),
502 base_path.clone(),
503 self.dir_listing_enabled,
504 self.inline_optimization_enabled,
505 self.commit_retries,
506 self.table_version_storage_enabled,
507 )
508 .await
509 {
510 Ok(ns) => Some(Arc::new(ns)),
511 Err(e) => {
512 log::warn!(
514 "Failed to initialize manifest namespace, falling back to directory listing only: {}",
515 e
516 );
517 None
518 }
519 }
520 } else {
521 None
522 };
523
524 let credential_vendor = if has_credential_vendor_config(&self.credential_vendor_properties)
526 {
527 create_credential_vendor_for_location(&self.root, &self.credential_vendor_properties)
528 .await?
529 .map(Arc::from)
530 } else {
531 None
532 };
533
534 Ok(DirectoryNamespace {
535 root: self.root,
536 storage_options: self.storage_options,
537 session: self.session,
538 object_store,
539 base_path,
540 manifest_ns,
541 dir_listing_enabled: self.dir_listing_enabled,
542 table_version_tracking_enabled: self.table_version_tracking_enabled,
543 table_version_storage_enabled: self.table_version_storage_enabled,
544 credential_vendor,
545 context_provider: self.context_provider,
546 })
547 }
548
549 async fn initialize_object_store(
551 root: &str,
552 storage_options: &Option<HashMap<String, String>>,
553 session: &Option<Arc<Session>>,
554 ) -> Result<(Arc<ObjectStore>, Path)> {
555 let accessor = storage_options.clone().map(|opts| {
557 Arc::new(lance_io::object_store::StorageOptionsAccessor::with_static_options(opts))
558 });
559 let params = ObjectStoreParams {
560 storage_options_accessor: accessor,
561 ..Default::default()
562 };
563
564 let registry = if let Some(session) = session {
566 session.store_registry()
567 } else {
568 Arc::new(ObjectStoreRegistry::default())
569 };
570
571 let (object_store, base_path) = ObjectStore::from_uri_and_params(registry, root, ¶ms)
573 .await
574 .map_err(|e| {
575 Error::namespace_source(format!("Failed to create object store: {}", e).into())
576 })?;
577
578 Ok((object_store, base_path))
579 }
580}
581
582pub struct DirectoryNamespace {
606 root: String,
607 storage_options: Option<HashMap<String, String>>,
608 #[allow(dead_code)]
609 session: Option<Arc<Session>>,
610 object_store: Arc<ObjectStore>,
611 base_path: Path,
612 manifest_ns: Option<Arc<manifest::ManifestNamespace>>,
613 dir_listing_enabled: bool,
614 table_version_tracking_enabled: bool,
617 table_version_storage_enabled: bool,
619 credential_vendor: Option<Arc<dyn CredentialVendor>>,
622 #[allow(dead_code)]
625 context_provider: Option<Arc<dyn DynamicContextProvider>>,
626}
627
628impl std::fmt::Debug for DirectoryNamespace {
629 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
630 write!(f, "{}", self.namespace_id())
631 }
632}
633
634impl std::fmt::Display for DirectoryNamespace {
635 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
636 write!(f, "{}", self.namespace_id())
637 }
638}
639
640struct TableDeleteEntry {
643 table_id: Option<Vec<String>>,
644 ranges: Vec<(i64, i64)>,
645}
646
647impl DirectoryNamespace {
648 fn apply_pagination(names: &mut Vec<String>, page_token: Option<String>, limit: Option<i32>) {
657 names.sort();
659
660 if let Some(start_after) = page_token {
662 if let Some(index) = names
663 .iter()
664 .position(|name| name.as_str() > start_after.as_str())
665 {
666 names.drain(0..index);
667 } else {
668 names.clear();
669 }
670 }
671
672 if let Some(limit) = limit
674 && limit >= 0
675 {
676 names.truncate(limit as usize);
677 }
678 }
679
680 async fn list_directory_tables(&self) -> Result<Vec<String>> {
682 let mut tables = Vec::new();
683 let entries = self
684 .object_store
685 .read_dir(self.base_path.clone())
686 .await
687 .map_err(|e| {
688 Error::io_source(box_error(std::io::Error::other(format!(
689 "Failed to list directory: {}",
690 e
691 ))))
692 })?;
693
694 for entry in entries {
695 let path = entry.trim_end_matches('/');
696 if !path.ends_with(".lance") {
697 continue;
698 }
699
700 let table_name = &path[..path.len() - 6];
701
702 let status = self.check_table_status(table_name).await;
704 if status.is_deregistered || status.has_reserved_file {
705 continue;
706 }
707
708 tables.push(table_name.to_string());
709 }
710
711 Ok(tables)
712 }
713
714 fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
716 if let Some(id) = id
717 && !id.is_empty()
718 {
719 return Err(Error::namespace_source(format!(
720 "Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
721 id
722 ).into()));
723 }
724 Ok(())
725 }
726
727 fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
729 let id = id.as_ref().ok_or_else(|| {
730 Error::namespace_source("Directory namespace table ID cannot be empty".into())
731 })?;
732
733 if id.len() != 1 {
734 return Err(Error::namespace_source(format!(
735 "Multi-level table IDs are only supported when manifest mode is enabled, but got: {:?}",
736 id
737 )
738 .into()));
739 }
740
741 Ok(id[0].clone())
742 }
743
744 async fn resolve_table_location(&self, id: &Option<Vec<String>>) -> Result<String> {
745 let mut describe_req = DescribeTableRequest::new();
746 describe_req.id = id.clone();
747 describe_req.load_detailed_metadata = Some(false);
748
749 let describe_resp = self.describe_table(describe_req).await?;
750
751 describe_resp.location.ok_or_else(|| {
752 Error::namespace_source(format!("Table location not found for: {:?}", id).into())
753 })
754 }
755
756 fn table_full_uri(&self, table_name: &str) -> String {
757 format!("{}/{}.lance", &self.root, table_name)
758 }
759
760 fn uri_to_object_store_path(uri: &str) -> Path {
761 let path_str = if let Some(rest) = uri.strip_prefix("file://") {
762 rest
763 } else if let Some(rest) = uri.strip_prefix("s3://") {
764 rest.split_once('/').map(|(_, p)| p).unwrap_or(rest)
765 } else if let Some(rest) = uri.strip_prefix("gs://") {
766 rest.split_once('/').map(|(_, p)| p).unwrap_or(rest)
767 } else if let Some(rest) = uri.strip_prefix("az://") {
768 rest.split_once('/').map(|(_, p)| p).unwrap_or(rest)
769 } else {
770 uri
771 };
772 Path::from(path_str)
773 }
774
775 fn table_path(&self, table_name: &str) -> Path {
777 self.base_path
778 .child(format!("{}.lance", table_name).as_str())
779 }
780
781 fn table_reserved_file_path(&self, table_name: &str) -> Path {
783 self.base_path
784 .child(format!("{}.lance", table_name).as_str())
785 .child(".lance-reserved")
786 }
787
788 fn table_deregistered_file_path(&self, table_name: &str) -> Path {
790 self.base_path
791 .child(format!("{}.lance", table_name).as_str())
792 .child(".lance-deregistered")
793 }
794
795 pub(crate) async fn check_table_status(&self, table_name: &str) -> TableStatus {
801 let table_path = self.table_path(table_name);
802 match self.object_store.read_dir(table_path).await {
803 Ok(entries) => {
804 let exists = !entries.is_empty();
805 let is_deregistered = entries.iter().any(|e| e.ends_with(".lance-deregistered"));
806 let has_reserved_file = entries.iter().any(|e| e.ends_with(".lance-reserved"));
807 TableStatus {
808 exists,
809 is_deregistered,
810 has_reserved_file,
811 }
812 }
813 Err(_) => TableStatus {
814 exists: false,
815 is_deregistered: false,
816 has_reserved_file: false,
817 },
818 }
819 }
820
821 async fn put_marker_file_atomic(
822 &self,
823 path: &Path,
824 file_description: &str,
825 ) -> std::result::Result<(), String> {
826 let put_opts = PutOptions {
827 mode: PutMode::Create,
828 ..Default::default()
829 };
830
831 match self
832 .object_store
833 .inner
834 .put_opts(path, bytes::Bytes::new().into(), put_opts)
835 .await
836 {
837 Ok(_) => Ok(()),
838 Err(ObjectStoreError::AlreadyExists { .. })
839 | Err(ObjectStoreError::Precondition { .. }) => {
840 Err(format!("{} already exists", file_description))
841 }
842 Err(e) => Err(format!("Failed to create {}: {}", file_description, e)),
843 }
844 }
845
846 async fn get_storage_options_for_table(
866 &self,
867 table_uri: &str,
868 vend_credentials: bool,
869 identity: Option<&Identity>,
870 ) -> Result<Option<HashMap<String, String>>> {
871 if vend_credentials && let Some(ref vendor) = self.credential_vendor {
872 let vended = vendor.vend_credentials(table_uri, identity).await?;
873 return Ok(Some(vended.storage_options));
874 }
875 Ok(None)
878 }
879
880 pub async fn migrate(&self) -> Result<usize> {
933 let Some(ref manifest_ns) = self.manifest_ns else {
935 return Ok(0); };
937
938 let manifest_locations = manifest_ns.list_manifest_table_locations().await?;
940
941 let dir_tables = self.list_directory_tables().await?;
943
944 let mut migrated_count = 0;
949 for table_name in dir_tables {
950 let dir_name = format!("{}.lance", table_name);
952 if !manifest_locations.contains(&dir_name) {
953 manifest_ns.register_table(&table_name, dir_name).await?;
954 migrated_count += 1;
955 }
956 }
957
958 Ok(migrated_count)
959 }
960
961 async fn delete_physical_version_files(
970 &self,
971 table_entries: &[TableDeleteEntry],
972 best_effort: bool,
973 ) -> Result<i64> {
974 let mut deleted_count = 0i64;
975 for te in table_entries {
976 let table_uri = self.resolve_table_location(&te.table_id).await?;
977 let table_path = Self::uri_to_object_store_path(&table_uri);
978 let table_path_str = table_path.as_ref();
979 let versions_dir_path = Path::from(format!("{}_versions", table_path_str));
980
981 for (start, end) in &te.ranges {
982 for version in *start..=*end {
983 let version_path =
984 versions_dir_path.child(format!("{}.manifest", version as u64));
985 match self.object_store.inner.delete(&version_path).await {
986 Ok(_) => {
987 deleted_count += 1;
988 }
989 Err(object_store::Error::NotFound { .. }) => {}
990 Err(e) => {
991 if best_effort {
992 log::warn!(
993 "Failed to delete manifest file for version {} of table {:?}: {:?}",
994 version,
995 te.table_id,
996 e
997 );
998 } else {
999 return Err(Error::namespace_source(
1000 format!(
1001 "Failed to delete version {} for table at '{}': {}",
1002 version, table_uri, e
1003 )
1004 .into(),
1005 ));
1006 }
1007 }
1008 }
1009 }
1010 }
1011 }
1012 Ok(deleted_count)
1013 }
1014}
1015
1016#[async_trait]
1017impl LanceNamespace for DirectoryNamespace {
1018 async fn list_namespaces(
1019 &self,
1020 request: ListNamespacesRequest,
1021 ) -> Result<ListNamespacesResponse> {
1022 if let Some(ref manifest_ns) = self.manifest_ns {
1023 return manifest_ns.list_namespaces(request).await;
1024 }
1025
1026 Self::validate_root_namespace_id(&request.id)?;
1027 Ok(ListNamespacesResponse::new(vec![]))
1028 }
1029
1030 async fn describe_namespace(
1031 &self,
1032 request: DescribeNamespaceRequest,
1033 ) -> Result<DescribeNamespaceResponse> {
1034 if let Some(ref manifest_ns) = self.manifest_ns {
1035 return manifest_ns.describe_namespace(request).await;
1036 }
1037
1038 Self::validate_root_namespace_id(&request.id)?;
1039 #[allow(clippy::needless_update)]
1040 Ok(DescribeNamespaceResponse {
1041 properties: Some(HashMap::new()),
1042 ..Default::default()
1043 })
1044 }
1045
1046 async fn create_namespace(
1047 &self,
1048 request: CreateNamespaceRequest,
1049 ) -> Result<CreateNamespaceResponse> {
1050 if let Some(ref manifest_ns) = self.manifest_ns {
1051 return manifest_ns.create_namespace(request).await;
1052 }
1053
1054 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
1055 return Err(Error::namespace_source(
1056 "Root namespace already exists and cannot be created".into(),
1057 ));
1058 }
1059
1060 Err(Error::not_supported_source(
1061 "Child namespaces are only supported when manifest mode is enabled".into(),
1062 ))
1063 }
1064
1065 async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
1066 if let Some(ref manifest_ns) = self.manifest_ns {
1067 return manifest_ns.drop_namespace(request).await;
1068 }
1069
1070 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
1071 return Err(Error::namespace_source(
1072 "Root namespace cannot be dropped".into(),
1073 ));
1074 }
1075
1076 Err(Error::not_supported_source(
1077 "Child namespaces are only supported when manifest mode is enabled".into(),
1078 ))
1079 }
1080
1081 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
1082 if let Some(ref manifest_ns) = self.manifest_ns {
1083 return manifest_ns.namespace_exists(request).await;
1084 }
1085
1086 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
1087 return Ok(());
1088 }
1089
1090 Err(Error::namespace_source(
1091 "Child namespaces are only supported when manifest mode is enabled".into(),
1092 ))
1093 }
1094
1095 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1096 let namespace_id = request
1098 .id
1099 .as_ref()
1100 .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
1101
1102 if !namespace_id.is_empty() {
1104 if let Some(ref manifest_ns) = self.manifest_ns {
1105 return manifest_ns.list_tables(request).await;
1106 }
1107 return Err(Error::not_supported_source(
1108 "Child namespaces are only supported when manifest mode is enabled".into(),
1109 ));
1110 }
1111
1112 if let Some(ref manifest_ns) = self.manifest_ns
1114 && !self.dir_listing_enabled
1115 {
1116 return manifest_ns.list_tables(request).await;
1117 }
1118
1119 let mut tables = if self.manifest_ns.is_some() && self.dir_listing_enabled {
1121 let manifest_locations = if let Some(ref manifest_ns) = self.manifest_ns {
1123 manifest_ns.list_manifest_table_locations().await?
1124 } else {
1125 std::collections::HashSet::new()
1126 };
1127
1128 let mut manifest_request = request.clone();
1130 manifest_request.limit = None;
1131 manifest_request.page_token = None;
1132 let manifest_tables = if let Some(ref manifest_ns) = self.manifest_ns {
1133 let manifest_response = manifest_ns.list_tables(manifest_request).await?;
1134 manifest_response.tables
1135 } else {
1136 vec![]
1137 };
1138
1139 let mut all_tables: Vec<String> = manifest_tables;
1142 let dir_tables = self.list_directory_tables().await?;
1143 for table_name in dir_tables {
1144 let full_location = format!("{}/{}.lance", self.root, table_name);
1147 let relative_location = format!("{}.lance", table_name);
1148 if !manifest_locations.contains(&full_location)
1149 && !manifest_locations.contains(&relative_location)
1150 {
1151 all_tables.push(table_name);
1152 }
1153 }
1154
1155 all_tables
1156 } else {
1157 self.list_directory_tables().await?
1158 };
1159
1160 Self::apply_pagination(&mut tables, request.page_token, request.limit);
1162 let response = ListTablesResponse::new(tables);
1163 Ok(response)
1164 }
1165
1166 async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1167 if let Some(ref manifest_ns) = self.manifest_ns {
1168 match manifest_ns.describe_table(request.clone()).await {
1169 Ok(mut response) => {
1170 if let Some(ref table_uri) = response.table_uri {
1171 let vend = request.vend_credentials.unwrap_or(true);
1173 let identity = request.identity.as_deref();
1174 response.storage_options = self
1175 .get_storage_options_for_table(table_uri, vend, identity)
1176 .await?;
1177 }
1178 if self.table_version_tracking_enabled {
1180 response.managed_versioning = Some(true);
1181 }
1182 return Ok(response);
1183 }
1184 Err(_)
1185 if self.dir_listing_enabled
1186 && request.id.as_ref().is_some_and(|id| id.len() == 1) =>
1187 {
1188 }
1190 Err(e) => return Err(e),
1191 }
1192 }
1193
1194 let table_name = Self::table_name_from_id(&request.id)?;
1195 let table_uri = self.table_full_uri(&table_name);
1196
1197 let status = self.check_table_status(&table_name).await;
1199
1200 if !status.exists {
1201 return Err(Error::namespace_source(
1202 format!("Table does not exist: {}", table_name).into(),
1203 ));
1204 }
1205
1206 if status.is_deregistered {
1207 return Err(Error::namespace_source(
1208 format!("Table is deregistered: {}", table_name).into(),
1209 ));
1210 }
1211
1212 let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
1213 let vend_credentials = request.vend_credentials.unwrap_or(true);
1215 let identity = request.identity.as_deref();
1216
1217 if !load_detailed_metadata {
1219 let storage_options = self
1220 .get_storage_options_for_table(&table_uri, vend_credentials, identity)
1221 .await?;
1222 return Ok(DescribeTableResponse {
1223 table: Some(table_name),
1224 namespace: request.id.as_ref().map(|id| {
1225 if id.len() > 1 {
1226 id[..id.len() - 1].to_vec()
1227 } else {
1228 vec![]
1229 }
1230 }),
1231 location: Some(table_uri.clone()),
1232 table_uri: Some(table_uri),
1233 storage_options,
1234 managed_versioning: if self.table_version_tracking_enabled {
1235 Some(true)
1236 } else {
1237 None
1238 },
1239 ..Default::default()
1240 });
1241 }
1242
1243 let mut builder = DatasetBuilder::from_uri(&table_uri);
1246 if let Some(opts) = &self.storage_options {
1247 builder = builder.with_storage_options(opts.clone());
1248 }
1249 if let Some(sess) = &self.session {
1250 builder = builder.with_session(sess.clone());
1251 }
1252 match builder.load().await {
1253 Ok(mut dataset) => {
1254 if let Some(requested_version) = request.version {
1256 dataset = dataset.checkout_version(requested_version as u64).await?;
1257 }
1258
1259 let version_info = dataset.version();
1260 let lance_schema = dataset.schema();
1261 let arrow_schema: arrow_schema::Schema = lance_schema.into();
1262 let json_schema = arrow_schema_to_json(&arrow_schema)?;
1263 let storage_options = self
1264 .get_storage_options_for_table(&table_uri, vend_credentials, identity)
1265 .await?;
1266
1267 let metadata: std::collections::HashMap<String, String> =
1269 version_info.metadata.into_iter().collect();
1270
1271 Ok(DescribeTableResponse {
1272 table: Some(table_name),
1273 namespace: request.id.as_ref().map(|id| {
1274 if id.len() > 1 {
1275 id[..id.len() - 1].to_vec()
1276 } else {
1277 vec![]
1278 }
1279 }),
1280 version: Some(version_info.version as i64),
1281 location: Some(table_uri.clone()),
1282 table_uri: Some(table_uri),
1283 schema: Some(Box::new(json_schema)),
1284 storage_options,
1285 metadata: Some(metadata),
1286 managed_versioning: if self.table_version_tracking_enabled {
1287 Some(true)
1288 } else {
1289 None
1290 },
1291 ..Default::default()
1292 })
1293 }
1294 Err(err) => {
1295 if status.has_reserved_file {
1297 let storage_options = self
1298 .get_storage_options_for_table(&table_uri, vend_credentials, identity)
1299 .await?;
1300 Ok(DescribeTableResponse {
1301 table: Some(table_name),
1302 namespace: request.id.as_ref().map(|id| {
1303 if id.len() > 1 {
1304 id[..id.len() - 1].to_vec()
1305 } else {
1306 vec![]
1307 }
1308 }),
1309 location: Some(table_uri.clone()),
1310 table_uri: Some(table_uri),
1311 storage_options,
1312 managed_versioning: if self.table_version_tracking_enabled {
1313 Some(true)
1314 } else {
1315 None
1316 },
1317 ..Default::default()
1318 })
1319 } else {
1320 Err(Error::namespace_source(
1321 format!(
1322 "Table directory exists but cannot load dataset {}: {:?}",
1323 table_name, err
1324 )
1325 .into(),
1326 ))
1327 }
1328 }
1329 }
1330 }
1331
1332 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1333 if let Some(ref manifest_ns) = self.manifest_ns {
1334 match manifest_ns.table_exists(request.clone()).await {
1335 Ok(()) => return Ok(()),
1336 Err(_) if self.dir_listing_enabled => {
1337 }
1339 Err(e) => return Err(e),
1340 }
1341 }
1342
1343 let table_name = Self::table_name_from_id(&request.id)?;
1344
1345 let status = self.check_table_status(&table_name).await;
1347
1348 if !status.exists {
1349 return Err(Error::namespace_source(
1350 format!("Table does not exist: {}", table_name).into(),
1351 ));
1352 }
1353
1354 if status.is_deregistered {
1355 return Err(Error::namespace_source(
1356 format!("Table is deregistered: {}", table_name).into(),
1357 ));
1358 }
1359
1360 Ok(())
1361 }
1362
1363 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1364 if let Some(ref manifest_ns) = self.manifest_ns {
1365 return manifest_ns.drop_table(request).await;
1366 }
1367
1368 let table_name = Self::table_name_from_id(&request.id)?;
1369 let table_uri = self.table_full_uri(&table_name);
1370 let table_path = self.table_path(&table_name);
1371
1372 self.object_store
1373 .remove_dir_all(table_path)
1374 .await
1375 .map_err(|e| {
1376 Error::namespace_source(
1377 format!("Failed to drop table {}: {}", table_name, e).into(),
1378 )
1379 })?;
1380
1381 Ok(DropTableResponse {
1382 id: request.id,
1383 location: Some(table_uri),
1384 ..Default::default()
1385 })
1386 }
1387
1388 async fn create_table(
1389 &self,
1390 request: CreateTableRequest,
1391 request_data: Bytes,
1392 ) -> Result<CreateTableResponse> {
1393 if let Some(ref manifest_ns) = self.manifest_ns {
1394 return manifest_ns.create_table(request, request_data).await;
1395 }
1396
1397 let table_name = Self::table_name_from_id(&request.id)?;
1398 let table_uri = self.table_full_uri(&table_name);
1399 if request_data.is_empty() {
1400 return Err(Error::namespace_source(
1401 "Request data (Arrow IPC stream) is required for create_table".into(),
1402 ));
1403 }
1404
1405 let cursor = Cursor::new(request_data.to_vec());
1407 let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| {
1408 Error::namespace_source(format!("Invalid Arrow IPC stream: {}", e).into())
1409 })?;
1410 let arrow_schema = stream_reader.schema();
1411
1412 let mut batches = Vec::new();
1414 for batch_result in stream_reader {
1415 batches.push(batch_result.map_err(|e| {
1416 Error::namespace_source(
1417 format!("Failed to read batch from IPC stream: {}", e).into(),
1418 )
1419 })?);
1420 }
1421
1422 let reader = if batches.is_empty() {
1424 let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
1425 let batches = vec![Ok(batch)];
1426 RecordBatchIterator::new(batches, arrow_schema.clone())
1427 } else {
1428 let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
1429 RecordBatchIterator::new(batch_results, arrow_schema)
1430 };
1431
1432 let store_params = self.storage_options.as_ref().map(|opts| ObjectStoreParams {
1433 storage_options_accessor: Some(Arc::new(
1434 lance_io::object_store::StorageOptionsAccessor::with_static_options(opts.clone()),
1435 )),
1436 ..Default::default()
1437 });
1438
1439 let write_params = WriteParams {
1440 mode: WriteMode::Create,
1441 store_params,
1442 ..Default::default()
1443 };
1444
1445 Dataset::write(reader, &table_uri, Some(write_params))
1447 .await
1448 .map_err(|e| {
1449 Error::namespace_source(format!("Failed to create Lance dataset: {}", e).into())
1450 })?;
1451
1452 Ok(CreateTableResponse {
1453 version: Some(1),
1454 location: Some(table_uri),
1455 storage_options: self.storage_options.clone(),
1456 ..Default::default()
1457 })
1458 }
1459
1460 async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
1461 if let Some(ref manifest_ns) = self.manifest_ns {
1462 let mut response = manifest_ns.declare_table(request.clone()).await?;
1463 if let Some(ref location) = response.location {
1464 let vend = request.vend_credentials.unwrap_or(true);
1466 let identity = request.identity.as_deref();
1467 response.storage_options = self
1468 .get_storage_options_for_table(location, vend, identity)
1469 .await?;
1470 }
1471 if self.table_version_tracking_enabled {
1473 response.managed_versioning = Some(true);
1474 }
1475 return Ok(response);
1476 }
1477
1478 let table_name = Self::table_name_from_id(&request.id)?;
1479 let table_uri = self.table_full_uri(&table_name);
1480
1481 if let Some(location) = &request.location {
1483 let location = location.trim_end_matches('/');
1484 if location != table_uri {
1485 return Err(Error::namespace_source(
1486 format!(
1487 "Cannot declare table {} at location {}, must be at location {}",
1488 table_name, location, table_uri
1489 )
1490 .into(),
1491 ));
1492 }
1493 }
1494
1495 let status = self.check_table_status(&table_name).await;
1499 if status.exists && !status.has_reserved_file {
1500 return Err(Error::namespace_source(
1502 format!("Table already exists: {}", table_name).into(),
1503 ));
1504 }
1505
1506 let reserved_file_path = self.table_reserved_file_path(&table_name);
1510
1511 self.put_marker_file_atomic(&reserved_file_path, &format!("table {}", table_name))
1512 .await
1513 .map_err(|e| Error::namespace_source(e.into()))?;
1514
1515 let vend_credentials = request.vend_credentials.unwrap_or(true);
1517 let identity = request.identity.as_deref();
1518 let storage_options = self
1519 .get_storage_options_for_table(&table_uri, vend_credentials, identity)
1520 .await?;
1521
1522 Ok(DeclareTableResponse {
1523 location: Some(table_uri),
1524 storage_options,
1525 managed_versioning: if self.table_version_tracking_enabled {
1526 Some(true)
1527 } else {
1528 None
1529 },
1530 ..Default::default()
1531 })
1532 }
1533
1534 async fn register_table(
1535 &self,
1536 request: lance_namespace::models::RegisterTableRequest,
1537 ) -> Result<lance_namespace::models::RegisterTableResponse> {
1538 if let Some(ref manifest_ns) = self.manifest_ns {
1540 return LanceNamespace::register_table(manifest_ns.as_ref(), request).await;
1541 }
1542
1543 Err(Error::not_supported_source(
1545 "register_table is only supported when manifest mode is enabled".into(),
1546 ))
1547 }
1548
1549 async fn deregister_table(
1550 &self,
1551 request: lance_namespace::models::DeregisterTableRequest,
1552 ) -> Result<lance_namespace::models::DeregisterTableResponse> {
1553 if let Some(ref manifest_ns) = self.manifest_ns {
1555 return LanceNamespace::deregister_table(manifest_ns.as_ref(), request).await;
1556 }
1557
1558 let table_name = Self::table_name_from_id(&request.id)?;
1560 let table_uri = self.table_full_uri(&table_name);
1561
1562 let status = self.check_table_status(&table_name).await;
1565
1566 if !status.exists {
1567 return Err(Error::namespace_source(
1568 format!("Table does not exist: {}", table_name).into(),
1569 ));
1570 }
1571
1572 if status.is_deregistered {
1573 return Err(Error::namespace_source(
1574 format!("Table is already deregistered: {}", table_name).into(),
1575 ));
1576 }
1577
1578 let deregistered_path = self.table_deregistered_file_path(&table_name);
1584 self.put_marker_file_atomic(
1585 &deregistered_path,
1586 &format!("deregistration marker for table {}", table_name),
1587 )
1588 .await
1589 .map_err(|e| {
1590 let message = if e.contains("already exists") {
1592 format!("Table is already deregistered: {}", table_name)
1593 } else {
1594 e
1595 };
1596 Error::namespace_source(message.into())
1597 })?;
1598
1599 Ok(lance_namespace::models::DeregisterTableResponse {
1600 id: request.id,
1601 location: Some(table_uri),
1602 ..Default::default()
1603 })
1604 }
1605
1606 async fn list_table_versions(
1607 &self,
1608 request: ListTableVersionsRequest,
1609 ) -> Result<ListTableVersionsResponse> {
1610 if self.table_version_storage_enabled
1612 && let Some(ref manifest_ns) = self.manifest_ns
1613 {
1614 let table_id = request.id.clone().unwrap_or_default();
1615 let want_descending = request.descending == Some(true);
1616 return manifest_ns
1617 .list_table_versions(&table_id, want_descending, request.limit)
1618 .await;
1619 }
1620
1621 let table_uri = self.resolve_table_location(&request.id).await?;
1623
1624 let table_path = Self::uri_to_object_store_path(&table_uri);
1625 let versions_dir = table_path.child("_versions");
1626 let manifest_metas: Vec<_> = self
1627 .object_store
1628 .read_dir_all(&versions_dir, None)
1629 .try_collect()
1630 .await
1631 .map_err(|e| {
1632 Error::namespace_source(
1633 format!(
1634 "Failed to list manifest files for table at '{}': {}",
1635 table_uri, e
1636 )
1637 .into(),
1638 )
1639 })?;
1640
1641 let is_v2_naming = manifest_metas
1642 .first()
1643 .is_some_and(|meta| meta.location.filename().is_some_and(|f| f.len() == 29));
1644
1645 let mut table_versions: Vec<TableVersion> = manifest_metas
1646 .into_iter()
1647 .filter_map(|meta| {
1648 let filename = meta.location.filename()?;
1649 let version_str = filename.strip_suffix(".manifest")?;
1650 if version_str.starts_with('d') {
1651 return None;
1652 }
1653 let file_version: u64 = version_str.parse().ok()?;
1654
1655 let actual_version = if file_version > u64::MAX / 2 {
1656 u64::MAX - file_version
1657 } else {
1658 file_version
1659 };
1660
1661 Some(TableVersion {
1663 version: actual_version as i64,
1664 manifest_path: meta.location.to_string(),
1665 manifest_size: Some(meta.size as i64),
1666 e_tag: meta.e_tag,
1667 timestamp_millis: Some(meta.last_modified.timestamp_millis()),
1668 metadata: None,
1669 })
1670 })
1671 .collect();
1672
1673 let list_is_ordered = self.object_store.list_is_lexically_ordered;
1674 let want_descending = request.descending == Some(true);
1675
1676 let needs_sort = if list_is_ordered {
1677 if is_v2_naming {
1678 !want_descending
1679 } else {
1680 want_descending
1681 }
1682 } else {
1683 true
1684 };
1685
1686 if needs_sort {
1687 if want_descending {
1688 table_versions.sort_by(|a, b| b.version.cmp(&a.version));
1689 } else {
1690 table_versions.sort_by(|a, b| a.version.cmp(&b.version));
1691 }
1692 }
1693
1694 if let Some(limit) = request.limit {
1695 table_versions.truncate(limit as usize);
1696 }
1697
1698 Ok(ListTableVersionsResponse {
1699 versions: table_versions,
1700 page_token: None,
1701 })
1702 }
1703
1704 async fn create_table_version(
1705 &self,
1706 request: CreateTableVersionRequest,
1707 ) -> Result<CreateTableVersionResponse> {
1708 let table_uri = self.resolve_table_location(&request.id).await?;
1709
1710 let staging_manifest_path = &request.manifest_path;
1711 let version = request.version as u64;
1712
1713 let table_path = Self::uri_to_object_store_path(&table_uri);
1714
1715 let naming_scheme = match request.naming_scheme.as_deref() {
1717 Some("V1") => ManifestNamingScheme::V1,
1718 _ => ManifestNamingScheme::V2,
1719 };
1720
1721 let final_path = naming_scheme.manifest_path(&table_path, version);
1723
1724 let staging_path = Self::uri_to_object_store_path(staging_manifest_path);
1725 let manifest_data = self
1726 .object_store
1727 .inner
1728 .get(&staging_path)
1729 .await
1730 .map_err(|e| {
1731 Error::namespace_source(
1732 format!(
1733 "Failed to read staging manifest at '{}': {}",
1734 staging_manifest_path, e
1735 )
1736 .into(),
1737 )
1738 })?
1739 .bytes()
1740 .await
1741 .map_err(|e| {
1742 Error::namespace_source(
1743 format!(
1744 "Failed to read staging manifest bytes at '{}': {}",
1745 staging_manifest_path, e
1746 )
1747 .into(),
1748 )
1749 })?;
1750
1751 let manifest_size = manifest_data.len() as i64;
1752
1753 let put_result = self
1754 .object_store
1755 .inner
1756 .put_opts(
1757 &final_path,
1758 manifest_data.into(),
1759 PutOptions {
1760 mode: PutMode::Create,
1761 ..Default::default()
1762 },
1763 )
1764 .await
1765 .map_err(|e| match e {
1766 object_store::Error::AlreadyExists { .. }
1767 | object_store::Error::Precondition { .. } => Error::namespace_source(
1768 format!(
1769 "Version {} already exists for table at '{}'",
1770 version, table_uri
1771 )
1772 .into(),
1773 ),
1774 _ => Error::namespace_source(
1775 format!(
1776 "Failed to create version {} for table at '{}': {}",
1777 version, table_uri, e
1778 )
1779 .into(),
1780 ),
1781 })?;
1782
1783 if let Err(e) = self.object_store.inner.delete(&staging_path).await {
1785 log::warn!(
1786 "Failed to delete staging manifest at '{}': {:?}",
1787 staging_path,
1788 e
1789 );
1790 }
1791
1792 if self.table_version_storage_enabled
1794 && let Some(ref manifest_ns) = self.manifest_ns
1795 {
1796 let table_id_str =
1797 manifest::ManifestNamespace::str_object_id(&request.id.clone().unwrap_or_default());
1798 let object_id =
1799 manifest::ManifestNamespace::build_version_object_id(&table_id_str, version as i64);
1800 let metadata_json = serde_json::json!({
1801 "manifest_path": final_path.to_string(),
1802 "manifest_size": manifest_size,
1803 "e_tag": put_result.e_tag,
1804 "naming_scheme": request.naming_scheme.as_deref().unwrap_or("V2"),
1805 })
1806 .to_string();
1807
1808 if let Err(e) = manifest_ns
1809 .insert_into_manifest_with_metadata(
1810 vec![manifest::ManifestEntry {
1811 object_id,
1812 object_type: manifest::ObjectType::TableVersion,
1813 location: None,
1814 metadata: Some(metadata_json),
1815 }],
1816 None,
1817 )
1818 .await
1819 {
1820 log::warn!(
1821 "Failed to record table version in __manifest (best-effort): {:?}",
1822 e
1823 );
1824 }
1825 }
1826
1827 Ok(CreateTableVersionResponse {
1828 transaction_id: None,
1829 version: Some(Box::new(TableVersion {
1830 version: version as i64,
1831 manifest_path: final_path.to_string(),
1832 manifest_size: Some(manifest_size),
1833 e_tag: put_result.e_tag,
1834 timestamp_millis: None,
1835 metadata: None,
1836 })),
1837 })
1838 }
1839
1840 async fn describe_table_version(
1841 &self,
1842 request: DescribeTableVersionRequest,
1843 ) -> Result<DescribeTableVersionResponse> {
1844 if self.table_version_storage_enabled
1847 && let (Some(manifest_ns), Some(version)) = (&self.manifest_ns, request.version)
1848 {
1849 let table_id = request.id.clone().unwrap_or_default();
1850 return manifest_ns.describe_table_version(&table_id, version).await;
1851 }
1852
1853 let table_uri = self.resolve_table_location(&request.id).await?;
1855
1856 let mut builder = DatasetBuilder::from_uri(&table_uri);
1858 if let Some(opts) = &self.storage_options {
1859 builder = builder.with_storage_options(opts.clone());
1860 }
1861 if let Some(sess) = &self.session {
1862 builder = builder.with_session(sess.clone());
1863 }
1864 let mut dataset = builder.load().await.map_err(|e| {
1865 Error::namespace_source(
1866 format!("Failed to open table at '{}': {}", table_uri, e).into(),
1867 )
1868 })?;
1869
1870 if let Some(version) = request.version {
1871 dataset = dataset
1872 .checkout_version(version as u64)
1873 .await
1874 .map_err(|e| {
1875 Error::namespace_source(
1876 format!(
1877 "Failed to checkout version {} for table at '{}': {}",
1878 version, table_uri, e
1879 )
1880 .into(),
1881 )
1882 })?;
1883 }
1884
1885 let version_info = dataset.version();
1886 let manifest_location = dataset.manifest_location();
1887 let metadata: std::collections::HashMap<String, String> =
1888 version_info.metadata.into_iter().collect();
1889
1890 let table_version = TableVersion {
1891 version: version_info.version as i64,
1892 manifest_path: manifest_location.path.to_string(),
1893 manifest_size: manifest_location.size.map(|s| s as i64),
1894 e_tag: manifest_location.e_tag.clone(),
1895 timestamp_millis: Some(version_info.timestamp.timestamp_millis()),
1896 metadata: if metadata.is_empty() {
1897 None
1898 } else {
1899 Some(metadata)
1900 },
1901 };
1902
1903 Ok(DescribeTableVersionResponse {
1904 version: Box::new(table_version),
1905 })
1906 }
1907
1908 async fn batch_delete_table_versions(
1909 &self,
1910 request: BatchDeleteTableVersionsRequest,
1911 ) -> Result<BatchDeleteTableVersionsResponse> {
1912 let ranges: Vec<(i64, i64)> = request
1915 .ranges
1916 .iter()
1917 .map(|r| {
1918 let start = r.start_version;
1919 let end = if r.end_version > 0 {
1920 r.end_version
1921 } else {
1922 start
1923 };
1924 (start, end)
1925 })
1926 .collect();
1927 let table_entries = vec![TableDeleteEntry {
1928 table_id: request.id.clone(),
1929 ranges,
1930 }];
1931
1932 let mut total_deleted_count = 0i64;
1933
1934 if self.table_version_storage_enabled
1935 && let Some(ref manifest_ns) = self.manifest_ns
1936 {
1937 let mut all_object_ids: Vec<String> = Vec::new();
1944 for te in &table_entries {
1945 let table_id_str = manifest::ManifestNamespace::str_object_id(
1946 &te.table_id.clone().unwrap_or_default(),
1947 );
1948 for (start, end) in &te.ranges {
1949 for version in *start..=*end {
1950 let object_id = manifest::ManifestNamespace::build_version_object_id(
1951 &table_id_str,
1952 version,
1953 );
1954 all_object_ids.push(object_id);
1955 }
1956 }
1957 }
1958
1959 if !all_object_ids.is_empty() {
1960 total_deleted_count = manifest_ns
1961 .batch_delete_table_versions_by_object_ids(&all_object_ids)
1962 .await?;
1963 }
1964
1965 let _ = self
1970 .delete_physical_version_files(&table_entries, true)
1971 .await;
1972
1973 return Ok(BatchDeleteTableVersionsResponse {
1974 deleted_count: Some(total_deleted_count),
1975 transaction_id: None,
1976 });
1977 }
1978
1979 total_deleted_count = self
1981 .delete_physical_version_files(&table_entries, false)
1982 .await?;
1983
1984 Ok(BatchDeleteTableVersionsResponse {
1985 deleted_count: Some(total_deleted_count),
1986 transaction_id: None,
1987 })
1988 }
1989
1990 fn namespace_id(&self) -> String {
1991 format!("DirectoryNamespace {{ root: {:?} }}", self.root)
1992 }
1993}
1994
1995#[cfg(test)]
1996mod tests {
1997 use super::*;
1998 use arrow_ipc::reader::StreamReader;
1999 use lance::dataset::Dataset;
2000 use lance_core::utils::tempfile::{TempStdDir, TempStrDir};
2001 use lance_namespace::models::{
2002 CreateTableRequest, JsonArrowDataType, JsonArrowField, JsonArrowSchema, ListTablesRequest,
2003 };
2004 use lance_namespace::schema::convert_json_arrow_schema;
2005 use std::io::Cursor;
2006 use std::sync::Arc;
2007
2008 async fn create_test_namespace() -> (DirectoryNamespace, TempStdDir) {
2010 let temp_dir = TempStdDir::default();
2011
2012 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
2013 .build()
2014 .await
2015 .unwrap();
2016 (namespace, temp_dir)
2017 }
2018
2019 fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
2021 use arrow::ipc::writer::StreamWriter;
2022
2023 let arrow_schema = convert_json_arrow_schema(schema).unwrap();
2024 let arrow_schema = Arc::new(arrow_schema);
2025 let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
2026 let mut buffer = Vec::new();
2027 {
2028 let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
2029 writer.write(&batch).unwrap();
2030 writer.finish().unwrap();
2031 }
2032 buffer
2033 }
2034
2035 fn create_test_schema() -> JsonArrowSchema {
2037 let int_type = JsonArrowDataType::new("int32".to_string());
2038 let string_type = JsonArrowDataType::new("utf8".to_string());
2039
2040 let id_field = JsonArrowField {
2041 name: "id".to_string(),
2042 r#type: Box::new(int_type),
2043 nullable: false,
2044 metadata: None,
2045 };
2046
2047 let name_field = JsonArrowField {
2048 name: "name".to_string(),
2049 r#type: Box::new(string_type),
2050 nullable: true,
2051 metadata: None,
2052 };
2053
2054 JsonArrowSchema {
2055 fields: vec![id_field, name_field],
2056 metadata: None,
2057 }
2058 }
2059
2060 #[tokio::test]
2061 async fn test_create_table() {
2062 let (namespace, _temp_dir) = create_test_namespace().await;
2063
2064 let schema = create_test_schema();
2066 let ipc_data = create_test_ipc_data(&schema);
2067
2068 let mut request = CreateTableRequest::new();
2069 request.id = Some(vec!["test_table".to_string()]);
2070
2071 let response = namespace
2072 .create_table(request, bytes::Bytes::from(ipc_data))
2073 .await
2074 .unwrap();
2075
2076 assert!(response.location.is_some());
2077 assert!(response.location.unwrap().ends_with("test_table.lance"));
2078 assert_eq!(response.version, Some(1));
2079 }
2080
2081 #[tokio::test]
2082 async fn test_create_table_without_data() {
2083 let (namespace, _temp_dir) = create_test_namespace().await;
2084
2085 let mut request = CreateTableRequest::new();
2086 request.id = Some(vec!["test_table".to_string()]);
2087
2088 let result = namespace.create_table(request, bytes::Bytes::new()).await;
2089 assert!(result.is_err());
2090 assert!(
2091 result
2092 .unwrap_err()
2093 .to_string()
2094 .contains("Arrow IPC stream) is required")
2095 );
2096 }
2097
2098 #[tokio::test]
2099 async fn test_create_table_with_invalid_id() {
2100 let (namespace, _temp_dir) = create_test_namespace().await;
2101
2102 let schema = create_test_schema();
2104 let ipc_data = create_test_ipc_data(&schema);
2105
2106 let mut request = CreateTableRequest::new();
2108 request.id = Some(vec![]);
2109
2110 let result = namespace
2111 .create_table(request, bytes::Bytes::from(ipc_data.clone()))
2112 .await;
2113 assert!(result.is_err());
2114
2115 let mut create_ns_req = CreateNamespaceRequest::new();
2118 create_ns_req.id = Some(vec!["test_namespace".to_string()]);
2119 namespace.create_namespace(create_ns_req).await.unwrap();
2120
2121 let mut request = CreateTableRequest::new();
2123 request.id = Some(vec!["test_namespace".to_string(), "table".to_string()]);
2124
2125 let result = namespace
2126 .create_table(request, bytes::Bytes::from(ipc_data))
2127 .await;
2128 assert!(
2130 result.is_ok(),
2131 "Multi-level table IDs should work with manifest enabled"
2132 );
2133 }
2134
2135 #[tokio::test]
2136 async fn test_list_tables() {
2137 let (namespace, _temp_dir) = create_test_namespace().await;
2138
2139 let mut request = ListTablesRequest::new();
2141 request.id = Some(vec![]);
2142 let response = namespace.list_tables(request).await.unwrap();
2143 assert_eq!(response.tables.len(), 0);
2144
2145 let schema = create_test_schema();
2147 let ipc_data = create_test_ipc_data(&schema);
2148
2149 let mut create_request = CreateTableRequest::new();
2151 create_request.id = Some(vec!["table1".to_string()]);
2152 namespace
2153 .create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
2154 .await
2155 .unwrap();
2156
2157 let mut create_request = CreateTableRequest::new();
2159 create_request.id = Some(vec!["table2".to_string()]);
2160 namespace
2161 .create_table(create_request, bytes::Bytes::from(ipc_data))
2162 .await
2163 .unwrap();
2164
2165 let mut request = ListTablesRequest::new();
2167 request.id = Some(vec![]);
2168 let response = namespace.list_tables(request).await.unwrap();
2169 let tables = response.tables;
2170 assert_eq!(tables.len(), 2);
2171 assert!(tables.contains(&"table1".to_string()));
2172 assert!(tables.contains(&"table2".to_string()));
2173 }
2174
2175 #[tokio::test]
2176 async fn test_list_tables_with_namespace_id() {
2177 let (namespace, _temp_dir) = create_test_namespace().await;
2178
2179 let mut create_ns_req = CreateNamespaceRequest::new();
2181 create_ns_req.id = Some(vec!["test_namespace".to_string()]);
2182 namespace.create_namespace(create_ns_req).await.unwrap();
2183
2184 let mut request = ListTablesRequest::new();
2186 request.id = Some(vec!["test_namespace".to_string()]);
2187
2188 let result = namespace.list_tables(request).await;
2189 assert!(
2191 result.is_ok(),
2192 "list_tables should work with child namespace when manifest is enabled"
2193 );
2194 let response = result.unwrap();
2195 assert_eq!(
2196 response.tables.len(),
2197 0,
2198 "Namespace should have no tables yet"
2199 );
2200 }
2201
2202 #[tokio::test]
2203 async fn test_describe_table() {
2204 let (namespace, _temp_dir) = create_test_namespace().await;
2205
2206 let schema = create_test_schema();
2208 let ipc_data = create_test_ipc_data(&schema);
2209
2210 let mut create_request = CreateTableRequest::new();
2211 create_request.id = Some(vec!["test_table".to_string()]);
2212 namespace
2213 .create_table(create_request, bytes::Bytes::from(ipc_data))
2214 .await
2215 .unwrap();
2216
2217 let mut request = DescribeTableRequest::new();
2219 request.id = Some(vec!["test_table".to_string()]);
2220 let response = namespace.describe_table(request).await.unwrap();
2221
2222 assert!(response.location.is_some());
2223 assert!(response.location.unwrap().ends_with("test_table.lance"));
2224 }
2225
2226 #[tokio::test]
2227 async fn test_describe_nonexistent_table() {
2228 let (namespace, _temp_dir) = create_test_namespace().await;
2229
2230 let mut request = DescribeTableRequest::new();
2231 request.id = Some(vec!["nonexistent".to_string()]);
2232
2233 let result = namespace.describe_table(request).await;
2234 assert!(result.is_err());
2235 assert!(
2236 result
2237 .unwrap_err()
2238 .to_string()
2239 .contains("Table does not exist")
2240 );
2241 }
2242
2243 #[tokio::test]
2244 async fn test_table_exists() {
2245 let (namespace, _temp_dir) = create_test_namespace().await;
2246
2247 let schema = create_test_schema();
2249 let ipc_data = create_test_ipc_data(&schema);
2250
2251 let mut create_request = CreateTableRequest::new();
2252 create_request.id = Some(vec!["existing_table".to_string()]);
2253 namespace
2254 .create_table(create_request, bytes::Bytes::from(ipc_data))
2255 .await
2256 .unwrap();
2257
2258 let mut request = TableExistsRequest::new();
2260 request.id = Some(vec!["existing_table".to_string()]);
2261 let result = namespace.table_exists(request).await;
2262 assert!(result.is_ok());
2263
2264 let mut request = TableExistsRequest::new();
2266 request.id = Some(vec!["nonexistent".to_string()]);
2267 let result = namespace.table_exists(request).await;
2268 assert!(result.is_err());
2269 assert!(
2270 result
2271 .unwrap_err()
2272 .to_string()
2273 .contains("Table does not exist")
2274 );
2275 }
2276
2277 #[tokio::test]
2278 async fn test_drop_table() {
2279 let (namespace, _temp_dir) = create_test_namespace().await;
2280
2281 let schema = create_test_schema();
2283 let ipc_data = create_test_ipc_data(&schema);
2284
2285 let mut create_request = CreateTableRequest::new();
2286 create_request.id = Some(vec!["table_to_drop".to_string()]);
2287 namespace
2288 .create_table(create_request, bytes::Bytes::from(ipc_data))
2289 .await
2290 .unwrap();
2291
2292 let mut exists_request = TableExistsRequest::new();
2294 exists_request.id = Some(vec!["table_to_drop".to_string()]);
2295 assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
2296
2297 let mut drop_request = DropTableRequest::new();
2299 drop_request.id = Some(vec!["table_to_drop".to_string()]);
2300 let response = namespace.drop_table(drop_request).await.unwrap();
2301 assert!(response.location.is_some());
2302
2303 assert!(namespace.table_exists(exists_request).await.is_err());
2305 }
2306
2307 #[tokio::test]
2308 async fn test_drop_nonexistent_table() {
2309 let (namespace, _temp_dir) = create_test_namespace().await;
2310
2311 let mut request = DropTableRequest::new();
2312 request.id = Some(vec!["nonexistent".to_string()]);
2313
2314 let result = namespace.drop_table(request).await;
2316 let _ = result;
2319 }
2320
2321 #[tokio::test]
2322 async fn test_root_namespace_operations() {
2323 let (namespace, _temp_dir) = create_test_namespace().await;
2324
2325 let mut request = ListNamespacesRequest::new();
2327 request.id = Some(vec![]);
2328 let result = namespace.list_namespaces(request).await;
2329 assert!(result.is_ok());
2330 assert_eq!(result.unwrap().namespaces.len(), 0);
2331
2332 let mut request = DescribeNamespaceRequest::new();
2334 request.id = Some(vec![]);
2335 let result = namespace.describe_namespace(request).await;
2336 assert!(result.is_ok());
2337
2338 let mut request = NamespaceExistsRequest::new();
2340 request.id = Some(vec![]);
2341 let result = namespace.namespace_exists(request).await;
2342 assert!(result.is_ok());
2343
2344 let mut request = CreateNamespaceRequest::new();
2346 request.id = Some(vec![]);
2347 let result = namespace.create_namespace(request).await;
2348 assert!(result.is_err());
2349 assert!(result.unwrap_err().to_string().contains("already exists"));
2350
2351 let mut request = DropNamespaceRequest::new();
2353 request.id = Some(vec![]);
2354 let result = namespace.drop_namespace(request).await;
2355 assert!(result.is_err());
2356 assert!(
2357 result
2358 .unwrap_err()
2359 .to_string()
2360 .contains("cannot be dropped")
2361 );
2362 }
2363
2364 #[tokio::test]
2365 async fn test_non_root_namespace_operations() {
2366 let (namespace, _temp_dir) = create_test_namespace().await;
2367
2368 let mut request = CreateNamespaceRequest::new();
2371 request.id = Some(vec!["child".to_string()]);
2372 let result = namespace.create_namespace(request).await;
2373 assert!(
2374 result.is_ok(),
2375 "Child namespace creation should succeed with manifest enabled"
2376 );
2377
2378 let mut request = NamespaceExistsRequest::new();
2380 request.id = Some(vec!["child".to_string()]);
2381 let result = namespace.namespace_exists(request).await;
2382 assert!(
2383 result.is_ok(),
2384 "Child namespace should exist after creation"
2385 );
2386
2387 let mut request = DropNamespaceRequest::new();
2389 request.id = Some(vec!["child".to_string()]);
2390 let result = namespace.drop_namespace(request).await;
2391 assert!(
2392 result.is_ok(),
2393 "Child namespace drop should succeed with manifest enabled"
2394 );
2395
2396 let mut request = NamespaceExistsRequest::new();
2398 request.id = Some(vec!["child".to_string()]);
2399 let result = namespace.namespace_exists(request).await;
2400 assert!(
2401 result.is_err(),
2402 "Child namespace should not exist after drop"
2403 );
2404 }
2405
2406 #[tokio::test]
2407 async fn test_config_custom_root() {
2408 let temp_dir = TempStdDir::default();
2409 let custom_path = temp_dir.join("custom");
2410 std::fs::create_dir(&custom_path).unwrap();
2411
2412 let namespace = DirectoryNamespaceBuilder::new(custom_path.to_string_lossy().to_string())
2413 .build()
2414 .await
2415 .unwrap();
2416
2417 let schema = create_test_schema();
2419 let ipc_data = create_test_ipc_data(&schema);
2420
2421 let mut request = CreateTableRequest::new();
2423 request.id = Some(vec!["test_table".to_string()]);
2424
2425 let response = namespace
2426 .create_table(request, bytes::Bytes::from(ipc_data))
2427 .await
2428 .unwrap();
2429
2430 assert!(response.location.unwrap().contains("custom"));
2431 }
2432
2433 #[tokio::test]
2434 async fn test_config_storage_options() {
2435 let temp_dir = TempStdDir::default();
2436
2437 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
2438 .storage_option("option1", "value1")
2439 .storage_option("option2", "value2")
2440 .build()
2441 .await
2442 .unwrap();
2443
2444 let schema = create_test_schema();
2446 let ipc_data = create_test_ipc_data(&schema);
2447
2448 let mut request = CreateTableRequest::new();
2450 request.id = Some(vec!["test_table".to_string()]);
2451
2452 let response = namespace
2453 .create_table(request, bytes::Bytes::from(ipc_data))
2454 .await
2455 .unwrap();
2456
2457 let storage_options = response.storage_options.unwrap();
2458 assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
2459 assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
2460 }
2461
2462 #[tokio::test]
2466 async fn test_no_storage_options_without_vendor() {
2467 use lance_namespace::models::DeclareTableRequest;
2468
2469 let temp_dir = TempStdDir::default();
2470
2471 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
2473 .manifest_enabled(false)
2474 .storage_option("aws_access_key_id", "AKID")
2475 .storage_option("aws_secret_access_key", "SECRET")
2476 .storage_option("region", "us-east-1")
2477 .build()
2478 .await
2479 .unwrap();
2480
2481 let schema = create_test_schema();
2482 let ipc_data = create_test_ipc_data(&schema);
2483
2484 let mut create_req = CreateTableRequest::new();
2486 create_req.id = Some(vec!["t1".to_string()]);
2487 namespace
2488 .create_table(create_req, bytes::Bytes::from(ipc_data))
2489 .await
2490 .unwrap();
2491
2492 let mut desc_req = DescribeTableRequest::new();
2494 desc_req.id = Some(vec!["t1".to_string()]);
2495 let resp = namespace.describe_table(desc_req).await.unwrap();
2496 assert!(resp.storage_options.is_none());
2497
2498 let mut decl_req = DeclareTableRequest::new();
2500 decl_req.id = Some(vec!["t2".to_string()]);
2501 let resp = namespace.declare_table(decl_req).await.unwrap();
2502 assert!(resp.storage_options.is_none());
2503 }
2504
2505 #[tokio::test]
2507 async fn test_no_storage_options_without_vendor_manifest() {
2508 let temp_dir = TempStdDir::default();
2509
2510 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
2511 .storage_option("aws_access_key_id", "AKID")
2512 .storage_option("aws_secret_access_key", "SECRET")
2513 .storage_option("region", "us-east-1")
2514 .build()
2515 .await
2516 .unwrap();
2517
2518 let schema = create_test_schema();
2519 let ipc_data = create_test_ipc_data(&schema);
2520
2521 let mut create_req = CreateTableRequest::new();
2522 create_req.id = Some(vec!["t1".to_string()]);
2523 namespace
2524 .create_table(create_req, bytes::Bytes::from(ipc_data))
2525 .await
2526 .unwrap();
2527
2528 let mut desc_req = DescribeTableRequest::new();
2530 desc_req.id = Some(vec!["t1".to_string()]);
2531 let resp = namespace.describe_table(desc_req).await.unwrap();
2532 assert!(resp.storage_options.is_none());
2533 }
2534
2535 #[tokio::test]
2536 async fn test_from_properties_manifest_enabled() {
2537 let temp_dir = TempStdDir::default();
2538
2539 let mut properties = HashMap::new();
2540 properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2541 properties.insert("manifest_enabled".to_string(), "true".to_string());
2542 properties.insert("dir_listing_enabled".to_string(), "false".to_string());
2543
2544 let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2545 assert!(builder.manifest_enabled);
2546 assert!(!builder.dir_listing_enabled);
2547
2548 let namespace = builder.build().await.unwrap();
2549
2550 let schema = create_test_schema();
2552 let ipc_data = create_test_ipc_data(&schema);
2553
2554 let mut request = CreateTableRequest::new();
2556 request.id = Some(vec!["test_table".to_string()]);
2557
2558 let response = namespace
2559 .create_table(request, bytes::Bytes::from(ipc_data))
2560 .await
2561 .unwrap();
2562
2563 assert!(response.location.is_some());
2564 }
2565
2566 #[tokio::test]
2567 async fn test_from_properties_dir_listing_enabled() {
2568 let temp_dir = TempStdDir::default();
2569
2570 let mut properties = HashMap::new();
2571 properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2572 properties.insert("manifest_enabled".to_string(), "false".to_string());
2573 properties.insert("dir_listing_enabled".to_string(), "true".to_string());
2574
2575 let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2576 assert!(!builder.manifest_enabled);
2577 assert!(builder.dir_listing_enabled);
2578
2579 let namespace = builder.build().await.unwrap();
2580
2581 let schema = create_test_schema();
2583 let ipc_data = create_test_ipc_data(&schema);
2584
2585 let mut request = CreateTableRequest::new();
2587 request.id = Some(vec!["test_table".to_string()]);
2588
2589 let response = namespace
2590 .create_table(request, bytes::Bytes::from(ipc_data))
2591 .await
2592 .unwrap();
2593
2594 assert!(response.location.is_some());
2595 }
2596
2597 #[tokio::test]
2598 async fn test_from_properties_defaults() {
2599 let temp_dir = TempStdDir::default();
2600
2601 let mut properties = HashMap::new();
2602 properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2603
2604 let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2605 assert!(builder.manifest_enabled);
2607 assert!(builder.dir_listing_enabled);
2608 }
2609
2610 #[tokio::test]
2611 async fn test_from_properties_with_storage_options() {
2612 let temp_dir = TempStdDir::default();
2613
2614 let mut properties = HashMap::new();
2615 properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2616 properties.insert("manifest_enabled".to_string(), "true".to_string());
2617 properties.insert("storage.region".to_string(), "us-west-2".to_string());
2618 properties.insert("storage.bucket".to_string(), "my-bucket".to_string());
2619
2620 let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2621 assert!(builder.manifest_enabled);
2622 assert!(builder.storage_options.is_some());
2623
2624 let storage_options = builder.storage_options.unwrap();
2625 assert_eq!(
2626 storage_options.get("region"),
2627 Some(&"us-west-2".to_string())
2628 );
2629 assert_eq!(
2630 storage_options.get("bucket"),
2631 Some(&"my-bucket".to_string())
2632 );
2633 }
2634
2635 #[tokio::test]
2636 async fn test_various_arrow_types() {
2637 let (namespace, _temp_dir) = create_test_namespace().await;
2638
2639 let fields = vec![
2641 JsonArrowField {
2642 name: "bool_col".to_string(),
2643 r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
2644 nullable: true,
2645 metadata: None,
2646 },
2647 JsonArrowField {
2648 name: "int8_col".to_string(),
2649 r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
2650 nullable: true,
2651 metadata: None,
2652 },
2653 JsonArrowField {
2654 name: "float64_col".to_string(),
2655 r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
2656 nullable: true,
2657 metadata: None,
2658 },
2659 JsonArrowField {
2660 name: "binary_col".to_string(),
2661 r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
2662 nullable: true,
2663 metadata: None,
2664 },
2665 ];
2666
2667 let schema = JsonArrowSchema {
2668 fields,
2669 metadata: None,
2670 };
2671
2672 let ipc_data = create_test_ipc_data(&schema);
2674
2675 let mut request = CreateTableRequest::new();
2676 request.id = Some(vec!["complex_table".to_string()]);
2677
2678 let response = namespace
2679 .create_table(request, bytes::Bytes::from(ipc_data))
2680 .await
2681 .unwrap();
2682
2683 assert!(response.location.is_some());
2684 }
2685
2686 #[tokio::test]
2687 async fn test_connect_dir() {
2688 let temp_dir = TempStdDir::default();
2689
2690 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
2691 .build()
2692 .await
2693 .unwrap();
2694
2695 let mut request = ListTablesRequest::new();
2697 request.id = Some(vec![]);
2698 let response = namespace.list_tables(request).await.unwrap();
2699 assert_eq!(response.tables.len(), 0);
2700 }
2701
2702 #[tokio::test]
2703 async fn test_create_table_with_ipc_data() {
2704 use arrow::array::{Int32Array, StringArray};
2705 use arrow::ipc::writer::StreamWriter;
2706
2707 let (namespace, _temp_dir) = create_test_namespace().await;
2708
2709 let schema = create_test_schema();
2711
2712 let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
2714 let arrow_schema = Arc::new(arrow_schema);
2715
2716 let id_array = Int32Array::from(vec![1, 2, 3]);
2718 let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
2719 let batch = arrow::record_batch::RecordBatch::try_new(
2720 arrow_schema.clone(),
2721 vec![Arc::new(id_array), Arc::new(name_array)],
2722 )
2723 .unwrap();
2724
2725 let mut buffer = Vec::new();
2727 {
2728 let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
2729 writer.write(&batch).unwrap();
2730 writer.finish().unwrap();
2731 }
2732
2733 let mut request = CreateTableRequest::new();
2735 request.id = Some(vec!["test_table_with_data".to_string()]);
2736
2737 let response = namespace
2738 .create_table(request, Bytes::from(buffer))
2739 .await
2740 .unwrap();
2741
2742 assert_eq!(response.version, Some(1));
2743 assert!(
2744 response
2745 .location
2746 .unwrap()
2747 .contains("test_table_with_data.lance")
2748 );
2749
2750 let mut exists_request = TableExistsRequest::new();
2752 exists_request.id = Some(vec!["test_table_with_data".to_string()]);
2753 namespace.table_exists(exists_request).await.unwrap();
2754 }
2755
2756 #[tokio::test]
2757 async fn test_child_namespace_create_and_list() {
2758 let (namespace, _temp_dir) = create_test_namespace().await;
2759
2760 for i in 1..=3 {
2762 let mut create_req = CreateNamespaceRequest::new();
2763 create_req.id = Some(vec![format!("ns{}", i)]);
2764 let result = namespace.create_namespace(create_req).await;
2765 assert!(result.is_ok(), "Failed to create child namespace ns{}", i);
2766 }
2767
2768 let list_req = ListNamespacesRequest {
2770 id: Some(vec![]),
2771 ..Default::default()
2772 };
2773 let result = namespace.list_namespaces(list_req).await;
2774 assert!(result.is_ok());
2775 let namespaces = result.unwrap().namespaces;
2776 assert_eq!(namespaces.len(), 3);
2777 assert!(namespaces.contains(&"ns1".to_string()));
2778 assert!(namespaces.contains(&"ns2".to_string()));
2779 assert!(namespaces.contains(&"ns3".to_string()));
2780 }
2781
2782 #[tokio::test]
2783 async fn test_nested_namespace_hierarchy() {
2784 let (namespace, _temp_dir) = create_test_namespace().await;
2785
2786 let mut create_req = CreateNamespaceRequest::new();
2788 create_req.id = Some(vec!["parent".to_string()]);
2789 namespace.create_namespace(create_req).await.unwrap();
2790
2791 let mut create_req = CreateNamespaceRequest::new();
2793 create_req.id = Some(vec!["parent".to_string(), "child1".to_string()]);
2794 namespace.create_namespace(create_req).await.unwrap();
2795
2796 let mut create_req = CreateNamespaceRequest::new();
2797 create_req.id = Some(vec!["parent".to_string(), "child2".to_string()]);
2798 namespace.create_namespace(create_req).await.unwrap();
2799
2800 let list_req = ListNamespacesRequest {
2802 id: Some(vec!["parent".to_string()]),
2803 ..Default::default()
2804 };
2805 let result = namespace.list_namespaces(list_req).await;
2806 assert!(result.is_ok());
2807 let children = result.unwrap().namespaces;
2808 assert_eq!(children.len(), 2);
2809 assert!(children.contains(&"child1".to_string()));
2810 assert!(children.contains(&"child2".to_string()));
2811
2812 let list_req = ListNamespacesRequest {
2814 id: Some(vec![]),
2815 ..Default::default()
2816 };
2817 let result = namespace.list_namespaces(list_req).await;
2818 assert!(result.is_ok());
2819 let root_namespaces = result.unwrap().namespaces;
2820 assert_eq!(root_namespaces.len(), 1);
2821 assert_eq!(root_namespaces[0], "parent");
2822 }
2823
2824 #[tokio::test]
2825 async fn test_table_in_child_namespace() {
2826 let (namespace, _temp_dir) = create_test_namespace().await;
2827
2828 let mut create_ns_req = CreateNamespaceRequest::new();
2830 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2831 namespace.create_namespace(create_ns_req).await.unwrap();
2832
2833 let schema = create_test_schema();
2835 let ipc_data = create_test_ipc_data(&schema);
2836 let mut create_table_req = CreateTableRequest::new();
2837 create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2838 let result = namespace
2839 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2840 .await;
2841 assert!(result.is_ok(), "Failed to create table in child namespace");
2842
2843 let list_req = ListTablesRequest {
2845 id: Some(vec!["test_ns".to_string()]),
2846 ..Default::default()
2847 };
2848 let result = namespace.list_tables(list_req).await;
2849 assert!(result.is_ok());
2850 let tables = result.unwrap().tables;
2851 assert_eq!(tables.len(), 1);
2852 assert_eq!(tables[0], "table1");
2853
2854 let mut exists_req = TableExistsRequest::new();
2856 exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2857 let result = namespace.table_exists(exists_req).await;
2858 assert!(result.is_ok());
2859
2860 let mut describe_req = DescribeTableRequest::new();
2862 describe_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2863 let result = namespace.describe_table(describe_req).await;
2864 assert!(result.is_ok());
2865 let response = result.unwrap();
2866 assert!(response.location.is_some());
2867 }
2868
2869 #[tokio::test]
2870 async fn test_multiple_tables_in_child_namespace() {
2871 let (namespace, _temp_dir) = create_test_namespace().await;
2872
2873 let mut create_ns_req = CreateNamespaceRequest::new();
2875 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2876 namespace.create_namespace(create_ns_req).await.unwrap();
2877
2878 let schema = create_test_schema();
2880 let ipc_data = create_test_ipc_data(&schema);
2881 for i in 1..=3 {
2882 let mut create_table_req = CreateTableRequest::new();
2883 create_table_req.id = Some(vec!["test_ns".to_string(), format!("table{}", i)]);
2884 namespace
2885 .create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
2886 .await
2887 .unwrap();
2888 }
2889
2890 let list_req = ListTablesRequest {
2892 id: Some(vec!["test_ns".to_string()]),
2893 ..Default::default()
2894 };
2895 let result = namespace.list_tables(list_req).await;
2896 assert!(result.is_ok());
2897 let tables = result.unwrap().tables;
2898 assert_eq!(tables.len(), 3);
2899 assert!(tables.contains(&"table1".to_string()));
2900 assert!(tables.contains(&"table2".to_string()));
2901 assert!(tables.contains(&"table3".to_string()));
2902 }
2903
2904 #[tokio::test]
2905 async fn test_drop_table_in_child_namespace() {
2906 let (namespace, _temp_dir) = create_test_namespace().await;
2907
2908 let mut create_ns_req = CreateNamespaceRequest::new();
2910 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2911 namespace.create_namespace(create_ns_req).await.unwrap();
2912
2913 let schema = create_test_schema();
2915 let ipc_data = create_test_ipc_data(&schema);
2916 let mut create_table_req = CreateTableRequest::new();
2917 create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2918 namespace
2919 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2920 .await
2921 .unwrap();
2922
2923 let mut drop_req = DropTableRequest::new();
2925 drop_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2926 let result = namespace.drop_table(drop_req).await;
2927 assert!(result.is_ok(), "Failed to drop table in child namespace");
2928
2929 let mut exists_req = TableExistsRequest::new();
2931 exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2932 let result = namespace.table_exists(exists_req).await;
2933 assert!(result.is_err());
2934 }
2935
2936 #[tokio::test]
2937 async fn test_deeply_nested_namespace() {
2938 let (namespace, _temp_dir) = create_test_namespace().await;
2939
2940 let mut create_req = CreateNamespaceRequest::new();
2942 create_req.id = Some(vec!["level1".to_string()]);
2943 namespace.create_namespace(create_req).await.unwrap();
2944
2945 let mut create_req = CreateNamespaceRequest::new();
2946 create_req.id = Some(vec!["level1".to_string(), "level2".to_string()]);
2947 namespace.create_namespace(create_req).await.unwrap();
2948
2949 let mut create_req = CreateNamespaceRequest::new();
2950 create_req.id = Some(vec![
2951 "level1".to_string(),
2952 "level2".to_string(),
2953 "level3".to_string(),
2954 ]);
2955 namespace.create_namespace(create_req).await.unwrap();
2956
2957 let schema = create_test_schema();
2959 let ipc_data = create_test_ipc_data(&schema);
2960 let mut create_table_req = CreateTableRequest::new();
2961 create_table_req.id = Some(vec![
2962 "level1".to_string(),
2963 "level2".to_string(),
2964 "level3".to_string(),
2965 "table1".to_string(),
2966 ]);
2967 let result = namespace
2968 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2969 .await;
2970 assert!(
2971 result.is_ok(),
2972 "Failed to create table in deeply nested namespace"
2973 );
2974
2975 let mut exists_req = TableExistsRequest::new();
2977 exists_req.id = Some(vec![
2978 "level1".to_string(),
2979 "level2".to_string(),
2980 "level3".to_string(),
2981 "table1".to_string(),
2982 ]);
2983 let result = namespace.table_exists(exists_req).await;
2984 assert!(result.is_ok());
2985 }
2986
2987 #[tokio::test]
2988 async fn test_namespace_with_properties() {
2989 let (namespace, _temp_dir) = create_test_namespace().await;
2990
2991 let mut properties = HashMap::new();
2993 properties.insert("owner".to_string(), "test_user".to_string());
2994 properties.insert("description".to_string(), "Test namespace".to_string());
2995
2996 let mut create_req = CreateNamespaceRequest::new();
2997 create_req.id = Some(vec!["test_ns".to_string()]);
2998 create_req.properties = Some(properties.clone());
2999 namespace.create_namespace(create_req).await.unwrap();
3000
3001 let describe_req = DescribeNamespaceRequest {
3003 id: Some(vec!["test_ns".to_string()]),
3004 ..Default::default()
3005 };
3006 let result = namespace.describe_namespace(describe_req).await;
3007 assert!(result.is_ok());
3008 let response = result.unwrap();
3009 assert!(response.properties.is_some());
3010 let props = response.properties.unwrap();
3011 assert_eq!(props.get("owner"), Some(&"test_user".to_string()));
3012 assert_eq!(
3013 props.get("description"),
3014 Some(&"Test namespace".to_string())
3015 );
3016 }
3017
3018 #[tokio::test]
3019 async fn test_cannot_drop_namespace_with_tables() {
3020 let (namespace, _temp_dir) = create_test_namespace().await;
3021
3022 let mut create_ns_req = CreateNamespaceRequest::new();
3024 create_ns_req.id = Some(vec!["test_ns".to_string()]);
3025 namespace.create_namespace(create_ns_req).await.unwrap();
3026
3027 let schema = create_test_schema();
3029 let ipc_data = create_test_ipc_data(&schema);
3030 let mut create_table_req = CreateTableRequest::new();
3031 create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
3032 namespace
3033 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
3034 .await
3035 .unwrap();
3036
3037 let mut drop_req = DropNamespaceRequest::new();
3039 drop_req.id = Some(vec!["test_ns".to_string()]);
3040 let result = namespace.drop_namespace(drop_req).await;
3041 assert!(
3042 result.is_err(),
3043 "Should not be able to drop namespace with tables"
3044 );
3045 }
3046
3047 #[tokio::test]
3048 async fn test_isolation_between_namespaces() {
3049 let (namespace, _temp_dir) = create_test_namespace().await;
3050
3051 let mut create_req = CreateNamespaceRequest::new();
3053 create_req.id = Some(vec!["ns1".to_string()]);
3054 namespace.create_namespace(create_req).await.unwrap();
3055
3056 let mut create_req = CreateNamespaceRequest::new();
3057 create_req.id = Some(vec!["ns2".to_string()]);
3058 namespace.create_namespace(create_req).await.unwrap();
3059
3060 let schema = create_test_schema();
3062 let ipc_data = create_test_ipc_data(&schema);
3063
3064 let mut create_table_req = CreateTableRequest::new();
3065 create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
3066 namespace
3067 .create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
3068 .await
3069 .unwrap();
3070
3071 let mut create_table_req = CreateTableRequest::new();
3072 create_table_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
3073 namespace
3074 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
3075 .await
3076 .unwrap();
3077
3078 let list_req = ListTablesRequest {
3080 id: Some(vec!["ns1".to_string()]),
3081 page_token: None,
3082 limit: None,
3083 ..Default::default()
3084 };
3085 let result = namespace.list_tables(list_req).await.unwrap();
3086 assert_eq!(result.tables.len(), 1);
3087 assert_eq!(result.tables[0], "table1");
3088
3089 let list_req = ListTablesRequest {
3090 id: Some(vec!["ns2".to_string()]),
3091 page_token: None,
3092 limit: None,
3093 ..Default::default()
3094 };
3095 let result = namespace.list_tables(list_req).await.unwrap();
3096 assert_eq!(result.tables.len(), 1);
3097 assert_eq!(result.tables[0], "table1");
3098
3099 let mut drop_req = DropTableRequest::new();
3101 drop_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
3102 namespace.drop_table(drop_req).await.unwrap();
3103
3104 let mut exists_req = TableExistsRequest::new();
3106 exists_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
3107 assert!(namespace.table_exists(exists_req).await.is_err());
3108
3109 let mut exists_req = TableExistsRequest::new();
3110 exists_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
3111 assert!(namespace.table_exists(exists_req).await.is_ok());
3112 }
3113
3114 #[tokio::test]
3115 async fn test_migrate_directory_tables() {
3116 let temp_dir = TempStdDir::default();
3117 let temp_path = temp_dir.to_str().unwrap();
3118
3119 let dir_only_ns = DirectoryNamespaceBuilder::new(temp_path)
3121 .manifest_enabled(false)
3122 .dir_listing_enabled(true)
3123 .build()
3124 .await
3125 .unwrap();
3126
3127 let schema = create_test_schema();
3129 let ipc_data = create_test_ipc_data(&schema);
3130
3131 for i in 1..=3 {
3132 let mut create_req = CreateTableRequest::new();
3133 create_req.id = Some(vec![format!("table{}", i)]);
3134 dir_only_ns
3135 .create_table(create_req, bytes::Bytes::from(ipc_data.clone()))
3136 .await
3137 .unwrap();
3138 }
3139
3140 drop(dir_only_ns);
3141
3142 let dual_mode_ns = DirectoryNamespaceBuilder::new(temp_path)
3144 .manifest_enabled(true)
3145 .dir_listing_enabled(true)
3146 .build()
3147 .await
3148 .unwrap();
3149
3150 let mut list_req = ListTablesRequest::new();
3152 list_req.id = Some(vec![]);
3153 let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
3154 assert_eq!(tables.len(), 3);
3155
3156 let migrated_count = dual_mode_ns.migrate().await.unwrap();
3158 assert_eq!(migrated_count, 3, "Should migrate all 3 tables");
3159
3160 let mut list_req = ListTablesRequest::new();
3162 list_req.id = Some(vec![]);
3163 let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
3164 assert_eq!(tables.len(), 3);
3165
3166 let migrated_count = dual_mode_ns.migrate().await.unwrap();
3168 assert_eq!(
3169 migrated_count, 0,
3170 "Should not migrate already-migrated tables"
3171 );
3172
3173 drop(dual_mode_ns);
3174
3175 let manifest_only_ns = DirectoryNamespaceBuilder::new(temp_path)
3177 .manifest_enabled(true)
3178 .dir_listing_enabled(false)
3179 .build()
3180 .await
3181 .unwrap();
3182
3183 let mut list_req = ListTablesRequest::new();
3185 list_req.id = Some(vec![]);
3186 let tables = manifest_only_ns.list_tables(list_req).await.unwrap().tables;
3187 assert_eq!(tables.len(), 3);
3188 assert!(tables.contains(&"table1".to_string()));
3189 assert!(tables.contains(&"table2".to_string()));
3190 assert!(tables.contains(&"table3".to_string()));
3191 }
3192
3193 #[tokio::test]
3194 async fn test_migrate_without_manifest() {
3195 let temp_dir = TempStdDir::default();
3196 let temp_path = temp_dir.to_str().unwrap();
3197
3198 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3200 .manifest_enabled(false)
3201 .dir_listing_enabled(true)
3202 .build()
3203 .await
3204 .unwrap();
3205
3206 let migrated_count = namespace.migrate().await.unwrap();
3208 assert_eq!(migrated_count, 0);
3209 }
3210
3211 #[tokio::test]
3212 async fn test_register_table() {
3213 use lance_namespace::models::{RegisterTableRequest, TableExistsRequest};
3214
3215 let temp_dir = TempStdDir::default();
3216 let temp_path = temp_dir.to_str().unwrap();
3217
3218 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3219 .build()
3220 .await
3221 .unwrap();
3222
3223 let schema = create_test_schema();
3225 let ipc_data = create_test_ipc_data(&schema);
3226
3227 let table_uri = format!("{}/external_table.lance", temp_path);
3228 let cursor = Cursor::new(ipc_data);
3229 let stream_reader = StreamReader::try_new(cursor, None).unwrap();
3230 let batches: Vec<_> = stream_reader
3231 .collect::<std::result::Result<Vec<_>, _>>()
3232 .unwrap();
3233 let schema = batches[0].schema();
3234 let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
3235 let reader = RecordBatchIterator::new(batch_results, schema);
3236 Dataset::write(Box::new(reader), &table_uri, None)
3237 .await
3238 .unwrap();
3239
3240 let mut register_req = RegisterTableRequest::new("external_table.lance".to_string());
3242 register_req.id = Some(vec!["registered_table".to_string()]);
3243
3244 let response = namespace.register_table(register_req).await.unwrap();
3245 assert_eq!(response.location, Some("external_table.lance".to_string()));
3246
3247 let mut exists_req = TableExistsRequest::new();
3249 exists_req.id = Some(vec!["registered_table".to_string()]);
3250 assert!(namespace.table_exists(exists_req).await.is_ok());
3251
3252 let mut list_req = ListTablesRequest::new();
3254 list_req.id = Some(vec![]);
3255 let tables = namespace.list_tables(list_req).await.unwrap();
3256 assert!(tables.tables.contains(&"registered_table".to_string()));
3257 }
3258
3259 #[tokio::test]
3260 async fn test_register_table_duplicate_fails() {
3261 use lance_namespace::models::RegisterTableRequest;
3262
3263 let temp_dir = TempStdDir::default();
3264 let temp_path = temp_dir.to_str().unwrap();
3265
3266 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3267 .build()
3268 .await
3269 .unwrap();
3270
3271 let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
3273 register_req.id = Some(vec!["test_table".to_string()]);
3274
3275 namespace
3276 .register_table(register_req.clone())
3277 .await
3278 .unwrap();
3279
3280 let result = namespace.register_table(register_req).await;
3282 assert!(result.is_err());
3283 assert!(result.unwrap_err().to_string().contains("already exists"));
3284 }
3285
3286 #[tokio::test]
3287 async fn test_deregister_table() {
3288 use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
3289
3290 let temp_dir = TempStdDir::default();
3291 let temp_path = temp_dir.to_str().unwrap();
3292
3293 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3296 .manifest_enabled(true)
3297 .dir_listing_enabled(false)
3298 .build()
3299 .await
3300 .unwrap();
3301
3302 let schema = create_test_schema();
3304 let ipc_data = create_test_ipc_data(&schema);
3305
3306 let mut create_req = CreateTableRequest::new();
3307 create_req.id = Some(vec!["test_table".to_string()]);
3308 namespace
3309 .create_table(create_req, bytes::Bytes::from(ipc_data))
3310 .await
3311 .unwrap();
3312
3313 let mut exists_req = TableExistsRequest::new();
3315 exists_req.id = Some(vec!["test_table".to_string()]);
3316 assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
3317
3318 let mut deregister_req = DeregisterTableRequest::new();
3320 deregister_req.id = Some(vec!["test_table".to_string()]);
3321 let response = namespace.deregister_table(deregister_req).await.unwrap();
3322
3323 assert!(
3325 response.location.is_some(),
3326 "Deregister should return location"
3327 );
3328 let location = response.location.as_ref().unwrap();
3329 let expected_url = lance_io::object_store::uri_to_url(temp_path)
3332 .expect("Failed to convert temp path to URL");
3333 let expected_prefix = expected_url.to_string();
3334 assert!(
3335 location.starts_with(&expected_prefix),
3336 "Location should start with '{}', got: {}",
3337 expected_prefix,
3338 location
3339 );
3340 assert!(
3341 location.contains("test_table"),
3342 "Location should contain table name: {}",
3343 location
3344 );
3345 assert_eq!(response.id, Some(vec!["test_table".to_string()]));
3346
3347 assert!(namespace.table_exists(exists_req).await.is_err());
3349
3350 let dataset = Dataset::open(location).await;
3352 assert!(
3353 dataset.is_ok(),
3354 "Physical table data should still exist at {}",
3355 location
3356 );
3357 }
3358
3359 #[tokio::test]
3360 async fn test_deregister_table_in_child_namespace() {
3361 use lance_namespace::models::{
3362 CreateNamespaceRequest, DeregisterTableRequest, TableExistsRequest,
3363 };
3364
3365 let temp_dir = TempStdDir::default();
3366 let temp_path = temp_dir.to_str().unwrap();
3367
3368 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3369 .build()
3370 .await
3371 .unwrap();
3372
3373 let mut create_ns_req = CreateNamespaceRequest::new();
3375 create_ns_req.id = Some(vec!["test_ns".to_string()]);
3376 namespace.create_namespace(create_ns_req).await.unwrap();
3377
3378 let schema = create_test_schema();
3380 let ipc_data = create_test_ipc_data(&schema);
3381
3382 let mut create_req = CreateTableRequest::new();
3383 create_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
3384 namespace
3385 .create_table(create_req, bytes::Bytes::from(ipc_data))
3386 .await
3387 .unwrap();
3388
3389 let mut deregister_req = DeregisterTableRequest::new();
3391 deregister_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
3392 let response = namespace.deregister_table(deregister_req).await.unwrap();
3393
3394 assert!(
3396 response.location.is_some(),
3397 "Deregister should return location"
3398 );
3399 let location = response.location.as_ref().unwrap();
3400 let expected_url = lance_io::object_store::uri_to_url(temp_path)
3403 .expect("Failed to convert temp path to URL");
3404 let expected_prefix = expected_url.to_string();
3405 assert!(
3406 location.starts_with(&expected_prefix),
3407 "Location should start with '{}', got: {}",
3408 expected_prefix,
3409 location
3410 );
3411 assert!(
3412 location.contains("test_ns") && location.contains("test_table"),
3413 "Location should contain namespace and table name: {}",
3414 location
3415 );
3416 assert_eq!(
3417 response.id,
3418 Some(vec!["test_ns".to_string(), "test_table".to_string()])
3419 );
3420
3421 let mut exists_req = TableExistsRequest::new();
3423 exists_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
3424 assert!(namespace.table_exists(exists_req).await.is_err());
3425 }
3426
3427 #[tokio::test]
3428 async fn test_register_without_manifest_fails() {
3429 use lance_namespace::models::RegisterTableRequest;
3430
3431 let temp_dir = TempStdDir::default();
3432 let temp_path = temp_dir.to_str().unwrap();
3433
3434 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3436 .manifest_enabled(false)
3437 .build()
3438 .await
3439 .unwrap();
3440
3441 let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
3443 register_req.id = Some(vec!["test_table".to_string()]);
3444 let result = namespace.register_table(register_req).await;
3445 assert!(result.is_err());
3446 assert!(
3447 result
3448 .unwrap_err()
3449 .to_string()
3450 .contains("manifest mode is enabled")
3451 );
3452
3453 }
3456
3457 #[tokio::test]
3458 async fn test_register_table_rejects_absolute_uri() {
3459 use lance_namespace::models::RegisterTableRequest;
3460
3461 let temp_dir = TempStdDir::default();
3462 let temp_path = temp_dir.to_str().unwrap();
3463
3464 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3465 .build()
3466 .await
3467 .unwrap();
3468
3469 let mut register_req = RegisterTableRequest::new("s3://bucket/table.lance".to_string());
3471 register_req.id = Some(vec!["test_table".to_string()]);
3472 let result = namespace.register_table(register_req).await;
3473 assert!(result.is_err());
3474 let err_msg = result.unwrap_err().to_string();
3475 assert!(err_msg.contains("Absolute URIs are not allowed"));
3476 }
3477
3478 #[tokio::test]
3479 async fn test_register_table_rejects_absolute_path() {
3480 use lance_namespace::models::RegisterTableRequest;
3481
3482 let temp_dir = TempStdDir::default();
3483 let temp_path = temp_dir.to_str().unwrap();
3484
3485 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3486 .build()
3487 .await
3488 .unwrap();
3489
3490 let mut register_req = RegisterTableRequest::new("/tmp/table.lance".to_string());
3492 register_req.id = Some(vec!["test_table".to_string()]);
3493 let result = namespace.register_table(register_req).await;
3494 assert!(result.is_err());
3495 let err_msg = result.unwrap_err().to_string();
3496 assert!(err_msg.contains("Absolute paths are not allowed"));
3497 }
3498
3499 #[tokio::test]
3500 async fn test_register_table_rejects_path_traversal() {
3501 use lance_namespace::models::RegisterTableRequest;
3502
3503 let temp_dir = TempStdDir::default();
3504 let temp_path = temp_dir.to_str().unwrap();
3505
3506 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3507 .build()
3508 .await
3509 .unwrap();
3510
3511 let mut register_req = RegisterTableRequest::new("../outside/table.lance".to_string());
3513 register_req.id = Some(vec!["test_table".to_string()]);
3514 let result = namespace.register_table(register_req).await;
3515 assert!(result.is_err());
3516 let err_msg = result.unwrap_err().to_string();
3517 assert!(err_msg.contains("Path traversal is not allowed"));
3518 }
3519
3520 #[tokio::test]
3521 async fn test_namespace_write() {
3522 use arrow::array::Int32Array;
3523 use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
3524 use arrow::record_batch::{RecordBatch, RecordBatchIterator};
3525 use lance::dataset::{Dataset, WriteMode, WriteParams};
3526 use lance_namespace::LanceNamespace;
3527
3528 let (namespace, _temp_dir) = create_test_namespace().await;
3529 let namespace = Arc::new(namespace) as Arc<dyn LanceNamespace>;
3530
3531 let table_id = vec!["test_ns".to_string(), "test_table".to_string()];
3533 let schema = Arc::new(ArrowSchema::new(vec![
3534 ArrowField::new("a", DataType::Int32, false),
3535 ArrowField::new("b", DataType::Int32, false),
3536 ]));
3537
3538 let data1 = RecordBatch::try_new(
3540 schema.clone(),
3541 vec![
3542 Arc::new(Int32Array::from(vec![1, 2, 3])),
3543 Arc::new(Int32Array::from(vec![10, 20, 30])),
3544 ],
3545 )
3546 .unwrap();
3547
3548 let reader1 = RecordBatchIterator::new(vec![data1].into_iter().map(Ok), schema.clone());
3549 let dataset =
3550 Dataset::write_into_namespace(reader1, namespace.clone(), table_id.clone(), None)
3551 .await
3552 .unwrap();
3553
3554 assert_eq!(dataset.count_rows(None).await.unwrap(), 3);
3555 assert_eq!(dataset.version().version, 1);
3556
3557 let data2 = RecordBatch::try_new(
3559 schema.clone(),
3560 vec![
3561 Arc::new(Int32Array::from(vec![4, 5])),
3562 Arc::new(Int32Array::from(vec![40, 50])),
3563 ],
3564 )
3565 .unwrap();
3566
3567 let params_append = WriteParams {
3568 mode: WriteMode::Append,
3569 ..Default::default()
3570 };
3571
3572 let reader2 = RecordBatchIterator::new(vec![data2].into_iter().map(Ok), schema.clone());
3573 let dataset = Dataset::write_into_namespace(
3574 reader2,
3575 namespace.clone(),
3576 table_id.clone(),
3577 Some(params_append),
3578 )
3579 .await
3580 .unwrap();
3581
3582 assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
3583 assert_eq!(dataset.version().version, 2);
3584
3585 let data3 = RecordBatch::try_new(
3587 schema.clone(),
3588 vec![
3589 Arc::new(Int32Array::from(vec![100, 200])),
3590 Arc::new(Int32Array::from(vec![1000, 2000])),
3591 ],
3592 )
3593 .unwrap();
3594
3595 let params_overwrite = WriteParams {
3596 mode: WriteMode::Overwrite,
3597 ..Default::default()
3598 };
3599
3600 let reader3 = RecordBatchIterator::new(vec![data3].into_iter().map(Ok), schema.clone());
3601 let dataset = Dataset::write_into_namespace(
3602 reader3,
3603 namespace.clone(),
3604 table_id.clone(),
3605 Some(params_overwrite),
3606 )
3607 .await
3608 .unwrap();
3609
3610 assert_eq!(dataset.count_rows(None).await.unwrap(), 2);
3611 assert_eq!(dataset.version().version, 3);
3612
3613 let result = dataset.scan().try_into_batch().await.unwrap();
3615 let a_col = result
3616 .column_by_name("a")
3617 .unwrap()
3618 .as_any()
3619 .downcast_ref::<Int32Array>()
3620 .unwrap();
3621 assert_eq!(a_col.values(), &[100, 200]);
3622 }
3623
3624 #[tokio::test]
3629 async fn test_declare_table_v1_mode() {
3630 use lance_namespace::models::{
3631 DeclareTableRequest, DescribeTableRequest, TableExistsRequest,
3632 };
3633
3634 let temp_dir = TempStdDir::default();
3635 let temp_path = temp_dir.to_str().unwrap();
3636
3637 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3639 .manifest_enabled(false)
3640 .build()
3641 .await
3642 .unwrap();
3643
3644 let mut declare_req = DeclareTableRequest::new();
3646 declare_req.id = Some(vec!["test_table".to_string()]);
3647 let response = namespace.declare_table(declare_req).await.unwrap();
3648
3649 assert!(response.location.is_some());
3651 let location = response.location.as_ref().unwrap();
3652 assert!(location.ends_with("test_table.lance"));
3653
3654 let mut exists_req = TableExistsRequest::new();
3656 exists_req.id = Some(vec!["test_table".to_string()]);
3657 assert!(namespace.table_exists(exists_req).await.is_ok());
3658
3659 let mut describe_req = DescribeTableRequest::new();
3661 describe_req.id = Some(vec!["test_table".to_string()]);
3662 let describe_response = namespace.describe_table(describe_req).await.unwrap();
3663 assert!(describe_response.location.is_some());
3664 assert!(describe_response.version.is_none()); assert!(describe_response.schema.is_none()); }
3667
3668 #[tokio::test]
3669 async fn test_declare_table_with_manifest() {
3670 use lance_namespace::models::{DeclareTableRequest, TableExistsRequest};
3671
3672 let temp_dir = TempStdDir::default();
3673 let temp_path = temp_dir.to_str().unwrap();
3674
3675 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3677 .manifest_enabled(true)
3678 .dir_listing_enabled(false)
3679 .build()
3680 .await
3681 .unwrap();
3682
3683 let mut declare_req = DeclareTableRequest::new();
3685 declare_req.id = Some(vec!["test_table".to_string()]);
3686 let response = namespace.declare_table(declare_req).await.unwrap();
3687
3688 assert!(response.location.is_some());
3690
3691 let mut exists_req = TableExistsRequest::new();
3693 exists_req.id = Some(vec!["test_table".to_string()]);
3694 assert!(namespace.table_exists(exists_req).await.is_ok());
3695 }
3696
3697 #[tokio::test]
3698 async fn test_declare_table_when_table_exists() {
3699 use lance_namespace::models::DeclareTableRequest;
3700
3701 let temp_dir = TempStdDir::default();
3702 let temp_path = temp_dir.to_str().unwrap();
3703
3704 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3705 .manifest_enabled(false)
3706 .build()
3707 .await
3708 .unwrap();
3709
3710 let schema = create_test_schema();
3712 let ipc_data = create_test_ipc_data(&schema);
3713 let mut create_req = CreateTableRequest::new();
3714 create_req.id = Some(vec!["test_table".to_string()]);
3715 namespace
3716 .create_table(create_req, bytes::Bytes::from(ipc_data))
3717 .await
3718 .unwrap();
3719
3720 let mut declare_req = DeclareTableRequest::new();
3722 declare_req.id = Some(vec!["test_table".to_string()]);
3723 let result = namespace.declare_table(declare_req).await;
3724 assert!(result.is_err());
3725 }
3726
3727 #[tokio::test]
3732 async fn test_deregister_table_v1_mode() {
3733 use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
3734
3735 let temp_dir = TempStdDir::default();
3736 let temp_path = temp_dir.to_str().unwrap();
3737
3738 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3740 .manifest_enabled(false)
3741 .dir_listing_enabled(true)
3742 .build()
3743 .await
3744 .unwrap();
3745
3746 let schema = create_test_schema();
3748 let ipc_data = create_test_ipc_data(&schema);
3749 let mut create_req = CreateTableRequest::new();
3750 create_req.id = Some(vec!["test_table".to_string()]);
3751 namespace
3752 .create_table(create_req, bytes::Bytes::from(ipc_data))
3753 .await
3754 .unwrap();
3755
3756 let mut exists_req = TableExistsRequest::new();
3758 exists_req.id = Some(vec!["test_table".to_string()]);
3759 assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
3760
3761 let mut deregister_req = DeregisterTableRequest::new();
3763 deregister_req.id = Some(vec!["test_table".to_string()]);
3764 let response = namespace.deregister_table(deregister_req).await.unwrap();
3765
3766 assert!(response.location.is_some());
3768 let location = response.location.as_ref().unwrap();
3769 assert!(location.contains("test_table"));
3770
3771 let result = namespace.table_exists(exists_req).await;
3773 assert!(result.is_err());
3774 assert!(result.unwrap_err().to_string().contains("deregistered"));
3775
3776 let dataset = Dataset::open(location).await;
3778 assert!(dataset.is_ok(), "Physical table data should still exist");
3779 }
3780
3781 #[tokio::test]
3782 async fn test_deregister_table_v1_already_deregistered() {
3783 use lance_namespace::models::DeregisterTableRequest;
3784
3785 let temp_dir = TempStdDir::default();
3786 let temp_path = temp_dir.to_str().unwrap();
3787
3788 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3789 .manifest_enabled(false)
3790 .dir_listing_enabled(true)
3791 .build()
3792 .await
3793 .unwrap();
3794
3795 let schema = create_test_schema();
3797 let ipc_data = create_test_ipc_data(&schema);
3798 let mut create_req = CreateTableRequest::new();
3799 create_req.id = Some(vec!["test_table".to_string()]);
3800 namespace
3801 .create_table(create_req, bytes::Bytes::from(ipc_data))
3802 .await
3803 .unwrap();
3804
3805 let mut deregister_req = DeregisterTableRequest::new();
3807 deregister_req.id = Some(vec!["test_table".to_string()]);
3808 namespace
3809 .deregister_table(deregister_req.clone())
3810 .await
3811 .unwrap();
3812
3813 let result = namespace.deregister_table(deregister_req).await;
3815 assert!(result.is_err());
3816 assert!(
3817 result
3818 .unwrap_err()
3819 .to_string()
3820 .contains("already deregistered")
3821 );
3822 }
3823
3824 #[tokio::test]
3829 async fn test_list_tables_skips_deregistered_v1() {
3830 use lance_namespace::models::DeregisterTableRequest;
3831
3832 let temp_dir = TempStdDir::default();
3833 let temp_path = temp_dir.to_str().unwrap();
3834
3835 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3836 .manifest_enabled(false)
3837 .dir_listing_enabled(true)
3838 .build()
3839 .await
3840 .unwrap();
3841
3842 let schema = create_test_schema();
3844 let ipc_data = create_test_ipc_data(&schema);
3845
3846 let mut create_req1 = CreateTableRequest::new();
3847 create_req1.id = Some(vec!["table1".to_string()]);
3848 namespace
3849 .create_table(create_req1, bytes::Bytes::from(ipc_data.clone()))
3850 .await
3851 .unwrap();
3852
3853 let mut create_req2 = CreateTableRequest::new();
3854 create_req2.id = Some(vec!["table2".to_string()]);
3855 namespace
3856 .create_table(create_req2, bytes::Bytes::from(ipc_data))
3857 .await
3858 .unwrap();
3859
3860 let mut list_req = ListTablesRequest::new();
3862 list_req.id = Some(vec![]);
3863 let list_response = namespace.list_tables(list_req.clone()).await.unwrap();
3864 assert_eq!(list_response.tables.len(), 2);
3865
3866 let mut deregister_req = DeregisterTableRequest::new();
3868 deregister_req.id = Some(vec!["table1".to_string()]);
3869 namespace.deregister_table(deregister_req).await.unwrap();
3870
3871 let list_response = namespace.list_tables(list_req).await.unwrap();
3873 assert_eq!(list_response.tables.len(), 1);
3874 assert!(list_response.tables.contains(&"table2".to_string()));
3875 assert!(!list_response.tables.contains(&"table1".to_string()));
3876 }
3877
3878 #[tokio::test]
3883 async fn test_describe_table_fails_for_deregistered_v1() {
3884 use lance_namespace::models::{DeregisterTableRequest, DescribeTableRequest};
3885
3886 let temp_dir = TempStdDir::default();
3887 let temp_path = temp_dir.to_str().unwrap();
3888
3889 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3890 .manifest_enabled(false)
3891 .dir_listing_enabled(true)
3892 .build()
3893 .await
3894 .unwrap();
3895
3896 let schema = create_test_schema();
3898 let ipc_data = create_test_ipc_data(&schema);
3899 let mut create_req = CreateTableRequest::new();
3900 create_req.id = Some(vec!["test_table".to_string()]);
3901 namespace
3902 .create_table(create_req, bytes::Bytes::from(ipc_data))
3903 .await
3904 .unwrap();
3905
3906 let mut describe_req = DescribeTableRequest::new();
3908 describe_req.id = Some(vec!["test_table".to_string()]);
3909 assert!(namespace.describe_table(describe_req.clone()).await.is_ok());
3910
3911 let mut deregister_req = DeregisterTableRequest::new();
3913 deregister_req.id = Some(vec!["test_table".to_string()]);
3914 namespace.deregister_table(deregister_req).await.unwrap();
3915
3916 let result = namespace.describe_table(describe_req).await;
3918 assert!(result.is_err());
3919 assert!(result.unwrap_err().to_string().contains("deregistered"));
3920 }
3921
3922 #[tokio::test]
3923 async fn test_table_exists_fails_for_deregistered_v1() {
3924 use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
3925
3926 let temp_dir = TempStdDir::default();
3927 let temp_path = temp_dir.to_str().unwrap();
3928
3929 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3930 .manifest_enabled(false)
3931 .dir_listing_enabled(true)
3932 .build()
3933 .await
3934 .unwrap();
3935
3936 let schema = create_test_schema();
3938 let ipc_data = create_test_ipc_data(&schema);
3939 let mut create_req = CreateTableRequest::new();
3940 create_req.id = Some(vec!["test_table".to_string()]);
3941 namespace
3942 .create_table(create_req, bytes::Bytes::from(ipc_data))
3943 .await
3944 .unwrap();
3945
3946 let mut exists_req = TableExistsRequest::new();
3948 exists_req.id = Some(vec!["test_table".to_string()]);
3949 assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
3950
3951 let mut deregister_req = DeregisterTableRequest::new();
3953 deregister_req.id = Some(vec!["test_table".to_string()]);
3954 namespace.deregister_table(deregister_req).await.unwrap();
3955
3956 let result = namespace.table_exists(exists_req).await;
3958 assert!(result.is_err());
3959 assert!(result.unwrap_err().to_string().contains("deregistered"));
3960 }
3961
3962 #[tokio::test]
3963 async fn test_atomic_table_status_check() {
3964 let temp_dir = TempStdDir::default();
3968 let temp_path = temp_dir.to_str().unwrap();
3969
3970 let namespace = DirectoryNamespaceBuilder::new(temp_path)
3971 .manifest_enabled(false)
3972 .dir_listing_enabled(true)
3973 .build()
3974 .await
3975 .unwrap();
3976
3977 let schema = create_test_schema();
3979 let ipc_data = create_test_ipc_data(&schema);
3980 let mut create_req = CreateTableRequest::new();
3981 create_req.id = Some(vec!["test_table".to_string()]);
3982 namespace
3983 .create_table(create_req, bytes::Bytes::from(ipc_data))
3984 .await
3985 .unwrap();
3986
3987 let status = namespace.check_table_status("test_table").await;
3989 assert!(status.exists);
3990 assert!(!status.is_deregistered);
3991 assert!(!status.has_reserved_file);
3992 }
3993
3994 #[tokio::test]
3995 async fn test_table_version_tracking_enabled_managed_versioning() {
3996 use lance_namespace::models::DescribeTableRequest;
3997
3998 let temp_dir = TempStdDir::default();
3999 let temp_path = temp_dir.to_str().unwrap();
4000
4001 let namespace = DirectoryNamespaceBuilder::new(temp_path)
4003 .table_version_tracking_enabled(true)
4004 .build()
4005 .await
4006 .unwrap();
4007
4008 let schema = create_test_schema();
4010 let ipc_data = create_test_ipc_data(&schema);
4011 let mut create_req = CreateTableRequest::new();
4012 create_req.id = Some(vec!["test_table".to_string()]);
4013 namespace
4014 .create_table(create_req, bytes::Bytes::from(ipc_data))
4015 .await
4016 .unwrap();
4017
4018 let mut describe_req = DescribeTableRequest::new();
4020 describe_req.id = Some(vec!["test_table".to_string()]);
4021 let describe_resp = namespace.describe_table(describe_req).await.unwrap();
4022
4023 assert_eq!(
4025 describe_resp.managed_versioning,
4026 Some(true),
4027 "managed_versioning should be true when table_version_tracking_enabled=true"
4028 );
4029 }
4030
4031 #[tokio::test]
4032 async fn test_table_version_tracking_disabled_no_managed_versioning() {
4033 use lance_namespace::models::DescribeTableRequest;
4034
4035 let temp_dir = TempStdDir::default();
4036 let temp_path = temp_dir.to_str().unwrap();
4037
4038 let namespace = DirectoryNamespaceBuilder::new(temp_path)
4040 .table_version_tracking_enabled(false)
4041 .build()
4042 .await
4043 .unwrap();
4044
4045 let schema = create_test_schema();
4047 let ipc_data = create_test_ipc_data(&schema);
4048 let mut create_req = CreateTableRequest::new();
4049 create_req.id = Some(vec!["test_table".to_string()]);
4050 namespace
4051 .create_table(create_req, bytes::Bytes::from(ipc_data))
4052 .await
4053 .unwrap();
4054
4055 let mut describe_req = DescribeTableRequest::new();
4057 describe_req.id = Some(vec!["test_table".to_string()]);
4058 let describe_resp = namespace.describe_table(describe_req).await.unwrap();
4059
4060 assert!(
4062 describe_resp.managed_versioning.is_none(),
4063 "managed_versioning should be None when table_version_tracking_enabled=false, got: {:?}",
4064 describe_resp.managed_versioning
4065 );
4066 }
4067
4068 #[tokio::test]
4069 #[cfg(not(windows))]
4070 async fn test_list_table_versions() {
4071 use arrow::array::{Int32Array, RecordBatchIterator};
4072 use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
4073 use arrow::record_batch::RecordBatch;
4074 use lance::dataset::{Dataset, WriteMode, WriteParams};
4075 use lance_namespace::models::{CreateNamespaceRequest, ListTableVersionsRequest};
4076
4077 let temp_dir = TempStrDir::default();
4078 let temp_path: &str = &temp_dir;
4079
4080 let namespace: Arc<dyn LanceNamespace> = Arc::new(
4081 DirectoryNamespaceBuilder::new(temp_path)
4082 .table_version_tracking_enabled(true)
4083 .build()
4084 .await
4085 .unwrap(),
4086 );
4087
4088 let mut create_ns_req = CreateNamespaceRequest::new();
4090 create_ns_req.id = Some(vec!["workspace".to_string()]);
4091 namespace.create_namespace(create_ns_req).await.unwrap();
4092
4093 let table_id = vec!["workspace".to_string(), "test_table".to_string()];
4095 let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
4096 "id",
4097 DataType::Int32,
4098 false,
4099 )]));
4100 let batch = RecordBatch::try_new(
4101 arrow_schema.clone(),
4102 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
4103 )
4104 .unwrap();
4105 let batches = RecordBatchIterator::new(vec![Ok(batch.clone())], arrow_schema.clone());
4106 let write_params = WriteParams {
4107 mode: WriteMode::Create,
4108 ..Default::default()
4109 };
4110 let mut dataset = Dataset::write_into_namespace(
4111 batches,
4112 namespace.clone(),
4113 table_id.clone(),
4114 Some(write_params),
4115 )
4116 .await
4117 .unwrap();
4118
4119 let batch2 = RecordBatch::try_new(
4121 arrow_schema.clone(),
4122 vec![Arc::new(Int32Array::from(vec![100, 200]))],
4123 )
4124 .unwrap();
4125 let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema.clone());
4126 dataset.append(batches, None).await.unwrap();
4127
4128 let batch3 = RecordBatch::try_new(
4130 arrow_schema.clone(),
4131 vec![Arc::new(Int32Array::from(vec![300, 400]))],
4132 )
4133 .unwrap();
4134 let batches = RecordBatchIterator::new(vec![Ok(batch3)], arrow_schema);
4135 dataset.append(batches, None).await.unwrap();
4136
4137 let mut list_req = ListTableVersionsRequest::new();
4139 list_req.id = Some(table_id.clone());
4140 let list_resp = namespace.list_table_versions(list_req).await.unwrap();
4141
4142 assert_eq!(
4143 list_resp.versions.len(),
4144 3,
4145 "Should have 3 versions, got: {:?}",
4146 list_resp.versions
4147 );
4148
4149 for expected_version in 1..=3 {
4151 let version = list_resp
4152 .versions
4153 .iter()
4154 .find(|v| v.version == expected_version)
4155 .unwrap_or_else(|| panic!("Expected version {}", expected_version));
4156
4157 assert!(
4158 !version.manifest_path.is_empty(),
4159 "manifest_path should be set for version {}",
4160 expected_version
4161 );
4162 assert!(
4163 version.manifest_path.contains(".manifest"),
4164 "manifest_path should contain .manifest for version {}",
4165 expected_version
4166 );
4167 assert!(
4168 version.manifest_size.is_some(),
4169 "manifest_size should be set for version {}",
4170 expected_version
4171 );
4172 assert!(
4173 version.manifest_size.unwrap() > 0,
4174 "manifest_size should be > 0 for version {}",
4175 expected_version
4176 );
4177 assert!(
4178 version.timestamp_millis.is_some(),
4179 "timestamp_millis should be set for version {}",
4180 expected_version
4181 );
4182 }
4183 }
4184
4185 #[tokio::test]
4186 #[cfg(not(windows))]
4187 async fn test_describe_table_version() {
4188 use arrow::array::{Int32Array, RecordBatchIterator};
4189 use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
4190 use arrow::record_batch::RecordBatch;
4191 use lance::dataset::{Dataset, WriteMode, WriteParams};
4192 use lance_namespace::models::{CreateNamespaceRequest, DescribeTableVersionRequest};
4193
4194 let temp_dir = TempStrDir::default();
4195 let temp_path: &str = &temp_dir;
4196
4197 let namespace: Arc<dyn LanceNamespace> = Arc::new(
4198 DirectoryNamespaceBuilder::new(temp_path)
4199 .table_version_tracking_enabled(true)
4200 .build()
4201 .await
4202 .unwrap(),
4203 );
4204
4205 let mut create_ns_req = CreateNamespaceRequest::new();
4207 create_ns_req.id = Some(vec!["workspace".to_string()]);
4208 namespace.create_namespace(create_ns_req).await.unwrap();
4209
4210 let table_id = vec!["workspace".to_string(), "test_table".to_string()];
4212 let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
4213 "id",
4214 DataType::Int32,
4215 false,
4216 )]));
4217 let batch = RecordBatch::try_new(
4218 arrow_schema.clone(),
4219 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
4220 )
4221 .unwrap();
4222 let batches = RecordBatchIterator::new(vec![Ok(batch)], arrow_schema.clone());
4223 let write_params = WriteParams {
4224 mode: WriteMode::Create,
4225 ..Default::default()
4226 };
4227 let mut dataset = Dataset::write_into_namespace(
4228 batches,
4229 namespace.clone(),
4230 table_id.clone(),
4231 Some(write_params),
4232 )
4233 .await
4234 .unwrap();
4235
4236 let batch2 = RecordBatch::try_new(
4238 arrow_schema.clone(),
4239 vec![Arc::new(Int32Array::from(vec![100, 200]))],
4240 )
4241 .unwrap();
4242 let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema);
4243 dataset.append(batches, None).await.unwrap();
4244
4245 let mut describe_req = DescribeTableVersionRequest::new();
4247 describe_req.id = Some(table_id.clone());
4248 describe_req.version = Some(1);
4249 let describe_resp = namespace
4250 .describe_table_version(describe_req)
4251 .await
4252 .unwrap();
4253
4254 let version = &describe_resp.version;
4255 assert_eq!(version.version, 1);
4256 assert!(version.timestamp_millis.is_some());
4257 assert!(
4258 !version.manifest_path.is_empty(),
4259 "manifest_path should be set"
4260 );
4261 assert!(
4262 version.manifest_path.contains(".manifest"),
4263 "manifest_path should contain .manifest"
4264 );
4265 assert!(
4266 version.manifest_size.is_some(),
4267 "manifest_size should be set"
4268 );
4269 assert!(
4270 version.manifest_size.unwrap() > 0,
4271 "manifest_size should be > 0"
4272 );
4273
4274 let mut describe_req = DescribeTableVersionRequest::new();
4276 describe_req.id = Some(table_id.clone());
4277 describe_req.version = Some(2);
4278 let describe_resp = namespace
4279 .describe_table_version(describe_req)
4280 .await
4281 .unwrap();
4282
4283 let version = &describe_resp.version;
4284 assert_eq!(version.version, 2);
4285 assert!(version.timestamp_millis.is_some());
4286 assert!(
4287 !version.manifest_path.is_empty(),
4288 "manifest_path should be set"
4289 );
4290 assert!(
4291 version.manifest_size.is_some(),
4292 "manifest_size should be set"
4293 );
4294 assert!(
4295 version.manifest_size.unwrap() > 0,
4296 "manifest_size should be > 0"
4297 );
4298 }
4299
4300 #[tokio::test]
4301 #[cfg(not(windows))]
4302 async fn test_describe_table_version_latest() {
4303 use arrow::array::{Int32Array, RecordBatchIterator};
4304 use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
4305 use arrow::record_batch::RecordBatch;
4306 use lance::dataset::{Dataset, WriteMode, WriteParams};
4307 use lance_namespace::models::{CreateNamespaceRequest, DescribeTableVersionRequest};
4308
4309 let temp_dir = TempStrDir::default();
4310 let temp_path: &str = &temp_dir;
4311
4312 let namespace: Arc<dyn LanceNamespace> = Arc::new(
4313 DirectoryNamespaceBuilder::new(temp_path)
4314 .table_version_tracking_enabled(true)
4315 .build()
4316 .await
4317 .unwrap(),
4318 );
4319
4320 let mut create_ns_req = CreateNamespaceRequest::new();
4322 create_ns_req.id = Some(vec!["workspace".to_string()]);
4323 namespace.create_namespace(create_ns_req).await.unwrap();
4324
4325 let table_id = vec!["workspace".to_string(), "test_table".to_string()];
4327 let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
4328 "id",
4329 DataType::Int32,
4330 false,
4331 )]));
4332 let batch = RecordBatch::try_new(
4333 arrow_schema.clone(),
4334 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
4335 )
4336 .unwrap();
4337 let batches = RecordBatchIterator::new(vec![Ok(batch)], arrow_schema.clone());
4338 let write_params = WriteParams {
4339 mode: WriteMode::Create,
4340 ..Default::default()
4341 };
4342 let mut dataset = Dataset::write_into_namespace(
4343 batches,
4344 namespace.clone(),
4345 table_id.clone(),
4346 Some(write_params),
4347 )
4348 .await
4349 .unwrap();
4350
4351 let batch2 = RecordBatch::try_new(
4353 arrow_schema.clone(),
4354 vec![Arc::new(Int32Array::from(vec![100, 200]))],
4355 )
4356 .unwrap();
4357 let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema.clone());
4358 dataset.append(batches, None).await.unwrap();
4359
4360 let batch3 = RecordBatch::try_new(
4362 arrow_schema.clone(),
4363 vec![Arc::new(Int32Array::from(vec![300, 400]))],
4364 )
4365 .unwrap();
4366 let batches = RecordBatchIterator::new(vec![Ok(batch3)], arrow_schema);
4367 dataset.append(batches, None).await.unwrap();
4368
4369 let mut describe_req = DescribeTableVersionRequest::new();
4371 describe_req.id = Some(table_id.clone());
4372 describe_req.version = None;
4373 let describe_resp = namespace
4374 .describe_table_version(describe_req)
4375 .await
4376 .unwrap();
4377
4378 assert_eq!(describe_resp.version.version, 3);
4380 }
4381
4382 #[tokio::test]
4383 #[cfg(not(windows))]
4384 async fn test_create_table_version() {
4385 use futures::TryStreamExt;
4386 use lance::dataset::builder::DatasetBuilder;
4387 use lance_namespace::models::CreateTableVersionRequest;
4388
4389 let temp_dir = TempStrDir::default();
4390 let temp_path: &str = &temp_dir;
4391
4392 let namespace: Arc<dyn LanceNamespace> = Arc::new(
4393 DirectoryNamespaceBuilder::new(temp_path)
4394 .table_version_tracking_enabled(true)
4395 .build()
4396 .await
4397 .unwrap(),
4398 );
4399
4400 let schema = create_test_schema();
4402 let ipc_data = create_test_ipc_data(&schema);
4403 let mut create_req = CreateTableRequest::new();
4404 create_req.id = Some(vec!["test_table".to_string()]);
4405 namespace
4406 .create_table(create_req, bytes::Bytes::from(ipc_data))
4407 .await
4408 .unwrap();
4409
4410 let table_id = vec!["test_table".to_string()];
4412 let dataset = DatasetBuilder::from_namespace(namespace.clone(), table_id.clone())
4413 .await
4414 .unwrap()
4415 .load()
4416 .await
4417 .unwrap();
4418
4419 let versions_path = dataset.versions_dir();
4421 let manifest_metas: Vec<_> = dataset
4422 .object_store()
4423 .inner
4424 .list(Some(&versions_path))
4425 .try_collect()
4426 .await
4427 .unwrap();
4428
4429 let manifest_meta = manifest_metas
4430 .iter()
4431 .find(|m| {
4432 m.location
4433 .filename()
4434 .map(|f| f.ends_with(".manifest"))
4435 .unwrap_or(false)
4436 })
4437 .expect("No manifest file found");
4438
4439 let manifest_data = dataset
4441 .object_store()
4442 .inner
4443 .get(&manifest_meta.location)
4444 .await
4445 .unwrap()
4446 .bytes()
4447 .await
4448 .unwrap();
4449
4450 let staging_path = dataset.versions_dir().child("staging_manifest");
4452 dataset
4453 .object_store()
4454 .inner
4455 .put(&staging_path, manifest_data.into())
4456 .await
4457 .unwrap();
4458
4459 let mut create_version_req = CreateTableVersionRequest::new(2, staging_path.to_string());
4462 create_version_req.id = Some(table_id.clone());
4463 create_version_req.naming_scheme = Some("V2".to_string());
4464
4465 let result = namespace.create_table_version(create_version_req).await;
4466 assert!(
4467 result.is_ok(),
4468 "create_table_version should succeed: {:?}",
4469 result
4470 );
4471
4472 let response = result.unwrap();
4474 let version_info = response
4475 .version
4476 .expect("response should contain version info");
4477 let version_2_path = Path::from(version_info.manifest_path);
4478 let head_result = dataset.object_store().inner.head(&version_2_path).await;
4479 assert!(
4480 head_result.is_ok(),
4481 "Version 2 manifest should exist at {}",
4482 version_2_path
4483 );
4484
4485 let staging_head_result = dataset.object_store().inner.head(&staging_path).await;
4487 assert!(
4488 staging_head_result.is_err(),
4489 "Staging manifest should have been deleted after create_table_version"
4490 );
4491 }
4492
4493 #[tokio::test]
4494 #[cfg(not(windows))]
4495 async fn test_create_table_version_conflict() {
4496 use futures::TryStreamExt;
4499 use lance::dataset::builder::DatasetBuilder;
4500 use lance_namespace::models::CreateTableVersionRequest;
4501
4502 let temp_dir = TempStrDir::default();
4503 let temp_path: &str = &temp_dir;
4504
4505 let namespace: Arc<dyn LanceNamespace> = Arc::new(
4506 DirectoryNamespaceBuilder::new(temp_path)
4507 .table_version_tracking_enabled(true)
4508 .build()
4509 .await
4510 .unwrap(),
4511 );
4512
4513 let schema = create_test_schema();
4515 let ipc_data = create_test_ipc_data(&schema);
4516 let mut create_req = CreateTableRequest::new();
4517 create_req.id = Some(vec!["test_table".to_string()]);
4518 namespace
4519 .create_table(create_req, bytes::Bytes::from(ipc_data))
4520 .await
4521 .unwrap();
4522
4523 let table_id = vec!["test_table".to_string()];
4525 let dataset = DatasetBuilder::from_namespace(namespace.clone(), table_id.clone())
4526 .await
4527 .unwrap()
4528 .load()
4529 .await
4530 .unwrap();
4531
4532 let versions_path = dataset.versions_dir();
4534 let manifest_metas: Vec<_> = dataset
4535 .object_store()
4536 .inner
4537 .list(Some(&versions_path))
4538 .try_collect()
4539 .await
4540 .unwrap();
4541
4542 let manifest_meta = manifest_metas
4543 .iter()
4544 .find(|m| {
4545 m.location
4546 .filename()
4547 .map(|f| f.ends_with(".manifest"))
4548 .unwrap_or(false)
4549 })
4550 .expect("No manifest file found");
4551
4552 let manifest_data = dataset
4554 .object_store()
4555 .inner
4556 .get(&manifest_meta.location)
4557 .await
4558 .unwrap()
4559 .bytes()
4560 .await
4561 .unwrap();
4562
4563 let staging_path = dataset.versions_dir().child("staging_manifest");
4565 dataset
4566 .object_store()
4567 .inner
4568 .put(&staging_path, manifest_data.into())
4569 .await
4570 .unwrap();
4571
4572 let mut create_version_req = CreateTableVersionRequest::new(2, staging_path.to_string());
4574 create_version_req.id = Some(table_id.clone());
4575 create_version_req.naming_scheme = Some("V2".to_string());
4576 let first_result = namespace.create_table_version(create_version_req).await;
4577 assert!(
4578 first_result.is_ok(),
4579 "First create_table_version for version 2 should succeed: {:?}",
4580 first_result
4581 );
4582
4583 let version_2_path = Path::from(
4585 first_result
4586 .unwrap()
4587 .version
4588 .expect("response should contain version info")
4589 .manifest_path,
4590 );
4591
4592 let mut create_version_req = CreateTableVersionRequest::new(2, staging_path.to_string());
4594 create_version_req.id = Some(table_id.clone());
4595 create_version_req.naming_scheme = Some("V2".to_string());
4596
4597 let result = namespace.create_table_version(create_version_req).await;
4598 assert!(
4599 result.is_err(),
4600 "create_table_version should fail for existing version"
4601 );
4602
4603 let head_result = dataset.object_store().inner.head(&version_2_path).await;
4605 assert!(
4606 head_result.is_ok(),
4607 "Version 2 manifest should still exist at {}",
4608 version_2_path
4609 );
4610 }
4611
4612 #[tokio::test]
4613 async fn test_create_table_version_table_not_found() {
4614 use lance_namespace::models::CreateTableVersionRequest;
4615
4616 let temp_dir = TempStdDir::default();
4617 let temp_path = temp_dir.to_str().unwrap();
4618
4619 let namespace = DirectoryNamespaceBuilder::new(temp_path)
4620 .table_version_tracking_enabled(true)
4621 .build()
4622 .await
4623 .unwrap();
4624
4625 let mut create_version_req =
4627 CreateTableVersionRequest::new(1, "/some/staging/path".to_string());
4628 create_version_req.id = Some(vec!["non_existent_table".to_string()]);
4629
4630 let result = namespace.create_table_version(create_version_req).await;
4631 assert!(
4632 result.is_err(),
4633 "create_table_version should fail for non-existent table"
4634 );
4635 let err_msg = result.unwrap_err().to_string();
4636 assert!(
4637 err_msg.contains("does not exist"),
4638 "Error should mention table does not exist, got: {}",
4639 err_msg
4640 );
4641 }
4642
4643 mod e2e_table_version_tracking {
4645 use super::*;
4646 use std::sync::atomic::{AtomicUsize, Ordering};
4647
4648 struct TrackingNamespace {
4650 inner: DirectoryNamespace,
4651 create_table_version_count: AtomicUsize,
4652 describe_table_version_count: AtomicUsize,
4653 list_table_versions_count: AtomicUsize,
4654 }
4655
4656 impl TrackingNamespace {
4657 fn new(inner: DirectoryNamespace) -> Self {
4658 Self {
4659 inner,
4660 create_table_version_count: AtomicUsize::new(0),
4661 describe_table_version_count: AtomicUsize::new(0),
4662 list_table_versions_count: AtomicUsize::new(0),
4663 }
4664 }
4665
4666 fn create_table_version_calls(&self) -> usize {
4667 self.create_table_version_count.load(Ordering::SeqCst)
4668 }
4669
4670 fn describe_table_version_calls(&self) -> usize {
4671 self.describe_table_version_count.load(Ordering::SeqCst)
4672 }
4673
4674 fn list_table_versions_calls(&self) -> usize {
4675 self.list_table_versions_count.load(Ordering::SeqCst)
4676 }
4677 }
4678
4679 impl std::fmt::Debug for TrackingNamespace {
4680 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4681 f.debug_struct("TrackingNamespace")
4682 .field(
4683 "create_table_version_calls",
4684 &self.create_table_version_calls(),
4685 )
4686 .finish()
4687 }
4688 }
4689
4690 #[async_trait]
4691 impl LanceNamespace for TrackingNamespace {
4692 async fn create_namespace(
4693 &self,
4694 request: CreateNamespaceRequest,
4695 ) -> Result<CreateNamespaceResponse> {
4696 self.inner.create_namespace(request).await
4697 }
4698
4699 async fn describe_namespace(
4700 &self,
4701 request: DescribeNamespaceRequest,
4702 ) -> Result<DescribeNamespaceResponse> {
4703 self.inner.describe_namespace(request).await
4704 }
4705
4706 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
4707 self.inner.namespace_exists(request).await
4708 }
4709
4710 async fn list_namespaces(
4711 &self,
4712 request: ListNamespacesRequest,
4713 ) -> Result<ListNamespacesResponse> {
4714 self.inner.list_namespaces(request).await
4715 }
4716
4717 async fn drop_namespace(
4718 &self,
4719 request: DropNamespaceRequest,
4720 ) -> Result<DropNamespaceResponse> {
4721 self.inner.drop_namespace(request).await
4722 }
4723
4724 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
4725 self.inner.list_tables(request).await
4726 }
4727
4728 async fn describe_table(
4729 &self,
4730 request: DescribeTableRequest,
4731 ) -> Result<DescribeTableResponse> {
4732 self.inner.describe_table(request).await
4733 }
4734
4735 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
4736 self.inner.table_exists(request).await
4737 }
4738
4739 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
4740 self.inner.drop_table(request).await
4741 }
4742
4743 async fn create_table(
4744 &self,
4745 request: CreateTableRequest,
4746 request_data: Bytes,
4747 ) -> Result<CreateTableResponse> {
4748 self.inner.create_table(request, request_data).await
4749 }
4750
4751 async fn declare_table(
4752 &self,
4753 request: DeclareTableRequest,
4754 ) -> Result<DeclareTableResponse> {
4755 self.inner.declare_table(request).await
4756 }
4757
4758 async fn list_table_versions(
4759 &self,
4760 request: ListTableVersionsRequest,
4761 ) -> Result<ListTableVersionsResponse> {
4762 self.list_table_versions_count
4763 .fetch_add(1, Ordering::SeqCst);
4764 self.inner.list_table_versions(request).await
4765 }
4766
4767 async fn create_table_version(
4768 &self,
4769 request: CreateTableVersionRequest,
4770 ) -> Result<CreateTableVersionResponse> {
4771 self.create_table_version_count
4772 .fetch_add(1, Ordering::SeqCst);
4773 self.inner.create_table_version(request).await
4774 }
4775
4776 async fn describe_table_version(
4777 &self,
4778 request: DescribeTableVersionRequest,
4779 ) -> Result<DescribeTableVersionResponse> {
4780 self.describe_table_version_count
4781 .fetch_add(1, Ordering::SeqCst);
4782 self.inner.describe_table_version(request).await
4783 }
4784
4785 async fn batch_delete_table_versions(
4786 &self,
4787 request: BatchDeleteTableVersionsRequest,
4788 ) -> Result<BatchDeleteTableVersionsResponse> {
4789 self.inner.batch_delete_table_versions(request).await
4790 }
4791
4792 fn namespace_id(&self) -> String {
4793 self.inner.namespace_id()
4794 }
4795 }
4796
4797 #[tokio::test]
4798 async fn test_describe_table_returns_managed_versioning() {
4799 use lance_namespace::models::{CreateNamespaceRequest, DescribeTableRequest};
4800
4801 let temp_dir = TempStdDir::default();
4802 let temp_path = temp_dir.to_str().unwrap();
4803
4804 let ns = DirectoryNamespaceBuilder::new(temp_path)
4806 .table_version_tracking_enabled(true)
4807 .manifest_enabled(true)
4808 .build()
4809 .await
4810 .unwrap();
4811
4812 let mut create_ns_req = CreateNamespaceRequest::new();
4814 create_ns_req.id = Some(vec!["workspace".to_string()]);
4815 ns.create_namespace(create_ns_req).await.unwrap();
4816
4817 let schema = create_test_schema();
4819 let ipc_data = create_test_ipc_data(&schema);
4820 let mut create_req = CreateTableRequest::new();
4821 create_req.id = Some(vec!["workspace".to_string(), "test_table".to_string()]);
4822 ns.create_table(create_req, bytes::Bytes::from(ipc_data))
4823 .await
4824 .unwrap();
4825
4826 let mut describe_req = DescribeTableRequest::new();
4828 describe_req.id = Some(vec!["workspace".to_string(), "test_table".to_string()]);
4829 let describe_resp = ns.describe_table(describe_req).await.unwrap();
4830
4831 assert_eq!(
4833 describe_resp.managed_versioning,
4834 Some(true),
4835 "managed_versioning should be true when table_version_tracking_enabled=true"
4836 );
4837 }
4838
4839 #[tokio::test]
4840 #[cfg(not(windows))]
4841 async fn test_external_manifest_store_invokes_namespace_apis() {
4842 use arrow::array::{Int32Array, StringArray};
4843 use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
4844 use arrow::record_batch::RecordBatch;
4845 use lance::Dataset;
4846 use lance::dataset::builder::DatasetBuilder;
4847 use lance::dataset::{WriteMode, WriteParams};
4848 use lance_namespace::models::CreateNamespaceRequest;
4849
4850 let temp_dir = TempStdDir::default();
4851 let temp_path = temp_dir.to_str().unwrap();
4852
4853 let inner_ns = DirectoryNamespaceBuilder::new(temp_path)
4855 .table_version_tracking_enabled(true)
4856 .manifest_enabled(true)
4857 .build()
4858 .await
4859 .unwrap();
4860
4861 let tracking_ns = Arc::new(TrackingNamespace::new(inner_ns));
4862 let ns: Arc<dyn LanceNamespace> = tracking_ns.clone();
4863
4864 let mut create_ns_req = CreateNamespaceRequest::new();
4866 create_ns_req.id = Some(vec!["workspace".to_string()]);
4867 ns.create_namespace(create_ns_req).await.unwrap();
4868
4869 let table_id = vec!["workspace".to_string(), "test_table".to_string()];
4871
4872 let arrow_schema = Arc::new(ArrowSchema::new(vec![
4874 Field::new("id", DataType::Int32, false),
4875 Field::new("name", DataType::Utf8, true),
4876 ]));
4877 let batch = RecordBatch::try_new(
4878 arrow_schema.clone(),
4879 vec![
4880 Arc::new(Int32Array::from(vec![1, 2, 3])),
4881 Arc::new(StringArray::from(vec!["a", "b", "c"])),
4882 ],
4883 )
4884 .unwrap();
4885
4886 let batches = RecordBatchIterator::new(vec![Ok(batch.clone())], arrow_schema.clone());
4888 let write_params = WriteParams {
4889 mode: WriteMode::Create,
4890 ..Default::default()
4891 };
4892 let mut dataset = Dataset::write_into_namespace(
4893 batches,
4894 ns.clone(),
4895 table_id.clone(),
4896 Some(write_params),
4897 )
4898 .await
4899 .unwrap();
4900 assert_eq!(dataset.version().version, 1);
4901
4902 assert_eq!(
4904 tracking_ns.create_table_version_calls(),
4905 1,
4906 "create_table_version should have been called once during initial write_into_namespace"
4907 );
4908
4909 let append_batch = RecordBatch::try_new(
4911 arrow_schema.clone(),
4912 vec![
4913 Arc::new(Int32Array::from(vec![4, 5, 6])),
4914 Arc::new(StringArray::from(vec!["d", "e", "f"])),
4915 ],
4916 )
4917 .unwrap();
4918 let append_batches = RecordBatchIterator::new(vec![Ok(append_batch)], arrow_schema);
4919 dataset.append(append_batches, None).await.unwrap();
4920
4921 assert_eq!(
4922 tracking_ns.create_table_version_calls(),
4923 2,
4924 "create_table_version should have been called twice (once for create, once for append)"
4925 );
4926
4927 let initial_list_calls = tracking_ns.list_table_versions_calls();
4929 let latest_dataset = DatasetBuilder::from_namespace(ns.clone(), table_id.clone())
4930 .await
4931 .unwrap()
4932 .load()
4933 .await
4934 .unwrap();
4935 assert_eq!(latest_dataset.version().version, 2);
4936 assert_eq!(
4937 tracking_ns.list_table_versions_calls(),
4938 initial_list_calls + 1,
4939 "list_table_versions should have been called exactly once during checkout_latest"
4940 );
4941
4942 let initial_describe_calls = tracking_ns.describe_table_version_calls();
4944 let v1_dataset = DatasetBuilder::from_namespace(ns.clone(), table_id.clone())
4945 .await
4946 .unwrap()
4947 .with_version(1)
4948 .load()
4949 .await
4950 .unwrap();
4951 assert_eq!(v1_dataset.version().version, 1);
4952 assert_eq!(
4953 tracking_ns.describe_table_version_calls(),
4954 initial_describe_calls + 1,
4955 "describe_table_version should have been called exactly once during checkout to version 1"
4956 );
4957 }
4958
4959 #[tokio::test]
4960 #[cfg(not(windows))]
4961 async fn test_dataset_commit_with_external_manifest_store() {
4962 use arrow::array::{Int32Array, StringArray};
4963 use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
4964 use arrow::record_batch::RecordBatch;
4965 use futures::TryStreamExt;
4966 use lance::dataset::{Dataset, WriteMode, WriteParams};
4967 use lance_namespace::models::CreateNamespaceRequest;
4968 use lance_table::io::commit::ManifestNamingScheme;
4969
4970 let temp_dir = TempStdDir::default();
4971 let temp_path = temp_dir.to_str().unwrap();
4972
4973 let inner_ns = DirectoryNamespaceBuilder::new(temp_path)
4975 .table_version_tracking_enabled(true)
4976 .manifest_enabled(true)
4977 .build()
4978 .await
4979 .unwrap();
4980
4981 let tracking_ns: Arc<dyn LanceNamespace> = Arc::new(TrackingNamespace::new(inner_ns));
4982
4983 let mut create_ns_req = CreateNamespaceRequest::new();
4985 create_ns_req.id = Some(vec!["workspace".to_string()]);
4986 tracking_ns.create_namespace(create_ns_req).await.unwrap();
4987
4988 let table_id = vec!["workspace".to_string(), "test_table".to_string()];
4990 let arrow_schema = Arc::new(ArrowSchema::new(vec![
4991 Field::new("id", DataType::Int32, false),
4992 Field::new("name", DataType::Utf8, true),
4993 ]));
4994 let batch = RecordBatch::try_new(
4995 arrow_schema.clone(),
4996 vec![
4997 Arc::new(Int32Array::from(vec![1, 2, 3])),
4998 Arc::new(StringArray::from(vec!["a", "b", "c"])),
4999 ],
5000 )
5001 .unwrap();
5002 let batches = RecordBatchIterator::new(vec![Ok(batch)], arrow_schema.clone());
5003 let write_params = WriteParams {
5004 mode: WriteMode::Create,
5005 ..Default::default()
5006 };
5007 let dataset = Dataset::write_into_namespace(
5008 batches,
5009 tracking_ns.clone(),
5010 table_id.clone(),
5011 Some(write_params),
5012 )
5013 .await
5014 .unwrap();
5015 assert_eq!(dataset.version().version, 1);
5016
5017 let batch2 = RecordBatch::try_new(
5019 arrow_schema.clone(),
5020 vec![
5021 Arc::new(Int32Array::from(vec![4, 5, 6])),
5022 Arc::new(StringArray::from(vec!["d", "e", "f"])),
5023 ],
5024 )
5025 .unwrap();
5026 let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema);
5027 let write_params = WriteParams {
5028 mode: WriteMode::Append,
5029 ..Default::default()
5030 };
5031 Dataset::write_into_namespace(
5032 batches,
5033 tracking_ns.clone(),
5034 table_id.clone(),
5035 Some(write_params),
5036 )
5037 .await
5038 .unwrap();
5039
5040 let manifest_metas: Vec<_> = dataset
5043 .object_store()
5044 .inner
5045 .list(Some(&dataset.versions_dir()))
5046 .try_collect()
5047 .await
5048 .unwrap();
5049 let version_2_found = manifest_metas.iter().any(|m| {
5050 m.location
5051 .filename()
5052 .map(|f| {
5053 f.ends_with(".manifest")
5054 && ManifestNamingScheme::V2.parse_version(f) == Some(2)
5055 })
5056 .unwrap_or(false)
5057 });
5058 assert!(
5059 version_2_found,
5060 "Version 2 manifest should exist in versions directory"
5061 );
5062 }
5063 }
5064
5065 mod multi_table_transactions {
5067 use super::*;
5068 use futures::TryStreamExt;
5069 use lance::dataset::builder::DatasetBuilder;
5070 use lance_namespace::models::CreateTableVersionRequest;
5071
5072 async fn create_managed_namespace(temp_path: &str) -> Arc<DirectoryNamespace> {
5074 Arc::new(
5075 DirectoryNamespaceBuilder::new(temp_path)
5076 .table_version_tracking_enabled(true)
5077 .table_version_storage_enabled(true)
5078 .manifest_enabled(true)
5079 .build()
5080 .await
5081 .unwrap(),
5082 )
5083 }
5084
5085 async fn create_table_and_get_staging(
5087 namespace: Arc<dyn LanceNamespace>,
5088 table_name: &str,
5089 ) -> (Vec<String>, object_store::path::Path) {
5090 let schema = create_test_schema();
5091 let ipc_data = create_test_ipc_data(&schema);
5092 let mut create_req = CreateTableRequest::new();
5093 create_req.id = Some(vec![table_name.to_string()]);
5094 namespace
5095 .create_table(create_req, bytes::Bytes::from(ipc_data))
5096 .await
5097 .unwrap();
5098
5099 let table_id = vec![table_name.to_string()];
5100 let dataset = DatasetBuilder::from_namespace(namespace.clone(), table_id.clone())
5101 .await
5102 .unwrap()
5103 .load()
5104 .await
5105 .unwrap();
5106
5107 let versions_path = dataset.versions_dir();
5109 let manifest_metas: Vec<_> = dataset
5110 .object_store()
5111 .inner
5112 .list(Some(&versions_path))
5113 .try_collect()
5114 .await
5115 .unwrap();
5116
5117 let manifest_meta = manifest_metas
5118 .iter()
5119 .find(|m| {
5120 m.location
5121 .filename()
5122 .map(|f| f.ends_with(".manifest"))
5123 .unwrap_or(false)
5124 })
5125 .expect("No manifest file found");
5126
5127 let manifest_data = dataset
5128 .object_store()
5129 .inner
5130 .get(&manifest_meta.location)
5131 .await
5132 .unwrap()
5133 .bytes()
5134 .await
5135 .unwrap();
5136
5137 let staging_path = dataset
5138 .versions_dir()
5139 .child(format!("staging_{}", table_name));
5140 dataset
5141 .object_store()
5142 .inner
5143 .put(&staging_path, manifest_data.into())
5144 .await
5145 .unwrap();
5146
5147 (table_id, staging_path)
5148 }
5149
5150 #[tokio::test]
5151 async fn test_table_version_storage_enabled_requires_manifest() {
5152 let temp_dir = TempStdDir::default();
5154 let temp_path = temp_dir.to_str().unwrap();
5155
5156 let result = DirectoryNamespaceBuilder::new(temp_path)
5157 .table_version_storage_enabled(true)
5158 .manifest_enabled(false)
5159 .build()
5160 .await;
5161
5162 assert!(
5163 result.is_err(),
5164 "Should fail when table_version_storage_enabled=true but manifest_enabled=false"
5165 );
5166 }
5167
5168 #[tokio::test]
5169 #[cfg(not(windows))]
5170 async fn test_create_table_version_records_in_manifest() {
5171 let temp_dir = TempStrDir::default();
5174 let temp_path: &str = &temp_dir;
5175
5176 let namespace = create_managed_namespace(temp_path).await;
5177 let ns: Arc<dyn LanceNamespace> = namespace.clone();
5178
5179 let (table_id, staging_path) =
5180 create_table_and_get_staging(ns.clone(), "table_managed").await;
5181
5182 let mut create_req = CreateTableVersionRequest::new(2, staging_path.to_string());
5184 create_req.id = Some(table_id.clone());
5185 create_req.naming_scheme = Some("V2".to_string());
5186 let response = namespace.create_table_version(create_req).await.unwrap();
5187
5188 assert!(response.version.is_some());
5189 let version = response.version.unwrap();
5190 assert_eq!(version.version, 2);
5191
5192 let manifest_ns = namespace.manifest_ns.as_ref().unwrap();
5194 let table_id_str = manifest::ManifestNamespace::str_object_id(&table_id);
5195 let versions = manifest_ns
5196 .query_table_versions(&table_id_str, false, None)
5197 .await
5198 .unwrap();
5199
5200 assert!(
5201 !versions.is_empty(),
5202 "Version should be recorded in __manifest"
5203 );
5204 let (ver, _path) = &versions[0];
5205 assert_eq!(*ver, 2, "Recorded version should be 2");
5206 }
5207 }
5208}