lance_namespace/
dir.rs

1//! Directory-based Lance Namespace implementation.
2//!
3//! This module provides a directory-based implementation of the Lance namespace
4//! that stores tables as Lance datasets in a filesystem directory structure.
5
6use std::collections::HashMap;
7use std::str::FromStr;
8
9use async_trait::async_trait;
10use bytes::Bytes;
11use lance::dataset::{Dataset, WriteParams};
12use opendal::Operator;
13
14use lance_namespace_reqwest_client::models::{
15    CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
16    CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DescribeNamespaceRequest,
17    DescribeNamespaceResponse, DescribeTableRequest, DescribeTableResponse, DropNamespaceRequest,
18    DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest,
19    ListNamespacesResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest,
20    TableExistsRequest,
21};
22
23use crate::namespace::{LanceNamespace, NamespaceError, Result};
24
25/// Configuration for DirectoryNamespace.
26#[derive(Debug, Clone)]
27pub struct DirectoryNamespaceConfig {
28    /// Root directory for the namespace
29    root: String,
30    /// Storage options for the backend
31    storage_options: HashMap<String, String>,
32}
33
34impl DirectoryNamespaceConfig {
35    /// Property key for the root directory
36    pub const ROOT: &'static str = "root";
37    /// Prefix for storage options
38    pub const STORAGE_OPTIONS_PREFIX: &'static str = "storage.";
39
40    /// Create a new configuration from properties
41    pub fn new(properties: HashMap<String, String>) -> Self {
42        let root = properties
43            .get(Self::ROOT)
44            .cloned()
45            .unwrap_or_else(|| {
46                std::env::current_dir()
47                    .unwrap()
48                    .to_string_lossy()
49                    .to_string()
50            })
51            .trim_end_matches('/')
52            .to_string();
53
54        let storage_options: HashMap<String, String> = properties
55            .iter()
56            .filter_map(|(k, v)| {
57                k.strip_prefix(Self::STORAGE_OPTIONS_PREFIX)
58                    .map(|key| (key.to_string(), v.clone()))
59            })
60            .collect();
61
62        Self {
63            root,
64            storage_options,
65        }
66    }
67
68    /// Get the root directory
69    pub fn root(&self) -> &str {
70        &self.root
71    }
72
73    /// Get the storage options
74    pub fn storage_options(&self) -> &HashMap<String, String> {
75        &self.storage_options
76    }
77}
78
79/// Directory-based implementation of Lance Namespace.
80///
81/// This implementation stores tables as Lance datasets in a directory structure.
82/// It supports local filesystems and cloud storage backends through OpenDAL.
83pub struct DirectoryNamespace {
84    config: DirectoryNamespaceConfig,
85    operator: Operator,
86}
87
88impl DirectoryNamespace {
89    /// Create a new DirectoryNamespace instance
90    pub fn new(properties: HashMap<String, String>) -> Result<Self> {
91        let config = DirectoryNamespaceConfig::new(properties);
92        let operator = Self::initialize_operator(&config)?;
93
94        Ok(Self { config, operator })
95    }
96
97    /// Initialize the OpenDAL operator based on the configuration
98    fn initialize_operator(config: &DirectoryNamespaceConfig) -> Result<Operator> {
99        let root = config.root();
100        let storage_options = &config.storage_options;
101
102        // Parse the root path to determine scheme and configuration
103        let (scheme, opendal_config) = Self::parse_storage_path(root, storage_options)?;
104
105        // Create the operator with the determined scheme and configuration
106        let operator = Operator::via_iter(scheme, opendal_config)
107            .map_err(|e| NamespaceError::Other(format!("Failed to create operator: {}", e)))?;
108
109        Ok(operator)
110    }
111
112    /// Parse storage path and return scheme and configuration
113    fn parse_storage_path(
114        root: &str,
115        storage_options: &HashMap<String, String>,
116    ) -> Result<(opendal::Scheme, HashMap<String, String>)> {
117        use url::Url;
118
119        let mut config = HashMap::new();
120
121        // Try to parse as URL, if it fails, treat as local filesystem path
122        let (scheme, authority, path) = if let Ok(url) = Url::parse(root) {
123            let scheme = Self::normalize_scheme(url.scheme());
124            let authority = url.host_str().unwrap_or("");
125            // For file:// and fs:// URLs, preserve the full path including leading slash
126            // For cloud storage URLs, remove the leading slash
127            let path = if scheme == "fs" || scheme == "file" {
128                url.path().to_string()
129            } else {
130                url.path().trim_start_matches('/').to_string()
131            };
132            (scheme, authority.to_string(), path)
133        } else {
134            // Not a URL, treat as local filesystem - prepend fs://
135            ("fs".to_string(), String::new(), root.to_string())
136        };
137
138        // Configure based on scheme
139        let opendal_scheme = match scheme.as_str() {
140            "fs" | "file" => {
141                // For filesystem, use the full path (authority is empty for local paths)
142                if authority.is_empty() {
143                    config.insert("root".to_string(), path);
144                } else {
145                    // Handle file:///absolute/path or fs://hostname/path
146                    config.insert("root".to_string(), format!("{}/{}", authority, path));
147                }
148                opendal::Scheme::Fs
149            }
150            "s3" => {
151                config.insert("root".to_string(), path);
152                config.insert("bucket".to_string(), authority);
153                opendal::Scheme::S3
154            }
155            "gcs" => {
156                config.insert("root".to_string(), path);
157                config.insert("bucket".to_string(), authority);
158                opendal::Scheme::Gcs
159            }
160            "azblob" => {
161                config.insert("root".to_string(), path);
162                config.insert("container".to_string(), authority);
163                opendal::Scheme::Azblob
164            }
165            _ => {
166                // For unknown schemes, try to parse as OpenDAL scheme
167                config.insert("root".to_string(), path);
168                if !authority.is_empty() {
169                    config.insert("bucket".to_string(), authority);
170                }
171                opendal::Scheme::from_str(&scheme).map_err(|_| {
172                    NamespaceError::Other(format!("Unsupported storage scheme: {}", scheme))
173                })?
174            }
175        };
176
177        // Add storage options for all schemes
178        config.extend(storage_options.clone());
179
180        Ok((opendal_scheme, config))
181    }
182
183    /// Normalize scheme names with common aliases
184    fn normalize_scheme(scheme: &str) -> String {
185        match scheme.to_lowercase().as_str() {
186            "s3a" | "s3n" => "s3".to_string(),
187            "abfs" => "azblob".to_string(),
188            "file" => "fs".to_string(),
189            s => s.to_string(),
190        }
191    }
192
193    /// Validate that the namespace ID represents the root namespace
194    fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
195        if let Some(id) = id {
196            if !id.is_empty() {
197                return Err(NamespaceError::Other(format!(
198                    "Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
199                    id
200                )));
201            }
202        }
203        Ok(())
204    }
205
206    /// Extract table name from table ID
207    fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
208        let id = id.as_ref().ok_or_else(|| {
209            NamespaceError::Other("Directory namespace table ID cannot be empty".to_string())
210        })?;
211
212        if id.len() != 1 {
213            return Err(NamespaceError::Other(format!(
214                "Directory namespace only supports single-level table IDs, but got: {:?}",
215                id
216            )));
217        }
218
219        Ok(id[0].clone())
220    }
221
222    /// Get the full path for a table
223    fn table_full_path(&self, table_name: &str) -> String {
224        format!("{}/{}.lance", self.config.root(), table_name)
225    }
226
227    /// Get the versions path for a table
228    fn table_versions_path(&self, table_name: &str) -> String {
229        format!("{}.lance/_versions/", table_name)
230    }
231}
232
233#[async_trait]
234impl LanceNamespace for DirectoryNamespace {
235    async fn list_namespaces(
236        &self,
237        request: ListNamespacesRequest,
238    ) -> Result<ListNamespacesResponse> {
239        // Validate this is a request for the root namespace
240        Self::validate_root_namespace_id(&request.id)?;
241
242        // Directory namespace only contains the root namespace (empty list)
243        Ok(ListNamespacesResponse::new(vec![]))
244    }
245
246    async fn describe_namespace(
247        &self,
248        request: DescribeNamespaceRequest,
249    ) -> Result<DescribeNamespaceResponse> {
250        // Validate this is a request for the root namespace
251        Self::validate_root_namespace_id(&request.id)?;
252
253        // Return description of the root namespace
254        Ok(DescribeNamespaceResponse {
255            properties: Some(HashMap::new()),
256        })
257    }
258
259    async fn create_namespace(
260        &self,
261        request: CreateNamespaceRequest,
262    ) -> Result<CreateNamespaceResponse> {
263        // Root namespace always exists and cannot be created
264        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
265            return Err(NamespaceError::Other(
266                "Root namespace already exists and cannot be created".to_string(),
267            ));
268        }
269
270        // Non-root namespaces are not supported
271        Err(NamespaceError::NotSupported(
272            "Directory namespace only supports the root namespace".to_string(),
273        ))
274    }
275
276    async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
277        // Root namespace always exists and cannot be dropped
278        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
279            return Err(NamespaceError::Other(
280                "Root namespace cannot be dropped".to_string(),
281            ));
282        }
283
284        // Non-root namespaces are not supported
285        Err(NamespaceError::NotSupported(
286            "Directory namespace only supports the root namespace".to_string(),
287        ))
288    }
289
290    async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
291        // Root namespace always exists
292        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
293            return Ok(());
294        }
295
296        // Non-root namespaces don't exist
297        Err(NamespaceError::Other(
298            "Only root namespace exists in directory namespace".to_string(),
299        ))
300    }
301
302    async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
303        Self::validate_root_namespace_id(&request.id)?;
304
305        let mut tables = Vec::new();
306
307        // Use non-recursive listing to avoid issues with object stores that don't have directory concept
308        let entries = self.operator.list("").await.map_err(|e| {
309            NamespaceError::Io(std::io::Error::new(
310                std::io::ErrorKind::Other,
311                format!("Failed to list directory: {}", e),
312            ))
313        })?;
314
315        for entry in entries {
316            let path = entry.path().trim_end_matches('/');
317
318            // Only process directory-like paths that end with .lance
319            if !path.ends_with(".lance") {
320                continue;
321            }
322
323            // Extract table name (remove .lance suffix)
324            let table_name = &path[..path.len() - 6];
325
326            // Check if it's a valid Lance dataset or has .lance-reserved file
327            let mut is_table = false;
328
329            // First check for .lance-reserved file
330            let reserved_file_path = format!("{}.lance/.lance-reserved", table_name);
331            if self
332                .operator
333                .exists(&reserved_file_path)
334                .await
335                .unwrap_or(false)
336            {
337                is_table = true;
338            }
339
340            // If not found, check for _versions directory
341            if !is_table {
342                let versions_path = self.table_versions_path(table_name);
343                if let Ok(version_entries) = self.operator.list(&versions_path).await {
344                    // If there's at least one version file, it's a valid Lance dataset
345                    if !version_entries.is_empty() {
346                        is_table = true;
347                    }
348                }
349            }
350
351            if is_table {
352                tables.push(table_name.to_string());
353            }
354        }
355
356        let response = ListTablesResponse::new(tables);
357        Ok(response)
358    }
359
360    async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
361        let table_name = Self::table_name_from_id(&request.id)?;
362        let table_path = self.table_full_path(&table_name);
363
364        // Check if table exists - either as Lance dataset or with .lance-reserved file
365        let mut table_exists = false;
366
367        // First check for .lance-reserved file
368        let reserved_file_path = format!("{}.lance/.lance-reserved", table_name);
369        if self
370            .operator
371            .exists(&reserved_file_path)
372            .await
373            .unwrap_or(false)
374        {
375            table_exists = true;
376        }
377
378        // If not found, check if it's a Lance dataset by looking for _versions directory
379        if !table_exists {
380            let versions_path = self.table_versions_path(&table_name);
381            if let Ok(entries) = self.operator.list(&versions_path).await {
382                if !entries.is_empty() {
383                    table_exists = true;
384                }
385            }
386        }
387
388        if !table_exists {
389            return Err(NamespaceError::Other(format!(
390                "Table does not exist: {}",
391                table_name
392            )));
393        }
394
395        Ok(DescribeTableResponse {
396            version: None,
397            location: Some(table_path),
398            schema: None,
399            properties: None,
400            storage_options: Some(self.config.storage_options.clone()),
401        })
402    }
403
404    async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
405        let table_name = Self::table_name_from_id(&request.id)?;
406
407        // Check if table exists - either as Lance dataset or with .lance-reserved file
408        let mut table_exists = false;
409
410        // First check for .lance-reserved file
411        let reserved_file_path = format!("{}.lance/.lance-reserved", table_name);
412        if self
413            .operator
414            .exists(&reserved_file_path)
415            .await
416            .unwrap_or(false)
417        {
418            table_exists = true;
419        }
420
421        // If not found, check if it's a Lance dataset by looking for _versions directory
422        if !table_exists {
423            let versions_path = self.table_versions_path(&table_name);
424            if let Ok(entries) = self.operator.list(&versions_path).await {
425                if !entries.is_empty() {
426                    table_exists = true;
427                }
428            }
429        }
430
431        if !table_exists {
432            return Err(NamespaceError::Other(format!(
433                "Table does not exist: {}",
434                table_name
435            )));
436        }
437
438        Ok(())
439    }
440
441    async fn create_table(
442        &self,
443        request: CreateTableRequest,
444        request_data: Bytes,
445    ) -> Result<CreateTableResponse> {
446        let table_name = Self::table_name_from_id(&request.id)?;
447        let table_path = self.table_full_path(&table_name);
448
449        // Validate that request_data is provided and is a valid Arrow IPC stream
450        if request_data.is_empty() {
451            return Err(NamespaceError::Other(
452                "Request data (Arrow IPC stream) is required for create_table".to_string(),
453            ));
454        }
455
456        // Validate location if provided
457        if let Some(location) = &request.location {
458            let location = location.trim_end_matches('/');
459            if location != table_path {
460                return Err(NamespaceError::Other(format!(
461                    "Cannot create table {} at location {}, must be at location {}",
462                    table_name, location, table_path
463                )));
464            }
465        }
466
467        // Parse the Arrow IPC stream from request_data
468        use arrow::ipc::reader::StreamReader;
469        use std::io::Cursor;
470
471        let cursor = Cursor::new(request_data.to_vec());
472        let stream_reader = StreamReader::try_new(cursor, None)
473            .map_err(|e| NamespaceError::Other(format!("Invalid Arrow IPC stream: {}", e)))?;
474
475        // Extract schema from the IPC stream
476        let arrow_schema = stream_reader.schema();
477
478        // Collect all batches from the stream
479        let mut batches = Vec::new();
480        for batch_result in stream_reader {
481            batches.push(batch_result.map_err(|e| {
482                NamespaceError::Other(format!("Failed to read batch from IPC stream: {}", e))
483            })?);
484        }
485
486        // Create RecordBatchReader from the batches
487        let reader = if batches.is_empty() {
488            // If no batches in the stream, create an empty batch with the schema
489            let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
490            let batches = vec![Ok(batch)];
491            arrow::record_batch::RecordBatchIterator::new(batches, arrow_schema.clone())
492        } else {
493            // Convert to RecordBatchIterator
494            let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
495            arrow::record_batch::RecordBatchIterator::new(batch_results, arrow_schema)
496        };
497
498        // Set up write parameters for creating a new dataset
499        let write_params = WriteParams {
500            mode: lance::dataset::WriteMode::Create,
501            ..Default::default()
502        };
503
504        // Create the Lance dataset using the actual Lance API
505        Dataset::write(reader, &table_path, Some(write_params))
506            .await
507            .map_err(|e| NamespaceError::Other(format!("Failed to create Lance dataset: {}", e)))?;
508
509        Ok(CreateTableResponse {
510            version: Some(1),
511            location: Some(table_path),
512            properties: None,
513            storage_options: Some(self.config.storage_options.clone()),
514        })
515    }
516
517    async fn create_empty_table(
518        &self,
519        request: CreateEmptyTableRequest,
520    ) -> Result<CreateEmptyTableResponse> {
521        let table_name = Self::table_name_from_id(&request.id)?;
522        let table_path = self.table_full_path(&table_name);
523
524        // Validate location if provided
525        if let Some(location) = &request.location {
526            let location = location.trim_end_matches('/');
527            if location != table_path {
528                return Err(NamespaceError::Other(format!(
529                    "Cannot create table {} at location {}, must be at location {}",
530                    table_name, location, table_path
531                )));
532            }
533        }
534
535        // Create the .lance-reserved file to mark the table as existing
536        let reserved_file_path = format!("{}.lance/.lance-reserved", table_name);
537        self.operator
538            .write(&reserved_file_path, Vec::<u8>::new())
539            .await
540            .map_err(|e| {
541                NamespaceError::Other(format!(
542                    "Failed to create .lance-reserved file for table {}: {}",
543                    table_name, e
544                ))
545            })?;
546
547        Ok(CreateEmptyTableResponse {
548            location: Some(table_path),
549            properties: None,
550            storage_options: Some(self.config.storage_options.clone()),
551        })
552    }
553
554    async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
555        let table_name = Self::table_name_from_id(&request.id)?;
556        let table_path = self.table_full_path(&table_name);
557
558        // Remove the entire table directory
559        let table_dir = format!("{}.lance/", table_name);
560        self.operator.remove_all(&table_dir).await.map_err(|e| {
561            NamespaceError::Other(format!("Failed to drop table {}: {}", table_name, e))
562        })?;
563
564        Ok(DropTableResponse {
565            id: request.id,
566            location: Some(table_path),
567            properties: None,
568            transaction_id: None,
569        })
570    }
571}
572
573#[cfg(test)]
574mod tests {
575    use super::*;
576    use crate::schema::convert_json_arrow_schema;
577    use lance_namespace_reqwest_client::models::{
578        JsonArrowDataType, JsonArrowField, JsonArrowSchema,
579    };
580    use std::collections::HashMap;
581    use std::sync::Arc;
582    use tempfile::TempDir;
583
584    /// Helper to create a test DirectoryNamespace with a temporary directory
585    async fn create_test_namespace() -> (DirectoryNamespace, TempDir) {
586        let temp_dir = TempDir::new().unwrap();
587        let mut properties = HashMap::new();
588        properties.insert(
589            "root".to_string(),
590            temp_dir.path().to_string_lossy().to_string(),
591        );
592
593        let namespace = DirectoryNamespace::new(properties).unwrap();
594        (namespace, temp_dir)
595    }
596
597    /// Helper to create test IPC data from a schema
598    fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
599        use arrow::ipc::writer::StreamWriter;
600
601        let arrow_schema = convert_json_arrow_schema(schema).unwrap();
602        let arrow_schema = Arc::new(arrow_schema);
603        let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
604        let mut buffer = Vec::new();
605        {
606            let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
607            writer.write(&batch).unwrap();
608            writer.finish().unwrap();
609        }
610        buffer
611    }
612
613    /// Helper to create a simple test schema
614    fn create_test_schema() -> JsonArrowSchema {
615        let int_type = JsonArrowDataType::new("int32".to_string());
616        let string_type = JsonArrowDataType::new("utf8".to_string());
617
618        let id_field = JsonArrowField {
619            name: "id".to_string(),
620            r#type: Box::new(int_type.clone()),
621            nullable: false,
622            metadata: None,
623        };
624
625        let name_field = JsonArrowField {
626            name: "name".to_string(),
627            r#type: Box::new(string_type),
628            nullable: true,
629            metadata: None,
630        };
631
632        JsonArrowSchema {
633            fields: vec![id_field, name_field],
634            metadata: None,
635        }
636    }
637
638    #[tokio::test]
639    async fn test_create_table() {
640        let (namespace, _temp_dir) = create_test_namespace().await;
641
642        // Create test IPC data
643        let schema = create_test_schema();
644        let ipc_data = create_test_ipc_data(&schema);
645
646        let mut request = CreateTableRequest::new();
647        request.id = Some(vec!["test_table".to_string()]);
648
649        let response = namespace
650            .create_table(request, bytes::Bytes::from(ipc_data))
651            .await
652            .unwrap();
653
654        assert!(response.location.is_some());
655        assert!(response.location.unwrap().ends_with("test_table.lance"));
656        assert_eq!(response.version, Some(1));
657    }
658
659    #[tokio::test]
660    async fn test_create_table_without_data() {
661        let (namespace, _temp_dir) = create_test_namespace().await;
662
663        let mut request = CreateTableRequest::new();
664        request.id = Some(vec!["test_table".to_string()]);
665
666        let result = namespace.create_table(request, bytes::Bytes::new()).await;
667        assert!(result.is_err());
668        assert!(result
669            .unwrap_err()
670            .to_string()
671            .contains("Arrow IPC stream) is required"));
672    }
673
674    #[tokio::test]
675    async fn test_create_table_with_invalid_id() {
676        let (namespace, _temp_dir) = create_test_namespace().await;
677
678        // Create test IPC data
679        let schema = create_test_schema();
680        let ipc_data = create_test_ipc_data(&schema);
681
682        // Test with empty ID
683        let mut request = CreateTableRequest::new();
684        request.id = Some(vec![]);
685
686        let result = namespace
687            .create_table(request, bytes::Bytes::from(ipc_data.clone()))
688            .await;
689        assert!(result.is_err());
690
691        // Test with multi-level ID
692        let mut request = CreateTableRequest::new();
693        request.id = Some(vec!["namespace".to_string(), "table".to_string()]);
694
695        let result = namespace
696            .create_table(request, bytes::Bytes::from(ipc_data))
697            .await;
698        assert!(result.is_err());
699        assert!(result
700            .unwrap_err()
701            .to_string()
702            .contains("single-level table IDs"));
703    }
704
705    #[tokio::test]
706    async fn test_create_table_with_wrong_location() {
707        let (namespace, _temp_dir) = create_test_namespace().await;
708
709        // Create test IPC data
710        let schema = create_test_schema();
711        let ipc_data = create_test_ipc_data(&schema);
712
713        let mut request = CreateTableRequest::new();
714        request.id = Some(vec!["test_table".to_string()]);
715        request.location = Some("/wrong/path/table.lance".to_string());
716
717        let result = namespace
718            .create_table(request, bytes::Bytes::from(ipc_data))
719            .await;
720        assert!(result.is_err());
721        assert!(result
722            .unwrap_err()
723            .to_string()
724            .contains("must be at location"));
725    }
726
727    #[tokio::test]
728    async fn test_list_tables() {
729        let (namespace, _temp_dir) = create_test_namespace().await;
730
731        // Initially, no tables
732        let request = ListTablesRequest::new();
733        let response = namespace.list_tables(request).await.unwrap();
734        assert_eq!(response.tables.len(), 0);
735
736        // Create test IPC data
737        let schema = create_test_schema();
738        let ipc_data = create_test_ipc_data(&schema);
739
740        // Create a table
741        let mut create_request = CreateTableRequest::new();
742        create_request.id = Some(vec!["table1".to_string()]);
743        namespace
744            .create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
745            .await
746            .unwrap();
747
748        // Create another table
749        let mut create_request = CreateTableRequest::new();
750        create_request.id = Some(vec!["table2".to_string()]);
751        namespace
752            .create_table(create_request, bytes::Bytes::from(ipc_data))
753            .await
754            .unwrap();
755
756        // List tables should return both
757        let request = ListTablesRequest::new();
758        let response = namespace.list_tables(request).await.unwrap();
759        let tables = response.tables;
760        assert_eq!(tables.len(), 2);
761        assert!(tables.contains(&"table1".to_string()));
762        assert!(tables.contains(&"table2".to_string()));
763    }
764
765    #[tokio::test]
766    async fn test_list_tables_with_namespace_id() {
767        let (namespace, _temp_dir) = create_test_namespace().await;
768
769        let mut request = ListTablesRequest::new();
770        request.id = Some(vec!["namespace".to_string()]);
771
772        let result = namespace.list_tables(request).await;
773        assert!(result.is_err());
774        assert!(result
775            .unwrap_err()
776            .to_string()
777            .contains("root namespace operations"));
778    }
779
780    #[tokio::test]
781    async fn test_describe_table() {
782        let (namespace, _temp_dir) = create_test_namespace().await;
783
784        // Create a table first
785        let schema = create_test_schema();
786        let ipc_data = create_test_ipc_data(&schema);
787
788        let mut create_request = CreateTableRequest::new();
789        create_request.id = Some(vec!["test_table".to_string()]);
790        namespace
791            .create_table(create_request, bytes::Bytes::from(ipc_data))
792            .await
793            .unwrap();
794
795        // Describe the table
796        let mut request = DescribeTableRequest::new();
797        request.id = Some(vec!["test_table".to_string()]);
798        let response = namespace.describe_table(request).await.unwrap();
799
800        assert!(response.location.is_some());
801        assert!(response.location.unwrap().ends_with("test_table.lance"));
802    }
803
804    #[tokio::test]
805    async fn test_describe_nonexistent_table() {
806        let (namespace, _temp_dir) = create_test_namespace().await;
807
808        let mut request = DescribeTableRequest::new();
809        request.id = Some(vec!["nonexistent".to_string()]);
810
811        let result = namespace.describe_table(request).await;
812        assert!(result.is_err());
813        assert!(result
814            .unwrap_err()
815            .to_string()
816            .contains("Table does not exist"));
817    }
818
819    #[tokio::test]
820    async fn test_table_exists() {
821        let (namespace, _temp_dir) = create_test_namespace().await;
822
823        // Create a table
824        let schema = create_test_schema();
825        let ipc_data = create_test_ipc_data(&schema);
826
827        let mut create_request = CreateTableRequest::new();
828        create_request.id = Some(vec!["existing_table".to_string()]);
829        namespace
830            .create_table(create_request, bytes::Bytes::from(ipc_data))
831            .await
832            .unwrap();
833
834        // Check existing table
835        let mut request = TableExistsRequest::new();
836        request.id = Some(vec!["existing_table".to_string()]);
837        let result = namespace.table_exists(request).await;
838        assert!(result.is_ok());
839
840        // Check non-existent table
841        let mut request = TableExistsRequest::new();
842        request.id = Some(vec!["nonexistent".to_string()]);
843        let result = namespace.table_exists(request).await;
844        assert!(result.is_err());
845        assert!(result
846            .unwrap_err()
847            .to_string()
848            .contains("Table does not exist"));
849    }
850
851    #[tokio::test]
852    async fn test_drop_table() {
853        let (namespace, _temp_dir) = create_test_namespace().await;
854
855        // Create a table
856        let schema = create_test_schema();
857        let ipc_data = create_test_ipc_data(&schema);
858
859        let mut create_request = CreateTableRequest::new();
860        create_request.id = Some(vec!["table_to_drop".to_string()]);
861        namespace
862            .create_table(create_request, bytes::Bytes::from(ipc_data))
863            .await
864            .unwrap();
865
866        // Verify it exists
867        let mut exists_request = TableExistsRequest::new();
868        exists_request.id = Some(vec!["table_to_drop".to_string()]);
869        assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
870
871        // Drop the table
872        let mut drop_request = DropTableRequest::new();
873        drop_request.id = Some(vec!["table_to_drop".to_string()]);
874        let response = namespace.drop_table(drop_request).await.unwrap();
875        assert!(response.location.is_some());
876
877        // Verify it no longer exists
878        assert!(namespace.table_exists(exists_request).await.is_err());
879    }
880
881    #[tokio::test]
882    async fn test_drop_nonexistent_table() {
883        let (namespace, _temp_dir) = create_test_namespace().await;
884
885        let mut request = DropTableRequest::new();
886        request.id = Some(vec!["nonexistent".to_string()]);
887
888        // Should not fail when dropping non-existent table (idempotent)
889        let result = namespace.drop_table(request).await;
890        // The operation might succeed or fail depending on implementation
891        // But it should not panic
892        let _ = result;
893    }
894
895    #[tokio::test]
896    async fn test_root_namespace_operations() {
897        let (namespace, _temp_dir) = create_test_namespace().await;
898
899        // Test list_namespaces - should return empty list for root
900        let request = ListNamespacesRequest::new();
901        let result = namespace.list_namespaces(request).await;
902        assert!(result.is_ok());
903        assert_eq!(result.unwrap().namespaces.len(), 0);
904
905        // Test describe_namespace - should succeed for root
906        let request = DescribeNamespaceRequest::new();
907        let result = namespace.describe_namespace(request).await;
908        assert!(result.is_ok());
909
910        // Test namespace_exists - root always exists
911        let request = NamespaceExistsRequest::new();
912        let result = namespace.namespace_exists(request).await;
913        assert!(result.is_ok());
914
915        // Test create_namespace - root cannot be created
916        let request = CreateNamespaceRequest::new();
917        let result = namespace.create_namespace(request).await;
918        assert!(result.is_err());
919        assert!(result.unwrap_err().to_string().contains("already exists"));
920
921        // Test drop_namespace - root cannot be dropped
922        let request = DropNamespaceRequest::new();
923        let result = namespace.drop_namespace(request).await;
924        assert!(result.is_err());
925        assert!(result
926            .unwrap_err()
927            .to_string()
928            .contains("cannot be dropped"));
929    }
930
931    #[tokio::test]
932    async fn test_non_root_namespace_operations() {
933        let (namespace, _temp_dir) = create_test_namespace().await;
934
935        // Test create_namespace for non-root - not supported
936        let mut request = CreateNamespaceRequest::new();
937        request.id = Some(vec!["child".to_string()]);
938        let result = namespace.create_namespace(request).await;
939        assert!(matches!(result, Err(NamespaceError::NotSupported(_))));
940
941        // Test namespace_exists for non-root - should not exist
942        let mut request = NamespaceExistsRequest::new();
943        request.id = Some(vec!["child".to_string()]);
944        let result = namespace.namespace_exists(request).await;
945        assert!(result.is_err());
946        assert!(result
947            .unwrap_err()
948            .to_string()
949            .contains("Only root namespace exists"));
950
951        // Test drop_namespace for non-root - not supported
952        let mut request = DropNamespaceRequest::new();
953        request.id = Some(vec!["child".to_string()]);
954        let result = namespace.drop_namespace(request).await;
955        assert!(matches!(result, Err(NamespaceError::NotSupported(_))));
956    }
957
958    #[tokio::test]
959    async fn test_config_custom_root() {
960        let temp_dir = TempDir::new().unwrap();
961        let custom_path = temp_dir.path().join("custom");
962        std::fs::create_dir(&custom_path).unwrap();
963
964        let mut properties = HashMap::new();
965        properties.insert(
966            "root".to_string(),
967            custom_path.to_string_lossy().to_string(),
968        );
969
970        let namespace = DirectoryNamespace::new(properties).unwrap();
971
972        // Create test IPC data
973        let schema = create_test_schema();
974        let ipc_data = create_test_ipc_data(&schema);
975
976        // Create a table and verify location
977        let mut request = CreateTableRequest::new();
978        request.id = Some(vec!["test_table".to_string()]);
979
980        let response = namespace
981            .create_table(request, bytes::Bytes::from(ipc_data))
982            .await
983            .unwrap();
984
985        assert!(response.location.unwrap().contains("custom"));
986    }
987
988    #[tokio::test]
989    async fn test_config_storage_options() {
990        let temp_dir = TempDir::new().unwrap();
991        let mut properties = HashMap::new();
992        properties.insert(
993            "root".to_string(),
994            temp_dir.path().to_string_lossy().to_string(),
995        );
996        properties.insert("storage.option1".to_string(), "value1".to_string());
997        properties.insert("storage.option2".to_string(), "value2".to_string());
998
999        let namespace = DirectoryNamespace::new(properties).unwrap();
1000
1001        // Create test IPC data
1002        let schema = create_test_schema();
1003        let ipc_data = create_test_ipc_data(&schema);
1004
1005        // Create a table and check storage options are included
1006        let mut request = CreateTableRequest::new();
1007        request.id = Some(vec!["test_table".to_string()]);
1008
1009        let response = namespace
1010            .create_table(request, bytes::Bytes::from(ipc_data))
1011            .await
1012            .unwrap();
1013
1014        let storage_options = response.storage_options.unwrap();
1015        assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
1016        assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
1017    }
1018
1019    #[tokio::test]
1020    async fn test_various_arrow_types() {
1021        let (namespace, _temp_dir) = create_test_namespace().await;
1022
1023        // Create schema with various types
1024        let fields = vec![
1025            JsonArrowField {
1026                name: "bool_col".to_string(),
1027                r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
1028                nullable: true,
1029                metadata: None,
1030            },
1031            JsonArrowField {
1032                name: "int8_col".to_string(),
1033                r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
1034                nullable: true,
1035                metadata: None,
1036            },
1037            JsonArrowField {
1038                name: "float64_col".to_string(),
1039                r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
1040                nullable: true,
1041                metadata: None,
1042            },
1043            JsonArrowField {
1044                name: "binary_col".to_string(),
1045                r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
1046                nullable: true,
1047                metadata: None,
1048            },
1049        ];
1050
1051        let schema = JsonArrowSchema {
1052            fields,
1053            metadata: None,
1054        };
1055
1056        // Create IPC data
1057        let ipc_data = create_test_ipc_data(&schema);
1058
1059        let mut request = CreateTableRequest::new();
1060        request.id = Some(vec!["complex_table".to_string()]);
1061
1062        let response = namespace
1063            .create_table(request, bytes::Bytes::from(ipc_data))
1064            .await
1065            .unwrap();
1066
1067        assert!(response.location.is_some());
1068    }
1069
1070    #[tokio::test]
1071    async fn test_connect_dir() {
1072        let temp_dir = TempDir::new().unwrap();
1073        let mut properties = HashMap::new();
1074        properties.insert(
1075            "root".to_string(),
1076            temp_dir.path().to_string_lossy().to_string(),
1077        );
1078
1079        let namespace = crate::connect("dir", properties).await.unwrap();
1080
1081        // Test basic operation through the trait object
1082        let request = ListTablesRequest::new();
1083        let response = namespace.list_tables(request).await.unwrap();
1084        assert_eq!(response.tables.len(), 0);
1085    }
1086
1087    #[test]
1088    fn test_parse_storage_path_local() {
1089        let storage_options = HashMap::new();
1090
1091        // Test local filesystem paths
1092        let (scheme, config) =
1093            DirectoryNamespace::parse_storage_path("/path/to/data", &storage_options).unwrap();
1094        assert!(matches!(scheme, opendal::Scheme::Fs));
1095        assert_eq!(config.get("root").unwrap(), "/path/to/data");
1096
1097        // Test relative path
1098        let (scheme, config) =
1099            DirectoryNamespace::parse_storage_path("./data", &storage_options).unwrap();
1100        assert!(matches!(scheme, opendal::Scheme::Fs));
1101        assert_eq!(config.get("root").unwrap(), "./data");
1102    }
1103
1104    #[test]
1105    fn test_parse_storage_path_s3() {
1106        let storage_options = HashMap::new();
1107
1108        // Test S3 URL
1109        let (scheme, config) =
1110            DirectoryNamespace::parse_storage_path("s3://my-bucket/path/to/data", &storage_options)
1111                .unwrap();
1112        assert!(matches!(scheme, opendal::Scheme::S3));
1113        assert_eq!(config.get("bucket").unwrap(), "my-bucket");
1114        assert_eq!(config.get("root").unwrap(), "path/to/data");
1115
1116        // Test S3 with just bucket
1117        let (scheme, config) =
1118            DirectoryNamespace::parse_storage_path("s3://my-bucket", &storage_options).unwrap();
1119        assert!(matches!(scheme, opendal::Scheme::S3));
1120        assert_eq!(config.get("bucket").unwrap(), "my-bucket");
1121        assert_eq!(config.get("root").unwrap(), "");
1122    }
1123
1124    #[test]
1125    fn test_parse_storage_path_gcs() {
1126        let storage_options = HashMap::new();
1127
1128        // Test GCS URL
1129        let (scheme, config) = DirectoryNamespace::parse_storage_path(
1130            "gcs://my-bucket/path/to/data",
1131            &storage_options,
1132        )
1133        .unwrap();
1134        assert!(matches!(scheme, opendal::Scheme::Gcs));
1135        assert_eq!(config.get("bucket").unwrap(), "my-bucket");
1136        assert_eq!(config.get("root").unwrap(), "path/to/data");
1137    }
1138
1139    #[test]
1140    fn test_parse_storage_path_azblob() {
1141        let storage_options = HashMap::new();
1142
1143        // Test Azure Blob URL
1144        let (scheme, config) = DirectoryNamespace::parse_storage_path(
1145            "azblob://my-container/path/to/data",
1146            &storage_options,
1147        )
1148        .unwrap();
1149        assert!(matches!(scheme, opendal::Scheme::Azblob));
1150        assert_eq!(config.get("container").unwrap(), "my-container");
1151        assert_eq!(config.get("root").unwrap(), "path/to/data");
1152
1153        // Test with abfs alias
1154        let (scheme, config) =
1155            DirectoryNamespace::parse_storage_path("abfs://my-container/path", &storage_options)
1156                .unwrap();
1157        assert!(matches!(scheme, opendal::Scheme::Azblob));
1158        assert_eq!(config.get("container").unwrap(), "my-container");
1159        assert_eq!(config.get("root").unwrap(), "path");
1160    }
1161
1162    #[test]
1163    fn test_normalize_scheme() {
1164        // Test scheme normalization
1165        assert_eq!(DirectoryNamespace::normalize_scheme("s3a"), "s3");
1166        assert_eq!(DirectoryNamespace::normalize_scheme("s3n"), "s3");
1167        assert_eq!(DirectoryNamespace::normalize_scheme("S3A"), "s3");
1168        assert_eq!(DirectoryNamespace::normalize_scheme("abfs"), "azblob");
1169        assert_eq!(DirectoryNamespace::normalize_scheme("ABFS"), "azblob");
1170        assert_eq!(DirectoryNamespace::normalize_scheme("file"), "fs");
1171        assert_eq!(DirectoryNamespace::normalize_scheme("FILE"), "fs");
1172        assert_eq!(DirectoryNamespace::normalize_scheme("gcs"), "gcs");
1173        assert_eq!(DirectoryNamespace::normalize_scheme("random"), "random");
1174    }
1175
1176    #[test]
1177    fn test_fs_scheme_url() {
1178        let storage_options = HashMap::new();
1179
1180        // Test file:// URLs
1181        let (scheme, config) =
1182            DirectoryNamespace::parse_storage_path("file:///absolute/path", &storage_options)
1183                .unwrap();
1184        assert!(matches!(scheme, opendal::Scheme::Fs));
1185        assert_eq!(config.get("root").unwrap(), "/absolute/path");
1186
1187        // Test fs:// URLs
1188        let (scheme, config) =
1189            DirectoryNamespace::parse_storage_path("fs:///absolute/path", &storage_options)
1190                .unwrap();
1191        assert!(matches!(scheme, opendal::Scheme::Fs));
1192        assert_eq!(config.get("root").unwrap(), "/absolute/path");
1193    }
1194
1195    #[test]
1196    fn test_storage_options_passed_through() {
1197        // Test that storage options are properly passed through parse_storage_path
1198        let mut storage_options = HashMap::new();
1199        storage_options.insert("aws_access_key_id".to_string(), "test_key".to_string());
1200        storage_options.insert(
1201            "aws_secret_access_key".to_string(),
1202            "test_secret".to_string(),
1203        );
1204        storage_options.insert("region".to_string(), "us-west-2".to_string());
1205
1206        // Test with S3 - storage options should be included
1207        let (scheme, config) =
1208            DirectoryNamespace::parse_storage_path("s3://my-bucket/path", &storage_options)
1209                .unwrap();
1210        assert!(matches!(scheme, opendal::Scheme::S3));
1211        assert_eq!(config.get("bucket").unwrap(), "my-bucket");
1212        assert_eq!(config.get("root").unwrap(), "path");
1213        assert_eq!(config.get("aws_access_key_id").unwrap(), "test_key");
1214        assert_eq!(config.get("aws_secret_access_key").unwrap(), "test_secret");
1215        assert_eq!(config.get("region").unwrap(), "us-west-2");
1216
1217        // Test with local filesystem - storage options should still be included
1218        let (scheme, config) =
1219            DirectoryNamespace::parse_storage_path("/local/path", &storage_options).unwrap();
1220        assert!(matches!(scheme, opendal::Scheme::Fs));
1221        assert_eq!(config.get("root").unwrap(), "/local/path");
1222        // Even for local fs, storage options should be passed through
1223        assert_eq!(config.get("aws_access_key_id").unwrap(), "test_key");
1224    }
1225
1226    #[tokio::test]
1227    async fn test_create_table_with_ipc_data() {
1228        use arrow::array::{Int32Array, StringArray};
1229        use arrow::ipc::writer::StreamWriter;
1230
1231        let (namespace, _temp_dir) = create_test_namespace().await;
1232
1233        // Create a schema with some fields
1234        let schema = create_test_schema();
1235
1236        // Create some test data that matches the schema
1237        let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
1238        let arrow_schema = Arc::new(arrow_schema);
1239
1240        // Create a RecordBatch with actual data
1241        let id_array = Int32Array::from(vec![1, 2, 3]);
1242        let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
1243        let batch = arrow::record_batch::RecordBatch::try_new(
1244            arrow_schema.clone(),
1245            vec![Arc::new(id_array), Arc::new(name_array)],
1246        )
1247        .unwrap();
1248
1249        // Write the batch to an IPC stream
1250        let mut buffer = Vec::new();
1251        {
1252            let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
1253            writer.write(&batch).unwrap();
1254            writer.finish().unwrap();
1255        }
1256
1257        // Create table with the IPC data
1258        let mut request = CreateTableRequest::new();
1259        request.id = Some(vec!["test_table_with_data".to_string()]);
1260
1261        let response = namespace
1262            .create_table(request, Bytes::from(buffer))
1263            .await
1264            .unwrap();
1265
1266        assert_eq!(response.version, Some(1));
1267        assert!(response
1268            .location
1269            .unwrap()
1270            .contains("test_table_with_data.lance"));
1271
1272        // Verify table exists
1273        let mut exists_request = TableExistsRequest::new();
1274        exists_request.id = Some(vec!["test_table_with_data".to_string()]);
1275        namespace.table_exists(exists_request).await.unwrap();
1276    }
1277
1278    #[tokio::test]
1279    async fn test_create_empty_table() {
1280        let (namespace, _temp_dir) = create_test_namespace().await;
1281
1282        let mut request = CreateEmptyTableRequest::new();
1283        request.id = Some(vec!["empty_table".to_string()]);
1284
1285        let response = namespace.create_empty_table(request).await.unwrap();
1286
1287        assert!(response.location.is_some());
1288        assert!(response.location.unwrap().ends_with("empty_table.lance"));
1289
1290        // Verify table exists by checking for .lance-reserved file
1291        let mut exists_request = TableExistsRequest::new();
1292        exists_request.id = Some(vec!["empty_table".to_string()]);
1293        namespace.table_exists(exists_request).await.unwrap();
1294
1295        // List tables should include the empty table
1296        let list_request = ListTablesRequest::new();
1297        let list_response = namespace.list_tables(list_request).await.unwrap();
1298        assert!(list_response.tables.contains(&"empty_table".to_string()));
1299    }
1300
1301    #[tokio::test]
1302    async fn test_create_empty_table_with_wrong_location() {
1303        let (namespace, _temp_dir) = create_test_namespace().await;
1304
1305        let mut request = CreateEmptyTableRequest::new();
1306        request.id = Some(vec!["test_table".to_string()]);
1307        request.location = Some("/wrong/path/table.lance".to_string());
1308
1309        let result = namespace.create_empty_table(request).await;
1310        assert!(result.is_err());
1311        assert!(result
1312            .unwrap_err()
1313            .to_string()
1314            .contains("must be at location"));
1315    }
1316}