1use std::collections::HashMap;
7use std::str::FromStr;
8
9use async_trait::async_trait;
10use bytes::Bytes;
11use lance::dataset::{Dataset, WriteParams};
12use opendal::Operator;
13
14use lance_namespace_reqwest_client::models::{
15 CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
16 CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DescribeNamespaceRequest,
17 DescribeNamespaceResponse, DescribeTableRequest, DescribeTableResponse, DropNamespaceRequest,
18 DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest,
19 ListNamespacesResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest,
20 TableExistsRequest,
21};
22
23use crate::namespace::{LanceNamespace, NamespaceError, Result};
24
25#[derive(Debug, Clone)]
27pub struct DirectoryNamespaceConfig {
28 root: String,
30 storage_options: HashMap<String, String>,
32}
33
34impl DirectoryNamespaceConfig {
35 pub const ROOT: &'static str = "root";
37 pub const STORAGE_OPTIONS_PREFIX: &'static str = "storage.";
39
40 pub fn new(properties: HashMap<String, String>) -> Self {
42 let root = properties
43 .get(Self::ROOT)
44 .cloned()
45 .unwrap_or_else(|| {
46 std::env::current_dir()
47 .unwrap()
48 .to_string_lossy()
49 .to_string()
50 })
51 .trim_end_matches('/')
52 .to_string();
53
54 let storage_options: HashMap<String, String> = properties
55 .iter()
56 .filter_map(|(k, v)| {
57 k.strip_prefix(Self::STORAGE_OPTIONS_PREFIX)
58 .map(|key| (key.to_string(), v.clone()))
59 })
60 .collect();
61
62 Self {
63 root,
64 storage_options,
65 }
66 }
67
68 pub fn root(&self) -> &str {
70 &self.root
71 }
72
73 pub fn storage_options(&self) -> &HashMap<String, String> {
75 &self.storage_options
76 }
77}
78
79pub struct DirectoryNamespace {
84 config: DirectoryNamespaceConfig,
85 operator: Operator,
86}
87
88impl DirectoryNamespace {
89 pub fn new(properties: HashMap<String, String>) -> Result<Self> {
91 let config = DirectoryNamespaceConfig::new(properties);
92 let operator = Self::initialize_operator(&config)?;
93
94 Ok(Self { config, operator })
95 }
96
97 fn initialize_operator(config: &DirectoryNamespaceConfig) -> Result<Operator> {
99 let root = config.root();
100 let storage_options = &config.storage_options;
101
102 let (scheme, opendal_config) = Self::parse_storage_path(root, storage_options)?;
104
105 let operator = Operator::via_iter(scheme, opendal_config)
107 .map_err(|e| NamespaceError::Other(format!("Failed to create operator: {}", e)))?;
108
109 Ok(operator)
110 }
111
112 fn parse_storage_path(
114 root: &str,
115 storage_options: &HashMap<String, String>,
116 ) -> Result<(opendal::Scheme, HashMap<String, String>)> {
117 use url::Url;
118
119 let mut config = HashMap::new();
120
121 let (scheme, authority, path) = if let Ok(url) = Url::parse(root) {
123 let scheme = Self::normalize_scheme(url.scheme());
124 let authority = url.host_str().unwrap_or("");
125 let path = if scheme == "fs" || scheme == "file" {
128 url.path().to_string()
129 } else {
130 url.path().trim_start_matches('/').to_string()
131 };
132 (scheme, authority.to_string(), path)
133 } else {
134 ("fs".to_string(), String::new(), root.to_string())
136 };
137
138 let opendal_scheme = match scheme.as_str() {
140 "fs" | "file" => {
141 if authority.is_empty() {
143 config.insert("root".to_string(), path);
144 } else {
145 config.insert("root".to_string(), format!("{}/{}", authority, path));
147 }
148 opendal::Scheme::Fs
149 }
150 "s3" => {
151 config.insert("root".to_string(), path);
152 config.insert("bucket".to_string(), authority);
153 opendal::Scheme::S3
154 }
155 "gcs" => {
156 config.insert("root".to_string(), path);
157 config.insert("bucket".to_string(), authority);
158 opendal::Scheme::Gcs
159 }
160 "azblob" => {
161 config.insert("root".to_string(), path);
162 config.insert("container".to_string(), authority);
163 opendal::Scheme::Azblob
164 }
165 _ => {
166 config.insert("root".to_string(), path);
168 if !authority.is_empty() {
169 config.insert("bucket".to_string(), authority);
170 }
171 opendal::Scheme::from_str(&scheme).map_err(|_| {
172 NamespaceError::Other(format!("Unsupported storage scheme: {}", scheme))
173 })?
174 }
175 };
176
177 config.extend(storage_options.clone());
179
180 Ok((opendal_scheme, config))
181 }
182
183 fn normalize_scheme(scheme: &str) -> String {
185 match scheme.to_lowercase().as_str() {
186 "s3a" | "s3n" => "s3".to_string(),
187 "abfs" => "azblob".to_string(),
188 "file" => "fs".to_string(),
189 s => s.to_string(),
190 }
191 }
192
193 fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
195 if let Some(id) = id {
196 if !id.is_empty() {
197 return Err(NamespaceError::Other(format!(
198 "Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
199 id
200 )));
201 }
202 }
203 Ok(())
204 }
205
206 fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
208 let id = id.as_ref().ok_or_else(|| {
209 NamespaceError::Other("Directory namespace table ID cannot be empty".to_string())
210 })?;
211
212 if id.len() != 1 {
213 return Err(NamespaceError::Other(format!(
214 "Directory namespace only supports single-level table IDs, but got: {:?}",
215 id
216 )));
217 }
218
219 Ok(id[0].clone())
220 }
221
222 fn table_full_path(&self, table_name: &str) -> String {
224 format!("{}/{}.lance", self.config.root(), table_name)
225 }
226
227 fn table_versions_path(&self, table_name: &str) -> String {
229 format!("{}.lance/_versions/", table_name)
230 }
231}
232
233#[async_trait]
234impl LanceNamespace for DirectoryNamespace {
235 async fn list_namespaces(
236 &self,
237 request: ListNamespacesRequest,
238 ) -> Result<ListNamespacesResponse> {
239 Self::validate_root_namespace_id(&request.id)?;
241
242 Ok(ListNamespacesResponse::new(vec![]))
244 }
245
246 async fn describe_namespace(
247 &self,
248 request: DescribeNamespaceRequest,
249 ) -> Result<DescribeNamespaceResponse> {
250 Self::validate_root_namespace_id(&request.id)?;
252
253 Ok(DescribeNamespaceResponse {
255 properties: Some(HashMap::new()),
256 })
257 }
258
259 async fn create_namespace(
260 &self,
261 request: CreateNamespaceRequest,
262 ) -> Result<CreateNamespaceResponse> {
263 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
265 return Err(NamespaceError::Other(
266 "Root namespace already exists and cannot be created".to_string(),
267 ));
268 }
269
270 Err(NamespaceError::NotSupported(
272 "Directory namespace only supports the root namespace".to_string(),
273 ))
274 }
275
276 async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
277 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
279 return Err(NamespaceError::Other(
280 "Root namespace cannot be dropped".to_string(),
281 ));
282 }
283
284 Err(NamespaceError::NotSupported(
286 "Directory namespace only supports the root namespace".to_string(),
287 ))
288 }
289
290 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
291 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
293 return Ok(());
294 }
295
296 Err(NamespaceError::Other(
298 "Only root namespace exists in directory namespace".to_string(),
299 ))
300 }
301
302 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
303 Self::validate_root_namespace_id(&request.id)?;
304
305 let mut tables = Vec::new();
306
307 let entries = self.operator.list("").await.map_err(|e| {
309 NamespaceError::Io(std::io::Error::new(
310 std::io::ErrorKind::Other,
311 format!("Failed to list directory: {}", e),
312 ))
313 })?;
314
315 for entry in entries {
316 let path = entry.path().trim_end_matches('/');
317
318 if !path.ends_with(".lance") {
320 continue;
321 }
322
323 let table_name = &path[..path.len() - 6];
325
326 let mut is_table = false;
328
329 let reserved_file_path = format!("{}.lance/.lance-reserved", table_name);
331 if self
332 .operator
333 .exists(&reserved_file_path)
334 .await
335 .unwrap_or(false)
336 {
337 is_table = true;
338 }
339
340 if !is_table {
342 let versions_path = self.table_versions_path(table_name);
343 if let Ok(version_entries) = self.operator.list(&versions_path).await {
344 if !version_entries.is_empty() {
346 is_table = true;
347 }
348 }
349 }
350
351 if is_table {
352 tables.push(table_name.to_string());
353 }
354 }
355
356 let response = ListTablesResponse::new(tables);
357 Ok(response)
358 }
359
360 async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
361 let table_name = Self::table_name_from_id(&request.id)?;
362 let table_path = self.table_full_path(&table_name);
363
364 let mut table_exists = false;
366
367 let reserved_file_path = format!("{}.lance/.lance-reserved", table_name);
369 if self
370 .operator
371 .exists(&reserved_file_path)
372 .await
373 .unwrap_or(false)
374 {
375 table_exists = true;
376 }
377
378 if !table_exists {
380 let versions_path = self.table_versions_path(&table_name);
381 if let Ok(entries) = self.operator.list(&versions_path).await {
382 if !entries.is_empty() {
383 table_exists = true;
384 }
385 }
386 }
387
388 if !table_exists {
389 return Err(NamespaceError::Other(format!(
390 "Table does not exist: {}",
391 table_name
392 )));
393 }
394
395 Ok(DescribeTableResponse {
396 version: None,
397 location: Some(table_path),
398 schema: None,
399 properties: None,
400 storage_options: Some(self.config.storage_options.clone()),
401 })
402 }
403
404 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
405 let table_name = Self::table_name_from_id(&request.id)?;
406
407 let mut table_exists = false;
409
410 let reserved_file_path = format!("{}.lance/.lance-reserved", table_name);
412 if self
413 .operator
414 .exists(&reserved_file_path)
415 .await
416 .unwrap_or(false)
417 {
418 table_exists = true;
419 }
420
421 if !table_exists {
423 let versions_path = self.table_versions_path(&table_name);
424 if let Ok(entries) = self.operator.list(&versions_path).await {
425 if !entries.is_empty() {
426 table_exists = true;
427 }
428 }
429 }
430
431 if !table_exists {
432 return Err(NamespaceError::Other(format!(
433 "Table does not exist: {}",
434 table_name
435 )));
436 }
437
438 Ok(())
439 }
440
441 async fn create_table(
442 &self,
443 request: CreateTableRequest,
444 request_data: Bytes,
445 ) -> Result<CreateTableResponse> {
446 let table_name = Self::table_name_from_id(&request.id)?;
447 let table_path = self.table_full_path(&table_name);
448
449 if request_data.is_empty() {
451 return Err(NamespaceError::Other(
452 "Request data (Arrow IPC stream) is required for create_table".to_string(),
453 ));
454 }
455
456 if let Some(location) = &request.location {
458 let location = location.trim_end_matches('/');
459 if location != table_path {
460 return Err(NamespaceError::Other(format!(
461 "Cannot create table {} at location {}, must be at location {}",
462 table_name, location, table_path
463 )));
464 }
465 }
466
467 use arrow::ipc::reader::StreamReader;
469 use std::io::Cursor;
470
471 let cursor = Cursor::new(request_data.to_vec());
472 let stream_reader = StreamReader::try_new(cursor, None)
473 .map_err(|e| NamespaceError::Other(format!("Invalid Arrow IPC stream: {}", e)))?;
474
475 let arrow_schema = stream_reader.schema();
477
478 let mut batches = Vec::new();
480 for batch_result in stream_reader {
481 batches.push(batch_result.map_err(|e| {
482 NamespaceError::Other(format!("Failed to read batch from IPC stream: {}", e))
483 })?);
484 }
485
486 let reader = if batches.is_empty() {
488 let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
490 let batches = vec![Ok(batch)];
491 arrow::record_batch::RecordBatchIterator::new(batches, arrow_schema.clone())
492 } else {
493 let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
495 arrow::record_batch::RecordBatchIterator::new(batch_results, arrow_schema)
496 };
497
498 let write_params = WriteParams {
500 mode: lance::dataset::WriteMode::Create,
501 ..Default::default()
502 };
503
504 Dataset::write(reader, &table_path, Some(write_params))
506 .await
507 .map_err(|e| NamespaceError::Other(format!("Failed to create Lance dataset: {}", e)))?;
508
509 Ok(CreateTableResponse {
510 version: Some(1),
511 location: Some(table_path),
512 properties: None,
513 storage_options: Some(self.config.storage_options.clone()),
514 })
515 }
516
517 async fn create_empty_table(
518 &self,
519 request: CreateEmptyTableRequest,
520 ) -> Result<CreateEmptyTableResponse> {
521 let table_name = Self::table_name_from_id(&request.id)?;
522 let table_path = self.table_full_path(&table_name);
523
524 if let Some(location) = &request.location {
526 let location = location.trim_end_matches('/');
527 if location != table_path {
528 return Err(NamespaceError::Other(format!(
529 "Cannot create table {} at location {}, must be at location {}",
530 table_name, location, table_path
531 )));
532 }
533 }
534
535 let reserved_file_path = format!("{}.lance/.lance-reserved", table_name);
537 self.operator
538 .write(&reserved_file_path, Vec::<u8>::new())
539 .await
540 .map_err(|e| {
541 NamespaceError::Other(format!(
542 "Failed to create .lance-reserved file for table {}: {}",
543 table_name, e
544 ))
545 })?;
546
547 Ok(CreateEmptyTableResponse {
548 location: Some(table_path),
549 properties: None,
550 storage_options: Some(self.config.storage_options.clone()),
551 })
552 }
553
554 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
555 let table_name = Self::table_name_from_id(&request.id)?;
556 let table_path = self.table_full_path(&table_name);
557
558 let table_dir = format!("{}.lance/", table_name);
560 self.operator.remove_all(&table_dir).await.map_err(|e| {
561 NamespaceError::Other(format!("Failed to drop table {}: {}", table_name, e))
562 })?;
563
564 Ok(DropTableResponse {
565 id: request.id,
566 location: Some(table_path),
567 properties: None,
568 transaction_id: None,
569 })
570 }
571}
572
573#[cfg(test)]
574mod tests {
575 use super::*;
576 use crate::schema::convert_json_arrow_schema;
577 use lance_namespace_reqwest_client::models::{
578 JsonArrowDataType, JsonArrowField, JsonArrowSchema,
579 };
580 use std::collections::HashMap;
581 use std::sync::Arc;
582 use tempfile::TempDir;
583
584 async fn create_test_namespace() -> (DirectoryNamespace, TempDir) {
586 let temp_dir = TempDir::new().unwrap();
587 let mut properties = HashMap::new();
588 properties.insert(
589 "root".to_string(),
590 temp_dir.path().to_string_lossy().to_string(),
591 );
592
593 let namespace = DirectoryNamespace::new(properties).unwrap();
594 (namespace, temp_dir)
595 }
596
597 fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
599 use arrow::ipc::writer::StreamWriter;
600
601 let arrow_schema = convert_json_arrow_schema(schema).unwrap();
602 let arrow_schema = Arc::new(arrow_schema);
603 let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
604 let mut buffer = Vec::new();
605 {
606 let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
607 writer.write(&batch).unwrap();
608 writer.finish().unwrap();
609 }
610 buffer
611 }
612
613 fn create_test_schema() -> JsonArrowSchema {
615 let int_type = JsonArrowDataType::new("int32".to_string());
616 let string_type = JsonArrowDataType::new("utf8".to_string());
617
618 let id_field = JsonArrowField {
619 name: "id".to_string(),
620 r#type: Box::new(int_type.clone()),
621 nullable: false,
622 metadata: None,
623 };
624
625 let name_field = JsonArrowField {
626 name: "name".to_string(),
627 r#type: Box::new(string_type),
628 nullable: true,
629 metadata: None,
630 };
631
632 JsonArrowSchema {
633 fields: vec![id_field, name_field],
634 metadata: None,
635 }
636 }
637
638 #[tokio::test]
639 async fn test_create_table() {
640 let (namespace, _temp_dir) = create_test_namespace().await;
641
642 let schema = create_test_schema();
644 let ipc_data = create_test_ipc_data(&schema);
645
646 let mut request = CreateTableRequest::new();
647 request.id = Some(vec!["test_table".to_string()]);
648
649 let response = namespace
650 .create_table(request, bytes::Bytes::from(ipc_data))
651 .await
652 .unwrap();
653
654 assert!(response.location.is_some());
655 assert!(response.location.unwrap().ends_with("test_table.lance"));
656 assert_eq!(response.version, Some(1));
657 }
658
659 #[tokio::test]
660 async fn test_create_table_without_data() {
661 let (namespace, _temp_dir) = create_test_namespace().await;
662
663 let mut request = CreateTableRequest::new();
664 request.id = Some(vec!["test_table".to_string()]);
665
666 let result = namespace.create_table(request, bytes::Bytes::new()).await;
667 assert!(result.is_err());
668 assert!(result
669 .unwrap_err()
670 .to_string()
671 .contains("Arrow IPC stream) is required"));
672 }
673
674 #[tokio::test]
675 async fn test_create_table_with_invalid_id() {
676 let (namespace, _temp_dir) = create_test_namespace().await;
677
678 let schema = create_test_schema();
680 let ipc_data = create_test_ipc_data(&schema);
681
682 let mut request = CreateTableRequest::new();
684 request.id = Some(vec![]);
685
686 let result = namespace
687 .create_table(request, bytes::Bytes::from(ipc_data.clone()))
688 .await;
689 assert!(result.is_err());
690
691 let mut request = CreateTableRequest::new();
693 request.id = Some(vec!["namespace".to_string(), "table".to_string()]);
694
695 let result = namespace
696 .create_table(request, bytes::Bytes::from(ipc_data))
697 .await;
698 assert!(result.is_err());
699 assert!(result
700 .unwrap_err()
701 .to_string()
702 .contains("single-level table IDs"));
703 }
704
705 #[tokio::test]
706 async fn test_create_table_with_wrong_location() {
707 let (namespace, _temp_dir) = create_test_namespace().await;
708
709 let schema = create_test_schema();
711 let ipc_data = create_test_ipc_data(&schema);
712
713 let mut request = CreateTableRequest::new();
714 request.id = Some(vec!["test_table".to_string()]);
715 request.location = Some("/wrong/path/table.lance".to_string());
716
717 let result = namespace
718 .create_table(request, bytes::Bytes::from(ipc_data))
719 .await;
720 assert!(result.is_err());
721 assert!(result
722 .unwrap_err()
723 .to_string()
724 .contains("must be at location"));
725 }
726
727 #[tokio::test]
728 async fn test_list_tables() {
729 let (namespace, _temp_dir) = create_test_namespace().await;
730
731 let request = ListTablesRequest::new();
733 let response = namespace.list_tables(request).await.unwrap();
734 assert_eq!(response.tables.len(), 0);
735
736 let schema = create_test_schema();
738 let ipc_data = create_test_ipc_data(&schema);
739
740 let mut create_request = CreateTableRequest::new();
742 create_request.id = Some(vec!["table1".to_string()]);
743 namespace
744 .create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
745 .await
746 .unwrap();
747
748 let mut create_request = CreateTableRequest::new();
750 create_request.id = Some(vec!["table2".to_string()]);
751 namespace
752 .create_table(create_request, bytes::Bytes::from(ipc_data))
753 .await
754 .unwrap();
755
756 let request = ListTablesRequest::new();
758 let response = namespace.list_tables(request).await.unwrap();
759 let tables = response.tables;
760 assert_eq!(tables.len(), 2);
761 assert!(tables.contains(&"table1".to_string()));
762 assert!(tables.contains(&"table2".to_string()));
763 }
764
765 #[tokio::test]
766 async fn test_list_tables_with_namespace_id() {
767 let (namespace, _temp_dir) = create_test_namespace().await;
768
769 let mut request = ListTablesRequest::new();
770 request.id = Some(vec!["namespace".to_string()]);
771
772 let result = namespace.list_tables(request).await;
773 assert!(result.is_err());
774 assert!(result
775 .unwrap_err()
776 .to_string()
777 .contains("root namespace operations"));
778 }
779
780 #[tokio::test]
781 async fn test_describe_table() {
782 let (namespace, _temp_dir) = create_test_namespace().await;
783
784 let schema = create_test_schema();
786 let ipc_data = create_test_ipc_data(&schema);
787
788 let mut create_request = CreateTableRequest::new();
789 create_request.id = Some(vec!["test_table".to_string()]);
790 namespace
791 .create_table(create_request, bytes::Bytes::from(ipc_data))
792 .await
793 .unwrap();
794
795 let mut request = DescribeTableRequest::new();
797 request.id = Some(vec!["test_table".to_string()]);
798 let response = namespace.describe_table(request).await.unwrap();
799
800 assert!(response.location.is_some());
801 assert!(response.location.unwrap().ends_with("test_table.lance"));
802 }
803
804 #[tokio::test]
805 async fn test_describe_nonexistent_table() {
806 let (namespace, _temp_dir) = create_test_namespace().await;
807
808 let mut request = DescribeTableRequest::new();
809 request.id = Some(vec!["nonexistent".to_string()]);
810
811 let result = namespace.describe_table(request).await;
812 assert!(result.is_err());
813 assert!(result
814 .unwrap_err()
815 .to_string()
816 .contains("Table does not exist"));
817 }
818
819 #[tokio::test]
820 async fn test_table_exists() {
821 let (namespace, _temp_dir) = create_test_namespace().await;
822
823 let schema = create_test_schema();
825 let ipc_data = create_test_ipc_data(&schema);
826
827 let mut create_request = CreateTableRequest::new();
828 create_request.id = Some(vec!["existing_table".to_string()]);
829 namespace
830 .create_table(create_request, bytes::Bytes::from(ipc_data))
831 .await
832 .unwrap();
833
834 let mut request = TableExistsRequest::new();
836 request.id = Some(vec!["existing_table".to_string()]);
837 let result = namespace.table_exists(request).await;
838 assert!(result.is_ok());
839
840 let mut request = TableExistsRequest::new();
842 request.id = Some(vec!["nonexistent".to_string()]);
843 let result = namespace.table_exists(request).await;
844 assert!(result.is_err());
845 assert!(result
846 .unwrap_err()
847 .to_string()
848 .contains("Table does not exist"));
849 }
850
851 #[tokio::test]
852 async fn test_drop_table() {
853 let (namespace, _temp_dir) = create_test_namespace().await;
854
855 let schema = create_test_schema();
857 let ipc_data = create_test_ipc_data(&schema);
858
859 let mut create_request = CreateTableRequest::new();
860 create_request.id = Some(vec!["table_to_drop".to_string()]);
861 namespace
862 .create_table(create_request, bytes::Bytes::from(ipc_data))
863 .await
864 .unwrap();
865
866 let mut exists_request = TableExistsRequest::new();
868 exists_request.id = Some(vec!["table_to_drop".to_string()]);
869 assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
870
871 let mut drop_request = DropTableRequest::new();
873 drop_request.id = Some(vec!["table_to_drop".to_string()]);
874 let response = namespace.drop_table(drop_request).await.unwrap();
875 assert!(response.location.is_some());
876
877 assert!(namespace.table_exists(exists_request).await.is_err());
879 }
880
881 #[tokio::test]
882 async fn test_drop_nonexistent_table() {
883 let (namespace, _temp_dir) = create_test_namespace().await;
884
885 let mut request = DropTableRequest::new();
886 request.id = Some(vec!["nonexistent".to_string()]);
887
888 let result = namespace.drop_table(request).await;
890 let _ = result;
893 }
894
895 #[tokio::test]
896 async fn test_root_namespace_operations() {
897 let (namespace, _temp_dir) = create_test_namespace().await;
898
899 let request = ListNamespacesRequest::new();
901 let result = namespace.list_namespaces(request).await;
902 assert!(result.is_ok());
903 assert_eq!(result.unwrap().namespaces.len(), 0);
904
905 let request = DescribeNamespaceRequest::new();
907 let result = namespace.describe_namespace(request).await;
908 assert!(result.is_ok());
909
910 let request = NamespaceExistsRequest::new();
912 let result = namespace.namespace_exists(request).await;
913 assert!(result.is_ok());
914
915 let request = CreateNamespaceRequest::new();
917 let result = namespace.create_namespace(request).await;
918 assert!(result.is_err());
919 assert!(result.unwrap_err().to_string().contains("already exists"));
920
921 let request = DropNamespaceRequest::new();
923 let result = namespace.drop_namespace(request).await;
924 assert!(result.is_err());
925 assert!(result
926 .unwrap_err()
927 .to_string()
928 .contains("cannot be dropped"));
929 }
930
931 #[tokio::test]
932 async fn test_non_root_namespace_operations() {
933 let (namespace, _temp_dir) = create_test_namespace().await;
934
935 let mut request = CreateNamespaceRequest::new();
937 request.id = Some(vec!["child".to_string()]);
938 let result = namespace.create_namespace(request).await;
939 assert!(matches!(result, Err(NamespaceError::NotSupported(_))));
940
941 let mut request = NamespaceExistsRequest::new();
943 request.id = Some(vec!["child".to_string()]);
944 let result = namespace.namespace_exists(request).await;
945 assert!(result.is_err());
946 assert!(result
947 .unwrap_err()
948 .to_string()
949 .contains("Only root namespace exists"));
950
951 let mut request = DropNamespaceRequest::new();
953 request.id = Some(vec!["child".to_string()]);
954 let result = namespace.drop_namespace(request).await;
955 assert!(matches!(result, Err(NamespaceError::NotSupported(_))));
956 }
957
958 #[tokio::test]
959 async fn test_config_custom_root() {
960 let temp_dir = TempDir::new().unwrap();
961 let custom_path = temp_dir.path().join("custom");
962 std::fs::create_dir(&custom_path).unwrap();
963
964 let mut properties = HashMap::new();
965 properties.insert(
966 "root".to_string(),
967 custom_path.to_string_lossy().to_string(),
968 );
969
970 let namespace = DirectoryNamespace::new(properties).unwrap();
971
972 let schema = create_test_schema();
974 let ipc_data = create_test_ipc_data(&schema);
975
976 let mut request = CreateTableRequest::new();
978 request.id = Some(vec!["test_table".to_string()]);
979
980 let response = namespace
981 .create_table(request, bytes::Bytes::from(ipc_data))
982 .await
983 .unwrap();
984
985 assert!(response.location.unwrap().contains("custom"));
986 }
987
988 #[tokio::test]
989 async fn test_config_storage_options() {
990 let temp_dir = TempDir::new().unwrap();
991 let mut properties = HashMap::new();
992 properties.insert(
993 "root".to_string(),
994 temp_dir.path().to_string_lossy().to_string(),
995 );
996 properties.insert("storage.option1".to_string(), "value1".to_string());
997 properties.insert("storage.option2".to_string(), "value2".to_string());
998
999 let namespace = DirectoryNamespace::new(properties).unwrap();
1000
1001 let schema = create_test_schema();
1003 let ipc_data = create_test_ipc_data(&schema);
1004
1005 let mut request = CreateTableRequest::new();
1007 request.id = Some(vec!["test_table".to_string()]);
1008
1009 let response = namespace
1010 .create_table(request, bytes::Bytes::from(ipc_data))
1011 .await
1012 .unwrap();
1013
1014 let storage_options = response.storage_options.unwrap();
1015 assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
1016 assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
1017 }
1018
1019 #[tokio::test]
1020 async fn test_various_arrow_types() {
1021 let (namespace, _temp_dir) = create_test_namespace().await;
1022
1023 let fields = vec![
1025 JsonArrowField {
1026 name: "bool_col".to_string(),
1027 r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
1028 nullable: true,
1029 metadata: None,
1030 },
1031 JsonArrowField {
1032 name: "int8_col".to_string(),
1033 r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
1034 nullable: true,
1035 metadata: None,
1036 },
1037 JsonArrowField {
1038 name: "float64_col".to_string(),
1039 r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
1040 nullable: true,
1041 metadata: None,
1042 },
1043 JsonArrowField {
1044 name: "binary_col".to_string(),
1045 r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
1046 nullable: true,
1047 metadata: None,
1048 },
1049 ];
1050
1051 let schema = JsonArrowSchema {
1052 fields,
1053 metadata: None,
1054 };
1055
1056 let ipc_data = create_test_ipc_data(&schema);
1058
1059 let mut request = CreateTableRequest::new();
1060 request.id = Some(vec!["complex_table".to_string()]);
1061
1062 let response = namespace
1063 .create_table(request, bytes::Bytes::from(ipc_data))
1064 .await
1065 .unwrap();
1066
1067 assert!(response.location.is_some());
1068 }
1069
1070 #[tokio::test]
1071 async fn test_connect_dir() {
1072 let temp_dir = TempDir::new().unwrap();
1073 let mut properties = HashMap::new();
1074 properties.insert(
1075 "root".to_string(),
1076 temp_dir.path().to_string_lossy().to_string(),
1077 );
1078
1079 let namespace = crate::connect("dir", properties).await.unwrap();
1080
1081 let request = ListTablesRequest::new();
1083 let response = namespace.list_tables(request).await.unwrap();
1084 assert_eq!(response.tables.len(), 0);
1085 }
1086
1087 #[test]
1088 fn test_parse_storage_path_local() {
1089 let storage_options = HashMap::new();
1090
1091 let (scheme, config) =
1093 DirectoryNamespace::parse_storage_path("/path/to/data", &storage_options).unwrap();
1094 assert!(matches!(scheme, opendal::Scheme::Fs));
1095 assert_eq!(config.get("root").unwrap(), "/path/to/data");
1096
1097 let (scheme, config) =
1099 DirectoryNamespace::parse_storage_path("./data", &storage_options).unwrap();
1100 assert!(matches!(scheme, opendal::Scheme::Fs));
1101 assert_eq!(config.get("root").unwrap(), "./data");
1102 }
1103
1104 #[test]
1105 fn test_parse_storage_path_s3() {
1106 let storage_options = HashMap::new();
1107
1108 let (scheme, config) =
1110 DirectoryNamespace::parse_storage_path("s3://my-bucket/path/to/data", &storage_options)
1111 .unwrap();
1112 assert!(matches!(scheme, opendal::Scheme::S3));
1113 assert_eq!(config.get("bucket").unwrap(), "my-bucket");
1114 assert_eq!(config.get("root").unwrap(), "path/to/data");
1115
1116 let (scheme, config) =
1118 DirectoryNamespace::parse_storage_path("s3://my-bucket", &storage_options).unwrap();
1119 assert!(matches!(scheme, opendal::Scheme::S3));
1120 assert_eq!(config.get("bucket").unwrap(), "my-bucket");
1121 assert_eq!(config.get("root").unwrap(), "");
1122 }
1123
1124 #[test]
1125 fn test_parse_storage_path_gcs() {
1126 let storage_options = HashMap::new();
1127
1128 let (scheme, config) = DirectoryNamespace::parse_storage_path(
1130 "gcs://my-bucket/path/to/data",
1131 &storage_options,
1132 )
1133 .unwrap();
1134 assert!(matches!(scheme, opendal::Scheme::Gcs));
1135 assert_eq!(config.get("bucket").unwrap(), "my-bucket");
1136 assert_eq!(config.get("root").unwrap(), "path/to/data");
1137 }
1138
1139 #[test]
1140 fn test_parse_storage_path_azblob() {
1141 let storage_options = HashMap::new();
1142
1143 let (scheme, config) = DirectoryNamespace::parse_storage_path(
1145 "azblob://my-container/path/to/data",
1146 &storage_options,
1147 )
1148 .unwrap();
1149 assert!(matches!(scheme, opendal::Scheme::Azblob));
1150 assert_eq!(config.get("container").unwrap(), "my-container");
1151 assert_eq!(config.get("root").unwrap(), "path/to/data");
1152
1153 let (scheme, config) =
1155 DirectoryNamespace::parse_storage_path("abfs://my-container/path", &storage_options)
1156 .unwrap();
1157 assert!(matches!(scheme, opendal::Scheme::Azblob));
1158 assert_eq!(config.get("container").unwrap(), "my-container");
1159 assert_eq!(config.get("root").unwrap(), "path");
1160 }
1161
1162 #[test]
1163 fn test_normalize_scheme() {
1164 assert_eq!(DirectoryNamespace::normalize_scheme("s3a"), "s3");
1166 assert_eq!(DirectoryNamespace::normalize_scheme("s3n"), "s3");
1167 assert_eq!(DirectoryNamespace::normalize_scheme("S3A"), "s3");
1168 assert_eq!(DirectoryNamespace::normalize_scheme("abfs"), "azblob");
1169 assert_eq!(DirectoryNamespace::normalize_scheme("ABFS"), "azblob");
1170 assert_eq!(DirectoryNamespace::normalize_scheme("file"), "fs");
1171 assert_eq!(DirectoryNamespace::normalize_scheme("FILE"), "fs");
1172 assert_eq!(DirectoryNamespace::normalize_scheme("gcs"), "gcs");
1173 assert_eq!(DirectoryNamespace::normalize_scheme("random"), "random");
1174 }
1175
1176 #[test]
1177 fn test_fs_scheme_url() {
1178 let storage_options = HashMap::new();
1179
1180 let (scheme, config) =
1182 DirectoryNamespace::parse_storage_path("file:///absolute/path", &storage_options)
1183 .unwrap();
1184 assert!(matches!(scheme, opendal::Scheme::Fs));
1185 assert_eq!(config.get("root").unwrap(), "/absolute/path");
1186
1187 let (scheme, config) =
1189 DirectoryNamespace::parse_storage_path("fs:///absolute/path", &storage_options)
1190 .unwrap();
1191 assert!(matches!(scheme, opendal::Scheme::Fs));
1192 assert_eq!(config.get("root").unwrap(), "/absolute/path");
1193 }
1194
1195 #[test]
1196 fn test_storage_options_passed_through() {
1197 let mut storage_options = HashMap::new();
1199 storage_options.insert("aws_access_key_id".to_string(), "test_key".to_string());
1200 storage_options.insert(
1201 "aws_secret_access_key".to_string(),
1202 "test_secret".to_string(),
1203 );
1204 storage_options.insert("region".to_string(), "us-west-2".to_string());
1205
1206 let (scheme, config) =
1208 DirectoryNamespace::parse_storage_path("s3://my-bucket/path", &storage_options)
1209 .unwrap();
1210 assert!(matches!(scheme, opendal::Scheme::S3));
1211 assert_eq!(config.get("bucket").unwrap(), "my-bucket");
1212 assert_eq!(config.get("root").unwrap(), "path");
1213 assert_eq!(config.get("aws_access_key_id").unwrap(), "test_key");
1214 assert_eq!(config.get("aws_secret_access_key").unwrap(), "test_secret");
1215 assert_eq!(config.get("region").unwrap(), "us-west-2");
1216
1217 let (scheme, config) =
1219 DirectoryNamespace::parse_storage_path("/local/path", &storage_options).unwrap();
1220 assert!(matches!(scheme, opendal::Scheme::Fs));
1221 assert_eq!(config.get("root").unwrap(), "/local/path");
1222 assert_eq!(config.get("aws_access_key_id").unwrap(), "test_key");
1224 }
1225
1226 #[tokio::test]
1227 async fn test_create_table_with_ipc_data() {
1228 use arrow::array::{Int32Array, StringArray};
1229 use arrow::ipc::writer::StreamWriter;
1230
1231 let (namespace, _temp_dir) = create_test_namespace().await;
1232
1233 let schema = create_test_schema();
1235
1236 let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
1238 let arrow_schema = Arc::new(arrow_schema);
1239
1240 let id_array = Int32Array::from(vec![1, 2, 3]);
1242 let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
1243 let batch = arrow::record_batch::RecordBatch::try_new(
1244 arrow_schema.clone(),
1245 vec![Arc::new(id_array), Arc::new(name_array)],
1246 )
1247 .unwrap();
1248
1249 let mut buffer = Vec::new();
1251 {
1252 let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
1253 writer.write(&batch).unwrap();
1254 writer.finish().unwrap();
1255 }
1256
1257 let mut request = CreateTableRequest::new();
1259 request.id = Some(vec!["test_table_with_data".to_string()]);
1260
1261 let response = namespace
1262 .create_table(request, Bytes::from(buffer))
1263 .await
1264 .unwrap();
1265
1266 assert_eq!(response.version, Some(1));
1267 assert!(response
1268 .location
1269 .unwrap()
1270 .contains("test_table_with_data.lance"));
1271
1272 let mut exists_request = TableExistsRequest::new();
1274 exists_request.id = Some(vec!["test_table_with_data".to_string()]);
1275 namespace.table_exists(exists_request).await.unwrap();
1276 }
1277
1278 #[tokio::test]
1279 async fn test_create_empty_table() {
1280 let (namespace, _temp_dir) = create_test_namespace().await;
1281
1282 let mut request = CreateEmptyTableRequest::new();
1283 request.id = Some(vec!["empty_table".to_string()]);
1284
1285 let response = namespace.create_empty_table(request).await.unwrap();
1286
1287 assert!(response.location.is_some());
1288 assert!(response.location.unwrap().ends_with("empty_table.lance"));
1289
1290 let mut exists_request = TableExistsRequest::new();
1292 exists_request.id = Some(vec!["empty_table".to_string()]);
1293 namespace.table_exists(exists_request).await.unwrap();
1294
1295 let list_request = ListTablesRequest::new();
1297 let list_response = namespace.list_tables(list_request).await.unwrap();
1298 assert!(list_response.tables.contains(&"empty_table".to_string()));
1299 }
1300
1301 #[tokio::test]
1302 async fn test_create_empty_table_with_wrong_location() {
1303 let (namespace, _temp_dir) = create_test_namespace().await;
1304
1305 let mut request = CreateEmptyTableRequest::new();
1306 request.id = Some(vec!["test_table".to_string()]);
1307 request.location = Some("/wrong/path/table.lance".to_string());
1308
1309 let result = namespace.create_empty_table(request).await;
1310 assert!(result.is_err());
1311 assert!(result
1312 .unwrap_err()
1313 .to_string()
1314 .contains("must be at location"));
1315 }
1316}