1use std::collections::HashMap;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use lance::dataset::{Dataset, WriteParams};
15use lance::session::Session;
16use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
17use object_store::path::Path;
18
19use lance_namespace::models::{
20 CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
21 CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DescribeNamespaceRequest,
22 DescribeNamespaceResponse, DescribeTableRequest, DescribeTableResponse, DropNamespaceRequest,
23 DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest,
24 ListNamespacesResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest,
25 TableExistsRequest,
26};
27
28use lance_core::{box_error, Error, Result};
29use lance_namespace::LanceNamespace;
30
31#[derive(Debug, Clone)]
66pub struct DirectoryNamespaceBuilder {
67 root: String,
68 storage_options: Option<HashMap<String, String>>,
69 session: Option<Arc<Session>>,
70}
71
72impl DirectoryNamespaceBuilder {
73 pub fn new(root: impl Into<String>) -> Self {
79 Self {
80 root: root.into().trim_end_matches('/').to_string(),
81 storage_options: None,
82 session: None,
83 }
84 }
85
86 pub fn from_properties(
123 properties: HashMap<String, String>,
124 session: Option<Arc<Session>>,
125 ) -> Result<Self> {
126 let root = properties
128 .get("root")
129 .cloned()
130 .ok_or_else(|| Error::Namespace {
131 source: "Missing required property 'root' for directory namespace".into(),
132 location: snafu::location!(),
133 })?;
134
135 let storage_options: HashMap<String, String> = properties
137 .iter()
138 .filter_map(|(k, v)| {
139 k.strip_prefix("storage.")
140 .map(|key| (key.to_string(), v.clone()))
141 })
142 .collect();
143
144 let storage_options = if storage_options.is_empty() {
145 None
146 } else {
147 Some(storage_options)
148 };
149
150 Ok(Self {
151 root: root.trim_end_matches('/').to_string(),
152 storage_options,
153 session,
154 })
155 }
156
157 pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
164 self.storage_options
165 .get_or_insert_with(HashMap::new)
166 .insert(key.into(), value.into());
167 self
168 }
169
170 pub fn storage_options(mut self, options: HashMap<String, String>) -> Self {
176 self.storage_options
177 .get_or_insert_with(HashMap::new)
178 .extend(options);
179 self
180 }
181
182 pub fn session(mut self, session: Arc<Session>) -> Self {
192 self.session = Some(session);
193 self
194 }
195
196 pub async fn build(self) -> Result<DirectoryNamespace> {
209 let (object_store, base_path) =
210 Self::initialize_object_store(&self.root, &self.storage_options, &self.session).await?;
211
212 Ok(DirectoryNamespace {
213 root: self.root,
214 storage_options: self.storage_options,
215 session: self.session,
216 object_store,
217 base_path,
218 })
219 }
220
221 async fn initialize_object_store(
223 root: &str,
224 storage_options: &Option<HashMap<String, String>>,
225 session: &Option<Arc<Session>>,
226 ) -> Result<(Arc<ObjectStore>, Path)> {
227 let params = ObjectStoreParams {
229 storage_options: storage_options.clone(),
230 ..Default::default()
231 };
232
233 let registry = if let Some(session) = session {
235 session.store_registry()
236 } else {
237 Arc::new(ObjectStoreRegistry::default())
238 };
239
240 let (object_store, base_path) = ObjectStore::from_uri_and_params(registry, root, ¶ms)
242 .await
243 .map_err(|e| Error::Namespace {
244 source: format!("Failed to create object store: {}", e).into(),
245 location: snafu::location!(),
246 })?;
247
248 Ok((object_store, base_path))
249 }
250}
251
252pub struct DirectoryNamespace {
257 root: String,
258 storage_options: Option<HashMap<String, String>>,
259 #[allow(dead_code)]
260 session: Option<Arc<Session>>,
261 object_store: Arc<ObjectStore>,
262 base_path: Path,
263}
264
265impl DirectoryNamespace {
266 fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
268 if let Some(id) = id {
269 if !id.is_empty() {
270 return Err(Error::Namespace {
271 source: format!(
272 "Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
273 id
274 ).into(),
275 location: snafu::location!(),
276 });
277 }
278 }
279 Ok(())
280 }
281
282 fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
284 let id = id.as_ref().ok_or_else(|| Error::Namespace {
285 source: "Directory namespace table ID cannot be empty".into(),
286 location: snafu::location!(),
287 })?;
288
289 if id.len() != 1 {
290 return Err(Error::Namespace {
291 source: format!(
292 "Directory namespace only supports single-level table IDs, but got: {:?}",
293 id
294 )
295 .into(),
296 location: snafu::location!(),
297 });
298 }
299
300 Ok(id[0].clone())
301 }
302
303 fn table_full_uri(&self, table_name: &str) -> String {
305 format!("{}/{}.lance", &self.root, table_name)
306 }
307
308 fn table_path(&self, table_name: &str) -> Path {
310 self.base_path
311 .child(format!("{}.lance", table_name).as_str())
312 }
313
314 fn table_versions_path(&self, table_name: &str) -> Path {
316 self.base_path
318 .child(format!("{}.lance", table_name).as_str())
319 .child("_versions")
320 }
321
322 fn table_reserved_file_path(&self, table_name: &str) -> Path {
324 self.base_path
326 .child(format!("{}.lance", table_name).as_str())
327 .child(".lance-reserved")
328 }
329}
330
331#[async_trait]
332impl LanceNamespace for DirectoryNamespace {
333 async fn list_namespaces(
334 &self,
335 request: ListNamespacesRequest,
336 ) -> Result<ListNamespacesResponse> {
337 Self::validate_root_namespace_id(&request.id)?;
339
340 Ok(ListNamespacesResponse::new(vec![]))
342 }
343
344 async fn describe_namespace(
345 &self,
346 request: DescribeNamespaceRequest,
347 ) -> Result<DescribeNamespaceResponse> {
348 Self::validate_root_namespace_id(&request.id)?;
350
351 Ok(DescribeNamespaceResponse {
353 properties: Some(HashMap::new()),
354 })
355 }
356
357 async fn create_namespace(
358 &self,
359 request: CreateNamespaceRequest,
360 ) -> Result<CreateNamespaceResponse> {
361 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
363 return Err(Error::Namespace {
364 source: "Root namespace already exists and cannot be created".into(),
365 location: snafu::location!(),
366 });
367 }
368
369 Err(Error::NotSupported {
371 source: "Directory namespace only supports the root namespace".into(),
372 location: snafu::location!(),
373 })
374 }
375
376 async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
377 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
379 return Err(Error::Namespace {
380 source: "Root namespace cannot be dropped".into(),
381 location: snafu::location!(),
382 });
383 }
384
385 Err(Error::NotSupported {
387 source: "Directory namespace only supports the root namespace".into(),
388 location: snafu::location!(),
389 })
390 }
391
392 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
393 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
395 return Ok(());
396 }
397
398 Err(Error::Namespace {
400 source: "Only root namespace exists in directory namespace".into(),
401 location: snafu::location!(),
402 })
403 }
404
405 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
406 Self::validate_root_namespace_id(&request.id)?;
407
408 let mut tables = Vec::new();
409
410 let entries = self
412 .object_store
413 .read_dir(self.base_path.clone())
414 .await
415 .map_err(|e| Error::IO {
416 source: box_error(std::io::Error::other(format!(
417 "Failed to list directory: {}",
418 e
419 ))),
420 location: snafu::location!(),
421 })?;
422
423 for entry in entries {
424 let path = entry.trim_end_matches('/');
425
426 if !path.ends_with(".lance") {
428 continue;
429 }
430
431 let table_name = &path[..path.len() - 6];
433
434 let mut is_table = false;
436
437 let reserved_file_path = self.table_reserved_file_path(table_name);
439 if self
440 .object_store
441 .exists(&reserved_file_path)
442 .await
443 .unwrap_or(false)
444 {
445 is_table = true;
446 }
447
448 if !is_table {
450 let versions_path = self.table_versions_path(table_name);
451 if let Ok(version_entries) = self.object_store.read_dir(versions_path).await {
452 if !version_entries.is_empty() {
454 is_table = true;
455 }
456 }
457 }
458
459 if is_table {
460 tables.push(table_name.to_string());
461 }
462 }
463
464 let response = ListTablesResponse::new(tables);
465 Ok(response)
466 }
467
468 async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
469 let table_name = Self::table_name_from_id(&request.id)?;
470 let table_uri = self.table_full_uri(&table_name);
471
472 let mut table_exists = false;
474
475 let reserved_file_path = self.table_reserved_file_path(&table_name);
477 if self
478 .object_store
479 .exists(&reserved_file_path)
480 .await
481 .unwrap_or(false)
482 {
483 table_exists = true;
484 }
485
486 if !table_exists {
488 let versions_path = self.table_versions_path(&table_name);
489 if let Ok(entries) = self.object_store.read_dir(versions_path).await {
490 if !entries.is_empty() {
491 table_exists = true;
492 }
493 }
494 }
495
496 if !table_exists {
497 return Err(Error::Namespace {
498 source: format!("Table does not exist: {}", table_name).into(),
499 location: snafu::location!(),
500 });
501 }
502
503 Ok(DescribeTableResponse {
504 version: None,
505 location: Some(table_uri),
506 schema: None,
507 properties: None,
508 storage_options: self.storage_options.clone(),
509 })
510 }
511
512 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
513 let table_name = Self::table_name_from_id(&request.id)?;
514
515 let mut table_exists = false;
517
518 let reserved_file_path = self.table_reserved_file_path(&table_name);
520 if self
521 .object_store
522 .exists(&reserved_file_path)
523 .await
524 .unwrap_or(false)
525 {
526 table_exists = true;
527 }
528
529 if !table_exists {
531 let versions_path = self.table_versions_path(&table_name);
532 if let Ok(entries) = self.object_store.read_dir(versions_path).await {
533 if !entries.is_empty() {
534 table_exists = true;
535 }
536 }
537 }
538
539 if !table_exists {
540 return Err(Error::Namespace {
541 source: format!("Table does not exist: {}", table_name).into(),
542 location: snafu::location!(),
543 });
544 }
545
546 Ok(())
547 }
548
549 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
550 let table_name = Self::table_name_from_id(&request.id)?;
551 let table_uri = self.table_full_uri(&table_name);
552
553 let table_path = self.table_path(&table_name);
555
556 self.object_store
557 .remove_dir_all(table_path)
558 .await
559 .map_err(|e| Error::Namespace {
560 source: format!("Failed to drop table {}: {}", table_name, e).into(),
561 location: snafu::location!(),
562 })?;
563
564 Ok(DropTableResponse {
565 id: request.id,
566 location: Some(table_uri),
567 properties: None,
568 transaction_id: None,
569 })
570 }
571
572 async fn create_table(
573 &self,
574 request: CreateTableRequest,
575 request_data: Bytes,
576 ) -> Result<CreateTableResponse> {
577 let table_name = Self::table_name_from_id(&request.id)?;
578 let table_uri = self.table_full_uri(&table_name);
579
580 if request_data.is_empty() {
582 return Err(Error::Namespace {
583 source: "Request data (Arrow IPC stream) is required for create_table".into(),
584 location: snafu::location!(),
585 });
586 }
587
588 if let Some(location) = &request.location {
590 let location = location.trim_end_matches('/');
591 if location != table_uri {
592 return Err(Error::Namespace {
593 source: format!(
594 "Cannot create table {} at location {}, must be at location {}",
595 table_name, location, table_uri
596 )
597 .into(),
598 location: snafu::location!(),
599 });
600 }
601 }
602
603 use arrow::ipc::reader::StreamReader;
605 use std::io::Cursor;
606
607 let cursor = Cursor::new(request_data.to_vec());
608 let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Namespace {
609 source: format!("Invalid Arrow IPC stream: {}", e).into(),
610 location: snafu::location!(),
611 })?;
612
613 let arrow_schema = stream_reader.schema();
615
616 let mut batches = Vec::new();
618 for batch_result in stream_reader {
619 batches.push(batch_result.map_err(|e| Error::Namespace {
620 source: format!("Failed to read batch from IPC stream: {}", e).into(),
621 location: snafu::location!(),
622 })?);
623 }
624
625 let reader = if batches.is_empty() {
627 let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
629 let batches = vec![Ok(batch)];
630 arrow::record_batch::RecordBatchIterator::new(batches, arrow_schema.clone())
631 } else {
632 let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
634 arrow::record_batch::RecordBatchIterator::new(batch_results, arrow_schema)
635 };
636
637 let store_params = self.storage_options.as_ref().map(|opts| ObjectStoreParams {
640 storage_options: Some(opts.clone()),
641 ..Default::default()
642 });
643
644 let write_params = WriteParams {
645 mode: lance::dataset::WriteMode::Create,
646 store_params,
647 ..Default::default()
648 };
649
650 Dataset::write(reader, &table_uri, Some(write_params))
652 .await
653 .map_err(|e| Error::Namespace {
654 source: format!("Failed to create Lance dataset: {}", e).into(),
655 location: snafu::location!(),
656 })?;
657
658 Ok(CreateTableResponse {
659 version: Some(1),
660 location: Some(table_uri),
661 properties: None,
662 storage_options: self.storage_options.clone(),
663 })
664 }
665
666 async fn create_empty_table(
667 &self,
668 request: CreateEmptyTableRequest,
669 ) -> Result<CreateEmptyTableResponse> {
670 let table_name = Self::table_name_from_id(&request.id)?;
671 let table_uri = self.table_full_uri(&table_name);
672
673 if let Some(location) = &request.location {
675 let location = location.trim_end_matches('/');
676 if location != table_uri {
677 return Err(Error::Namespace {
678 source: format!(
679 "Cannot create table {} at location {}, must be at location {}",
680 table_name, location, table_uri
681 )
682 .into(),
683 location: snafu::location!(),
684 });
685 }
686 }
687
688 let reserved_file_path = self.table_reserved_file_path(&table_name);
690
691 self.object_store
692 .create(&reserved_file_path)
693 .await
694 .map_err(|e| Error::Namespace {
695 source: format!(
696 "Failed to create .lance-reserved file for table {}: {}",
697 table_name, e
698 )
699 .into(),
700 location: snafu::location!(),
701 })?
702 .shutdown()
703 .await
704 .map_err(|e| Error::Namespace {
705 source: format!(
706 "Failed to finalize .lance-reserved file for table {}: {}",
707 table_name, e
708 )
709 .into(),
710 location: snafu::location!(),
711 })?;
712
713 Ok(CreateEmptyTableResponse {
714 location: Some(table_uri),
715 properties: None,
716 storage_options: self.storage_options.clone(),
717 })
718 }
719}
720
721#[cfg(test)]
722mod tests {
723 use super::*;
724 use lance_core::utils::tempfile::TempStdDir;
725 use lance_namespace::models::{JsonArrowDataType, JsonArrowField, JsonArrowSchema};
726 use lance_namespace::schema::convert_json_arrow_schema;
727 use std::sync::Arc;
728
729 async fn create_test_namespace() -> (DirectoryNamespace, TempStdDir) {
731 let temp_dir = TempStdDir::default();
732
733 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
734 .build()
735 .await
736 .unwrap();
737 (namespace, temp_dir)
738 }
739
740 fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
742 use arrow::ipc::writer::StreamWriter;
743
744 let arrow_schema = convert_json_arrow_schema(schema).unwrap();
745 let arrow_schema = Arc::new(arrow_schema);
746 let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
747 let mut buffer = Vec::new();
748 {
749 let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
750 writer.write(&batch).unwrap();
751 writer.finish().unwrap();
752 }
753 buffer
754 }
755
756 fn create_test_schema() -> JsonArrowSchema {
758 let int_type = JsonArrowDataType::new("int32".to_string());
759 let string_type = JsonArrowDataType::new("utf8".to_string());
760
761 let id_field = JsonArrowField {
762 name: "id".to_string(),
763 r#type: Box::new(int_type),
764 nullable: false,
765 metadata: None,
766 };
767
768 let name_field = JsonArrowField {
769 name: "name".to_string(),
770 r#type: Box::new(string_type),
771 nullable: true,
772 metadata: None,
773 };
774
775 JsonArrowSchema {
776 fields: vec![id_field, name_field],
777 metadata: None,
778 }
779 }
780
781 #[tokio::test]
782 async fn test_create_table() {
783 let (namespace, _temp_dir) = create_test_namespace().await;
784
785 let schema = create_test_schema();
787 let ipc_data = create_test_ipc_data(&schema);
788
789 let mut request = CreateTableRequest::new();
790 request.id = Some(vec!["test_table".to_string()]);
791
792 let response = namespace
793 .create_table(request, bytes::Bytes::from(ipc_data))
794 .await
795 .unwrap();
796
797 assert!(response.location.is_some());
798 assert!(response.location.unwrap().ends_with("test_table.lance"));
799 assert_eq!(response.version, Some(1));
800 }
801
802 #[tokio::test]
803 async fn test_create_table_without_data() {
804 let (namespace, _temp_dir) = create_test_namespace().await;
805
806 let mut request = CreateTableRequest::new();
807 request.id = Some(vec!["test_table".to_string()]);
808
809 let result = namespace.create_table(request, bytes::Bytes::new()).await;
810 assert!(result.is_err());
811 assert!(result
812 .unwrap_err()
813 .to_string()
814 .contains("Arrow IPC stream) is required"));
815 }
816
817 #[tokio::test]
818 async fn test_create_table_with_invalid_id() {
819 let (namespace, _temp_dir) = create_test_namespace().await;
820
821 let schema = create_test_schema();
823 let ipc_data = create_test_ipc_data(&schema);
824
825 let mut request = CreateTableRequest::new();
827 request.id = Some(vec![]);
828
829 let result = namespace
830 .create_table(request, bytes::Bytes::from(ipc_data.clone()))
831 .await;
832 assert!(result.is_err());
833
834 let mut request = CreateTableRequest::new();
836 request.id = Some(vec!["namespace".to_string(), "table".to_string()]);
837
838 let result = namespace
839 .create_table(request, bytes::Bytes::from(ipc_data))
840 .await;
841 assert!(result.is_err());
842 assert!(result
843 .unwrap_err()
844 .to_string()
845 .contains("single-level table IDs"));
846 }
847
848 #[tokio::test]
849 async fn test_create_table_with_wrong_location() {
850 let (namespace, _temp_dir) = create_test_namespace().await;
851
852 let schema = create_test_schema();
854 let ipc_data = create_test_ipc_data(&schema);
855
856 let mut request = CreateTableRequest::new();
857 request.id = Some(vec!["test_table".to_string()]);
858 request.location = Some("/wrong/path/table.lance".to_string());
859
860 let result = namespace
861 .create_table(request, bytes::Bytes::from(ipc_data))
862 .await;
863 assert!(result.is_err());
864 assert!(result
865 .unwrap_err()
866 .to_string()
867 .contains("must be at location"));
868 }
869
870 #[tokio::test]
871 async fn test_list_tables() {
872 let (namespace, _temp_dir) = create_test_namespace().await;
873
874 let request = ListTablesRequest::new();
876 let response = namespace.list_tables(request).await.unwrap();
877 assert_eq!(response.tables.len(), 0);
878
879 let schema = create_test_schema();
881 let ipc_data = create_test_ipc_data(&schema);
882
883 let mut create_request = CreateTableRequest::new();
885 create_request.id = Some(vec!["table1".to_string()]);
886 namespace
887 .create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
888 .await
889 .unwrap();
890
891 let mut create_request = CreateTableRequest::new();
893 create_request.id = Some(vec!["table2".to_string()]);
894 namespace
895 .create_table(create_request, bytes::Bytes::from(ipc_data))
896 .await
897 .unwrap();
898
899 let request = ListTablesRequest::new();
901 let response = namespace.list_tables(request).await.unwrap();
902 let tables = response.tables;
903 assert_eq!(tables.len(), 2);
904 assert!(tables.contains(&"table1".to_string()));
905 assert!(tables.contains(&"table2".to_string()));
906 }
907
908 #[tokio::test]
909 async fn test_list_tables_with_namespace_id() {
910 let (namespace, _temp_dir) = create_test_namespace().await;
911
912 let mut request = ListTablesRequest::new();
913 request.id = Some(vec!["namespace".to_string()]);
914
915 let result = namespace.list_tables(request).await;
916 assert!(result.is_err());
917 assert!(result
918 .unwrap_err()
919 .to_string()
920 .contains("root namespace operations"));
921 }
922
923 #[tokio::test]
924 async fn test_describe_table() {
925 let (namespace, _temp_dir) = create_test_namespace().await;
926
927 let schema = create_test_schema();
929 let ipc_data = create_test_ipc_data(&schema);
930
931 let mut create_request = CreateTableRequest::new();
932 create_request.id = Some(vec!["test_table".to_string()]);
933 namespace
934 .create_table(create_request, bytes::Bytes::from(ipc_data))
935 .await
936 .unwrap();
937
938 let mut request = DescribeTableRequest::new();
940 request.id = Some(vec!["test_table".to_string()]);
941 let response = namespace.describe_table(request).await.unwrap();
942
943 assert!(response.location.is_some());
944 assert!(response.location.unwrap().ends_with("test_table.lance"));
945 }
946
947 #[tokio::test]
948 async fn test_describe_nonexistent_table() {
949 let (namespace, _temp_dir) = create_test_namespace().await;
950
951 let mut request = DescribeTableRequest::new();
952 request.id = Some(vec!["nonexistent".to_string()]);
953
954 let result = namespace.describe_table(request).await;
955 assert!(result.is_err());
956 assert!(result
957 .unwrap_err()
958 .to_string()
959 .contains("Table does not exist"));
960 }
961
962 #[tokio::test]
963 async fn test_table_exists() {
964 let (namespace, _temp_dir) = create_test_namespace().await;
965
966 let schema = create_test_schema();
968 let ipc_data = create_test_ipc_data(&schema);
969
970 let mut create_request = CreateTableRequest::new();
971 create_request.id = Some(vec!["existing_table".to_string()]);
972 namespace
973 .create_table(create_request, bytes::Bytes::from(ipc_data))
974 .await
975 .unwrap();
976
977 let mut request = TableExistsRequest::new();
979 request.id = Some(vec!["existing_table".to_string()]);
980 let result = namespace.table_exists(request).await;
981 assert!(result.is_ok());
982
983 let mut request = TableExistsRequest::new();
985 request.id = Some(vec!["nonexistent".to_string()]);
986 let result = namespace.table_exists(request).await;
987 assert!(result.is_err());
988 assert!(result
989 .unwrap_err()
990 .to_string()
991 .contains("Table does not exist"));
992 }
993
994 #[tokio::test]
995 async fn test_drop_table() {
996 let (namespace, _temp_dir) = create_test_namespace().await;
997
998 let schema = create_test_schema();
1000 let ipc_data = create_test_ipc_data(&schema);
1001
1002 let mut create_request = CreateTableRequest::new();
1003 create_request.id = Some(vec!["table_to_drop".to_string()]);
1004 namespace
1005 .create_table(create_request, bytes::Bytes::from(ipc_data))
1006 .await
1007 .unwrap();
1008
1009 let mut exists_request = TableExistsRequest::new();
1011 exists_request.id = Some(vec!["table_to_drop".to_string()]);
1012 assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
1013
1014 let mut drop_request = DropTableRequest::new();
1016 drop_request.id = Some(vec!["table_to_drop".to_string()]);
1017 let response = namespace.drop_table(drop_request).await.unwrap();
1018 assert!(response.location.is_some());
1019
1020 assert!(namespace.table_exists(exists_request).await.is_err());
1022 }
1023
1024 #[tokio::test]
1025 async fn test_drop_nonexistent_table() {
1026 let (namespace, _temp_dir) = create_test_namespace().await;
1027
1028 let mut request = DropTableRequest::new();
1029 request.id = Some(vec!["nonexistent".to_string()]);
1030
1031 let result = namespace.drop_table(request).await;
1033 let _ = result;
1036 }
1037
1038 #[tokio::test]
1039 async fn test_root_namespace_operations() {
1040 let (namespace, _temp_dir) = create_test_namespace().await;
1041
1042 let request = ListNamespacesRequest::new();
1044 let result = namespace.list_namespaces(request).await;
1045 assert!(result.is_ok());
1046 assert_eq!(result.unwrap().namespaces.len(), 0);
1047
1048 let request = DescribeNamespaceRequest::new();
1050 let result = namespace.describe_namespace(request).await;
1051 assert!(result.is_ok());
1052
1053 let request = NamespaceExistsRequest::new();
1055 let result = namespace.namespace_exists(request).await;
1056 assert!(result.is_ok());
1057
1058 let request = CreateNamespaceRequest::new();
1060 let result = namespace.create_namespace(request).await;
1061 assert!(result.is_err());
1062 assert!(result.unwrap_err().to_string().contains("already exists"));
1063
1064 let request = DropNamespaceRequest::new();
1066 let result = namespace.drop_namespace(request).await;
1067 assert!(result.is_err());
1068 assert!(result
1069 .unwrap_err()
1070 .to_string()
1071 .contains("cannot be dropped"));
1072 }
1073
1074 #[tokio::test]
1075 async fn test_non_root_namespace_operations() {
1076 let (namespace, _temp_dir) = create_test_namespace().await;
1077
1078 let mut request = CreateNamespaceRequest::new();
1080 request.id = Some(vec!["child".to_string()]);
1081 let result = namespace.create_namespace(request).await;
1082 assert!(matches!(result, Err(Error::NotSupported { .. })));
1083
1084 let mut request = NamespaceExistsRequest::new();
1086 request.id = Some(vec!["child".to_string()]);
1087 let result = namespace.namespace_exists(request).await;
1088 assert!(result.is_err());
1089 assert!(result
1090 .unwrap_err()
1091 .to_string()
1092 .contains("Only root namespace exists"));
1093
1094 let mut request = DropNamespaceRequest::new();
1096 request.id = Some(vec!["child".to_string()]);
1097 let result = namespace.drop_namespace(request).await;
1098 assert!(matches!(result, Err(Error::NotSupported { .. })));
1099 }
1100
1101 #[tokio::test]
1102 async fn test_config_custom_root() {
1103 let temp_dir = TempStdDir::default();
1104 let custom_path = temp_dir.join("custom");
1105 std::fs::create_dir(&custom_path).unwrap();
1106
1107 let namespace = DirectoryNamespaceBuilder::new(custom_path.to_string_lossy().to_string())
1108 .build()
1109 .await
1110 .unwrap();
1111
1112 let schema = create_test_schema();
1114 let ipc_data = create_test_ipc_data(&schema);
1115
1116 let mut request = CreateTableRequest::new();
1118 request.id = Some(vec!["test_table".to_string()]);
1119
1120 let response = namespace
1121 .create_table(request, bytes::Bytes::from(ipc_data))
1122 .await
1123 .unwrap();
1124
1125 assert!(response.location.unwrap().contains("custom"));
1126 }
1127
1128 #[tokio::test]
1129 async fn test_config_storage_options() {
1130 let temp_dir = TempStdDir::default();
1131
1132 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1133 .storage_option("option1", "value1")
1134 .storage_option("option2", "value2")
1135 .build()
1136 .await
1137 .unwrap();
1138
1139 let schema = create_test_schema();
1141 let ipc_data = create_test_ipc_data(&schema);
1142
1143 let mut request = CreateTableRequest::new();
1145 request.id = Some(vec!["test_table".to_string()]);
1146
1147 let response = namespace
1148 .create_table(request, bytes::Bytes::from(ipc_data))
1149 .await
1150 .unwrap();
1151
1152 let storage_options = response.storage_options.unwrap();
1153 assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
1154 assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
1155 }
1156
1157 #[tokio::test]
1158 async fn test_various_arrow_types() {
1159 let (namespace, _temp_dir) = create_test_namespace().await;
1160
1161 let fields = vec![
1163 JsonArrowField {
1164 name: "bool_col".to_string(),
1165 r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
1166 nullable: true,
1167 metadata: None,
1168 },
1169 JsonArrowField {
1170 name: "int8_col".to_string(),
1171 r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
1172 nullable: true,
1173 metadata: None,
1174 },
1175 JsonArrowField {
1176 name: "float64_col".to_string(),
1177 r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
1178 nullable: true,
1179 metadata: None,
1180 },
1181 JsonArrowField {
1182 name: "binary_col".to_string(),
1183 r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
1184 nullable: true,
1185 metadata: None,
1186 },
1187 ];
1188
1189 let schema = JsonArrowSchema {
1190 fields,
1191 metadata: None,
1192 };
1193
1194 let ipc_data = create_test_ipc_data(&schema);
1196
1197 let mut request = CreateTableRequest::new();
1198 request.id = Some(vec!["complex_table".to_string()]);
1199
1200 let response = namespace
1201 .create_table(request, bytes::Bytes::from(ipc_data))
1202 .await
1203 .unwrap();
1204
1205 assert!(response.location.is_some());
1206 }
1207
1208 #[tokio::test]
1209 async fn test_connect_dir() {
1210 let temp_dir = TempStdDir::default();
1211
1212 let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1213 .build()
1214 .await
1215 .unwrap();
1216
1217 let request = ListTablesRequest::new();
1219 let response = namespace.list_tables(request).await.unwrap();
1220 assert_eq!(response.tables.len(), 0);
1221 }
1222
1223 #[tokio::test]
1224 async fn test_create_table_with_ipc_data() {
1225 use arrow::array::{Int32Array, StringArray};
1226 use arrow::ipc::writer::StreamWriter;
1227
1228 let (namespace, _temp_dir) = create_test_namespace().await;
1229
1230 let schema = create_test_schema();
1232
1233 let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
1235 let arrow_schema = Arc::new(arrow_schema);
1236
1237 let id_array = Int32Array::from(vec![1, 2, 3]);
1239 let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
1240 let batch = arrow::record_batch::RecordBatch::try_new(
1241 arrow_schema.clone(),
1242 vec![Arc::new(id_array), Arc::new(name_array)],
1243 )
1244 .unwrap();
1245
1246 let mut buffer = Vec::new();
1248 {
1249 let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
1250 writer.write(&batch).unwrap();
1251 writer.finish().unwrap();
1252 }
1253
1254 let mut request = CreateTableRequest::new();
1256 request.id = Some(vec!["test_table_with_data".to_string()]);
1257
1258 let response = namespace
1259 .create_table(request, Bytes::from(buffer))
1260 .await
1261 .unwrap();
1262
1263 assert_eq!(response.version, Some(1));
1264 assert!(response
1265 .location
1266 .unwrap()
1267 .contains("test_table_with_data.lance"));
1268
1269 let mut exists_request = TableExistsRequest::new();
1271 exists_request.id = Some(vec!["test_table_with_data".to_string()]);
1272 namespace.table_exists(exists_request).await.unwrap();
1273 }
1274
1275 #[tokio::test]
1276 async fn test_create_empty_table() {
1277 let (namespace, temp_dir) = create_test_namespace().await;
1278
1279 let mut request = CreateEmptyTableRequest::new();
1280 request.id = Some(vec!["empty_table".to_string()]);
1281
1282 let response = namespace.create_empty_table(request).await.unwrap();
1283
1284 assert!(response.location.is_some());
1285 assert!(response.location.unwrap().ends_with("empty_table.lance"));
1286
1287 let table_dir = temp_dir.join("empty_table.lance");
1289 assert!(table_dir.exists());
1290 assert!(table_dir.is_dir());
1291
1292 let reserved_file = table_dir.join(".lance-reserved");
1293 assert!(reserved_file.exists());
1294 assert!(reserved_file.is_file());
1295
1296 let metadata = std::fs::metadata(&reserved_file).unwrap();
1298 assert_eq!(metadata.len(), 0);
1299
1300 let mut exists_request = TableExistsRequest::new();
1302 exists_request.id = Some(vec!["empty_table".to_string()]);
1303 namespace.table_exists(exists_request).await.unwrap();
1304
1305 let list_request = ListTablesRequest::new();
1307 let list_response = namespace.list_tables(list_request).await.unwrap();
1308 assert!(list_response.tables.contains(&"empty_table".to_string()));
1309
1310 let mut describe_request = DescribeTableRequest::new();
1312 describe_request.id = Some(vec!["empty_table".to_string()]);
1313 let describe_response = namespace.describe_table(describe_request).await.unwrap();
1314 assert!(describe_response.location.is_some());
1315 assert!(describe_response.location.unwrap().contains("empty_table"));
1316 }
1317
1318 #[tokio::test]
1319 async fn test_create_empty_table_with_wrong_location() {
1320 let (namespace, _temp_dir) = create_test_namespace().await;
1321
1322 let mut request = CreateEmptyTableRequest::new();
1323 request.id = Some(vec!["test_table".to_string()]);
1324 request.location = Some("/wrong/path/table.lance".to_string());
1325
1326 let result = namespace.create_empty_table(request).await;
1327 assert!(result.is_err());
1328 assert!(result
1329 .unwrap_err()
1330 .to_string()
1331 .contains("must be at location"));
1332 }
1333
1334 #[tokio::test]
1335 async fn test_create_empty_table_then_drop() {
1336 let (namespace, temp_dir) = create_test_namespace().await;
1337
1338 let mut create_request = CreateEmptyTableRequest::new();
1340 create_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1341
1342 let create_response = namespace.create_empty_table(create_request).await.unwrap();
1343 assert!(create_response.location.is_some());
1344
1345 let table_dir = temp_dir.join("empty_table_to_drop.lance");
1347 assert!(table_dir.exists());
1348 let reserved_file = table_dir.join(".lance-reserved");
1349 assert!(reserved_file.exists());
1350
1351 let mut drop_request = DropTableRequest::new();
1353 drop_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1354 let drop_response = namespace.drop_table(drop_request).await.unwrap();
1355 assert!(drop_response.location.is_some());
1356
1357 assert!(!table_dir.exists());
1359 assert!(!reserved_file.exists());
1360
1361 let mut exists_request = TableExistsRequest::new();
1363 exists_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1364 let exists_result = namespace.table_exists(exists_request).await;
1365 assert!(exists_result.is_err());
1366 }
1367}