1use std::collections::HashMap;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use lance::dataset::{Dataset, WriteParams};
15use lance::session::Session;
16use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
17use object_store::path::Path;
18
19use lance_namespace::models::{
20 CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
21 CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DescribeNamespaceRequest,
22 DescribeNamespaceResponse, DescribeTableRequest, DescribeTableResponse, DropNamespaceRequest,
23 DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest,
24 ListNamespacesResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest,
25 TableExistsRequest,
26};
27
28use lance_core::{box_error, Error, Result};
29use lance_namespace::LanceNamespace;
30
31#[derive(Debug, Clone)]
66pub struct DirectoryNamespaceBuilder {
67 root: String,
68 storage_options: Option<HashMap<String, String>>,
69 session: Option<Arc<Session>>,
70}
71
72impl DirectoryNamespaceBuilder {
73 pub fn new(root: impl Into<String>) -> Self {
79 Self {
80 root: root.into().trim_end_matches('/').to_string(),
81 storage_options: None,
82 session: None,
83 }
84 }
85
86 pub fn from_properties(
123 properties: HashMap<String, String>,
124 session: Option<Arc<Session>>,
125 ) -> Result<Self> {
126 let root = properties
128 .get("root")
129 .cloned()
130 .ok_or_else(|| Error::Namespace {
131 source: "Missing required property 'root' for directory namespace".into(),
132 location: snafu::location!(),
133 })?;
134
135 let storage_options: HashMap<String, String> = properties
137 .iter()
138 .filter_map(|(k, v)| {
139 k.strip_prefix("storage.")
140 .map(|key| (key.to_string(), v.clone()))
141 })
142 .collect();
143
144 let storage_options = if storage_options.is_empty() {
145 None
146 } else {
147 Some(storage_options)
148 };
149
150 Ok(Self {
151 root: root.trim_end_matches('/').to_string(),
152 storage_options,
153 session,
154 })
155 }
156
157 pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
164 self.storage_options
165 .get_or_insert_with(HashMap::new)
166 .insert(key.into(), value.into());
167 self
168 }
169
170 pub fn storage_options(mut self, options: HashMap<String, String>) -> Self {
176 self.storage_options
177 .get_or_insert_with(HashMap::new)
178 .extend(options);
179 self
180 }
181
182 pub fn session(mut self, session: Arc<Session>) -> Self {
192 self.session = Some(session);
193 self
194 }
195
196 pub async fn build(self) -> Result<DirectoryNamespace> {
209 let (object_store, base_path) =
210 Self::initialize_object_store(&self.root, &self.storage_options, &self.session).await?;
211
212 Ok(DirectoryNamespace {
213 root: self.root,
214 storage_options: self.storage_options,
215 session: self.session,
216 object_store,
217 base_path,
218 })
219 }
220
221 async fn initialize_object_store(
223 root: &str,
224 storage_options: &Option<HashMap<String, String>>,
225 session: &Option<Arc<Session>>,
226 ) -> Result<(Arc<ObjectStore>, Path)> {
227 let params = ObjectStoreParams {
229 storage_options: storage_options.clone(),
230 ..Default::default()
231 };
232
233 let registry = if let Some(session) = session {
235 session.store_registry()
236 } else {
237 Arc::new(ObjectStoreRegistry::default())
238 };
239
240 let (object_store, base_path) = ObjectStore::from_uri_and_params(registry, root, ¶ms)
242 .await
243 .map_err(|e| Error::Namespace {
244 source: format!("Failed to create object store: {}", e).into(),
245 location: snafu::location!(),
246 })?;
247
248 Ok((object_store, base_path))
249 }
250}
251
252pub struct DirectoryNamespace {
257 root: String,
258 storage_options: Option<HashMap<String, String>>,
259 #[allow(dead_code)]
260 session: Option<Arc<Session>>,
261 object_store: Arc<ObjectStore>,
262 base_path: Path,
263}
264
265impl std::fmt::Debug for DirectoryNamespace {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 write!(f, "{}", self.namespace_id())
268 }
269}
270
271impl std::fmt::Display for DirectoryNamespace {
272 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273 write!(f, "{}", self.namespace_id())
274 }
275}
276
277impl DirectoryNamespace {
278 fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
280 if let Some(id) = id {
281 if !id.is_empty() {
282 return Err(Error::Namespace {
283 source: format!(
284 "Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
285 id
286 ).into(),
287 location: snafu::location!(),
288 });
289 }
290 }
291 Ok(())
292 }
293
294 fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
296 let id = id.as_ref().ok_or_else(|| Error::Namespace {
297 source: "Directory namespace table ID cannot be empty".into(),
298 location: snafu::location!(),
299 })?;
300
301 if id.len() != 1 {
302 return Err(Error::Namespace {
303 source: format!(
304 "Directory namespace only supports single-level table IDs, but got: {:?}",
305 id
306 )
307 .into(),
308 location: snafu::location!(),
309 });
310 }
311
312 Ok(id[0].clone())
313 }
314
315 fn table_full_uri(&self, table_name: &str) -> String {
317 format!("{}/{}.lance", &self.root, table_name)
318 }
319
320 fn table_path(&self, table_name: &str) -> Path {
322 self.base_path
323 .child(format!("{}.lance", table_name).as_str())
324 }
325
326 fn table_versions_path(&self, table_name: &str) -> Path {
328 self.base_path
330 .child(format!("{}.lance", table_name).as_str())
331 .child("_versions")
332 }
333
334 fn table_reserved_file_path(&self, table_name: &str) -> Path {
336 self.base_path
338 .child(format!("{}.lance", table_name).as_str())
339 .child(".lance-reserved")
340 }
341}
342
343#[async_trait]
344impl LanceNamespace for DirectoryNamespace {
345 async fn list_namespaces(
346 &self,
347 request: ListNamespacesRequest,
348 ) -> Result<ListNamespacesResponse> {
349 Self::validate_root_namespace_id(&request.id)?;
351
352 Ok(ListNamespacesResponse::new(vec![]))
354 }
355
356 async fn describe_namespace(
357 &self,
358 request: DescribeNamespaceRequest,
359 ) -> Result<DescribeNamespaceResponse> {
360 Self::validate_root_namespace_id(&request.id)?;
362
363 Ok(DescribeNamespaceResponse {
365 properties: Some(HashMap::new()),
366 })
367 }
368
369 async fn create_namespace(
370 &self,
371 request: CreateNamespaceRequest,
372 ) -> Result<CreateNamespaceResponse> {
373 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
375 return Err(Error::Namespace {
376 source: "Root namespace already exists and cannot be created".into(),
377 location: snafu::location!(),
378 });
379 }
380
381 Err(Error::NotSupported {
383 source: "Directory namespace only supports the root namespace".into(),
384 location: snafu::location!(),
385 })
386 }
387
388 async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
389 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
391 return Err(Error::Namespace {
392 source: "Root namespace cannot be dropped".into(),
393 location: snafu::location!(),
394 });
395 }
396
397 Err(Error::NotSupported {
399 source: "Directory namespace only supports the root namespace".into(),
400 location: snafu::location!(),
401 })
402 }
403
404 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
405 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
407 return Ok(());
408 }
409
410 Err(Error::Namespace {
412 source: "Only root namespace exists in directory namespace".into(),
413 location: snafu::location!(),
414 })
415 }
416
417 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
418 Self::validate_root_namespace_id(&request.id)?;
419
420 let mut tables = Vec::new();
421
422 let entries = self
424 .object_store
425 .read_dir(self.base_path.clone())
426 .await
427 .map_err(|e| Error::IO {
428 source: box_error(std::io::Error::other(format!(
429 "Failed to list directory: {}",
430 e
431 ))),
432 location: snafu::location!(),
433 })?;
434
435 for entry in entries {
436 let path = entry.trim_end_matches('/');
437
438 if !path.ends_with(".lance") {
440 continue;
441 }
442
443 let table_name = &path[..path.len() - 6];
445
446 let mut is_table = false;
448
449 let reserved_file_path = self.table_reserved_file_path(table_name);
451 if self
452 .object_store
453 .exists(&reserved_file_path)
454 .await
455 .unwrap_or(false)
456 {
457 is_table = true;
458 }
459
460 if !is_table {
462 let versions_path = self.table_versions_path(table_name);
463 if let Ok(version_entries) = self.object_store.read_dir(versions_path).await {
464 if !version_entries.is_empty() {
466 is_table = true;
467 }
468 }
469 }
470
471 if is_table {
472 tables.push(table_name.to_string());
473 }
474 }
475
476 let response = ListTablesResponse::new(tables);
477 Ok(response)
478 }
479
480 async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
481 let table_name = Self::table_name_from_id(&request.id)?;
482 let table_uri = self.table_full_uri(&table_name);
483
484 let mut table_exists = false;
486
487 let reserved_file_path = self.table_reserved_file_path(&table_name);
489 if self
490 .object_store
491 .exists(&reserved_file_path)
492 .await
493 .unwrap_or(false)
494 {
495 table_exists = true;
496 }
497
498 if !table_exists {
500 let versions_path = self.table_versions_path(&table_name);
501 if let Ok(entries) = self.object_store.read_dir(versions_path).await {
502 if !entries.is_empty() {
503 table_exists = true;
504 }
505 }
506 }
507
508 if !table_exists {
509 return Err(Error::Namespace {
510 source: format!("Table does not exist: {}", table_name).into(),
511 location: snafu::location!(),
512 });
513 }
514
515 Ok(DescribeTableResponse {
516 version: None,
517 location: Some(table_uri),
518 schema: None,
519 properties: None,
520 storage_options: self.storage_options.clone(),
521 })
522 }
523
524 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
525 let table_name = Self::table_name_from_id(&request.id)?;
526
527 let mut table_exists = false;
529
530 let reserved_file_path = self.table_reserved_file_path(&table_name);
532 if self
533 .object_store
534 .exists(&reserved_file_path)
535 .await
536 .unwrap_or(false)
537 {
538 table_exists = true;
539 }
540
541 if !table_exists {
543 let versions_path = self.table_versions_path(&table_name);
544 if let Ok(entries) = self.object_store.read_dir(versions_path).await {
545 if !entries.is_empty() {
546 table_exists = true;
547 }
548 }
549 }
550
551 if !table_exists {
552 return Err(Error::Namespace {
553 source: format!("Table does not exist: {}", table_name).into(),
554 location: snafu::location!(),
555 });
556 }
557
558 Ok(())
559 }
560
561 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
562 let table_name = Self::table_name_from_id(&request.id)?;
563 let table_uri = self.table_full_uri(&table_name);
564
565 let table_path = self.table_path(&table_name);
567
568 self.object_store
569 .remove_dir_all(table_path)
570 .await
571 .map_err(|e| Error::Namespace {
572 source: format!("Failed to drop table {}: {}", table_name, e).into(),
573 location: snafu::location!(),
574 })?;
575
576 Ok(DropTableResponse {
577 id: request.id,
578 location: Some(table_uri),
579 properties: None,
580 transaction_id: None,
581 })
582 }
583
584 async fn create_table(
585 &self,
586 request: CreateTableRequest,
587 request_data: Bytes,
588 ) -> Result<CreateTableResponse> {
589 let table_name = Self::table_name_from_id(&request.id)?;
590 let table_uri = self.table_full_uri(&table_name);
591
592 if request_data.is_empty() {
594 return Err(Error::Namespace {
595 source: "Request data (Arrow IPC stream) is required for create_table".into(),
596 location: snafu::location!(),
597 });
598 }
599
600 if let Some(location) = &request.location {
602 let location = location.trim_end_matches('/');
603 if location != table_uri {
604 return Err(Error::Namespace {
605 source: format!(
606 "Cannot create table {} at location {}, must be at location {}",
607 table_name, location, table_uri
608 )
609 .into(),
610 location: snafu::location!(),
611 });
612 }
613 }
614
615 use arrow::ipc::reader::StreamReader;
617 use std::io::Cursor;
618
619 let cursor = Cursor::new(request_data.to_vec());
620 let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Namespace {
621 source: format!("Invalid Arrow IPC stream: {}", e).into(),
622 location: snafu::location!(),
623 })?;
624
625 let arrow_schema = stream_reader.schema();
627
628 let mut batches = Vec::new();
630 for batch_result in stream_reader {
631 batches.push(batch_result.map_err(|e| Error::Namespace {
632 source: format!("Failed to read batch from IPC stream: {}", e).into(),
633 location: snafu::location!(),
634 })?);
635 }
636
637 let reader = if batches.is_empty() {
639 let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
641 let batches = vec![Ok(batch)];
642 arrow::record_batch::RecordBatchIterator::new(batches, arrow_schema.clone())
643 } else {
644 let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
646 arrow::record_batch::RecordBatchIterator::new(batch_results, arrow_schema)
647 };
648
649 let store_params = self.storage_options.as_ref().map(|opts| ObjectStoreParams {
652 storage_options: Some(opts.clone()),
653 ..Default::default()
654 });
655
656 let write_params = WriteParams {
657 mode: lance::dataset::WriteMode::Create,
658 store_params,
659 ..Default::default()
660 };
661
662 Dataset::write(reader, &table_uri, Some(write_params))
664 .await
665 .map_err(|e| Error::Namespace {
666 source: format!("Failed to create Lance dataset: {}", e).into(),
667 location: snafu::location!(),
668 })?;
669
670 Ok(CreateTableResponse {
671 version: Some(1),
672 location: Some(table_uri),
673 properties: None,
674 storage_options: self.storage_options.clone(),
675 })
676 }
677
678 async fn create_empty_table(
679 &self,
680 request: CreateEmptyTableRequest,
681 ) -> Result<CreateEmptyTableResponse> {
682 let table_name = Self::table_name_from_id(&request.id)?;
683 let table_uri = self.table_full_uri(&table_name);
684
685 if let Some(location) = &request.location {
687 let location = location.trim_end_matches('/');
688 if location != table_uri {
689 return Err(Error::Namespace {
690 source: format!(
691 "Cannot create table {} at location {}, must be at location {}",
692 table_name, location, table_uri
693 )
694 .into(),
695 location: snafu::location!(),
696 });
697 }
698 }
699
700 let reserved_file_path = self.table_reserved_file_path(&table_name);
702
703 self.object_store
704 .create(&reserved_file_path)
705 .await
706 .map_err(|e| Error::Namespace {
707 source: format!(
708 "Failed to create .lance-reserved file for table {}: {}",
709 table_name, e
710 )
711 .into(),
712 location: snafu::location!(),
713 })?
714 .shutdown()
715 .await
716 .map_err(|e| Error::Namespace {
717 source: format!(
718 "Failed to finalize .lance-reserved file for table {}: {}",
719 table_name, e
720 )
721 .into(),
722 location: snafu::location!(),
723 })?;
724
725 Ok(CreateEmptyTableResponse {
726 location: Some(table_uri),
727 properties: None,
728 storage_options: self.storage_options.clone(),
729 })
730 }
731
732 fn namespace_id(&self) -> String {
733 format!("DirectoryNamespace {{ root: {:?} }}", self.root)
734 }
735}
736
737#[cfg(test)]
738mod tests {
739 use super::*;
740 use lance_core::utils::tempfile::TempStdDir;
741 use lance_namespace::models::{JsonArrowDataType, JsonArrowField, JsonArrowSchema};
742 use lance_namespace::schema::convert_json_arrow_schema;
743 use std::sync::Arc;
744
745 async fn create_test_namespace() -> (DirectoryNamespace, TempStdDir) {
747 let temp_dir = TempStdDir::default();
748
749 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
750 .build()
751 .await
752 .unwrap();
753 (namespace, temp_dir)
754 }
755
756 fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
758 use arrow::ipc::writer::StreamWriter;
759
760 let arrow_schema = convert_json_arrow_schema(schema).unwrap();
761 let arrow_schema = Arc::new(arrow_schema);
762 let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
763 let mut buffer = Vec::new();
764 {
765 let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
766 writer.write(&batch).unwrap();
767 writer.finish().unwrap();
768 }
769 buffer
770 }
771
772 fn create_test_schema() -> JsonArrowSchema {
774 let int_type = JsonArrowDataType::new("int32".to_string());
775 let string_type = JsonArrowDataType::new("utf8".to_string());
776
777 let id_field = JsonArrowField {
778 name: "id".to_string(),
779 r#type: Box::new(int_type),
780 nullable: false,
781 metadata: None,
782 };
783
784 let name_field = JsonArrowField {
785 name: "name".to_string(),
786 r#type: Box::new(string_type),
787 nullable: true,
788 metadata: None,
789 };
790
791 JsonArrowSchema {
792 fields: vec![id_field, name_field],
793 metadata: None,
794 }
795 }
796
797 #[tokio::test]
798 async fn test_create_table() {
799 let (namespace, _temp_dir) = create_test_namespace().await;
800
801 let schema = create_test_schema();
803 let ipc_data = create_test_ipc_data(&schema);
804
805 let mut request = CreateTableRequest::new();
806 request.id = Some(vec!["test_table".to_string()]);
807
808 let response = namespace
809 .create_table(request, bytes::Bytes::from(ipc_data))
810 .await
811 .unwrap();
812
813 assert!(response.location.is_some());
814 assert!(response.location.unwrap().ends_with("test_table.lance"));
815 assert_eq!(response.version, Some(1));
816 }
817
818 #[tokio::test]
819 async fn test_create_table_without_data() {
820 let (namespace, _temp_dir) = create_test_namespace().await;
821
822 let mut request = CreateTableRequest::new();
823 request.id = Some(vec!["test_table".to_string()]);
824
825 let result = namespace.create_table(request, bytes::Bytes::new()).await;
826 assert!(result.is_err());
827 assert!(result
828 .unwrap_err()
829 .to_string()
830 .contains("Arrow IPC stream) is required"));
831 }
832
833 #[tokio::test]
834 async fn test_create_table_with_invalid_id() {
835 let (namespace, _temp_dir) = create_test_namespace().await;
836
837 let schema = create_test_schema();
839 let ipc_data = create_test_ipc_data(&schema);
840
841 let mut request = CreateTableRequest::new();
843 request.id = Some(vec![]);
844
845 let result = namespace
846 .create_table(request, bytes::Bytes::from(ipc_data.clone()))
847 .await;
848 assert!(result.is_err());
849
850 let mut request = CreateTableRequest::new();
852 request.id = Some(vec!["namespace".to_string(), "table".to_string()]);
853
854 let result = namespace
855 .create_table(request, bytes::Bytes::from(ipc_data))
856 .await;
857 assert!(result.is_err());
858 assert!(result
859 .unwrap_err()
860 .to_string()
861 .contains("single-level table IDs"));
862 }
863
864 #[tokio::test]
865 async fn test_create_table_with_wrong_location() {
866 let (namespace, _temp_dir) = create_test_namespace().await;
867
868 let schema = create_test_schema();
870 let ipc_data = create_test_ipc_data(&schema);
871
872 let mut request = CreateTableRequest::new();
873 request.id = Some(vec!["test_table".to_string()]);
874 request.location = Some("/wrong/path/table.lance".to_string());
875
876 let result = namespace
877 .create_table(request, bytes::Bytes::from(ipc_data))
878 .await;
879 assert!(result.is_err());
880 assert!(result
881 .unwrap_err()
882 .to_string()
883 .contains("must be at location"));
884 }
885
886 #[tokio::test]
887 async fn test_list_tables() {
888 let (namespace, _temp_dir) = create_test_namespace().await;
889
890 let request = ListTablesRequest::new();
892 let response = namespace.list_tables(request).await.unwrap();
893 assert_eq!(response.tables.len(), 0);
894
895 let schema = create_test_schema();
897 let ipc_data = create_test_ipc_data(&schema);
898
899 let mut create_request = CreateTableRequest::new();
901 create_request.id = Some(vec!["table1".to_string()]);
902 namespace
903 .create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
904 .await
905 .unwrap();
906
907 let mut create_request = CreateTableRequest::new();
909 create_request.id = Some(vec!["table2".to_string()]);
910 namespace
911 .create_table(create_request, bytes::Bytes::from(ipc_data))
912 .await
913 .unwrap();
914
915 let request = ListTablesRequest::new();
917 let response = namespace.list_tables(request).await.unwrap();
918 let tables = response.tables;
919 assert_eq!(tables.len(), 2);
920 assert!(tables.contains(&"table1".to_string()));
921 assert!(tables.contains(&"table2".to_string()));
922 }
923
924 #[tokio::test]
925 async fn test_list_tables_with_namespace_id() {
926 let (namespace, _temp_dir) = create_test_namespace().await;
927
928 let mut request = ListTablesRequest::new();
929 request.id = Some(vec!["namespace".to_string()]);
930
931 let result = namespace.list_tables(request).await;
932 assert!(result.is_err());
933 assert!(result
934 .unwrap_err()
935 .to_string()
936 .contains("root namespace operations"));
937 }
938
939 #[tokio::test]
940 async fn test_describe_table() {
941 let (namespace, _temp_dir) = create_test_namespace().await;
942
943 let schema = create_test_schema();
945 let ipc_data = create_test_ipc_data(&schema);
946
947 let mut create_request = CreateTableRequest::new();
948 create_request.id = Some(vec!["test_table".to_string()]);
949 namespace
950 .create_table(create_request, bytes::Bytes::from(ipc_data))
951 .await
952 .unwrap();
953
954 let mut request = DescribeTableRequest::new();
956 request.id = Some(vec!["test_table".to_string()]);
957 let response = namespace.describe_table(request).await.unwrap();
958
959 assert!(response.location.is_some());
960 assert!(response.location.unwrap().ends_with("test_table.lance"));
961 }
962
963 #[tokio::test]
964 async fn test_describe_nonexistent_table() {
965 let (namespace, _temp_dir) = create_test_namespace().await;
966
967 let mut request = DescribeTableRequest::new();
968 request.id = Some(vec!["nonexistent".to_string()]);
969
970 let result = namespace.describe_table(request).await;
971 assert!(result.is_err());
972 assert!(result
973 .unwrap_err()
974 .to_string()
975 .contains("Table does not exist"));
976 }
977
978 #[tokio::test]
979 async fn test_table_exists() {
980 let (namespace, _temp_dir) = create_test_namespace().await;
981
982 let schema = create_test_schema();
984 let ipc_data = create_test_ipc_data(&schema);
985
986 let mut create_request = CreateTableRequest::new();
987 create_request.id = Some(vec!["existing_table".to_string()]);
988 namespace
989 .create_table(create_request, bytes::Bytes::from(ipc_data))
990 .await
991 .unwrap();
992
993 let mut request = TableExistsRequest::new();
995 request.id = Some(vec!["existing_table".to_string()]);
996 let result = namespace.table_exists(request).await;
997 assert!(result.is_ok());
998
999 let mut request = TableExistsRequest::new();
1001 request.id = Some(vec!["nonexistent".to_string()]);
1002 let result = namespace.table_exists(request).await;
1003 assert!(result.is_err());
1004 assert!(result
1005 .unwrap_err()
1006 .to_string()
1007 .contains("Table does not exist"));
1008 }
1009
1010 #[tokio::test]
1011 async fn test_drop_table() {
1012 let (namespace, _temp_dir) = create_test_namespace().await;
1013
1014 let schema = create_test_schema();
1016 let ipc_data = create_test_ipc_data(&schema);
1017
1018 let mut create_request = CreateTableRequest::new();
1019 create_request.id = Some(vec!["table_to_drop".to_string()]);
1020 namespace
1021 .create_table(create_request, bytes::Bytes::from(ipc_data))
1022 .await
1023 .unwrap();
1024
1025 let mut exists_request = TableExistsRequest::new();
1027 exists_request.id = Some(vec!["table_to_drop".to_string()]);
1028 assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
1029
1030 let mut drop_request = DropTableRequest::new();
1032 drop_request.id = Some(vec!["table_to_drop".to_string()]);
1033 let response = namespace.drop_table(drop_request).await.unwrap();
1034 assert!(response.location.is_some());
1035
1036 assert!(namespace.table_exists(exists_request).await.is_err());
1038 }
1039
1040 #[tokio::test]
1041 async fn test_drop_nonexistent_table() {
1042 let (namespace, _temp_dir) = create_test_namespace().await;
1043
1044 let mut request = DropTableRequest::new();
1045 request.id = Some(vec!["nonexistent".to_string()]);
1046
1047 let result = namespace.drop_table(request).await;
1049 let _ = result;
1052 }
1053
1054 #[tokio::test]
1055 async fn test_root_namespace_operations() {
1056 let (namespace, _temp_dir) = create_test_namespace().await;
1057
1058 let request = ListNamespacesRequest::new();
1060 let result = namespace.list_namespaces(request).await;
1061 assert!(result.is_ok());
1062 assert_eq!(result.unwrap().namespaces.len(), 0);
1063
1064 let request = DescribeNamespaceRequest::new();
1066 let result = namespace.describe_namespace(request).await;
1067 assert!(result.is_ok());
1068
1069 let request = NamespaceExistsRequest::new();
1071 let result = namespace.namespace_exists(request).await;
1072 assert!(result.is_ok());
1073
1074 let request = CreateNamespaceRequest::new();
1076 let result = namespace.create_namespace(request).await;
1077 assert!(result.is_err());
1078 assert!(result.unwrap_err().to_string().contains("already exists"));
1079
1080 let request = DropNamespaceRequest::new();
1082 let result = namespace.drop_namespace(request).await;
1083 assert!(result.is_err());
1084 assert!(result
1085 .unwrap_err()
1086 .to_string()
1087 .contains("cannot be dropped"));
1088 }
1089
1090 #[tokio::test]
1091 async fn test_non_root_namespace_operations() {
1092 let (namespace, _temp_dir) = create_test_namespace().await;
1093
1094 let mut request = CreateNamespaceRequest::new();
1096 request.id = Some(vec!["child".to_string()]);
1097 let result = namespace.create_namespace(request).await;
1098 assert!(matches!(result, Err(Error::NotSupported { .. })));
1099
1100 let mut request = NamespaceExistsRequest::new();
1102 request.id = Some(vec!["child".to_string()]);
1103 let result = namespace.namespace_exists(request).await;
1104 assert!(result.is_err());
1105 assert!(result
1106 .unwrap_err()
1107 .to_string()
1108 .contains("Only root namespace exists"));
1109
1110 let mut request = DropNamespaceRequest::new();
1112 request.id = Some(vec!["child".to_string()]);
1113 let result = namespace.drop_namespace(request).await;
1114 assert!(matches!(result, Err(Error::NotSupported { .. })));
1115 }
1116
1117 #[tokio::test]
1118 async fn test_config_custom_root() {
1119 let temp_dir = TempStdDir::default();
1120 let custom_path = temp_dir.join("custom");
1121 std::fs::create_dir(&custom_path).unwrap();
1122
1123 let namespace = DirectoryNamespaceBuilder::new(custom_path.to_string_lossy().to_string())
1124 .build()
1125 .await
1126 .unwrap();
1127
1128 let schema = create_test_schema();
1130 let ipc_data = create_test_ipc_data(&schema);
1131
1132 let mut request = CreateTableRequest::new();
1134 request.id = Some(vec!["test_table".to_string()]);
1135
1136 let response = namespace
1137 .create_table(request, bytes::Bytes::from(ipc_data))
1138 .await
1139 .unwrap();
1140
1141 assert!(response.location.unwrap().contains("custom"));
1142 }
1143
1144 #[tokio::test]
1145 async fn test_config_storage_options() {
1146 let temp_dir = TempStdDir::default();
1147
1148 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1149 .storage_option("option1", "value1")
1150 .storage_option("option2", "value2")
1151 .build()
1152 .await
1153 .unwrap();
1154
1155 let schema = create_test_schema();
1157 let ipc_data = create_test_ipc_data(&schema);
1158
1159 let mut request = CreateTableRequest::new();
1161 request.id = Some(vec!["test_table".to_string()]);
1162
1163 let response = namespace
1164 .create_table(request, bytes::Bytes::from(ipc_data))
1165 .await
1166 .unwrap();
1167
1168 let storage_options = response.storage_options.unwrap();
1169 assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
1170 assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
1171 }
1172
1173 #[tokio::test]
1174 async fn test_various_arrow_types() {
1175 let (namespace, _temp_dir) = create_test_namespace().await;
1176
1177 let fields = vec![
1179 JsonArrowField {
1180 name: "bool_col".to_string(),
1181 r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
1182 nullable: true,
1183 metadata: None,
1184 },
1185 JsonArrowField {
1186 name: "int8_col".to_string(),
1187 r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
1188 nullable: true,
1189 metadata: None,
1190 },
1191 JsonArrowField {
1192 name: "float64_col".to_string(),
1193 r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
1194 nullable: true,
1195 metadata: None,
1196 },
1197 JsonArrowField {
1198 name: "binary_col".to_string(),
1199 r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
1200 nullable: true,
1201 metadata: None,
1202 },
1203 ];
1204
1205 let schema = JsonArrowSchema {
1206 fields,
1207 metadata: None,
1208 };
1209
1210 let ipc_data = create_test_ipc_data(&schema);
1212
1213 let mut request = CreateTableRequest::new();
1214 request.id = Some(vec!["complex_table".to_string()]);
1215
1216 let response = namespace
1217 .create_table(request, bytes::Bytes::from(ipc_data))
1218 .await
1219 .unwrap();
1220
1221 assert!(response.location.is_some());
1222 }
1223
1224 #[tokio::test]
1225 async fn test_connect_dir() {
1226 let temp_dir = TempStdDir::default();
1227
1228 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1229 .build()
1230 .await
1231 .unwrap();
1232
1233 let request = ListTablesRequest::new();
1235 let response = namespace.list_tables(request).await.unwrap();
1236 assert_eq!(response.tables.len(), 0);
1237 }
1238
1239 #[tokio::test]
1240 async fn test_create_table_with_ipc_data() {
1241 use arrow::array::{Int32Array, StringArray};
1242 use arrow::ipc::writer::StreamWriter;
1243
1244 let (namespace, _temp_dir) = create_test_namespace().await;
1245
1246 let schema = create_test_schema();
1248
1249 let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
1251 let arrow_schema = Arc::new(arrow_schema);
1252
1253 let id_array = Int32Array::from(vec![1, 2, 3]);
1255 let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
1256 let batch = arrow::record_batch::RecordBatch::try_new(
1257 arrow_schema.clone(),
1258 vec![Arc::new(id_array), Arc::new(name_array)],
1259 )
1260 .unwrap();
1261
1262 let mut buffer = Vec::new();
1264 {
1265 let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
1266 writer.write(&batch).unwrap();
1267 writer.finish().unwrap();
1268 }
1269
1270 let mut request = CreateTableRequest::new();
1272 request.id = Some(vec!["test_table_with_data".to_string()]);
1273
1274 let response = namespace
1275 .create_table(request, Bytes::from(buffer))
1276 .await
1277 .unwrap();
1278
1279 assert_eq!(response.version, Some(1));
1280 assert!(response
1281 .location
1282 .unwrap()
1283 .contains("test_table_with_data.lance"));
1284
1285 let mut exists_request = TableExistsRequest::new();
1287 exists_request.id = Some(vec!["test_table_with_data".to_string()]);
1288 namespace.table_exists(exists_request).await.unwrap();
1289 }
1290
1291 #[tokio::test]
1292 async fn test_create_empty_table() {
1293 let (namespace, temp_dir) = create_test_namespace().await;
1294
1295 let mut request = CreateEmptyTableRequest::new();
1296 request.id = Some(vec!["empty_table".to_string()]);
1297
1298 let response = namespace.create_empty_table(request).await.unwrap();
1299
1300 assert!(response.location.is_some());
1301 assert!(response.location.unwrap().ends_with("empty_table.lance"));
1302
1303 let table_dir = temp_dir.join("empty_table.lance");
1305 assert!(table_dir.exists());
1306 assert!(table_dir.is_dir());
1307
1308 let reserved_file = table_dir.join(".lance-reserved");
1309 assert!(reserved_file.exists());
1310 assert!(reserved_file.is_file());
1311
1312 let metadata = std::fs::metadata(&reserved_file).unwrap();
1314 assert_eq!(metadata.len(), 0);
1315
1316 let mut exists_request = TableExistsRequest::new();
1318 exists_request.id = Some(vec!["empty_table".to_string()]);
1319 namespace.table_exists(exists_request).await.unwrap();
1320
1321 let list_request = ListTablesRequest::new();
1323 let list_response = namespace.list_tables(list_request).await.unwrap();
1324 assert!(list_response.tables.contains(&"empty_table".to_string()));
1325
1326 let mut describe_request = DescribeTableRequest::new();
1328 describe_request.id = Some(vec!["empty_table".to_string()]);
1329 let describe_response = namespace.describe_table(describe_request).await.unwrap();
1330 assert!(describe_response.location.is_some());
1331 assert!(describe_response.location.unwrap().contains("empty_table"));
1332 }
1333
1334 #[tokio::test]
1335 async fn test_create_empty_table_with_wrong_location() {
1336 let (namespace, _temp_dir) = create_test_namespace().await;
1337
1338 let mut request = CreateEmptyTableRequest::new();
1339 request.id = Some(vec!["test_table".to_string()]);
1340 request.location = Some("/wrong/path/table.lance".to_string());
1341
1342 let result = namespace.create_empty_table(request).await;
1343 assert!(result.is_err());
1344 assert!(result
1345 .unwrap_err()
1346 .to_string()
1347 .contains("must be at location"));
1348 }
1349
1350 #[tokio::test]
1351 async fn test_create_empty_table_then_drop() {
1352 let (namespace, temp_dir) = create_test_namespace().await;
1353
1354 let mut create_request = CreateEmptyTableRequest::new();
1356 create_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1357
1358 let create_response = namespace.create_empty_table(create_request).await.unwrap();
1359 assert!(create_response.location.is_some());
1360
1361 let table_dir = temp_dir.join("empty_table_to_drop.lance");
1363 assert!(table_dir.exists());
1364 let reserved_file = table_dir.join(".lance-reserved");
1365 assert!(reserved_file.exists());
1366
1367 let mut drop_request = DropTableRequest::new();
1369 drop_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1370 let drop_response = namespace.drop_table(drop_request).await.unwrap();
1371 assert!(drop_response.location.is_some());
1372
1373 assert!(!table_dir.exists());
1375 assert!(!reserved_file.exists());
1376
1377 let mut exists_request = TableExistsRequest::new();
1379 exists_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1380 let exists_result = namespace.table_exists(exists_request).await;
1381 assert!(exists_result.is_err());
1382 }
1383}