1pub mod manifest;
10
11use arrow::record_batch::RecordBatchIterator;
12use arrow_ipc::reader::StreamReader;
13use async_trait::async_trait;
14use bytes::Bytes;
15use lance::dataset::{Dataset, WriteParams};
16use lance::session::Session;
17use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
18use object_store::path::Path;
19use std::collections::HashMap;
20use std::io::Cursor;
21use std::sync::Arc;
22
23use lance_namespace::models::{
24 CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
25 CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DescribeNamespaceRequest,
26 DescribeNamespaceResponse, DescribeTableRequest, DescribeTableResponse, DropNamespaceRequest,
27 DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest,
28 ListNamespacesResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest,
29 TableExistsRequest,
30};
31
32use lance_core::{box_error, Error, Result};
33use lance_namespace::schema::arrow_schema_to_json;
34use lance_namespace::LanceNamespace;
35
36#[derive(Debug, Clone)]
71pub struct DirectoryNamespaceBuilder {
72 root: String,
73 storage_options: Option<HashMap<String, String>>,
74 session: Option<Arc<Session>>,
75 manifest_enabled: bool,
76 dir_listing_enabled: bool,
77 inline_optimization_enabled: bool,
78}
79
80impl DirectoryNamespaceBuilder {
81 pub fn new(root: impl Into<String>) -> Self {
87 Self {
88 root: root.into().trim_end_matches('/').to_string(),
89 storage_options: None,
90 session: None,
91 manifest_enabled: true,
92 dir_listing_enabled: true, inline_optimization_enabled: true,
94 }
95 }
96
97 pub fn manifest_enabled(mut self, enabled: bool) -> Self {
102 self.manifest_enabled = enabled;
103 self
104 }
105
106 pub fn dir_listing_enabled(mut self, enabled: bool) -> Self {
111 self.dir_listing_enabled = enabled;
112 self
113 }
114
115 pub fn inline_optimization_enabled(mut self, enabled: bool) -> Self {
121 self.inline_optimization_enabled = enabled;
122 self
123 }
124
125 pub fn from_properties(
167 properties: HashMap<String, String>,
168 session: Option<Arc<Session>>,
169 ) -> Result<Self> {
170 let root = properties
172 .get("root")
173 .cloned()
174 .ok_or_else(|| Error::Namespace {
175 source: "Missing required property 'root' for directory namespace".into(),
176 location: snafu::location!(),
177 })?;
178
179 let storage_options: HashMap<String, String> = properties
181 .iter()
182 .filter_map(|(k, v)| {
183 k.strip_prefix("storage.")
184 .map(|key| (key.to_string(), v.clone()))
185 })
186 .collect();
187
188 let storage_options = if storage_options.is_empty() {
189 None
190 } else {
191 Some(storage_options)
192 };
193
194 let manifest_enabled = properties
196 .get("manifest_enabled")
197 .and_then(|v| v.parse::<bool>().ok())
198 .unwrap_or(true);
199
200 let dir_listing_enabled = properties
202 .get("dir_listing_enabled")
203 .and_then(|v| v.parse::<bool>().ok())
204 .unwrap_or(true);
205
206 let inline_optimization_enabled = properties
208 .get("inline_optimization_enabled")
209 .and_then(|v| v.parse::<bool>().ok())
210 .unwrap_or(true);
211
212 Ok(Self {
213 root: root.trim_end_matches('/').to_string(),
214 storage_options,
215 session,
216 manifest_enabled,
217 dir_listing_enabled,
218 inline_optimization_enabled,
219 })
220 }
221
222 pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
229 self.storage_options
230 .get_or_insert_with(HashMap::new)
231 .insert(key.into(), value.into());
232 self
233 }
234
235 pub fn storage_options(mut self, options: HashMap<String, String>) -> Self {
241 self.storage_options
242 .get_or_insert_with(HashMap::new)
243 .extend(options);
244 self
245 }
246
247 pub fn session(mut self, session: Arc<Session>) -> Self {
257 self.session = Some(session);
258 self
259 }
260
261 pub async fn build(self) -> Result<DirectoryNamespace> {
274 let (object_store, base_path) =
275 Self::initialize_object_store(&self.root, &self.storage_options, &self.session).await?;
276
277 let manifest_ns = if self.manifest_enabled {
278 match manifest::ManifestNamespace::from_directory(
279 self.root.clone(),
280 self.storage_options.clone(),
281 self.session.clone(),
282 object_store.clone(),
283 base_path.clone(),
284 self.dir_listing_enabled,
285 self.inline_optimization_enabled,
286 )
287 .await
288 {
289 Ok(ns) => Some(Arc::new(ns)),
290 Err(e) => {
291 log::warn!(
293 "Failed to initialize manifest namespace, falling back to directory listing only: {}",
294 e
295 );
296 None
297 }
298 }
299 } else {
300 None
301 };
302
303 Ok(DirectoryNamespace {
304 root: self.root,
305 storage_options: self.storage_options,
306 session: self.session,
307 object_store,
308 base_path,
309 manifest_ns,
310 dir_listing_enabled: self.dir_listing_enabled,
311 })
312 }
313
314 async fn initialize_object_store(
316 root: &str,
317 storage_options: &Option<HashMap<String, String>>,
318 session: &Option<Arc<Session>>,
319 ) -> Result<(Arc<ObjectStore>, Path)> {
320 let params = ObjectStoreParams {
322 storage_options: storage_options.clone(),
323 ..Default::default()
324 };
325
326 let registry = if let Some(session) = session {
328 session.store_registry()
329 } else {
330 Arc::new(ObjectStoreRegistry::default())
331 };
332
333 let (object_store, base_path) = ObjectStore::from_uri_and_params(registry, root, ¶ms)
335 .await
336 .map_err(|e| Error::Namespace {
337 source: format!("Failed to create object store: {}", e).into(),
338 location: snafu::location!(),
339 })?;
340
341 Ok((object_store, base_path))
342 }
343}
344
345pub struct DirectoryNamespace {
361 root: String,
362 storage_options: Option<HashMap<String, String>>,
363 #[allow(dead_code)]
364 session: Option<Arc<Session>>,
365 object_store: Arc<ObjectStore>,
366 base_path: Path,
367 manifest_ns: Option<Arc<manifest::ManifestNamespace>>,
368 dir_listing_enabled: bool,
369}
370
371impl std::fmt::Debug for DirectoryNamespace {
372 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
373 write!(f, "{}", self.namespace_id())
374 }
375}
376
377impl std::fmt::Display for DirectoryNamespace {
378 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
379 write!(f, "{}", self.namespace_id())
380 }
381}
382
383impl DirectoryNamespace {
384 fn apply_pagination(names: &mut Vec<String>, page_token: Option<String>, limit: Option<i32>) {
393 names.sort();
395
396 if let Some(start_after) = page_token {
398 if let Some(index) = names
399 .iter()
400 .position(|name| name.as_str() > start_after.as_str())
401 {
402 names.drain(0..index);
403 } else {
404 names.clear();
405 }
406 }
407
408 if let Some(limit) = limit {
410 if limit >= 0 {
411 names.truncate(limit as usize);
412 }
413 }
414 }
415
416 async fn list_directory_tables(&self) -> Result<Vec<String>> {
418 let mut tables = Vec::new();
419 let entries = self
420 .object_store
421 .read_dir(self.base_path.clone())
422 .await
423 .map_err(|e| Error::IO {
424 source: box_error(std::io::Error::other(format!(
425 "Failed to list directory: {}",
426 e
427 ))),
428 location: snafu::location!(),
429 })?;
430
431 for entry in entries {
432 let path = entry.trim_end_matches('/');
433 if !path.ends_with(".lance") {
434 continue;
435 }
436
437 let table_name = &path[..path.len() - 6];
438 tables.push(table_name.to_string());
439 }
440
441 Ok(tables)
442 }
443
444 fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
446 if let Some(id) = id {
447 if !id.is_empty() {
448 return Err(Error::Namespace {
449 source: format!(
450 "Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
451 id
452 ).into(),
453 location: snafu::location!(),
454 });
455 }
456 }
457 Ok(())
458 }
459
460 fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
462 let id = id.as_ref().ok_or_else(|| Error::Namespace {
463 source: "Directory namespace table ID cannot be empty".into(),
464 location: snafu::location!(),
465 })?;
466
467 if id.len() != 1 {
468 return Err(Error::Namespace {
469 source: format!(
470 "Multi-level table IDs are only supported when manifest mode is enabled, but got: {:?}",
471 id
472 )
473 .into(),
474 location: snafu::location!(),
475 });
476 }
477
478 Ok(id[0].clone())
479 }
480
481 fn table_full_uri(&self, table_name: &str) -> String {
483 format!("{}/{}.lance", &self.root, table_name)
484 }
485
486 fn table_path(&self, table_name: &str) -> Path {
488 self.base_path
489 .child(format!("{}.lance", table_name).as_str())
490 }
491
492 fn table_reserved_file_path(&self, table_name: &str) -> Path {
494 self.base_path
495 .child(format!("{}.lance", table_name).as_str())
496 .child(".lance-reserved")
497 }
498
499 pub async fn migrate(&self) -> Result<usize> {
552 let Some(ref manifest_ns) = self.manifest_ns else {
554 return Ok(0); };
556
557 let manifest_locations = manifest_ns.list_manifest_table_locations().await?;
559
560 let dir_tables = self.list_directory_tables().await?;
562
563 let mut migrated_count = 0;
568 for table_name in dir_tables {
569 let dir_name = format!("{}.lance", table_name);
571 if !manifest_locations.contains(&dir_name) {
572 manifest_ns.register_table(&table_name, dir_name).await?;
573 migrated_count += 1;
574 }
575 }
576
577 Ok(migrated_count)
578 }
579}
580
581#[async_trait]
582impl LanceNamespace for DirectoryNamespace {
583 async fn list_namespaces(
584 &self,
585 request: ListNamespacesRequest,
586 ) -> Result<ListNamespacesResponse> {
587 if let Some(ref manifest_ns) = self.manifest_ns {
588 return manifest_ns.list_namespaces(request).await;
589 }
590
591 Self::validate_root_namespace_id(&request.id)?;
592 Ok(ListNamespacesResponse::new(vec![]))
593 }
594
595 async fn describe_namespace(
596 &self,
597 request: DescribeNamespaceRequest,
598 ) -> Result<DescribeNamespaceResponse> {
599 if let Some(ref manifest_ns) = self.manifest_ns {
600 return manifest_ns.describe_namespace(request).await;
601 }
602
603 Self::validate_root_namespace_id(&request.id)?;
604 Ok(DescribeNamespaceResponse {
605 properties: Some(HashMap::new()),
606 })
607 }
608
609 async fn create_namespace(
610 &self,
611 request: CreateNamespaceRequest,
612 ) -> Result<CreateNamespaceResponse> {
613 if let Some(ref manifest_ns) = self.manifest_ns {
614 return manifest_ns.create_namespace(request).await;
615 }
616
617 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
618 return Err(Error::Namespace {
619 source: "Root namespace already exists and cannot be created".into(),
620 location: snafu::location!(),
621 });
622 }
623
624 Err(Error::NotSupported {
625 source: "Child namespaces are only supported when manifest mode is enabled".into(),
626 location: snafu::location!(),
627 })
628 }
629
630 async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
631 if let Some(ref manifest_ns) = self.manifest_ns {
632 return manifest_ns.drop_namespace(request).await;
633 }
634
635 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
636 return Err(Error::Namespace {
637 source: "Root namespace cannot be dropped".into(),
638 location: snafu::location!(),
639 });
640 }
641
642 Err(Error::NotSupported {
643 source: "Child namespaces are only supported when manifest mode is enabled".into(),
644 location: snafu::location!(),
645 })
646 }
647
648 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
649 if let Some(ref manifest_ns) = self.manifest_ns {
650 return manifest_ns.namespace_exists(request).await;
651 }
652
653 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
654 return Ok(());
655 }
656
657 Err(Error::Namespace {
658 source: "Child namespaces are only supported when manifest mode is enabled".into(),
659 location: snafu::location!(),
660 })
661 }
662
663 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
664 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
666 source: "Namespace ID is required".into(),
667 location: snafu::location!(),
668 })?;
669
670 if !namespace_id.is_empty() {
672 if let Some(ref manifest_ns) = self.manifest_ns {
673 return manifest_ns.list_tables(request).await;
674 }
675 return Err(Error::NotSupported {
676 source: "Child namespaces are only supported when manifest mode is enabled".into(),
677 location: snafu::location!(),
678 });
679 }
680
681 if let Some(ref manifest_ns) = self.manifest_ns {
683 if !self.dir_listing_enabled {
684 return manifest_ns.list_tables(request).await;
685 }
686 }
687
688 let mut tables = if self.manifest_ns.is_some() && self.dir_listing_enabled {
690 let manifest_locations = if let Some(ref manifest_ns) = self.manifest_ns {
692 manifest_ns.list_manifest_table_locations().await?
693 } else {
694 std::collections::HashSet::new()
695 };
696
697 let mut manifest_request = request.clone();
699 manifest_request.limit = None;
700 manifest_request.page_token = None;
701 let manifest_tables = if let Some(ref manifest_ns) = self.manifest_ns {
702 let manifest_response = manifest_ns.list_tables(manifest_request).await?;
703 manifest_response.tables
704 } else {
705 vec![]
706 };
707
708 let mut all_tables: Vec<String> = manifest_tables;
711 let dir_tables = self.list_directory_tables().await?;
712 for table_name in dir_tables {
713 let full_location = format!("{}/{}.lance", self.root, table_name);
716 let relative_location = format!("{}.lance", table_name);
717 if !manifest_locations.contains(&full_location)
718 && !manifest_locations.contains(&relative_location)
719 {
720 all_tables.push(table_name);
721 }
722 }
723
724 all_tables
725 } else {
726 self.list_directory_tables().await?
727 };
728
729 Self::apply_pagination(&mut tables, request.page_token, request.limit);
731 let response = ListTablesResponse::new(tables);
732 Ok(response)
733 }
734
735 async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
736 if let Some(ref manifest_ns) = self.manifest_ns {
737 match manifest_ns.describe_table(request.clone()).await {
738 Ok(response) => return Ok(response),
739 Err(_)
740 if self.dir_listing_enabled
741 && request.id.as_ref().is_some_and(|id| id.len() == 1) =>
742 {
743 }
745 Err(e) => return Err(e),
746 }
747 }
748
749 let table_name = Self::table_name_from_id(&request.id)?;
750 let table_uri = self.table_full_uri(&table_name);
751
752 let table_path = self.table_path(&table_name);
753 let dir_exists = self
754 .object_store
755 .read_dir(table_path)
756 .await
757 .map(|entries| !entries.is_empty())
758 .unwrap_or(false);
759
760 if !dir_exists {
761 return Err(Error::Namespace {
762 source: format!("Table does not exist: {}", table_name).into(),
763 location: snafu::location!(),
764 });
765 }
766
767 match Dataset::open(&table_uri).await {
769 Ok(mut dataset) => {
770 if let Some(requested_version) = request.version {
772 dataset = dataset.checkout_version(requested_version as u64).await?;
773 }
774
775 let version = dataset.version().version;
776 let lance_schema = dataset.schema();
777 let arrow_schema: arrow_schema::Schema = lance_schema.into();
778 let json_schema = arrow_schema_to_json(&arrow_schema)?;
779 Ok(DescribeTableResponse {
780 version: Some(version as i64),
781 location: Some(table_uri),
782 schema: Some(Box::new(json_schema)),
783 properties: None,
784 storage_options: self.storage_options.clone(),
785 })
786 }
787 Err(err) => {
788 let reserved_file_path = self.table_reserved_file_path(&table_name);
789 if self
790 .object_store
791 .exists(&reserved_file_path)
792 .await
793 .unwrap_or(false)
794 {
795 Ok(DescribeTableResponse {
796 version: None,
797 location: Some(table_uri),
798 schema: None,
799 properties: None,
800 storage_options: self.storage_options.clone(),
801 })
802 } else {
803 Err(Error::Namespace {
804 source: format!(
805 "Table directory exists but cannot load dataset {}: {:?}",
806 table_name, err
807 )
808 .into(),
809 location: snafu::location!(),
810 })
811 }
812 }
813 }
814 }
815
816 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
817 if let Some(ref manifest_ns) = self.manifest_ns {
818 match manifest_ns.table_exists(request.clone()).await {
819 Ok(()) => return Ok(()),
820 Err(_) if self.dir_listing_enabled => {
821 }
823 Err(e) => return Err(e),
824 }
825 }
826
827 let table_name = Self::table_name_from_id(&request.id)?;
828 let table_path = self.table_path(&table_name);
829 let table_exists = self
830 .object_store
831 .read_dir(table_path)
832 .await
833 .map(|entries| !entries.is_empty())
834 .unwrap_or(false);
835
836 if !table_exists {
837 return Err(Error::Namespace {
838 source: format!("Table does not exist: {}", table_name).into(),
839 location: snafu::location!(),
840 });
841 }
842
843 Ok(())
844 }
845
846 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
847 if let Some(ref manifest_ns) = self.manifest_ns {
848 return manifest_ns.drop_table(request).await;
849 }
850
851 let table_name = Self::table_name_from_id(&request.id)?;
852 let table_uri = self.table_full_uri(&table_name);
853 let table_path = self.table_path(&table_name);
854
855 self.object_store
856 .remove_dir_all(table_path)
857 .await
858 .map_err(|e| Error::Namespace {
859 source: format!("Failed to drop table {}: {}", table_name, e).into(),
860 location: snafu::location!(),
861 })?;
862
863 Ok(DropTableResponse {
864 id: request.id,
865 location: Some(table_uri),
866 properties: None,
867 transaction_id: None,
868 })
869 }
870
871 async fn create_table(
872 &self,
873 request: CreateTableRequest,
874 request_data: Bytes,
875 ) -> Result<CreateTableResponse> {
876 if let Some(ref manifest_ns) = self.manifest_ns {
877 return manifest_ns.create_table(request, request_data).await;
878 }
879
880 let table_name = Self::table_name_from_id(&request.id)?;
881 let table_uri = self.table_full_uri(&table_name);
882 if request_data.is_empty() {
883 return Err(Error::Namespace {
884 source: "Request data (Arrow IPC stream) is required for create_table".into(),
885 location: snafu::location!(),
886 });
887 }
888
889 if let Some(location) = &request.location {
891 let location = location.trim_end_matches('/');
892 if location != table_uri {
893 return Err(Error::Namespace {
894 source: format!(
895 "Cannot create table {} at location {}, must be at location {}",
896 table_name, location, table_uri
897 )
898 .into(),
899 location: snafu::location!(),
900 });
901 }
902 }
903
904 let cursor = Cursor::new(request_data.to_vec());
906 let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Namespace {
907 source: format!("Invalid Arrow IPC stream: {}", e).into(),
908 location: snafu::location!(),
909 })?;
910 let arrow_schema = stream_reader.schema();
911
912 let mut batches = Vec::new();
914 for batch_result in stream_reader {
915 batches.push(batch_result.map_err(|e| Error::Namespace {
916 source: format!("Failed to read batch from IPC stream: {}", e).into(),
917 location: snafu::location!(),
918 })?);
919 }
920
921 let reader = if batches.is_empty() {
923 let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
924 let batches = vec![Ok(batch)];
925 RecordBatchIterator::new(batches, arrow_schema.clone())
926 } else {
927 let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
928 RecordBatchIterator::new(batch_results, arrow_schema)
929 };
930
931 let store_params = self.storage_options.as_ref().map(|opts| ObjectStoreParams {
932 storage_options: Some(opts.clone()),
933 ..Default::default()
934 });
935
936 let write_params = WriteParams {
937 mode: lance::dataset::WriteMode::Create,
938 store_params,
939 ..Default::default()
940 };
941
942 Dataset::write(reader, &table_uri, Some(write_params))
944 .await
945 .map_err(|e| Error::Namespace {
946 source: format!("Failed to create Lance dataset: {}", e).into(),
947 location: snafu::location!(),
948 })?;
949
950 Ok(CreateTableResponse {
951 version: Some(1),
952 location: Some(table_uri),
953 properties: None,
954 storage_options: self.storage_options.clone(),
955 })
956 }
957
958 async fn create_empty_table(
959 &self,
960 request: CreateEmptyTableRequest,
961 ) -> Result<CreateEmptyTableResponse> {
962 if let Some(ref manifest_ns) = self.manifest_ns {
963 return manifest_ns.create_empty_table(request).await;
964 }
965
966 let table_name = Self::table_name_from_id(&request.id)?;
967 let table_uri = self.table_full_uri(&table_name);
968
969 if let Some(location) = &request.location {
971 let location = location.trim_end_matches('/');
972 if location != table_uri {
973 return Err(Error::Namespace {
974 source: format!(
975 "Cannot create table {} at location {}, must be at location {}",
976 table_name, location, table_uri
977 )
978 .into(),
979 location: snafu::location!(),
980 });
981 }
982 }
983
984 let reserved_file_path = self.table_reserved_file_path(&table_name);
986
987 self.object_store
988 .create(&reserved_file_path)
989 .await
990 .map_err(|e| Error::Namespace {
991 source: format!(
992 "Failed to create .lance-reserved file for table {}: {}",
993 table_name, e
994 )
995 .into(),
996 location: snafu::location!(),
997 })?
998 .shutdown()
999 .await
1000 .map_err(|e| Error::Namespace {
1001 source: format!(
1002 "Failed to finalize .lance-reserved file for table {}: {}",
1003 table_name, e
1004 )
1005 .into(),
1006 location: snafu::location!(),
1007 })?;
1008
1009 Ok(CreateEmptyTableResponse {
1010 location: Some(table_uri),
1011 properties: None,
1012 storage_options: self.storage_options.clone(),
1013 })
1014 }
1015
1016 async fn register_table(
1017 &self,
1018 request: lance_namespace::models::RegisterTableRequest,
1019 ) -> Result<lance_namespace::models::RegisterTableResponse> {
1020 if let Some(ref manifest_ns) = self.manifest_ns {
1022 return LanceNamespace::register_table(manifest_ns.as_ref(), request).await;
1023 }
1024
1025 Err(Error::NotSupported {
1027 source: "register_table is only supported when manifest mode is enabled".into(),
1028 location: snafu::location!(),
1029 })
1030 }
1031
1032 async fn deregister_table(
1033 &self,
1034 request: lance_namespace::models::DeregisterTableRequest,
1035 ) -> Result<lance_namespace::models::DeregisterTableResponse> {
1036 if let Some(ref manifest_ns) = self.manifest_ns {
1038 return LanceNamespace::deregister_table(manifest_ns.as_ref(), request).await;
1039 }
1040
1041 Err(Error::NotSupported {
1043 source: "deregister_table is only supported when manifest mode is enabled".into(),
1044 location: snafu::location!(),
1045 })
1046 }
1047
1048 fn namespace_id(&self) -> String {
1049 format!("DirectoryNamespace {{ root: {:?} }}", self.root)
1050 }
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055 use super::*;
1056 use arrow_ipc::reader::StreamReader;
1057 use lance::dataset::Dataset;
1058 use lance_core::utils::tempfile::TempStdDir;
1059 use lance_namespace::models::{
1060 CreateTableRequest, JsonArrowDataType, JsonArrowField, JsonArrowSchema, ListTablesRequest,
1061 };
1062 use lance_namespace::schema::convert_json_arrow_schema;
1063 use std::io::Cursor;
1064 use std::sync::Arc;
1065
1066 async fn create_test_namespace() -> (DirectoryNamespace, TempStdDir) {
1068 let temp_dir = TempStdDir::default();
1069
1070 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1071 .build()
1072 .await
1073 .unwrap();
1074 (namespace, temp_dir)
1075 }
1076
1077 fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
1079 use arrow::ipc::writer::StreamWriter;
1080
1081 let arrow_schema = convert_json_arrow_schema(schema).unwrap();
1082 let arrow_schema = Arc::new(arrow_schema);
1083 let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
1084 let mut buffer = Vec::new();
1085 {
1086 let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
1087 writer.write(&batch).unwrap();
1088 writer.finish().unwrap();
1089 }
1090 buffer
1091 }
1092
1093 fn create_test_schema() -> JsonArrowSchema {
1095 let int_type = JsonArrowDataType::new("int32".to_string());
1096 let string_type = JsonArrowDataType::new("utf8".to_string());
1097
1098 let id_field = JsonArrowField {
1099 name: "id".to_string(),
1100 r#type: Box::new(int_type),
1101 nullable: false,
1102 metadata: None,
1103 };
1104
1105 let name_field = JsonArrowField {
1106 name: "name".to_string(),
1107 r#type: Box::new(string_type),
1108 nullable: true,
1109 metadata: None,
1110 };
1111
1112 JsonArrowSchema {
1113 fields: vec![id_field, name_field],
1114 metadata: None,
1115 }
1116 }
1117
1118 #[tokio::test]
1119 async fn test_create_table() {
1120 let (namespace, _temp_dir) = create_test_namespace().await;
1121
1122 let schema = create_test_schema();
1124 let ipc_data = create_test_ipc_data(&schema);
1125
1126 let mut request = CreateTableRequest::new();
1127 request.id = Some(vec!["test_table".to_string()]);
1128
1129 let response = namespace
1130 .create_table(request, bytes::Bytes::from(ipc_data))
1131 .await
1132 .unwrap();
1133
1134 assert!(response.location.is_some());
1135 assert!(response.location.unwrap().ends_with("test_table.lance"));
1136 assert_eq!(response.version, Some(1));
1137 }
1138
1139 #[tokio::test]
1140 async fn test_create_table_without_data() {
1141 let (namespace, _temp_dir) = create_test_namespace().await;
1142
1143 let mut request = CreateTableRequest::new();
1144 request.id = Some(vec!["test_table".to_string()]);
1145
1146 let result = namespace.create_table(request, bytes::Bytes::new()).await;
1147 assert!(result.is_err());
1148 assert!(result
1149 .unwrap_err()
1150 .to_string()
1151 .contains("Arrow IPC stream) is required"));
1152 }
1153
1154 #[tokio::test]
1155 async fn test_create_table_with_invalid_id() {
1156 let (namespace, _temp_dir) = create_test_namespace().await;
1157
1158 let schema = create_test_schema();
1160 let ipc_data = create_test_ipc_data(&schema);
1161
1162 let mut request = CreateTableRequest::new();
1164 request.id = Some(vec![]);
1165
1166 let result = namespace
1167 .create_table(request, bytes::Bytes::from(ipc_data.clone()))
1168 .await;
1169 assert!(result.is_err());
1170
1171 let mut create_ns_req = CreateNamespaceRequest::new();
1174 create_ns_req.id = Some(vec!["test_namespace".to_string()]);
1175 namespace.create_namespace(create_ns_req).await.unwrap();
1176
1177 let mut request = CreateTableRequest::new();
1179 request.id = Some(vec!["test_namespace".to_string(), "table".to_string()]);
1180
1181 let result = namespace
1182 .create_table(request, bytes::Bytes::from(ipc_data))
1183 .await;
1184 assert!(
1186 result.is_ok(),
1187 "Multi-level table IDs should work with manifest enabled"
1188 );
1189 }
1190
1191 #[tokio::test]
1192 async fn test_create_table_with_wrong_location() {
1193 let (namespace, _temp_dir) = create_test_namespace().await;
1194
1195 let schema = create_test_schema();
1197 let ipc_data = create_test_ipc_data(&schema);
1198
1199 let mut request = CreateTableRequest::new();
1200 request.id = Some(vec!["test_table".to_string()]);
1201 request.location = Some("/wrong/path/table.lance".to_string());
1202
1203 let result = namespace
1204 .create_table(request, bytes::Bytes::from(ipc_data))
1205 .await;
1206 assert!(result.is_err());
1207 assert!(result
1208 .unwrap_err()
1209 .to_string()
1210 .contains("must be at location"));
1211 }
1212
1213 #[tokio::test]
1214 async fn test_list_tables() {
1215 let (namespace, _temp_dir) = create_test_namespace().await;
1216
1217 let mut request = ListTablesRequest::new();
1219 request.id = Some(vec![]);
1220 let response = namespace.list_tables(request).await.unwrap();
1221 assert_eq!(response.tables.len(), 0);
1222
1223 let schema = create_test_schema();
1225 let ipc_data = create_test_ipc_data(&schema);
1226
1227 let mut create_request = CreateTableRequest::new();
1229 create_request.id = Some(vec!["table1".to_string()]);
1230 namespace
1231 .create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
1232 .await
1233 .unwrap();
1234
1235 let mut create_request = CreateTableRequest::new();
1237 create_request.id = Some(vec!["table2".to_string()]);
1238 namespace
1239 .create_table(create_request, bytes::Bytes::from(ipc_data))
1240 .await
1241 .unwrap();
1242
1243 let mut request = ListTablesRequest::new();
1245 request.id = Some(vec![]);
1246 let response = namespace.list_tables(request).await.unwrap();
1247 let tables = response.tables;
1248 assert_eq!(tables.len(), 2);
1249 assert!(tables.contains(&"table1".to_string()));
1250 assert!(tables.contains(&"table2".to_string()));
1251 }
1252
1253 #[tokio::test]
1254 async fn test_list_tables_with_namespace_id() {
1255 let (namespace, _temp_dir) = create_test_namespace().await;
1256
1257 let mut create_ns_req = CreateNamespaceRequest::new();
1259 create_ns_req.id = Some(vec!["test_namespace".to_string()]);
1260 namespace.create_namespace(create_ns_req).await.unwrap();
1261
1262 let mut request = ListTablesRequest::new();
1264 request.id = Some(vec!["test_namespace".to_string()]);
1265
1266 let result = namespace.list_tables(request).await;
1267 assert!(
1269 result.is_ok(),
1270 "list_tables should work with child namespace when manifest is enabled"
1271 );
1272 let response = result.unwrap();
1273 assert_eq!(
1274 response.tables.len(),
1275 0,
1276 "Namespace should have no tables yet"
1277 );
1278 }
1279
1280 #[tokio::test]
1281 async fn test_describe_table() {
1282 let (namespace, _temp_dir) = create_test_namespace().await;
1283
1284 let schema = create_test_schema();
1286 let ipc_data = create_test_ipc_data(&schema);
1287
1288 let mut create_request = CreateTableRequest::new();
1289 create_request.id = Some(vec!["test_table".to_string()]);
1290 namespace
1291 .create_table(create_request, bytes::Bytes::from(ipc_data))
1292 .await
1293 .unwrap();
1294
1295 let mut request = DescribeTableRequest::new();
1297 request.id = Some(vec!["test_table".to_string()]);
1298 let response = namespace.describe_table(request).await.unwrap();
1299
1300 assert!(response.location.is_some());
1301 assert!(response.location.unwrap().ends_with("test_table.lance"));
1302 }
1303
1304 #[tokio::test]
1305 async fn test_describe_nonexistent_table() {
1306 let (namespace, _temp_dir) = create_test_namespace().await;
1307
1308 let mut request = DescribeTableRequest::new();
1309 request.id = Some(vec!["nonexistent".to_string()]);
1310
1311 let result = namespace.describe_table(request).await;
1312 assert!(result.is_err());
1313 assert!(result
1314 .unwrap_err()
1315 .to_string()
1316 .contains("Table does not exist"));
1317 }
1318
1319 #[tokio::test]
1320 async fn test_table_exists() {
1321 let (namespace, _temp_dir) = create_test_namespace().await;
1322
1323 let schema = create_test_schema();
1325 let ipc_data = create_test_ipc_data(&schema);
1326
1327 let mut create_request = CreateTableRequest::new();
1328 create_request.id = Some(vec!["existing_table".to_string()]);
1329 namespace
1330 .create_table(create_request, bytes::Bytes::from(ipc_data))
1331 .await
1332 .unwrap();
1333
1334 let mut request = TableExistsRequest::new();
1336 request.id = Some(vec!["existing_table".to_string()]);
1337 let result = namespace.table_exists(request).await;
1338 assert!(result.is_ok());
1339
1340 let mut request = TableExistsRequest::new();
1342 request.id = Some(vec!["nonexistent".to_string()]);
1343 let result = namespace.table_exists(request).await;
1344 assert!(result.is_err());
1345 assert!(result
1346 .unwrap_err()
1347 .to_string()
1348 .contains("Table does not exist"));
1349 }
1350
1351 #[tokio::test]
1352 async fn test_drop_table() {
1353 let (namespace, _temp_dir) = create_test_namespace().await;
1354
1355 let schema = create_test_schema();
1357 let ipc_data = create_test_ipc_data(&schema);
1358
1359 let mut create_request = CreateTableRequest::new();
1360 create_request.id = Some(vec!["table_to_drop".to_string()]);
1361 namespace
1362 .create_table(create_request, bytes::Bytes::from(ipc_data))
1363 .await
1364 .unwrap();
1365
1366 let mut exists_request = TableExistsRequest::new();
1368 exists_request.id = Some(vec!["table_to_drop".to_string()]);
1369 assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
1370
1371 let mut drop_request = DropTableRequest::new();
1373 drop_request.id = Some(vec!["table_to_drop".to_string()]);
1374 let response = namespace.drop_table(drop_request).await.unwrap();
1375 assert!(response.location.is_some());
1376
1377 assert!(namespace.table_exists(exists_request).await.is_err());
1379 }
1380
1381 #[tokio::test]
1382 async fn test_drop_nonexistent_table() {
1383 let (namespace, _temp_dir) = create_test_namespace().await;
1384
1385 let mut request = DropTableRequest::new();
1386 request.id = Some(vec!["nonexistent".to_string()]);
1387
1388 let result = namespace.drop_table(request).await;
1390 let _ = result;
1393 }
1394
1395 #[tokio::test]
1396 async fn test_root_namespace_operations() {
1397 let (namespace, _temp_dir) = create_test_namespace().await;
1398
1399 let mut request = ListNamespacesRequest::new();
1401 request.id = Some(vec![]);
1402 let result = namespace.list_namespaces(request).await;
1403 assert!(result.is_ok());
1404 assert_eq!(result.unwrap().namespaces.len(), 0);
1405
1406 let mut request = DescribeNamespaceRequest::new();
1408 request.id = Some(vec![]);
1409 let result = namespace.describe_namespace(request).await;
1410 assert!(result.is_ok());
1411
1412 let mut request = NamespaceExistsRequest::new();
1414 request.id = Some(vec![]);
1415 let result = namespace.namespace_exists(request).await;
1416 assert!(result.is_ok());
1417
1418 let mut request = CreateNamespaceRequest::new();
1420 request.id = Some(vec![]);
1421 let result = namespace.create_namespace(request).await;
1422 assert!(result.is_err());
1423 assert!(result.unwrap_err().to_string().contains("already exists"));
1424
1425 let mut request = DropNamespaceRequest::new();
1427 request.id = Some(vec![]);
1428 let result = namespace.drop_namespace(request).await;
1429 assert!(result.is_err());
1430 assert!(result
1431 .unwrap_err()
1432 .to_string()
1433 .contains("cannot be dropped"));
1434 }
1435
1436 #[tokio::test]
1437 async fn test_non_root_namespace_operations() {
1438 let (namespace, _temp_dir) = create_test_namespace().await;
1439
1440 let mut request = CreateNamespaceRequest::new();
1443 request.id = Some(vec!["child".to_string()]);
1444 let result = namespace.create_namespace(request).await;
1445 assert!(
1446 result.is_ok(),
1447 "Child namespace creation should succeed with manifest enabled"
1448 );
1449
1450 let mut request = NamespaceExistsRequest::new();
1452 request.id = Some(vec!["child".to_string()]);
1453 let result = namespace.namespace_exists(request).await;
1454 assert!(
1455 result.is_ok(),
1456 "Child namespace should exist after creation"
1457 );
1458
1459 let mut request = DropNamespaceRequest::new();
1461 request.id = Some(vec!["child".to_string()]);
1462 let result = namespace.drop_namespace(request).await;
1463 assert!(
1464 result.is_ok(),
1465 "Child namespace drop should succeed with manifest enabled"
1466 );
1467
1468 let mut request = NamespaceExistsRequest::new();
1470 request.id = Some(vec!["child".to_string()]);
1471 let result = namespace.namespace_exists(request).await;
1472 assert!(
1473 result.is_err(),
1474 "Child namespace should not exist after drop"
1475 );
1476 }
1477
1478 #[tokio::test]
1479 async fn test_config_custom_root() {
1480 let temp_dir = TempStdDir::default();
1481 let custom_path = temp_dir.join("custom");
1482 std::fs::create_dir(&custom_path).unwrap();
1483
1484 let namespace = DirectoryNamespaceBuilder::new(custom_path.to_string_lossy().to_string())
1485 .build()
1486 .await
1487 .unwrap();
1488
1489 let schema = create_test_schema();
1491 let ipc_data = create_test_ipc_data(&schema);
1492
1493 let mut request = CreateTableRequest::new();
1495 request.id = Some(vec!["test_table".to_string()]);
1496
1497 let response = namespace
1498 .create_table(request, bytes::Bytes::from(ipc_data))
1499 .await
1500 .unwrap();
1501
1502 assert!(response.location.unwrap().contains("custom"));
1503 }
1504
1505 #[tokio::test]
1506 async fn test_config_storage_options() {
1507 let temp_dir = TempStdDir::default();
1508
1509 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1510 .storage_option("option1", "value1")
1511 .storage_option("option2", "value2")
1512 .build()
1513 .await
1514 .unwrap();
1515
1516 let schema = create_test_schema();
1518 let ipc_data = create_test_ipc_data(&schema);
1519
1520 let mut request = CreateTableRequest::new();
1522 request.id = Some(vec!["test_table".to_string()]);
1523
1524 let response = namespace
1525 .create_table(request, bytes::Bytes::from(ipc_data))
1526 .await
1527 .unwrap();
1528
1529 let storage_options = response.storage_options.unwrap();
1530 assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
1531 assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
1532 }
1533
1534 #[tokio::test]
1535 async fn test_from_properties_manifest_enabled() {
1536 let temp_dir = TempStdDir::default();
1537
1538 let mut properties = HashMap::new();
1539 properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
1540 properties.insert("manifest_enabled".to_string(), "true".to_string());
1541 properties.insert("dir_listing_enabled".to_string(), "false".to_string());
1542
1543 let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
1544 assert!(builder.manifest_enabled);
1545 assert!(!builder.dir_listing_enabled);
1546
1547 let namespace = builder.build().await.unwrap();
1548
1549 let schema = create_test_schema();
1551 let ipc_data = create_test_ipc_data(&schema);
1552
1553 let mut request = CreateTableRequest::new();
1555 request.id = Some(vec!["test_table".to_string()]);
1556
1557 let response = namespace
1558 .create_table(request, bytes::Bytes::from(ipc_data))
1559 .await
1560 .unwrap();
1561
1562 assert!(response.location.is_some());
1563 }
1564
1565 #[tokio::test]
1566 async fn test_from_properties_dir_listing_enabled() {
1567 let temp_dir = TempStdDir::default();
1568
1569 let mut properties = HashMap::new();
1570 properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
1571 properties.insert("manifest_enabled".to_string(), "false".to_string());
1572 properties.insert("dir_listing_enabled".to_string(), "true".to_string());
1573
1574 let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
1575 assert!(!builder.manifest_enabled);
1576 assert!(builder.dir_listing_enabled);
1577
1578 let namespace = builder.build().await.unwrap();
1579
1580 let schema = create_test_schema();
1582 let ipc_data = create_test_ipc_data(&schema);
1583
1584 let mut request = CreateTableRequest::new();
1586 request.id = Some(vec!["test_table".to_string()]);
1587
1588 let response = namespace
1589 .create_table(request, bytes::Bytes::from(ipc_data))
1590 .await
1591 .unwrap();
1592
1593 assert!(response.location.is_some());
1594 }
1595
1596 #[tokio::test]
1597 async fn test_from_properties_defaults() {
1598 let temp_dir = TempStdDir::default();
1599
1600 let mut properties = HashMap::new();
1601 properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
1602
1603 let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
1604 assert!(builder.manifest_enabled);
1606 assert!(builder.dir_listing_enabled);
1607 }
1608
1609 #[tokio::test]
1610 async fn test_from_properties_with_storage_options() {
1611 let temp_dir = TempStdDir::default();
1612
1613 let mut properties = HashMap::new();
1614 properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
1615 properties.insert("manifest_enabled".to_string(), "true".to_string());
1616 properties.insert("storage.region".to_string(), "us-west-2".to_string());
1617 properties.insert("storage.bucket".to_string(), "my-bucket".to_string());
1618
1619 let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
1620 assert!(builder.manifest_enabled);
1621 assert!(builder.storage_options.is_some());
1622
1623 let storage_options = builder.storage_options.unwrap();
1624 assert_eq!(
1625 storage_options.get("region"),
1626 Some(&"us-west-2".to_string())
1627 );
1628 assert_eq!(
1629 storage_options.get("bucket"),
1630 Some(&"my-bucket".to_string())
1631 );
1632 }
1633
1634 #[tokio::test]
1635 async fn test_various_arrow_types() {
1636 let (namespace, _temp_dir) = create_test_namespace().await;
1637
1638 let fields = vec![
1640 JsonArrowField {
1641 name: "bool_col".to_string(),
1642 r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
1643 nullable: true,
1644 metadata: None,
1645 },
1646 JsonArrowField {
1647 name: "int8_col".to_string(),
1648 r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
1649 nullable: true,
1650 metadata: None,
1651 },
1652 JsonArrowField {
1653 name: "float64_col".to_string(),
1654 r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
1655 nullable: true,
1656 metadata: None,
1657 },
1658 JsonArrowField {
1659 name: "binary_col".to_string(),
1660 r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
1661 nullable: true,
1662 metadata: None,
1663 },
1664 ];
1665
1666 let schema = JsonArrowSchema {
1667 fields,
1668 metadata: None,
1669 };
1670
1671 let ipc_data = create_test_ipc_data(&schema);
1673
1674 let mut request = CreateTableRequest::new();
1675 request.id = Some(vec!["complex_table".to_string()]);
1676
1677 let response = namespace
1678 .create_table(request, bytes::Bytes::from(ipc_data))
1679 .await
1680 .unwrap();
1681
1682 assert!(response.location.is_some());
1683 }
1684
1685 #[tokio::test]
1686 async fn test_connect_dir() {
1687 let temp_dir = TempStdDir::default();
1688
1689 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1690 .build()
1691 .await
1692 .unwrap();
1693
1694 let mut request = ListTablesRequest::new();
1696 request.id = Some(vec![]);
1697 let response = namespace.list_tables(request).await.unwrap();
1698 assert_eq!(response.tables.len(), 0);
1699 }
1700
1701 #[tokio::test]
1702 async fn test_create_table_with_ipc_data() {
1703 use arrow::array::{Int32Array, StringArray};
1704 use arrow::ipc::writer::StreamWriter;
1705
1706 let (namespace, _temp_dir) = create_test_namespace().await;
1707
1708 let schema = create_test_schema();
1710
1711 let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
1713 let arrow_schema = Arc::new(arrow_schema);
1714
1715 let id_array = Int32Array::from(vec![1, 2, 3]);
1717 let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
1718 let batch = arrow::record_batch::RecordBatch::try_new(
1719 arrow_schema.clone(),
1720 vec![Arc::new(id_array), Arc::new(name_array)],
1721 )
1722 .unwrap();
1723
1724 let mut buffer = Vec::new();
1726 {
1727 let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
1728 writer.write(&batch).unwrap();
1729 writer.finish().unwrap();
1730 }
1731
1732 let mut request = CreateTableRequest::new();
1734 request.id = Some(vec!["test_table_with_data".to_string()]);
1735
1736 let response = namespace
1737 .create_table(request, Bytes::from(buffer))
1738 .await
1739 .unwrap();
1740
1741 assert_eq!(response.version, Some(1));
1742 assert!(response
1743 .location
1744 .unwrap()
1745 .contains("test_table_with_data.lance"));
1746
1747 let mut exists_request = TableExistsRequest::new();
1749 exists_request.id = Some(vec!["test_table_with_data".to_string()]);
1750 namespace.table_exists(exists_request).await.unwrap();
1751 }
1752
1753 #[tokio::test]
1754 async fn test_create_empty_table() {
1755 let (namespace, temp_dir) = create_test_namespace().await;
1756
1757 let mut request = CreateEmptyTableRequest::new();
1758 request.id = Some(vec!["empty_table".to_string()]);
1759
1760 let response = namespace.create_empty_table(request).await.unwrap();
1761
1762 assert!(response.location.is_some());
1763 assert!(response.location.unwrap().ends_with("empty_table.lance"));
1764
1765 let table_dir = temp_dir.join("empty_table.lance");
1767 assert!(table_dir.exists());
1768 assert!(table_dir.is_dir());
1769
1770 let reserved_file = table_dir.join(".lance-reserved");
1771 assert!(reserved_file.exists());
1772 assert!(reserved_file.is_file());
1773
1774 let metadata = std::fs::metadata(&reserved_file).unwrap();
1776 assert_eq!(metadata.len(), 0);
1777
1778 let mut exists_request = TableExistsRequest::new();
1780 exists_request.id = Some(vec!["empty_table".to_string()]);
1781 namespace.table_exists(exists_request).await.unwrap();
1782
1783 let mut list_request = ListTablesRequest::new();
1785 list_request.id = Some(vec![]);
1786 let list_response = namespace.list_tables(list_request).await.unwrap();
1787 assert!(list_response.tables.contains(&"empty_table".to_string()));
1788
1789 let mut describe_request = DescribeTableRequest::new();
1791 describe_request.id = Some(vec!["empty_table".to_string()]);
1792 let describe_response = namespace.describe_table(describe_request).await.unwrap();
1793 assert!(describe_response.location.is_some());
1794 assert!(describe_response.location.unwrap().contains("empty_table"));
1795 }
1796
1797 #[tokio::test]
1798 async fn test_create_empty_table_with_wrong_location() {
1799 let (namespace, _temp_dir) = create_test_namespace().await;
1800
1801 let mut request = CreateEmptyTableRequest::new();
1802 request.id = Some(vec!["test_table".to_string()]);
1803 request.location = Some("/wrong/path/table.lance".to_string());
1804
1805 let result = namespace.create_empty_table(request).await;
1806 assert!(result.is_err());
1807 assert!(result
1808 .unwrap_err()
1809 .to_string()
1810 .contains("must be at location"));
1811 }
1812
1813 #[tokio::test]
1814 async fn test_create_empty_table_then_drop() {
1815 let (namespace, temp_dir) = create_test_namespace().await;
1816
1817 let mut create_request = CreateEmptyTableRequest::new();
1819 create_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1820
1821 let create_response = namespace.create_empty_table(create_request).await.unwrap();
1822 assert!(create_response.location.is_some());
1823
1824 let table_dir = temp_dir.join("empty_table_to_drop.lance");
1826 assert!(table_dir.exists());
1827 let reserved_file = table_dir.join(".lance-reserved");
1828 assert!(reserved_file.exists());
1829
1830 let mut drop_request = DropTableRequest::new();
1832 drop_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1833 let drop_response = namespace.drop_table(drop_request).await.unwrap();
1834 assert!(drop_response.location.is_some());
1835
1836 assert!(!table_dir.exists());
1838 assert!(!reserved_file.exists());
1839
1840 let mut exists_request = TableExistsRequest::new();
1842 exists_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1843 let exists_result = namespace.table_exists(exists_request).await;
1844 assert!(exists_result.is_err());
1845 }
1846
1847 #[tokio::test]
1848 async fn test_child_namespace_create_and_list() {
1849 let (namespace, _temp_dir) = create_test_namespace().await;
1850
1851 for i in 1..=3 {
1853 let mut create_req = CreateNamespaceRequest::new();
1854 create_req.id = Some(vec![format!("ns{}", i)]);
1855 let result = namespace.create_namespace(create_req).await;
1856 assert!(result.is_ok(), "Failed to create child namespace ns{}", i);
1857 }
1858
1859 let list_req = ListNamespacesRequest {
1861 id: Some(vec![]),
1862 page_token: None,
1863 limit: None,
1864 };
1865 let result = namespace.list_namespaces(list_req).await;
1866 assert!(result.is_ok());
1867 let namespaces = result.unwrap().namespaces;
1868 assert_eq!(namespaces.len(), 3);
1869 assert!(namespaces.contains(&"ns1".to_string()));
1870 assert!(namespaces.contains(&"ns2".to_string()));
1871 assert!(namespaces.contains(&"ns3".to_string()));
1872 }
1873
1874 #[tokio::test]
1875 async fn test_nested_namespace_hierarchy() {
1876 let (namespace, _temp_dir) = create_test_namespace().await;
1877
1878 let mut create_req = CreateNamespaceRequest::new();
1880 create_req.id = Some(vec!["parent".to_string()]);
1881 namespace.create_namespace(create_req).await.unwrap();
1882
1883 let mut create_req = CreateNamespaceRequest::new();
1885 create_req.id = Some(vec!["parent".to_string(), "child1".to_string()]);
1886 namespace.create_namespace(create_req).await.unwrap();
1887
1888 let mut create_req = CreateNamespaceRequest::new();
1889 create_req.id = Some(vec!["parent".to_string(), "child2".to_string()]);
1890 namespace.create_namespace(create_req).await.unwrap();
1891
1892 let list_req = ListNamespacesRequest {
1894 id: Some(vec!["parent".to_string()]),
1895 page_token: None,
1896 limit: None,
1897 };
1898 let result = namespace.list_namespaces(list_req).await;
1899 assert!(result.is_ok());
1900 let children = result.unwrap().namespaces;
1901 assert_eq!(children.len(), 2);
1902 assert!(children.contains(&"child1".to_string()));
1903 assert!(children.contains(&"child2".to_string()));
1904
1905 let list_req = ListNamespacesRequest {
1907 id: Some(vec![]),
1908 page_token: None,
1909 limit: None,
1910 };
1911 let result = namespace.list_namespaces(list_req).await;
1912 assert!(result.is_ok());
1913 let root_namespaces = result.unwrap().namespaces;
1914 assert_eq!(root_namespaces.len(), 1);
1915 assert_eq!(root_namespaces[0], "parent");
1916 }
1917
1918 #[tokio::test]
1919 async fn test_table_in_child_namespace() {
1920 let (namespace, _temp_dir) = create_test_namespace().await;
1921
1922 let mut create_ns_req = CreateNamespaceRequest::new();
1924 create_ns_req.id = Some(vec!["test_ns".to_string()]);
1925 namespace.create_namespace(create_ns_req).await.unwrap();
1926
1927 let schema = create_test_schema();
1929 let ipc_data = create_test_ipc_data(&schema);
1930 let mut create_table_req = CreateTableRequest::new();
1931 create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
1932 let result = namespace
1933 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
1934 .await;
1935 assert!(result.is_ok(), "Failed to create table in child namespace");
1936
1937 let list_req = ListTablesRequest {
1939 id: Some(vec!["test_ns".to_string()]),
1940 page_token: None,
1941 limit: None,
1942 };
1943 let result = namespace.list_tables(list_req).await;
1944 assert!(result.is_ok());
1945 let tables = result.unwrap().tables;
1946 assert_eq!(tables.len(), 1);
1947 assert_eq!(tables[0], "table1");
1948
1949 let mut exists_req = TableExistsRequest::new();
1951 exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
1952 let result = namespace.table_exists(exists_req).await;
1953 assert!(result.is_ok());
1954
1955 let mut describe_req = DescribeTableRequest::new();
1957 describe_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
1958 let result = namespace.describe_table(describe_req).await;
1959 assert!(result.is_ok());
1960 let response = result.unwrap();
1961 assert!(response.location.is_some());
1962 }
1963
1964 #[tokio::test]
1965 async fn test_multiple_tables_in_child_namespace() {
1966 let (namespace, _temp_dir) = create_test_namespace().await;
1967
1968 let mut create_ns_req = CreateNamespaceRequest::new();
1970 create_ns_req.id = Some(vec!["test_ns".to_string()]);
1971 namespace.create_namespace(create_ns_req).await.unwrap();
1972
1973 let schema = create_test_schema();
1975 let ipc_data = create_test_ipc_data(&schema);
1976 for i in 1..=3 {
1977 let mut create_table_req = CreateTableRequest::new();
1978 create_table_req.id = Some(vec!["test_ns".to_string(), format!("table{}", i)]);
1979 namespace
1980 .create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
1981 .await
1982 .unwrap();
1983 }
1984
1985 let list_req = ListTablesRequest {
1987 id: Some(vec!["test_ns".to_string()]),
1988 page_token: None,
1989 limit: None,
1990 };
1991 let result = namespace.list_tables(list_req).await;
1992 assert!(result.is_ok());
1993 let tables = result.unwrap().tables;
1994 assert_eq!(tables.len(), 3);
1995 assert!(tables.contains(&"table1".to_string()));
1996 assert!(tables.contains(&"table2".to_string()));
1997 assert!(tables.contains(&"table3".to_string()));
1998 }
1999
2000 #[tokio::test]
2001 async fn test_drop_table_in_child_namespace() {
2002 let (namespace, _temp_dir) = create_test_namespace().await;
2003
2004 let mut create_ns_req = CreateNamespaceRequest::new();
2006 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2007 namespace.create_namespace(create_ns_req).await.unwrap();
2008
2009 let schema = create_test_schema();
2011 let ipc_data = create_test_ipc_data(&schema);
2012 let mut create_table_req = CreateTableRequest::new();
2013 create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2014 namespace
2015 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2016 .await
2017 .unwrap();
2018
2019 let mut drop_req = DropTableRequest::new();
2021 drop_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2022 let result = namespace.drop_table(drop_req).await;
2023 assert!(result.is_ok(), "Failed to drop table in child namespace");
2024
2025 let mut exists_req = TableExistsRequest::new();
2027 exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2028 let result = namespace.table_exists(exists_req).await;
2029 assert!(result.is_err());
2030 }
2031
2032 #[tokio::test]
2033 async fn test_empty_table_in_child_namespace() {
2034 let (namespace, _temp_dir) = create_test_namespace().await;
2035
2036 let mut create_ns_req = CreateNamespaceRequest::new();
2038 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2039 namespace.create_namespace(create_ns_req).await.unwrap();
2040
2041 let mut create_empty_req = CreateEmptyTableRequest::new();
2043 create_empty_req.id = Some(vec!["test_ns".to_string(), "empty_table".to_string()]);
2044 let result = namespace.create_empty_table(create_empty_req).await;
2045 assert!(
2046 result.is_ok(),
2047 "Failed to create empty table in child namespace"
2048 );
2049
2050 let mut exists_req = TableExistsRequest::new();
2052 exists_req.id = Some(vec!["test_ns".to_string(), "empty_table".to_string()]);
2053 let result = namespace.table_exists(exists_req).await;
2054 assert!(result.is_ok());
2055 }
2056
2057 #[tokio::test]
2058 async fn test_deeply_nested_namespace() {
2059 let (namespace, _temp_dir) = create_test_namespace().await;
2060
2061 let mut create_req = CreateNamespaceRequest::new();
2063 create_req.id = Some(vec!["level1".to_string()]);
2064 namespace.create_namespace(create_req).await.unwrap();
2065
2066 let mut create_req = CreateNamespaceRequest::new();
2067 create_req.id = Some(vec!["level1".to_string(), "level2".to_string()]);
2068 namespace.create_namespace(create_req).await.unwrap();
2069
2070 let mut create_req = CreateNamespaceRequest::new();
2071 create_req.id = Some(vec![
2072 "level1".to_string(),
2073 "level2".to_string(),
2074 "level3".to_string(),
2075 ]);
2076 namespace.create_namespace(create_req).await.unwrap();
2077
2078 let schema = create_test_schema();
2080 let ipc_data = create_test_ipc_data(&schema);
2081 let mut create_table_req = CreateTableRequest::new();
2082 create_table_req.id = Some(vec![
2083 "level1".to_string(),
2084 "level2".to_string(),
2085 "level3".to_string(),
2086 "table1".to_string(),
2087 ]);
2088 let result = namespace
2089 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2090 .await;
2091 assert!(
2092 result.is_ok(),
2093 "Failed to create table in deeply nested namespace"
2094 );
2095
2096 let mut exists_req = TableExistsRequest::new();
2098 exists_req.id = Some(vec![
2099 "level1".to_string(),
2100 "level2".to_string(),
2101 "level3".to_string(),
2102 "table1".to_string(),
2103 ]);
2104 let result = namespace.table_exists(exists_req).await;
2105 assert!(result.is_ok());
2106 }
2107
2108 #[tokio::test]
2109 async fn test_namespace_with_properties() {
2110 let (namespace, _temp_dir) = create_test_namespace().await;
2111
2112 let mut properties = HashMap::new();
2114 properties.insert("owner".to_string(), "test_user".to_string());
2115 properties.insert("description".to_string(), "Test namespace".to_string());
2116
2117 let mut create_req = CreateNamespaceRequest::new();
2118 create_req.id = Some(vec!["test_ns".to_string()]);
2119 create_req.properties = Some(properties.clone());
2120 namespace.create_namespace(create_req).await.unwrap();
2121
2122 let describe_req = DescribeNamespaceRequest {
2124 id: Some(vec!["test_ns".to_string()]),
2125 };
2126 let result = namespace.describe_namespace(describe_req).await;
2127 assert!(result.is_ok());
2128 let response = result.unwrap();
2129 assert!(response.properties.is_some());
2130 let props = response.properties.unwrap();
2131 assert_eq!(props.get("owner"), Some(&"test_user".to_string()));
2132 assert_eq!(
2133 props.get("description"),
2134 Some(&"Test namespace".to_string())
2135 );
2136 }
2137
2138 #[tokio::test]
2139 async fn test_cannot_drop_namespace_with_tables() {
2140 let (namespace, _temp_dir) = create_test_namespace().await;
2141
2142 let mut create_ns_req = CreateNamespaceRequest::new();
2144 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2145 namespace.create_namespace(create_ns_req).await.unwrap();
2146
2147 let schema = create_test_schema();
2149 let ipc_data = create_test_ipc_data(&schema);
2150 let mut create_table_req = CreateTableRequest::new();
2151 create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2152 namespace
2153 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2154 .await
2155 .unwrap();
2156
2157 let mut drop_req = DropNamespaceRequest::new();
2159 drop_req.id = Some(vec!["test_ns".to_string()]);
2160 let result = namespace.drop_namespace(drop_req).await;
2161 assert!(
2162 result.is_err(),
2163 "Should not be able to drop namespace with tables"
2164 );
2165 }
2166
2167 #[tokio::test]
2168 async fn test_isolation_between_namespaces() {
2169 let (namespace, _temp_dir) = create_test_namespace().await;
2170
2171 let mut create_req = CreateNamespaceRequest::new();
2173 create_req.id = Some(vec!["ns1".to_string()]);
2174 namespace.create_namespace(create_req).await.unwrap();
2175
2176 let mut create_req = CreateNamespaceRequest::new();
2177 create_req.id = Some(vec!["ns2".to_string()]);
2178 namespace.create_namespace(create_req).await.unwrap();
2179
2180 let schema = create_test_schema();
2182 let ipc_data = create_test_ipc_data(&schema);
2183
2184 let mut create_table_req = CreateTableRequest::new();
2185 create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2186 namespace
2187 .create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
2188 .await
2189 .unwrap();
2190
2191 let mut create_table_req = CreateTableRequest::new();
2192 create_table_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
2193 namespace
2194 .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2195 .await
2196 .unwrap();
2197
2198 let list_req = ListTablesRequest {
2200 id: Some(vec!["ns1".to_string()]),
2201 page_token: None,
2202 limit: None,
2203 };
2204 let result = namespace.list_tables(list_req).await.unwrap();
2205 assert_eq!(result.tables.len(), 1);
2206 assert_eq!(result.tables[0], "table1");
2207
2208 let list_req = ListTablesRequest {
2209 id: Some(vec!["ns2".to_string()]),
2210 page_token: None,
2211 limit: None,
2212 };
2213 let result = namespace.list_tables(list_req).await.unwrap();
2214 assert_eq!(result.tables.len(), 1);
2215 assert_eq!(result.tables[0], "table1");
2216
2217 let mut drop_req = DropTableRequest::new();
2219 drop_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2220 namespace.drop_table(drop_req).await.unwrap();
2221
2222 let mut exists_req = TableExistsRequest::new();
2224 exists_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2225 assert!(namespace.table_exists(exists_req).await.is_err());
2226
2227 let mut exists_req = TableExistsRequest::new();
2228 exists_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
2229 assert!(namespace.table_exists(exists_req).await.is_ok());
2230 }
2231
2232 #[tokio::test]
2233 async fn test_migrate_directory_tables() {
2234 let temp_dir = TempStdDir::default();
2235 let temp_path = temp_dir.to_str().unwrap();
2236
2237 let dir_only_ns = DirectoryNamespaceBuilder::new(temp_path)
2239 .manifest_enabled(false)
2240 .dir_listing_enabled(true)
2241 .build()
2242 .await
2243 .unwrap();
2244
2245 let schema = create_test_schema();
2247 let ipc_data = create_test_ipc_data(&schema);
2248
2249 for i in 1..=3 {
2250 let mut create_req = CreateTableRequest::new();
2251 create_req.id = Some(vec![format!("table{}", i)]);
2252 dir_only_ns
2253 .create_table(create_req, bytes::Bytes::from(ipc_data.clone()))
2254 .await
2255 .unwrap();
2256 }
2257
2258 drop(dir_only_ns);
2259
2260 let dual_mode_ns = DirectoryNamespaceBuilder::new(temp_path)
2262 .manifest_enabled(true)
2263 .dir_listing_enabled(true)
2264 .build()
2265 .await
2266 .unwrap();
2267
2268 let mut list_req = ListTablesRequest::new();
2270 list_req.id = Some(vec![]);
2271 let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
2272 assert_eq!(tables.len(), 3);
2273
2274 let migrated_count = dual_mode_ns.migrate().await.unwrap();
2276 assert_eq!(migrated_count, 3, "Should migrate all 3 tables");
2277
2278 let mut list_req = ListTablesRequest::new();
2280 list_req.id = Some(vec![]);
2281 let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
2282 assert_eq!(tables.len(), 3);
2283
2284 let migrated_count = dual_mode_ns.migrate().await.unwrap();
2286 assert_eq!(
2287 migrated_count, 0,
2288 "Should not migrate already-migrated tables"
2289 );
2290
2291 drop(dual_mode_ns);
2292
2293 let manifest_only_ns = DirectoryNamespaceBuilder::new(temp_path)
2295 .manifest_enabled(true)
2296 .dir_listing_enabled(false)
2297 .build()
2298 .await
2299 .unwrap();
2300
2301 let mut list_req = ListTablesRequest::new();
2303 list_req.id = Some(vec![]);
2304 let tables = manifest_only_ns.list_tables(list_req).await.unwrap().tables;
2305 assert_eq!(tables.len(), 3);
2306 assert!(tables.contains(&"table1".to_string()));
2307 assert!(tables.contains(&"table2".to_string()));
2308 assert!(tables.contains(&"table3".to_string()));
2309 }
2310
2311 #[tokio::test]
2312 async fn test_migrate_without_manifest() {
2313 let temp_dir = TempStdDir::default();
2314 let temp_path = temp_dir.to_str().unwrap();
2315
2316 let namespace = DirectoryNamespaceBuilder::new(temp_path)
2318 .manifest_enabled(false)
2319 .dir_listing_enabled(true)
2320 .build()
2321 .await
2322 .unwrap();
2323
2324 let migrated_count = namespace.migrate().await.unwrap();
2326 assert_eq!(migrated_count, 0);
2327 }
2328
2329 #[tokio::test]
2330 async fn test_register_table() {
2331 use lance_namespace::models::{RegisterTableRequest, TableExistsRequest};
2332
2333 let temp_dir = TempStdDir::default();
2334 let temp_path = temp_dir.to_str().unwrap();
2335
2336 let namespace = DirectoryNamespaceBuilder::new(temp_path)
2337 .build()
2338 .await
2339 .unwrap();
2340
2341 let schema = create_test_schema();
2343 let ipc_data = create_test_ipc_data(&schema);
2344
2345 let table_uri = format!("{}/external_table.lance", temp_path);
2346 let cursor = Cursor::new(ipc_data);
2347 let stream_reader = StreamReader::try_new(cursor, None).unwrap();
2348 let batches: Vec<_> = stream_reader
2349 .collect::<std::result::Result<Vec<_>, _>>()
2350 .unwrap();
2351 let schema = batches[0].schema();
2352 let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
2353 let reader = RecordBatchIterator::new(batch_results, schema);
2354 Dataset::write(Box::new(reader), &table_uri, None)
2355 .await
2356 .unwrap();
2357
2358 let mut register_req = RegisterTableRequest::new("external_table.lance".to_string());
2360 register_req.id = Some(vec!["registered_table".to_string()]);
2361
2362 let response = namespace.register_table(register_req).await.unwrap();
2363 assert_eq!(response.location, "external_table.lance");
2364
2365 let mut exists_req = TableExistsRequest::new();
2367 exists_req.id = Some(vec!["registered_table".to_string()]);
2368 assert!(namespace.table_exists(exists_req).await.is_ok());
2369
2370 let mut list_req = ListTablesRequest::new();
2372 list_req.id = Some(vec![]);
2373 let tables = namespace.list_tables(list_req).await.unwrap();
2374 assert!(tables.tables.contains(&"registered_table".to_string()));
2375 }
2376
2377 #[tokio::test]
2378 async fn test_register_table_duplicate_fails() {
2379 use lance_namespace::models::RegisterTableRequest;
2380
2381 let temp_dir = TempStdDir::default();
2382 let temp_path = temp_dir.to_str().unwrap();
2383
2384 let namespace = DirectoryNamespaceBuilder::new(temp_path)
2385 .build()
2386 .await
2387 .unwrap();
2388
2389 let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
2391 register_req.id = Some(vec!["test_table".to_string()]);
2392
2393 namespace
2394 .register_table(register_req.clone())
2395 .await
2396 .unwrap();
2397
2398 let result = namespace.register_table(register_req).await;
2400 assert!(result.is_err());
2401 assert!(result.unwrap_err().to_string().contains("already exists"));
2402 }
2403
2404 #[tokio::test]
2405 async fn test_deregister_table() {
2406 use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
2407
2408 let temp_dir = TempStdDir::default();
2409 let temp_path = temp_dir.to_str().unwrap();
2410
2411 let namespace = DirectoryNamespaceBuilder::new(temp_path)
2414 .manifest_enabled(true)
2415 .dir_listing_enabled(false)
2416 .build()
2417 .await
2418 .unwrap();
2419
2420 let schema = create_test_schema();
2422 let ipc_data = create_test_ipc_data(&schema);
2423
2424 let mut create_req = CreateTableRequest::new();
2425 create_req.id = Some(vec!["test_table".to_string()]);
2426 namespace
2427 .create_table(create_req, bytes::Bytes::from(ipc_data))
2428 .await
2429 .unwrap();
2430
2431 let mut exists_req = TableExistsRequest::new();
2433 exists_req.id = Some(vec!["test_table".to_string()]);
2434 assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
2435
2436 let mut deregister_req = DeregisterTableRequest::new();
2438 deregister_req.id = Some(vec!["test_table".to_string()]);
2439 let response = namespace.deregister_table(deregister_req).await.unwrap();
2440
2441 assert!(
2443 response.location.is_some(),
2444 "Deregister should return location"
2445 );
2446 let location = response.location.as_ref().unwrap();
2447 let expected_url = lance_io::object_store::uri_to_url(temp_path)
2450 .expect("Failed to convert temp path to URL");
2451 let expected_prefix = expected_url.to_string();
2452 assert!(
2453 location.starts_with(&expected_prefix),
2454 "Location should start with '{}', got: {}",
2455 expected_prefix,
2456 location
2457 );
2458 assert!(
2459 location.contains("test_table"),
2460 "Location should contain table name: {}",
2461 location
2462 );
2463 assert_eq!(response.id, Some(vec!["test_table".to_string()]));
2464
2465 assert!(namespace.table_exists(exists_req).await.is_err());
2467
2468 let dataset = Dataset::open(location).await;
2470 assert!(
2471 dataset.is_ok(),
2472 "Physical table data should still exist at {}",
2473 location
2474 );
2475 }
2476
2477 #[tokio::test]
2478 async fn test_deregister_table_in_child_namespace() {
2479 use lance_namespace::models::{
2480 CreateNamespaceRequest, DeregisterTableRequest, TableExistsRequest,
2481 };
2482
2483 let temp_dir = TempStdDir::default();
2484 let temp_path = temp_dir.to_str().unwrap();
2485
2486 let namespace = DirectoryNamespaceBuilder::new(temp_path)
2487 .build()
2488 .await
2489 .unwrap();
2490
2491 let mut create_ns_req = CreateNamespaceRequest::new();
2493 create_ns_req.id = Some(vec!["test_ns".to_string()]);
2494 namespace.create_namespace(create_ns_req).await.unwrap();
2495
2496 let schema = create_test_schema();
2498 let ipc_data = create_test_ipc_data(&schema);
2499
2500 let mut create_req = CreateTableRequest::new();
2501 create_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
2502 namespace
2503 .create_table(create_req, bytes::Bytes::from(ipc_data))
2504 .await
2505 .unwrap();
2506
2507 let mut deregister_req = DeregisterTableRequest::new();
2509 deregister_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
2510 let response = namespace.deregister_table(deregister_req).await.unwrap();
2511
2512 assert!(
2514 response.location.is_some(),
2515 "Deregister should return location"
2516 );
2517 let location = response.location.as_ref().unwrap();
2518 let expected_url = lance_io::object_store::uri_to_url(temp_path)
2521 .expect("Failed to convert temp path to URL");
2522 let expected_prefix = expected_url.to_string();
2523 assert!(
2524 location.starts_with(&expected_prefix),
2525 "Location should start with '{}', got: {}",
2526 expected_prefix,
2527 location
2528 );
2529 assert!(
2530 location.contains("test_ns") && location.contains("test_table"),
2531 "Location should contain namespace and table name: {}",
2532 location
2533 );
2534 assert_eq!(
2535 response.id,
2536 Some(vec!["test_ns".to_string(), "test_table".to_string()])
2537 );
2538
2539 let mut exists_req = TableExistsRequest::new();
2541 exists_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
2542 assert!(namespace.table_exists(exists_req).await.is_err());
2543 }
2544
2545 #[tokio::test]
2546 async fn test_register_deregister_without_manifest_fails() {
2547 use lance_namespace::models::{DeregisterTableRequest, RegisterTableRequest};
2548
2549 let temp_dir = TempStdDir::default();
2550 let temp_path = temp_dir.to_str().unwrap();
2551
2552 let namespace = DirectoryNamespaceBuilder::new(temp_path)
2554 .manifest_enabled(false)
2555 .build()
2556 .await
2557 .unwrap();
2558
2559 let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
2561 register_req.id = Some(vec!["test_table".to_string()]);
2562 let result = namespace.register_table(register_req).await;
2563 assert!(result.is_err());
2564 assert!(result
2565 .unwrap_err()
2566 .to_string()
2567 .contains("manifest mode is enabled"));
2568
2569 let mut deregister_req = DeregisterTableRequest::new();
2571 deregister_req.id = Some(vec!["test_table".to_string()]);
2572 let result = namespace.deregister_table(deregister_req).await;
2573 assert!(result.is_err());
2574 assert!(result
2575 .unwrap_err()
2576 .to_string()
2577 .contains("manifest mode is enabled"));
2578 }
2579
2580 #[tokio::test]
2581 async fn test_register_table_rejects_absolute_uri() {
2582 use lance_namespace::models::RegisterTableRequest;
2583
2584 let temp_dir = TempStdDir::default();
2585 let temp_path = temp_dir.to_str().unwrap();
2586
2587 let namespace = DirectoryNamespaceBuilder::new(temp_path)
2588 .build()
2589 .await
2590 .unwrap();
2591
2592 let mut register_req = RegisterTableRequest::new("s3://bucket/table.lance".to_string());
2594 register_req.id = Some(vec!["test_table".to_string()]);
2595 let result = namespace.register_table(register_req).await;
2596 assert!(result.is_err());
2597 let err_msg = result.unwrap_err().to_string();
2598 assert!(err_msg.contains("Absolute URIs are not allowed"));
2599 }
2600
2601 #[tokio::test]
2602 async fn test_register_table_rejects_absolute_path() {
2603 use lance_namespace::models::RegisterTableRequest;
2604
2605 let temp_dir = TempStdDir::default();
2606 let temp_path = temp_dir.to_str().unwrap();
2607
2608 let namespace = DirectoryNamespaceBuilder::new(temp_path)
2609 .build()
2610 .await
2611 .unwrap();
2612
2613 let mut register_req = RegisterTableRequest::new("/tmp/table.lance".to_string());
2615 register_req.id = Some(vec!["test_table".to_string()]);
2616 let result = namespace.register_table(register_req).await;
2617 assert!(result.is_err());
2618 let err_msg = result.unwrap_err().to_string();
2619 assert!(err_msg.contains("Absolute paths are not allowed"));
2620 }
2621
2622 #[tokio::test]
2623 async fn test_register_table_rejects_path_traversal() {
2624 use lance_namespace::models::RegisterTableRequest;
2625
2626 let temp_dir = TempStdDir::default();
2627 let temp_path = temp_dir.to_str().unwrap();
2628
2629 let namespace = DirectoryNamespaceBuilder::new(temp_path)
2630 .build()
2631 .await
2632 .unwrap();
2633
2634 let mut register_req = RegisterTableRequest::new("../outside/table.lance".to_string());
2636 register_req.id = Some(vec!["test_table".to_string()]);
2637 let result = namespace.register_table(register_req).await;
2638 assert!(result.is_err());
2639 let err_msg = result.unwrap_err().to_string();
2640 assert!(err_msg.contains("Path traversal is not allowed"));
2641 }
2642
2643 #[tokio::test]
2644 async fn test_namespace_write() {
2645 use arrow::array::Int32Array;
2646 use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
2647 use arrow::record_batch::{RecordBatch, RecordBatchIterator};
2648 use lance::dataset::{Dataset, WriteMode, WriteParams};
2649 use lance_namespace::LanceNamespace;
2650
2651 let (namespace, _temp_dir) = create_test_namespace().await;
2652 let namespace = Arc::new(namespace) as Arc<dyn LanceNamespace>;
2653
2654 let table_id = vec!["test_ns".to_string(), "test_table".to_string()];
2656 let schema = Arc::new(ArrowSchema::new(vec![
2657 ArrowField::new("a", DataType::Int32, false),
2658 ArrowField::new("b", DataType::Int32, false),
2659 ]));
2660
2661 let data1 = RecordBatch::try_new(
2663 schema.clone(),
2664 vec![
2665 Arc::new(Int32Array::from(vec![1, 2, 3])),
2666 Arc::new(Int32Array::from(vec![10, 20, 30])),
2667 ],
2668 )
2669 .unwrap();
2670
2671 let reader1 = RecordBatchIterator::new(vec![data1].into_iter().map(Ok), schema.clone());
2672 let dataset = Dataset::write_into_namespace(
2673 reader1,
2674 namespace.clone(),
2675 table_id.clone(),
2676 None,
2677 false,
2678 )
2679 .await
2680 .unwrap();
2681
2682 assert_eq!(dataset.count_rows(None).await.unwrap(), 3);
2683 assert_eq!(dataset.version().version, 1);
2684
2685 let data2 = RecordBatch::try_new(
2687 schema.clone(),
2688 vec![
2689 Arc::new(Int32Array::from(vec![4, 5])),
2690 Arc::new(Int32Array::from(vec![40, 50])),
2691 ],
2692 )
2693 .unwrap();
2694
2695 let params_append = WriteParams {
2696 mode: WriteMode::Append,
2697 ..Default::default()
2698 };
2699
2700 let reader2 = RecordBatchIterator::new(vec![data2].into_iter().map(Ok), schema.clone());
2701 let dataset = Dataset::write_into_namespace(
2702 reader2,
2703 namespace.clone(),
2704 table_id.clone(),
2705 Some(params_append),
2706 false,
2707 )
2708 .await
2709 .unwrap();
2710
2711 assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
2712 assert_eq!(dataset.version().version, 2);
2713
2714 let data3 = RecordBatch::try_new(
2716 schema.clone(),
2717 vec![
2718 Arc::new(Int32Array::from(vec![100, 200])),
2719 Arc::new(Int32Array::from(vec![1000, 2000])),
2720 ],
2721 )
2722 .unwrap();
2723
2724 let params_overwrite = WriteParams {
2725 mode: WriteMode::Overwrite,
2726 ..Default::default()
2727 };
2728
2729 let reader3 = RecordBatchIterator::new(vec![data3].into_iter().map(Ok), schema.clone());
2730 let dataset = Dataset::write_into_namespace(
2731 reader3,
2732 namespace.clone(),
2733 table_id.clone(),
2734 Some(params_overwrite),
2735 false,
2736 )
2737 .await
2738 .unwrap();
2739
2740 assert_eq!(dataset.count_rows(None).await.unwrap(), 2);
2741 assert_eq!(dataset.version().version, 3);
2742
2743 let result = dataset.scan().try_into_batch().await.unwrap();
2745 let a_col = result
2746 .column_by_name("a")
2747 .unwrap()
2748 .as_any()
2749 .downcast_ref::<Int32Array>()
2750 .unwrap();
2751 assert_eq!(a_col.values(), &[100, 200]);
2752 }
2753}