1use std::collections::HashMap;
7use std::str::FromStr;
8
9use async_trait::async_trait;
10use bytes::Bytes;
11use lance::dataset::{Dataset, WriteParams};
12use opendal::Operator;
13
14use crate::models::{
15 CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
16 CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DescribeNamespaceRequest,
17 DescribeNamespaceResponse, DescribeTableRequest, DescribeTableResponse, DropNamespaceRequest,
18 DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest,
19 ListNamespacesResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest,
20 TableExistsRequest,
21};
22
23use crate::namespace::{LanceNamespace, NamespaceError, Result};
24
25#[derive(Debug, Clone)]
27pub struct DirectoryNamespaceConfig {
28 root: String,
30 storage_options: HashMap<String, String>,
32}
33
34impl DirectoryNamespaceConfig {
35 pub const ROOT: &'static str = "root";
37 pub const STORAGE_OPTIONS_PREFIX: &'static str = "storage.";
39
40 pub fn new(properties: HashMap<String, String>) -> Self {
42 let root = properties
43 .get(Self::ROOT)
44 .cloned()
45 .unwrap_or_else(|| {
46 std::env::current_dir()
47 .unwrap()
48 .to_string_lossy()
49 .to_string()
50 })
51 .trim_end_matches('/')
52 .to_string();
53
54 let storage_options: HashMap<String, String> = properties
55 .iter()
56 .filter_map(|(k, v)| {
57 k.strip_prefix(Self::STORAGE_OPTIONS_PREFIX)
58 .map(|key| (key.to_string(), v.clone()))
59 })
60 .collect();
61
62 Self {
63 root,
64 storage_options,
65 }
66 }
67
68 pub fn root(&self) -> &str {
70 &self.root
71 }
72
73 pub fn storage_options(&self) -> &HashMap<String, String> {
75 &self.storage_options
76 }
77}
78
79pub struct DirectoryNamespace {
84 config: DirectoryNamespaceConfig,
85 operator: Operator,
86}
87
88impl DirectoryNamespace {
89 pub fn new(properties: HashMap<String, String>) -> Result<Self> {
91 let config = DirectoryNamespaceConfig::new(properties);
92 let operator = Self::initialize_operator(&config)?;
93
94 Ok(Self { config, operator })
95 }
96
97 fn initialize_operator(config: &DirectoryNamespaceConfig) -> Result<Operator> {
99 let root = config.root();
100 let storage_options = &config.storage_options;
101
102 let (scheme, opendal_config) = Self::parse_storage_path(root, storage_options)?;
104
105 let operator = Operator::via_iter(scheme, opendal_config)
107 .map_err(|e| NamespaceError::Other(format!("Failed to create operator: {}", e)))?;
108
109 Ok(operator)
110 }
111
112 fn parse_storage_path(
114 root: &str,
115 storage_options: &HashMap<String, String>,
116 ) -> Result<(opendal::Scheme, HashMap<String, String>)> {
117 use url::Url;
118
119 let mut config = HashMap::new();
120
121 let (scheme, authority, path) = if let Ok(url) = Url::parse(root) {
123 let scheme = Self::normalize_scheme(url.scheme());
124 let authority = url.host_str().unwrap_or("");
125 let path = if scheme == "fs" || scheme == "file" {
128 url.path().to_string()
129 } else {
130 url.path().trim_start_matches('/').to_string()
131 };
132 (scheme, authority.to_string(), path)
133 } else {
134 ("fs".to_string(), String::new(), root.to_string())
136 };
137
138 let opendal_scheme = match scheme.as_str() {
140 "fs" | "file" => {
141 if authority.is_empty() {
143 config.insert("root".to_string(), path);
144 } else {
145 config.insert("root".to_string(), format!("{}/{}", authority, path));
147 }
148 opendal::Scheme::Fs
149 }
150 "s3" => {
151 config.insert("root".to_string(), path);
152 config.insert("bucket".to_string(), authority);
153 opendal::Scheme::S3
154 }
155 "gcs" => {
156 config.insert("root".to_string(), path);
157 config.insert("bucket".to_string(), authority);
158 opendal::Scheme::Gcs
159 }
160 "azblob" => {
161 config.insert("root".to_string(), path);
162 config.insert("container".to_string(), authority);
163 opendal::Scheme::Azblob
164 }
165 _ => {
166 config.insert("root".to_string(), path);
168 if !authority.is_empty() {
169 config.insert("bucket".to_string(), authority);
170 }
171 opendal::Scheme::from_str(&scheme).map_err(|_| {
172 NamespaceError::Other(format!("Unsupported storage scheme: {}", scheme))
173 })?
174 }
175 };
176
177 config.extend(storage_options.clone());
179
180 Ok((opendal_scheme, config))
181 }
182
183 fn normalize_scheme(scheme: &str) -> String {
185 match scheme.to_lowercase().as_str() {
186 "s3a" | "s3n" => "s3".to_string(),
187 "abfs" => "azblob".to_string(),
188 "file" => "fs".to_string(),
189 s => s.to_string(),
190 }
191 }
192
193 fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
195 if let Some(id) = id {
196 if !id.is_empty() {
197 return Err(NamespaceError::Other(format!(
198 "Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
199 id
200 )));
201 }
202 }
203 Ok(())
204 }
205
206 fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
208 let id = id.as_ref().ok_or_else(|| {
209 NamespaceError::Other("Directory namespace table ID cannot be empty".to_string())
210 })?;
211
212 if id.len() != 1 {
213 return Err(NamespaceError::Other(format!(
214 "Directory namespace only supports single-level table IDs, but got: {:?}",
215 id
216 )));
217 }
218
219 Ok(id[0].clone())
220 }
221
222 fn table_full_path(&self, table_name: &str) -> String {
224 format!("{}/{}.lance", self.config.root(), table_name)
225 }
226
227 fn table_versions_path(&self, table_name: &str) -> String {
229 format!("{}.lance/_versions/", table_name)
230 }
231}
232
233#[async_trait]
234impl LanceNamespace for DirectoryNamespace {
235 async fn list_namespaces(
236 &self,
237 request: ListNamespacesRequest,
238 ) -> Result<ListNamespacesResponse> {
239 Self::validate_root_namespace_id(&request.id)?;
241
242 Ok(ListNamespacesResponse::new(vec![]))
244 }
245
246 async fn describe_namespace(
247 &self,
248 request: DescribeNamespaceRequest,
249 ) -> Result<DescribeNamespaceResponse> {
250 Self::validate_root_namespace_id(&request.id)?;
252
253 Ok(DescribeNamespaceResponse {
255 properties: Some(HashMap::new()),
256 })
257 }
258
259 async fn create_namespace(
260 &self,
261 request: CreateNamespaceRequest,
262 ) -> Result<CreateNamespaceResponse> {
263 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
265 return Err(NamespaceError::Other(
266 "Root namespace already exists and cannot be created".to_string(),
267 ));
268 }
269
270 Err(NamespaceError::NotSupported(
272 "Directory namespace only supports the root namespace".to_string(),
273 ))
274 }
275
276 async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
277 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
279 return Err(NamespaceError::Other(
280 "Root namespace cannot be dropped".to_string(),
281 ));
282 }
283
284 Err(NamespaceError::NotSupported(
286 "Directory namespace only supports the root namespace".to_string(),
287 ))
288 }
289
290 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
291 if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
293 return Ok(());
294 }
295
296 Err(NamespaceError::Other(
298 "Only root namespace exists in directory namespace".to_string(),
299 ))
300 }
301
302 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
303 Self::validate_root_namespace_id(&request.id)?;
304
305 let mut tables = Vec::new();
306
307 let entries = self.operator.list("").await.map_err(|e| {
309 NamespaceError::Io(std::io::Error::new(
310 std::io::ErrorKind::Other,
311 format!("Failed to list directory: {}", e),
312 ))
313 })?;
314
315 for entry in entries {
316 let path = entry.path().trim_end_matches('/');
317
318 if !path.ends_with(".lance") {
320 continue;
321 }
322
323 let table_name = &path[..path.len() - 6];
325
326 let mut is_table = false;
328
329 let reserved_file_path = format!("{}.lance/.lance-reserved", table_name);
331 if self
332 .operator
333 .exists(&reserved_file_path)
334 .await
335 .unwrap_or(false)
336 {
337 is_table = true;
338 }
339
340 if !is_table {
342 let versions_path = self.table_versions_path(table_name);
343 if let Ok(version_entries) = self.operator.list(&versions_path).await {
344 if !version_entries.is_empty() {
346 is_table = true;
347 }
348 }
349 }
350
351 if is_table {
352 tables.push(table_name.to_string());
353 }
354 }
355
356 let response = ListTablesResponse::new(tables);
357 Ok(response)
358 }
359
360 async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
361 let table_name = Self::table_name_from_id(&request.id)?;
362 let table_path = self.table_full_path(&table_name);
363
364 let mut table_exists = false;
366
367 let reserved_file_path = format!("{}.lance/.lance-reserved", table_name);
369 if self
370 .operator
371 .exists(&reserved_file_path)
372 .await
373 .unwrap_or(false)
374 {
375 table_exists = true;
376 }
377
378 if !table_exists {
380 let versions_path = self.table_versions_path(&table_name);
381 if let Ok(entries) = self.operator.list(&versions_path).await {
382 if !entries.is_empty() {
383 table_exists = true;
384 }
385 }
386 }
387
388 if !table_exists {
389 return Err(NamespaceError::Other(format!(
390 "Table does not exist: {}",
391 table_name
392 )));
393 }
394
395 Ok(DescribeTableResponse {
396 version: None,
397 location: Some(table_path),
398 schema: None,
399 properties: None,
400 storage_options: Some(self.config.storage_options.clone()),
401 })
402 }
403
404 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
405 let table_name = Self::table_name_from_id(&request.id)?;
406
407 let mut table_exists = false;
409
410 let reserved_file_path = format!("{}.lance/.lance-reserved", table_name);
412 if self
413 .operator
414 .exists(&reserved_file_path)
415 .await
416 .unwrap_or(false)
417 {
418 table_exists = true;
419 }
420
421 if !table_exists {
423 let versions_path = self.table_versions_path(&table_name);
424 if let Ok(entries) = self.operator.list(&versions_path).await {
425 if !entries.is_empty() {
426 table_exists = true;
427 }
428 }
429 }
430
431 if !table_exists {
432 return Err(NamespaceError::Other(format!(
433 "Table does not exist: {}",
434 table_name
435 )));
436 }
437
438 Ok(())
439 }
440
441 async fn create_table(
442 &self,
443 request: CreateTableRequest,
444 request_data: Bytes,
445 ) -> Result<CreateTableResponse> {
446 let table_name = Self::table_name_from_id(&request.id)?;
447 let table_path = self.table_full_path(&table_name);
448
449 if request_data.is_empty() {
451 return Err(NamespaceError::Other(
452 "Request data (Arrow IPC stream) is required for create_table".to_string(),
453 ));
454 }
455
456 if let Some(location) = &request.location {
458 let location = location.trim_end_matches('/');
459 if location != table_path {
460 return Err(NamespaceError::Other(format!(
461 "Cannot create table {} at location {}, must be at location {}",
462 table_name, location, table_path
463 )));
464 }
465 }
466
467 use arrow::ipc::reader::StreamReader;
469 use std::io::Cursor;
470
471 let cursor = Cursor::new(request_data.to_vec());
472 let stream_reader = StreamReader::try_new(cursor, None)
473 .map_err(|e| NamespaceError::Other(format!("Invalid Arrow IPC stream: {}", e)))?;
474
475 let arrow_schema = stream_reader.schema();
477
478 let mut batches = Vec::new();
480 for batch_result in stream_reader {
481 batches.push(batch_result.map_err(|e| {
482 NamespaceError::Other(format!("Failed to read batch from IPC stream: {}", e))
483 })?);
484 }
485
486 let reader = if batches.is_empty() {
488 let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
490 let batches = vec![Ok(batch)];
491 arrow::record_batch::RecordBatchIterator::new(batches, arrow_schema.clone())
492 } else {
493 let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
495 arrow::record_batch::RecordBatchIterator::new(batch_results, arrow_schema)
496 };
497
498 let write_params = WriteParams {
500 mode: lance::dataset::WriteMode::Create,
501 ..Default::default()
502 };
503
504 Dataset::write(reader, &table_path, Some(write_params))
506 .await
507 .map_err(|e| NamespaceError::Other(format!("Failed to create Lance dataset: {}", e)))?;
508
509 Ok(CreateTableResponse {
510 version: Some(1),
511 location: Some(table_path),
512 properties: None,
513 storage_options: Some(self.config.storage_options.clone()),
514 })
515 }
516
517 async fn create_empty_table(
518 &self,
519 request: CreateEmptyTableRequest,
520 ) -> Result<CreateEmptyTableResponse> {
521 let table_name = Self::table_name_from_id(&request.id)?;
522 let table_path = self.table_full_path(&table_name);
523
524 if let Some(location) = &request.location {
526 let location = location.trim_end_matches('/');
527 if location != table_path {
528 return Err(NamespaceError::Other(format!(
529 "Cannot create table {} at location {}, must be at location {}",
530 table_name, location, table_path
531 )));
532 }
533 }
534
535 let reserved_file_path = format!("{}.lance/.lance-reserved", table_name);
537 self.operator
538 .write(&reserved_file_path, Vec::<u8>::new())
539 .await
540 .map_err(|e| {
541 NamespaceError::Other(format!(
542 "Failed to create .lance-reserved file for table {}: {}",
543 table_name, e
544 ))
545 })?;
546
547 Ok(CreateEmptyTableResponse {
548 location: Some(table_path),
549 properties: None,
550 storage_options: Some(self.config.storage_options.clone()),
551 })
552 }
553
554 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
555 let table_name = Self::table_name_from_id(&request.id)?;
556 let table_path = self.table_full_path(&table_name);
557
558 let table_dir = format!("{}.lance/", table_name);
560 self.operator.remove_all(&table_dir).await.map_err(|e| {
561 NamespaceError::Other(format!("Failed to drop table {}: {}", table_name, e))
562 })?;
563
564 Ok(DropTableResponse {
565 id: request.id,
566 location: Some(table_path),
567 properties: None,
568 transaction_id: None,
569 })
570 }
571}
572
573#[cfg(test)]
574mod tests {
575 use super::*;
576 use crate::models::{JsonArrowDataType, JsonArrowField, JsonArrowSchema};
577 use crate::schema::convert_json_arrow_schema;
578 use std::collections::HashMap;
579 use std::sync::Arc;
580 use tempfile::TempDir;
581
582 async fn create_test_namespace() -> (DirectoryNamespace, TempDir) {
584 let temp_dir = TempDir::new().unwrap();
585 let mut properties = HashMap::new();
586 properties.insert(
587 "root".to_string(),
588 temp_dir.path().to_string_lossy().to_string(),
589 );
590
591 let namespace = DirectoryNamespace::new(properties).unwrap();
592 (namespace, temp_dir)
593 }
594
595 fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
597 use arrow::ipc::writer::StreamWriter;
598
599 let arrow_schema = convert_json_arrow_schema(schema).unwrap();
600 let arrow_schema = Arc::new(arrow_schema);
601 let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
602 let mut buffer = Vec::new();
603 {
604 let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
605 writer.write(&batch).unwrap();
606 writer.finish().unwrap();
607 }
608 buffer
609 }
610
611 fn create_test_schema() -> JsonArrowSchema {
613 let int_type = JsonArrowDataType::new("int32".to_string());
614 let string_type = JsonArrowDataType::new("utf8".to_string());
615
616 let id_field = JsonArrowField {
617 name: "id".to_string(),
618 r#type: Box::new(int_type.clone()),
619 nullable: false,
620 metadata: None,
621 };
622
623 let name_field = JsonArrowField {
624 name: "name".to_string(),
625 r#type: Box::new(string_type),
626 nullable: true,
627 metadata: None,
628 };
629
630 JsonArrowSchema {
631 fields: vec![id_field, name_field],
632 metadata: None,
633 }
634 }
635
636 #[tokio::test]
637 async fn test_create_table() {
638 let (namespace, _temp_dir) = create_test_namespace().await;
639
640 let schema = create_test_schema();
642 let ipc_data = create_test_ipc_data(&schema);
643
644 let mut request = CreateTableRequest::new();
645 request.id = Some(vec!["test_table".to_string()]);
646
647 let response = namespace
648 .create_table(request, bytes::Bytes::from(ipc_data))
649 .await
650 .unwrap();
651
652 assert!(response.location.is_some());
653 assert!(response.location.unwrap().ends_with("test_table.lance"));
654 assert_eq!(response.version, Some(1));
655 }
656
657 #[tokio::test]
658 async fn test_create_table_without_data() {
659 let (namespace, _temp_dir) = create_test_namespace().await;
660
661 let mut request = CreateTableRequest::new();
662 request.id = Some(vec!["test_table".to_string()]);
663
664 let result = namespace.create_table(request, bytes::Bytes::new()).await;
665 assert!(result.is_err());
666 assert!(result
667 .unwrap_err()
668 .to_string()
669 .contains("Arrow IPC stream) is required"));
670 }
671
672 #[tokio::test]
673 async fn test_create_table_with_invalid_id() {
674 let (namespace, _temp_dir) = create_test_namespace().await;
675
676 let schema = create_test_schema();
678 let ipc_data = create_test_ipc_data(&schema);
679
680 let mut request = CreateTableRequest::new();
682 request.id = Some(vec![]);
683
684 let result = namespace
685 .create_table(request, bytes::Bytes::from(ipc_data.clone()))
686 .await;
687 assert!(result.is_err());
688
689 let mut request = CreateTableRequest::new();
691 request.id = Some(vec!["namespace".to_string(), "table".to_string()]);
692
693 let result = namespace
694 .create_table(request, bytes::Bytes::from(ipc_data))
695 .await;
696 assert!(result.is_err());
697 assert!(result
698 .unwrap_err()
699 .to_string()
700 .contains("single-level table IDs"));
701 }
702
703 #[tokio::test]
704 async fn test_create_table_with_wrong_location() {
705 let (namespace, _temp_dir) = create_test_namespace().await;
706
707 let schema = create_test_schema();
709 let ipc_data = create_test_ipc_data(&schema);
710
711 let mut request = CreateTableRequest::new();
712 request.id = Some(vec!["test_table".to_string()]);
713 request.location = Some("/wrong/path/table.lance".to_string());
714
715 let result = namespace
716 .create_table(request, bytes::Bytes::from(ipc_data))
717 .await;
718 assert!(result.is_err());
719 assert!(result
720 .unwrap_err()
721 .to_string()
722 .contains("must be at location"));
723 }
724
725 #[tokio::test]
726 async fn test_list_tables() {
727 let (namespace, _temp_dir) = create_test_namespace().await;
728
729 let request = ListTablesRequest::new();
731 let response = namespace.list_tables(request).await.unwrap();
732 assert_eq!(response.tables.len(), 0);
733
734 let schema = create_test_schema();
736 let ipc_data = create_test_ipc_data(&schema);
737
738 let mut create_request = CreateTableRequest::new();
740 create_request.id = Some(vec!["table1".to_string()]);
741 namespace
742 .create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
743 .await
744 .unwrap();
745
746 let mut create_request = CreateTableRequest::new();
748 create_request.id = Some(vec!["table2".to_string()]);
749 namespace
750 .create_table(create_request, bytes::Bytes::from(ipc_data))
751 .await
752 .unwrap();
753
754 let request = ListTablesRequest::new();
756 let response = namespace.list_tables(request).await.unwrap();
757 let tables = response.tables;
758 assert_eq!(tables.len(), 2);
759 assert!(tables.contains(&"table1".to_string()));
760 assert!(tables.contains(&"table2".to_string()));
761 }
762
763 #[tokio::test]
764 async fn test_list_tables_with_namespace_id() {
765 let (namespace, _temp_dir) = create_test_namespace().await;
766
767 let mut request = ListTablesRequest::new();
768 request.id = Some(vec!["namespace".to_string()]);
769
770 let result = namespace.list_tables(request).await;
771 assert!(result.is_err());
772 assert!(result
773 .unwrap_err()
774 .to_string()
775 .contains("root namespace operations"));
776 }
777
778 #[tokio::test]
779 async fn test_describe_table() {
780 let (namespace, _temp_dir) = create_test_namespace().await;
781
782 let schema = create_test_schema();
784 let ipc_data = create_test_ipc_data(&schema);
785
786 let mut create_request = CreateTableRequest::new();
787 create_request.id = Some(vec!["test_table".to_string()]);
788 namespace
789 .create_table(create_request, bytes::Bytes::from(ipc_data))
790 .await
791 .unwrap();
792
793 let mut request = DescribeTableRequest::new();
795 request.id = Some(vec!["test_table".to_string()]);
796 let response = namespace.describe_table(request).await.unwrap();
797
798 assert!(response.location.is_some());
799 assert!(response.location.unwrap().ends_with("test_table.lance"));
800 }
801
802 #[tokio::test]
803 async fn test_describe_nonexistent_table() {
804 let (namespace, _temp_dir) = create_test_namespace().await;
805
806 let mut request = DescribeTableRequest::new();
807 request.id = Some(vec!["nonexistent".to_string()]);
808
809 let result = namespace.describe_table(request).await;
810 assert!(result.is_err());
811 assert!(result
812 .unwrap_err()
813 .to_string()
814 .contains("Table does not exist"));
815 }
816
817 #[tokio::test]
818 async fn test_table_exists() {
819 let (namespace, _temp_dir) = create_test_namespace().await;
820
821 let schema = create_test_schema();
823 let ipc_data = create_test_ipc_data(&schema);
824
825 let mut create_request = CreateTableRequest::new();
826 create_request.id = Some(vec!["existing_table".to_string()]);
827 namespace
828 .create_table(create_request, bytes::Bytes::from(ipc_data))
829 .await
830 .unwrap();
831
832 let mut request = TableExistsRequest::new();
834 request.id = Some(vec!["existing_table".to_string()]);
835 let result = namespace.table_exists(request).await;
836 assert!(result.is_ok());
837
838 let mut request = TableExistsRequest::new();
840 request.id = Some(vec!["nonexistent".to_string()]);
841 let result = namespace.table_exists(request).await;
842 assert!(result.is_err());
843 assert!(result
844 .unwrap_err()
845 .to_string()
846 .contains("Table does not exist"));
847 }
848
849 #[tokio::test]
850 async fn test_drop_table() {
851 let (namespace, _temp_dir) = create_test_namespace().await;
852
853 let schema = create_test_schema();
855 let ipc_data = create_test_ipc_data(&schema);
856
857 let mut create_request = CreateTableRequest::new();
858 create_request.id = Some(vec!["table_to_drop".to_string()]);
859 namespace
860 .create_table(create_request, bytes::Bytes::from(ipc_data))
861 .await
862 .unwrap();
863
864 let mut exists_request = TableExistsRequest::new();
866 exists_request.id = Some(vec!["table_to_drop".to_string()]);
867 assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
868
869 let mut drop_request = DropTableRequest::new();
871 drop_request.id = Some(vec!["table_to_drop".to_string()]);
872 let response = namespace.drop_table(drop_request).await.unwrap();
873 assert!(response.location.is_some());
874
875 assert!(namespace.table_exists(exists_request).await.is_err());
877 }
878
879 #[tokio::test]
880 async fn test_drop_nonexistent_table() {
881 let (namespace, _temp_dir) = create_test_namespace().await;
882
883 let mut request = DropTableRequest::new();
884 request.id = Some(vec!["nonexistent".to_string()]);
885
886 let result = namespace.drop_table(request).await;
888 let _ = result;
891 }
892
893 #[tokio::test]
894 async fn test_root_namespace_operations() {
895 let (namespace, _temp_dir) = create_test_namespace().await;
896
897 let request = ListNamespacesRequest::new();
899 let result = namespace.list_namespaces(request).await;
900 assert!(result.is_ok());
901 assert_eq!(result.unwrap().namespaces.len(), 0);
902
903 let request = DescribeNamespaceRequest::new();
905 let result = namespace.describe_namespace(request).await;
906 assert!(result.is_ok());
907
908 let request = NamespaceExistsRequest::new();
910 let result = namespace.namespace_exists(request).await;
911 assert!(result.is_ok());
912
913 let request = CreateNamespaceRequest::new();
915 let result = namespace.create_namespace(request).await;
916 assert!(result.is_err());
917 assert!(result.unwrap_err().to_string().contains("already exists"));
918
919 let request = DropNamespaceRequest::new();
921 let result = namespace.drop_namespace(request).await;
922 assert!(result.is_err());
923 assert!(result
924 .unwrap_err()
925 .to_string()
926 .contains("cannot be dropped"));
927 }
928
929 #[tokio::test]
930 async fn test_non_root_namespace_operations() {
931 let (namespace, _temp_dir) = create_test_namespace().await;
932
933 let mut request = CreateNamespaceRequest::new();
935 request.id = Some(vec!["child".to_string()]);
936 let result = namespace.create_namespace(request).await;
937 assert!(matches!(result, Err(NamespaceError::NotSupported(_))));
938
939 let mut request = NamespaceExistsRequest::new();
941 request.id = Some(vec!["child".to_string()]);
942 let result = namespace.namespace_exists(request).await;
943 assert!(result.is_err());
944 assert!(result
945 .unwrap_err()
946 .to_string()
947 .contains("Only root namespace exists"));
948
949 let mut request = DropNamespaceRequest::new();
951 request.id = Some(vec!["child".to_string()]);
952 let result = namespace.drop_namespace(request).await;
953 assert!(matches!(result, Err(NamespaceError::NotSupported(_))));
954 }
955
956 #[tokio::test]
957 async fn test_config_custom_root() {
958 let temp_dir = TempDir::new().unwrap();
959 let custom_path = temp_dir.path().join("custom");
960 std::fs::create_dir(&custom_path).unwrap();
961
962 let mut properties = HashMap::new();
963 properties.insert(
964 "root".to_string(),
965 custom_path.to_string_lossy().to_string(),
966 );
967
968 let namespace = DirectoryNamespace::new(properties).unwrap();
969
970 let schema = create_test_schema();
972 let ipc_data = create_test_ipc_data(&schema);
973
974 let mut request = CreateTableRequest::new();
976 request.id = Some(vec!["test_table".to_string()]);
977
978 let response = namespace
979 .create_table(request, bytes::Bytes::from(ipc_data))
980 .await
981 .unwrap();
982
983 assert!(response.location.unwrap().contains("custom"));
984 }
985
986 #[tokio::test]
987 async fn test_config_storage_options() {
988 let temp_dir = TempDir::new().unwrap();
989 let mut properties = HashMap::new();
990 properties.insert(
991 "root".to_string(),
992 temp_dir.path().to_string_lossy().to_string(),
993 );
994 properties.insert("storage.option1".to_string(), "value1".to_string());
995 properties.insert("storage.option2".to_string(), "value2".to_string());
996
997 let namespace = DirectoryNamespace::new(properties).unwrap();
998
999 let schema = create_test_schema();
1001 let ipc_data = create_test_ipc_data(&schema);
1002
1003 let mut request = CreateTableRequest::new();
1005 request.id = Some(vec!["test_table".to_string()]);
1006
1007 let response = namespace
1008 .create_table(request, bytes::Bytes::from(ipc_data))
1009 .await
1010 .unwrap();
1011
1012 let storage_options = response.storage_options.unwrap();
1013 assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
1014 assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
1015 }
1016
1017 #[tokio::test]
1018 async fn test_various_arrow_types() {
1019 let (namespace, _temp_dir) = create_test_namespace().await;
1020
1021 let fields = vec![
1023 JsonArrowField {
1024 name: "bool_col".to_string(),
1025 r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
1026 nullable: true,
1027 metadata: None,
1028 },
1029 JsonArrowField {
1030 name: "int8_col".to_string(),
1031 r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
1032 nullable: true,
1033 metadata: None,
1034 },
1035 JsonArrowField {
1036 name: "float64_col".to_string(),
1037 r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
1038 nullable: true,
1039 metadata: None,
1040 },
1041 JsonArrowField {
1042 name: "binary_col".to_string(),
1043 r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
1044 nullable: true,
1045 metadata: None,
1046 },
1047 ];
1048
1049 let schema = JsonArrowSchema {
1050 fields,
1051 metadata: None,
1052 };
1053
1054 let ipc_data = create_test_ipc_data(&schema);
1056
1057 let mut request = CreateTableRequest::new();
1058 request.id = Some(vec!["complex_table".to_string()]);
1059
1060 let response = namespace
1061 .create_table(request, bytes::Bytes::from(ipc_data))
1062 .await
1063 .unwrap();
1064
1065 assert!(response.location.is_some());
1066 }
1067
1068 #[tokio::test]
1069 async fn test_connect_dir() {
1070 let temp_dir = TempDir::new().unwrap();
1071 let mut properties = HashMap::new();
1072 properties.insert(
1073 "root".to_string(),
1074 temp_dir.path().to_string_lossy().to_string(),
1075 );
1076
1077 let namespace = crate::connect("dir", properties).await.unwrap();
1078
1079 let request = ListTablesRequest::new();
1081 let response = namespace.list_tables(request).await.unwrap();
1082 assert_eq!(response.tables.len(), 0);
1083 }
1084
1085 #[test]
1086 fn test_parse_storage_path_local() {
1087 let storage_options = HashMap::new();
1088
1089 let (scheme, config) =
1091 DirectoryNamespace::parse_storage_path("/path/to/data", &storage_options).unwrap();
1092 assert!(matches!(scheme, opendal::Scheme::Fs));
1093 assert_eq!(config.get("root").unwrap(), "/path/to/data");
1094
1095 let (scheme, config) =
1097 DirectoryNamespace::parse_storage_path("./data", &storage_options).unwrap();
1098 assert!(matches!(scheme, opendal::Scheme::Fs));
1099 assert_eq!(config.get("root").unwrap(), "./data");
1100 }
1101
1102 #[test]
1103 fn test_parse_storage_path_s3() {
1104 let storage_options = HashMap::new();
1105
1106 let (scheme, config) =
1108 DirectoryNamespace::parse_storage_path("s3://my-bucket/path/to/data", &storage_options)
1109 .unwrap();
1110 assert!(matches!(scheme, opendal::Scheme::S3));
1111 assert_eq!(config.get("bucket").unwrap(), "my-bucket");
1112 assert_eq!(config.get("root").unwrap(), "path/to/data");
1113
1114 let (scheme, config) =
1116 DirectoryNamespace::parse_storage_path("s3://my-bucket", &storage_options).unwrap();
1117 assert!(matches!(scheme, opendal::Scheme::S3));
1118 assert_eq!(config.get("bucket").unwrap(), "my-bucket");
1119 assert_eq!(config.get("root").unwrap(), "");
1120 }
1121
1122 #[test]
1123 fn test_parse_storage_path_gcs() {
1124 let storage_options = HashMap::new();
1125
1126 let (scheme, config) = DirectoryNamespace::parse_storage_path(
1128 "gcs://my-bucket/path/to/data",
1129 &storage_options,
1130 )
1131 .unwrap();
1132 assert!(matches!(scheme, opendal::Scheme::Gcs));
1133 assert_eq!(config.get("bucket").unwrap(), "my-bucket");
1134 assert_eq!(config.get("root").unwrap(), "path/to/data");
1135 }
1136
1137 #[test]
1138 fn test_parse_storage_path_azblob() {
1139 let storage_options = HashMap::new();
1140
1141 let (scheme, config) = DirectoryNamespace::parse_storage_path(
1143 "azblob://my-container/path/to/data",
1144 &storage_options,
1145 )
1146 .unwrap();
1147 assert!(matches!(scheme, opendal::Scheme::Azblob));
1148 assert_eq!(config.get("container").unwrap(), "my-container");
1149 assert_eq!(config.get("root").unwrap(), "path/to/data");
1150
1151 let (scheme, config) =
1153 DirectoryNamespace::parse_storage_path("abfs://my-container/path", &storage_options)
1154 .unwrap();
1155 assert!(matches!(scheme, opendal::Scheme::Azblob));
1156 assert_eq!(config.get("container").unwrap(), "my-container");
1157 assert_eq!(config.get("root").unwrap(), "path");
1158 }
1159
1160 #[test]
1161 fn test_normalize_scheme() {
1162 assert_eq!(DirectoryNamespace::normalize_scheme("s3a"), "s3");
1164 assert_eq!(DirectoryNamespace::normalize_scheme("s3n"), "s3");
1165 assert_eq!(DirectoryNamespace::normalize_scheme("S3A"), "s3");
1166 assert_eq!(DirectoryNamespace::normalize_scheme("abfs"), "azblob");
1167 assert_eq!(DirectoryNamespace::normalize_scheme("ABFS"), "azblob");
1168 assert_eq!(DirectoryNamespace::normalize_scheme("file"), "fs");
1169 assert_eq!(DirectoryNamespace::normalize_scheme("FILE"), "fs");
1170 assert_eq!(DirectoryNamespace::normalize_scheme("gcs"), "gcs");
1171 assert_eq!(DirectoryNamespace::normalize_scheme("random"), "random");
1172 }
1173
1174 #[test]
1175 fn test_fs_scheme_url() {
1176 let storage_options = HashMap::new();
1177
1178 let (scheme, config) =
1180 DirectoryNamespace::parse_storage_path("file:///absolute/path", &storage_options)
1181 .unwrap();
1182 assert!(matches!(scheme, opendal::Scheme::Fs));
1183 assert_eq!(config.get("root").unwrap(), "/absolute/path");
1184
1185 let (scheme, config) =
1187 DirectoryNamespace::parse_storage_path("fs:///absolute/path", &storage_options)
1188 .unwrap();
1189 assert!(matches!(scheme, opendal::Scheme::Fs));
1190 assert_eq!(config.get("root").unwrap(), "/absolute/path");
1191 }
1192
1193 #[test]
1194 fn test_storage_options_passed_through() {
1195 let mut storage_options = HashMap::new();
1197 storage_options.insert("aws_access_key_id".to_string(), "test_key".to_string());
1198 storage_options.insert(
1199 "aws_secret_access_key".to_string(),
1200 "test_secret".to_string(),
1201 );
1202 storage_options.insert("region".to_string(), "us-west-2".to_string());
1203
1204 let (scheme, config) =
1206 DirectoryNamespace::parse_storage_path("s3://my-bucket/path", &storage_options)
1207 .unwrap();
1208 assert!(matches!(scheme, opendal::Scheme::S3));
1209 assert_eq!(config.get("bucket").unwrap(), "my-bucket");
1210 assert_eq!(config.get("root").unwrap(), "path");
1211 assert_eq!(config.get("aws_access_key_id").unwrap(), "test_key");
1212 assert_eq!(config.get("aws_secret_access_key").unwrap(), "test_secret");
1213 assert_eq!(config.get("region").unwrap(), "us-west-2");
1214
1215 let (scheme, config) =
1217 DirectoryNamespace::parse_storage_path("/local/path", &storage_options).unwrap();
1218 assert!(matches!(scheme, opendal::Scheme::Fs));
1219 assert_eq!(config.get("root").unwrap(), "/local/path");
1220 assert_eq!(config.get("aws_access_key_id").unwrap(), "test_key");
1222 }
1223
1224 #[tokio::test]
1225 async fn test_create_table_with_ipc_data() {
1226 use arrow::array::{Int32Array, StringArray};
1227 use arrow::ipc::writer::StreamWriter;
1228
1229 let (namespace, _temp_dir) = create_test_namespace().await;
1230
1231 let schema = create_test_schema();
1233
1234 let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
1236 let arrow_schema = Arc::new(arrow_schema);
1237
1238 let id_array = Int32Array::from(vec![1, 2, 3]);
1240 let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
1241 let batch = arrow::record_batch::RecordBatch::try_new(
1242 arrow_schema.clone(),
1243 vec![Arc::new(id_array), Arc::new(name_array)],
1244 )
1245 .unwrap();
1246
1247 let mut buffer = Vec::new();
1249 {
1250 let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
1251 writer.write(&batch).unwrap();
1252 writer.finish().unwrap();
1253 }
1254
1255 let mut request = CreateTableRequest::new();
1257 request.id = Some(vec!["test_table_with_data".to_string()]);
1258
1259 let response = namespace
1260 .create_table(request, Bytes::from(buffer))
1261 .await
1262 .unwrap();
1263
1264 assert_eq!(response.version, Some(1));
1265 assert!(response
1266 .location
1267 .unwrap()
1268 .contains("test_table_with_data.lance"));
1269
1270 let mut exists_request = TableExistsRequest::new();
1272 exists_request.id = Some(vec!["test_table_with_data".to_string()]);
1273 namespace.table_exists(exists_request).await.unwrap();
1274 }
1275
1276 #[tokio::test]
1277 async fn test_create_empty_table() {
1278 let (namespace, temp_dir) = create_test_namespace().await;
1279
1280 let mut request = CreateEmptyTableRequest::new();
1281 request.id = Some(vec!["empty_table".to_string()]);
1282
1283 let response = namespace.create_empty_table(request).await.unwrap();
1284
1285 assert!(response.location.is_some());
1286 assert!(response.location.unwrap().ends_with("empty_table.lance"));
1287
1288 let table_dir = temp_dir.path().join("empty_table.lance");
1290 assert!(table_dir.exists());
1291 assert!(table_dir.is_dir());
1292
1293 let reserved_file = table_dir.join(".lance-reserved");
1294 assert!(reserved_file.exists());
1295 assert!(reserved_file.is_file());
1296
1297 let metadata = std::fs::metadata(&reserved_file).unwrap();
1299 assert_eq!(metadata.len(), 0);
1300
1301 let mut exists_request = TableExistsRequest::new();
1303 exists_request.id = Some(vec!["empty_table".to_string()]);
1304 namespace.table_exists(exists_request).await.unwrap();
1305
1306 let list_request = ListTablesRequest::new();
1308 let list_response = namespace.list_tables(list_request).await.unwrap();
1309 assert!(list_response.tables.contains(&"empty_table".to_string()));
1310
1311 let mut describe_request = DescribeTableRequest::new();
1313 describe_request.id = Some(vec!["empty_table".to_string()]);
1314 let describe_response = namespace.describe_table(describe_request).await.unwrap();
1315 assert!(describe_response.location.is_some());
1316 assert!(describe_response.location.unwrap().contains("empty_table"));
1317 }
1318
1319 #[tokio::test]
1320 async fn test_create_empty_table_with_wrong_location() {
1321 let (namespace, _temp_dir) = create_test_namespace().await;
1322
1323 let mut request = CreateEmptyTableRequest::new();
1324 request.id = Some(vec!["test_table".to_string()]);
1325 request.location = Some("/wrong/path/table.lance".to_string());
1326
1327 let result = namespace.create_empty_table(request).await;
1328 assert!(result.is_err());
1329 assert!(result
1330 .unwrap_err()
1331 .to_string()
1332 .contains("must be at location"));
1333 }
1334
1335 #[tokio::test]
1336 async fn test_create_empty_table_then_drop() {
1337 let (namespace, temp_dir) = create_test_namespace().await;
1338
1339 let mut create_request = CreateEmptyTableRequest::new();
1341 create_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1342
1343 let create_response = namespace.create_empty_table(create_request).await.unwrap();
1344 assert!(create_response.location.is_some());
1345
1346 let table_dir = temp_dir.path().join("empty_table_to_drop.lance");
1348 assert!(table_dir.exists());
1349 let reserved_file = table_dir.join(".lance-reserved");
1350 assert!(reserved_file.exists());
1351
1352 let mut drop_request = DropTableRequest::new();
1354 drop_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1355 let drop_response = namespace.drop_table(drop_request).await.unwrap();
1356 assert!(drop_response.location.is_some());
1357
1358 assert!(!table_dir.exists());
1360 assert!(!reserved_file.exists());
1361
1362 let mut exists_request = TableExistsRequest::new();
1364 exists_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1365 let exists_result = namespace.table_exists(exists_request).await;
1366 assert!(exists_result.is_err());
1367 }
1368}