lance_namespace/
dir.rs

1//! Directory-based Lance Namespace implementation.
2//!
3//! This module provides a directory-based implementation of the Lance namespace
4//! that stores tables as Lance datasets in a filesystem directory structure.
5
6use std::collections::HashMap;
7use std::str::FromStr;
8
9use async_trait::async_trait;
10use bytes::Bytes;
11use lance::dataset::{Dataset, WriteParams};
12use opendal::Operator;
13
14use crate::models::{
15    CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
16    CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DescribeNamespaceRequest,
17    DescribeNamespaceResponse, DescribeTableRequest, DescribeTableResponse, DropNamespaceRequest,
18    DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest,
19    ListNamespacesResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest,
20    TableExistsRequest,
21};
22
23use crate::namespace::{LanceNamespace, NamespaceError, Result};
24
25/// Configuration for DirectoryNamespace.
26#[derive(Debug, Clone)]
27pub struct DirectoryNamespaceConfig {
28    /// Root directory for the namespace
29    root: String,
30    /// Storage options for the backend
31    storage_options: HashMap<String, String>,
32}
33
34impl DirectoryNamespaceConfig {
35    /// Property key for the root directory
36    pub const ROOT: &'static str = "root";
37    /// Prefix for storage options
38    pub const STORAGE_OPTIONS_PREFIX: &'static str = "storage.";
39
40    /// Create a new configuration from properties
41    pub fn new(properties: HashMap<String, String>) -> Self {
42        let root = properties
43            .get(Self::ROOT)
44            .cloned()
45            .unwrap_or_else(|| {
46                std::env::current_dir()
47                    .unwrap()
48                    .to_string_lossy()
49                    .to_string()
50            })
51            .trim_end_matches('/')
52            .to_string();
53
54        let storage_options: HashMap<String, String> = properties
55            .iter()
56            .filter_map(|(k, v)| {
57                k.strip_prefix(Self::STORAGE_OPTIONS_PREFIX)
58                    .map(|key| (key.to_string(), v.clone()))
59            })
60            .collect();
61
62        Self {
63            root,
64            storage_options,
65        }
66    }
67
68    /// Get the root directory
69    pub fn root(&self) -> &str {
70        &self.root
71    }
72
73    /// Get the storage options
74    pub fn storage_options(&self) -> &HashMap<String, String> {
75        &self.storage_options
76    }
77}
78
79/// Directory-based implementation of Lance Namespace.
80///
81/// This implementation stores tables as Lance datasets in a directory structure.
82/// It supports local filesystems and cloud storage backends through OpenDAL.
83pub struct DirectoryNamespace {
84    config: DirectoryNamespaceConfig,
85    operator: Operator,
86}
87
88impl DirectoryNamespace {
89    /// Create a new DirectoryNamespace instance
90    pub fn new(properties: HashMap<String, String>) -> Result<Self> {
91        let config = DirectoryNamespaceConfig::new(properties);
92        let operator = Self::initialize_operator(&config)?;
93
94        Ok(Self { config, operator })
95    }
96
97    /// Initialize the OpenDAL operator based on the configuration
98    fn initialize_operator(config: &DirectoryNamespaceConfig) -> Result<Operator> {
99        let root = config.root();
100        let storage_options = &config.storage_options;
101
102        // Parse the root path to determine scheme and configuration
103        let (scheme, opendal_config) = Self::parse_storage_path(root, storage_options)?;
104
105        // Create the operator with the determined scheme and configuration
106        let operator = Operator::via_iter(scheme, opendal_config)
107            .map_err(|e| NamespaceError::Other(format!("Failed to create operator: {}", e)))?;
108
109        Ok(operator)
110    }
111
112    /// Parse storage path and return scheme and configuration
113    fn parse_storage_path(
114        root: &str,
115        storage_options: &HashMap<String, String>,
116    ) -> Result<(opendal::Scheme, HashMap<String, String>)> {
117        use url::Url;
118
119        let mut config = HashMap::new();
120
121        // Try to parse as URL, if it fails, treat as local filesystem path
122        let (scheme, authority, path) = if let Ok(url) = Url::parse(root) {
123            let scheme = Self::normalize_scheme(url.scheme());
124            let authority = url.host_str().unwrap_or("");
125            // For file:// and fs:// URLs, preserve the full path including leading slash
126            // For cloud storage URLs, remove the leading slash
127            let path = if scheme == "fs" || scheme == "file" {
128                url.path().to_string()
129            } else {
130                url.path().trim_start_matches('/').to_string()
131            };
132            (scheme, authority.to_string(), path)
133        } else {
134            // Not a URL, treat as local filesystem - prepend fs://
135            ("fs".to_string(), String::new(), root.to_string())
136        };
137
138        // Configure based on scheme
139        let opendal_scheme = match scheme.as_str() {
140            "fs" | "file" => {
141                // For filesystem, use the full path (authority is empty for local paths)
142                if authority.is_empty() {
143                    config.insert("root".to_string(), path);
144                } else {
145                    // Handle file:///absolute/path or fs://hostname/path
146                    config.insert("root".to_string(), format!("{}/{}", authority, path));
147                }
148                opendal::Scheme::Fs
149            }
150            "s3" => {
151                config.insert("root".to_string(), path);
152                config.insert("bucket".to_string(), authority);
153                opendal::Scheme::S3
154            }
155            "gcs" => {
156                config.insert("root".to_string(), path);
157                config.insert("bucket".to_string(), authority);
158                opendal::Scheme::Gcs
159            }
160            "azblob" => {
161                config.insert("root".to_string(), path);
162                config.insert("container".to_string(), authority);
163                opendal::Scheme::Azblob
164            }
165            _ => {
166                // For unknown schemes, try to parse as OpenDAL scheme
167                config.insert("root".to_string(), path);
168                if !authority.is_empty() {
169                    config.insert("bucket".to_string(), authority);
170                }
171                opendal::Scheme::from_str(&scheme).map_err(|_| {
172                    NamespaceError::Other(format!("Unsupported storage scheme: {}", scheme))
173                })?
174            }
175        };
176
177        // Add storage options for all schemes
178        config.extend(storage_options.clone());
179
180        Ok((opendal_scheme, config))
181    }
182
183    /// Normalize scheme names with common aliases
184    fn normalize_scheme(scheme: &str) -> String {
185        match scheme.to_lowercase().as_str() {
186            "s3a" | "s3n" => "s3".to_string(),
187            "abfs" => "azblob".to_string(),
188            "file" => "fs".to_string(),
189            s => s.to_string(),
190        }
191    }
192
193    /// Validate that the namespace ID represents the root namespace
194    fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
195        if let Some(id) = id {
196            if !id.is_empty() {
197                return Err(NamespaceError::Other(format!(
198                    "Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
199                    id
200                )));
201            }
202        }
203        Ok(())
204    }
205
206    /// Extract table name from table ID
207    fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
208        let id = id.as_ref().ok_or_else(|| {
209            NamespaceError::Other("Directory namespace table ID cannot be empty".to_string())
210        })?;
211
212        if id.len() != 1 {
213            return Err(NamespaceError::Other(format!(
214                "Directory namespace only supports single-level table IDs, but got: {:?}",
215                id
216            )));
217        }
218
219        Ok(id[0].clone())
220    }
221
222    /// Get the full path for a table
223    fn table_full_path(&self, table_name: &str) -> String {
224        format!("{}/{}.lance", self.config.root(), table_name)
225    }
226
227    /// Get the versions path for a table
228    fn table_versions_path(&self, table_name: &str) -> String {
229        format!("{}.lance/_versions/", table_name)
230    }
231}
232
233#[async_trait]
234impl LanceNamespace for DirectoryNamespace {
235    async fn list_namespaces(
236        &self,
237        request: ListNamespacesRequest,
238    ) -> Result<ListNamespacesResponse> {
239        // Validate this is a request for the root namespace
240        Self::validate_root_namespace_id(&request.id)?;
241
242        // Directory namespace only contains the root namespace (empty list)
243        Ok(ListNamespacesResponse::new(vec![]))
244    }
245
246    async fn describe_namespace(
247        &self,
248        request: DescribeNamespaceRequest,
249    ) -> Result<DescribeNamespaceResponse> {
250        // Validate this is a request for the root namespace
251        Self::validate_root_namespace_id(&request.id)?;
252
253        // Return description of the root namespace
254        Ok(DescribeNamespaceResponse {
255            properties: Some(HashMap::new()),
256        })
257    }
258
259    async fn create_namespace(
260        &self,
261        request: CreateNamespaceRequest,
262    ) -> Result<CreateNamespaceResponse> {
263        // Root namespace always exists and cannot be created
264        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
265            return Err(NamespaceError::Other(
266                "Root namespace already exists and cannot be created".to_string(),
267            ));
268        }
269
270        // Non-root namespaces are not supported
271        Err(NamespaceError::NotSupported(
272            "Directory namespace only supports the root namespace".to_string(),
273        ))
274    }
275
276    async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
277        // Root namespace always exists and cannot be dropped
278        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
279            return Err(NamespaceError::Other(
280                "Root namespace cannot be dropped".to_string(),
281            ));
282        }
283
284        // Non-root namespaces are not supported
285        Err(NamespaceError::NotSupported(
286            "Directory namespace only supports the root namespace".to_string(),
287        ))
288    }
289
290    async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
291        // Root namespace always exists
292        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
293            return Ok(());
294        }
295
296        // Non-root namespaces don't exist
297        Err(NamespaceError::Other(
298            "Only root namespace exists in directory namespace".to_string(),
299        ))
300    }
301
302    async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
303        Self::validate_root_namespace_id(&request.id)?;
304
305        let mut tables = Vec::new();
306
307        // Use non-recursive listing to avoid issues with object stores that don't have directory concept
308        let entries = self.operator.list("").await.map_err(|e| {
309            NamespaceError::Io(std::io::Error::new(
310                std::io::ErrorKind::Other,
311                format!("Failed to list directory: {}", e),
312            ))
313        })?;
314
315        for entry in entries {
316            let path = entry.path().trim_end_matches('/');
317
318            // Only process directory-like paths that end with .lance
319            if !path.ends_with(".lance") {
320                continue;
321            }
322
323            // Extract table name (remove .lance suffix)
324            let table_name = &path[..path.len() - 6];
325
326            // Check if it's a valid Lance dataset or has .lance-reserved file
327            let mut is_table = false;
328
329            // First check for .lance-reserved file
330            let reserved_file_path = format!("{}.lance/.lance-reserved", table_name);
331            if self
332                .operator
333                .exists(&reserved_file_path)
334                .await
335                .unwrap_or(false)
336            {
337                is_table = true;
338            }
339
340            // If not found, check for _versions directory
341            if !is_table {
342                let versions_path = self.table_versions_path(table_name);
343                if let Ok(version_entries) = self.operator.list(&versions_path).await {
344                    // If there's at least one version file, it's a valid Lance dataset
345                    if !version_entries.is_empty() {
346                        is_table = true;
347                    }
348                }
349            }
350
351            if is_table {
352                tables.push(table_name.to_string());
353            }
354        }
355
356        let response = ListTablesResponse::new(tables);
357        Ok(response)
358    }
359
360    async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
361        let table_name = Self::table_name_from_id(&request.id)?;
362        let table_path = self.table_full_path(&table_name);
363
364        // Check if table exists - either as Lance dataset or with .lance-reserved file
365        let mut table_exists = false;
366
367        // First check for .lance-reserved file
368        let reserved_file_path = format!("{}.lance/.lance-reserved", table_name);
369        if self
370            .operator
371            .exists(&reserved_file_path)
372            .await
373            .unwrap_or(false)
374        {
375            table_exists = true;
376        }
377
378        // If not found, check if it's a Lance dataset by looking for _versions directory
379        if !table_exists {
380            let versions_path = self.table_versions_path(&table_name);
381            if let Ok(entries) = self.operator.list(&versions_path).await {
382                if !entries.is_empty() {
383                    table_exists = true;
384                }
385            }
386        }
387
388        if !table_exists {
389            return Err(NamespaceError::Other(format!(
390                "Table does not exist: {}",
391                table_name
392            )));
393        }
394
395        Ok(DescribeTableResponse {
396            version: None,
397            location: Some(table_path),
398            schema: None,
399            properties: None,
400            storage_options: Some(self.config.storage_options.clone()),
401        })
402    }
403
404    async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
405        let table_name = Self::table_name_from_id(&request.id)?;
406
407        // Check if table exists - either as Lance dataset or with .lance-reserved file
408        let mut table_exists = false;
409
410        // First check for .lance-reserved file
411        let reserved_file_path = format!("{}.lance/.lance-reserved", table_name);
412        if self
413            .operator
414            .exists(&reserved_file_path)
415            .await
416            .unwrap_or(false)
417        {
418            table_exists = true;
419        }
420
421        // If not found, check if it's a Lance dataset by looking for _versions directory
422        if !table_exists {
423            let versions_path = self.table_versions_path(&table_name);
424            if let Ok(entries) = self.operator.list(&versions_path).await {
425                if !entries.is_empty() {
426                    table_exists = true;
427                }
428            }
429        }
430
431        if !table_exists {
432            return Err(NamespaceError::Other(format!(
433                "Table does not exist: {}",
434                table_name
435            )));
436        }
437
438        Ok(())
439    }
440
441    async fn create_table(
442        &self,
443        request: CreateTableRequest,
444        request_data: Bytes,
445    ) -> Result<CreateTableResponse> {
446        let table_name = Self::table_name_from_id(&request.id)?;
447        let table_path = self.table_full_path(&table_name);
448
449        // Validate that request_data is provided and is a valid Arrow IPC stream
450        if request_data.is_empty() {
451            return Err(NamespaceError::Other(
452                "Request data (Arrow IPC stream) is required for create_table".to_string(),
453            ));
454        }
455
456        // Validate location if provided
457        if let Some(location) = &request.location {
458            let location = location.trim_end_matches('/');
459            if location != table_path {
460                return Err(NamespaceError::Other(format!(
461                    "Cannot create table {} at location {}, must be at location {}",
462                    table_name, location, table_path
463                )));
464            }
465        }
466
467        // Parse the Arrow IPC stream from request_data
468        use arrow::ipc::reader::StreamReader;
469        use std::io::Cursor;
470
471        let cursor = Cursor::new(request_data.to_vec());
472        let stream_reader = StreamReader::try_new(cursor, None)
473            .map_err(|e| NamespaceError::Other(format!("Invalid Arrow IPC stream: {}", e)))?;
474
475        // Extract schema from the IPC stream
476        let arrow_schema = stream_reader.schema();
477
478        // Collect all batches from the stream
479        let mut batches = Vec::new();
480        for batch_result in stream_reader {
481            batches.push(batch_result.map_err(|e| {
482                NamespaceError::Other(format!("Failed to read batch from IPC stream: {}", e))
483            })?);
484        }
485
486        // Create RecordBatchReader from the batches
487        let reader = if batches.is_empty() {
488            // If no batches in the stream, create an empty batch with the schema
489            let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
490            let batches = vec![Ok(batch)];
491            arrow::record_batch::RecordBatchIterator::new(batches, arrow_schema.clone())
492        } else {
493            // Convert to RecordBatchIterator
494            let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
495            arrow::record_batch::RecordBatchIterator::new(batch_results, arrow_schema)
496        };
497
498        // Set up write parameters for creating a new dataset
499        let write_params = WriteParams {
500            mode: lance::dataset::WriteMode::Create,
501            ..Default::default()
502        };
503
504        // Create the Lance dataset using the actual Lance API
505        Dataset::write(reader, &table_path, Some(write_params))
506            .await
507            .map_err(|e| NamespaceError::Other(format!("Failed to create Lance dataset: {}", e)))?;
508
509        Ok(CreateTableResponse {
510            version: Some(1),
511            location: Some(table_path),
512            properties: None,
513            storage_options: Some(self.config.storage_options.clone()),
514        })
515    }
516
517    async fn create_empty_table(
518        &self,
519        request: CreateEmptyTableRequest,
520    ) -> Result<CreateEmptyTableResponse> {
521        let table_name = Self::table_name_from_id(&request.id)?;
522        let table_path = self.table_full_path(&table_name);
523
524        // Validate location if provided
525        if let Some(location) = &request.location {
526            let location = location.trim_end_matches('/');
527            if location != table_path {
528                return Err(NamespaceError::Other(format!(
529                    "Cannot create table {} at location {}, must be at location {}",
530                    table_name, location, table_path
531                )));
532            }
533        }
534
535        // Create the .lance-reserved file to mark the table as existing
536        let reserved_file_path = format!("{}.lance/.lance-reserved", table_name);
537        self.operator
538            .write(&reserved_file_path, Vec::<u8>::new())
539            .await
540            .map_err(|e| {
541                NamespaceError::Other(format!(
542                    "Failed to create .lance-reserved file for table {}: {}",
543                    table_name, e
544                ))
545            })?;
546
547        Ok(CreateEmptyTableResponse {
548            location: Some(table_path),
549            properties: None,
550            storage_options: Some(self.config.storage_options.clone()),
551        })
552    }
553
554    async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
555        let table_name = Self::table_name_from_id(&request.id)?;
556        let table_path = self.table_full_path(&table_name);
557
558        // Remove the entire table directory
559        let table_dir = format!("{}.lance/", table_name);
560        self.operator.remove_all(&table_dir).await.map_err(|e| {
561            NamespaceError::Other(format!("Failed to drop table {}: {}", table_name, e))
562        })?;
563
564        Ok(DropTableResponse {
565            id: request.id,
566            location: Some(table_path),
567            properties: None,
568            transaction_id: None,
569        })
570    }
571}
572
573#[cfg(test)]
574mod tests {
575    use super::*;
576    use crate::models::{JsonArrowDataType, JsonArrowField, JsonArrowSchema};
577    use crate::schema::convert_json_arrow_schema;
578    use std::collections::HashMap;
579    use std::sync::Arc;
580    use tempfile::TempDir;
581
582    /// Helper to create a test DirectoryNamespace with a temporary directory
583    async fn create_test_namespace() -> (DirectoryNamespace, TempDir) {
584        let temp_dir = TempDir::new().unwrap();
585        let mut properties = HashMap::new();
586        properties.insert(
587            "root".to_string(),
588            temp_dir.path().to_string_lossy().to_string(),
589        );
590
591        let namespace = DirectoryNamespace::new(properties).unwrap();
592        (namespace, temp_dir)
593    }
594
595    /// Helper to create test IPC data from a schema
596    fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
597        use arrow::ipc::writer::StreamWriter;
598
599        let arrow_schema = convert_json_arrow_schema(schema).unwrap();
600        let arrow_schema = Arc::new(arrow_schema);
601        let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
602        let mut buffer = Vec::new();
603        {
604            let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
605            writer.write(&batch).unwrap();
606            writer.finish().unwrap();
607        }
608        buffer
609    }
610
611    /// Helper to create a simple test schema
612    fn create_test_schema() -> JsonArrowSchema {
613        let int_type = JsonArrowDataType::new("int32".to_string());
614        let string_type = JsonArrowDataType::new("utf8".to_string());
615
616        let id_field = JsonArrowField {
617            name: "id".to_string(),
618            r#type: Box::new(int_type.clone()),
619            nullable: false,
620            metadata: None,
621        };
622
623        let name_field = JsonArrowField {
624            name: "name".to_string(),
625            r#type: Box::new(string_type),
626            nullable: true,
627            metadata: None,
628        };
629
630        JsonArrowSchema {
631            fields: vec![id_field, name_field],
632            metadata: None,
633        }
634    }
635
636    #[tokio::test]
637    async fn test_create_table() {
638        let (namespace, _temp_dir) = create_test_namespace().await;
639
640        // Create test IPC data
641        let schema = create_test_schema();
642        let ipc_data = create_test_ipc_data(&schema);
643
644        let mut request = CreateTableRequest::new();
645        request.id = Some(vec!["test_table".to_string()]);
646
647        let response = namespace
648            .create_table(request, bytes::Bytes::from(ipc_data))
649            .await
650            .unwrap();
651
652        assert!(response.location.is_some());
653        assert!(response.location.unwrap().ends_with("test_table.lance"));
654        assert_eq!(response.version, Some(1));
655    }
656
657    #[tokio::test]
658    async fn test_create_table_without_data() {
659        let (namespace, _temp_dir) = create_test_namespace().await;
660
661        let mut request = CreateTableRequest::new();
662        request.id = Some(vec!["test_table".to_string()]);
663
664        let result = namespace.create_table(request, bytes::Bytes::new()).await;
665        assert!(result.is_err());
666        assert!(result
667            .unwrap_err()
668            .to_string()
669            .contains("Arrow IPC stream) is required"));
670    }
671
672    #[tokio::test]
673    async fn test_create_table_with_invalid_id() {
674        let (namespace, _temp_dir) = create_test_namespace().await;
675
676        // Create test IPC data
677        let schema = create_test_schema();
678        let ipc_data = create_test_ipc_data(&schema);
679
680        // Test with empty ID
681        let mut request = CreateTableRequest::new();
682        request.id = Some(vec![]);
683
684        let result = namespace
685            .create_table(request, bytes::Bytes::from(ipc_data.clone()))
686            .await;
687        assert!(result.is_err());
688
689        // Test with multi-level ID
690        let mut request = CreateTableRequest::new();
691        request.id = Some(vec!["namespace".to_string(), "table".to_string()]);
692
693        let result = namespace
694            .create_table(request, bytes::Bytes::from(ipc_data))
695            .await;
696        assert!(result.is_err());
697        assert!(result
698            .unwrap_err()
699            .to_string()
700            .contains("single-level table IDs"));
701    }
702
703    #[tokio::test]
704    async fn test_create_table_with_wrong_location() {
705        let (namespace, _temp_dir) = create_test_namespace().await;
706
707        // Create test IPC data
708        let schema = create_test_schema();
709        let ipc_data = create_test_ipc_data(&schema);
710
711        let mut request = CreateTableRequest::new();
712        request.id = Some(vec!["test_table".to_string()]);
713        request.location = Some("/wrong/path/table.lance".to_string());
714
715        let result = namespace
716            .create_table(request, bytes::Bytes::from(ipc_data))
717            .await;
718        assert!(result.is_err());
719        assert!(result
720            .unwrap_err()
721            .to_string()
722            .contains("must be at location"));
723    }
724
725    #[tokio::test]
726    async fn test_list_tables() {
727        let (namespace, _temp_dir) = create_test_namespace().await;
728
729        // Initially, no tables
730        let request = ListTablesRequest::new();
731        let response = namespace.list_tables(request).await.unwrap();
732        assert_eq!(response.tables.len(), 0);
733
734        // Create test IPC data
735        let schema = create_test_schema();
736        let ipc_data = create_test_ipc_data(&schema);
737
738        // Create a table
739        let mut create_request = CreateTableRequest::new();
740        create_request.id = Some(vec!["table1".to_string()]);
741        namespace
742            .create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
743            .await
744            .unwrap();
745
746        // Create another table
747        let mut create_request = CreateTableRequest::new();
748        create_request.id = Some(vec!["table2".to_string()]);
749        namespace
750            .create_table(create_request, bytes::Bytes::from(ipc_data))
751            .await
752            .unwrap();
753
754        // List tables should return both
755        let request = ListTablesRequest::new();
756        let response = namespace.list_tables(request).await.unwrap();
757        let tables = response.tables;
758        assert_eq!(tables.len(), 2);
759        assert!(tables.contains(&"table1".to_string()));
760        assert!(tables.contains(&"table2".to_string()));
761    }
762
763    #[tokio::test]
764    async fn test_list_tables_with_namespace_id() {
765        let (namespace, _temp_dir) = create_test_namespace().await;
766
767        let mut request = ListTablesRequest::new();
768        request.id = Some(vec!["namespace".to_string()]);
769
770        let result = namespace.list_tables(request).await;
771        assert!(result.is_err());
772        assert!(result
773            .unwrap_err()
774            .to_string()
775            .contains("root namespace operations"));
776    }
777
778    #[tokio::test]
779    async fn test_describe_table() {
780        let (namespace, _temp_dir) = create_test_namespace().await;
781
782        // Create a table first
783        let schema = create_test_schema();
784        let ipc_data = create_test_ipc_data(&schema);
785
786        let mut create_request = CreateTableRequest::new();
787        create_request.id = Some(vec!["test_table".to_string()]);
788        namespace
789            .create_table(create_request, bytes::Bytes::from(ipc_data))
790            .await
791            .unwrap();
792
793        // Describe the table
794        let mut request = DescribeTableRequest::new();
795        request.id = Some(vec!["test_table".to_string()]);
796        let response = namespace.describe_table(request).await.unwrap();
797
798        assert!(response.location.is_some());
799        assert!(response.location.unwrap().ends_with("test_table.lance"));
800    }
801
802    #[tokio::test]
803    async fn test_describe_nonexistent_table() {
804        let (namespace, _temp_dir) = create_test_namespace().await;
805
806        let mut request = DescribeTableRequest::new();
807        request.id = Some(vec!["nonexistent".to_string()]);
808
809        let result = namespace.describe_table(request).await;
810        assert!(result.is_err());
811        assert!(result
812            .unwrap_err()
813            .to_string()
814            .contains("Table does not exist"));
815    }
816
817    #[tokio::test]
818    async fn test_table_exists() {
819        let (namespace, _temp_dir) = create_test_namespace().await;
820
821        // Create a table
822        let schema = create_test_schema();
823        let ipc_data = create_test_ipc_data(&schema);
824
825        let mut create_request = CreateTableRequest::new();
826        create_request.id = Some(vec!["existing_table".to_string()]);
827        namespace
828            .create_table(create_request, bytes::Bytes::from(ipc_data))
829            .await
830            .unwrap();
831
832        // Check existing table
833        let mut request = TableExistsRequest::new();
834        request.id = Some(vec!["existing_table".to_string()]);
835        let result = namespace.table_exists(request).await;
836        assert!(result.is_ok());
837
838        // Check non-existent table
839        let mut request = TableExistsRequest::new();
840        request.id = Some(vec!["nonexistent".to_string()]);
841        let result = namespace.table_exists(request).await;
842        assert!(result.is_err());
843        assert!(result
844            .unwrap_err()
845            .to_string()
846            .contains("Table does not exist"));
847    }
848
849    #[tokio::test]
850    async fn test_drop_table() {
851        let (namespace, _temp_dir) = create_test_namespace().await;
852
853        // Create a table
854        let schema = create_test_schema();
855        let ipc_data = create_test_ipc_data(&schema);
856
857        let mut create_request = CreateTableRequest::new();
858        create_request.id = Some(vec!["table_to_drop".to_string()]);
859        namespace
860            .create_table(create_request, bytes::Bytes::from(ipc_data))
861            .await
862            .unwrap();
863
864        // Verify it exists
865        let mut exists_request = TableExistsRequest::new();
866        exists_request.id = Some(vec!["table_to_drop".to_string()]);
867        assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
868
869        // Drop the table
870        let mut drop_request = DropTableRequest::new();
871        drop_request.id = Some(vec!["table_to_drop".to_string()]);
872        let response = namespace.drop_table(drop_request).await.unwrap();
873        assert!(response.location.is_some());
874
875        // Verify it no longer exists
876        assert!(namespace.table_exists(exists_request).await.is_err());
877    }
878
879    #[tokio::test]
880    async fn test_drop_nonexistent_table() {
881        let (namespace, _temp_dir) = create_test_namespace().await;
882
883        let mut request = DropTableRequest::new();
884        request.id = Some(vec!["nonexistent".to_string()]);
885
886        // Should not fail when dropping non-existent table (idempotent)
887        let result = namespace.drop_table(request).await;
888        // The operation might succeed or fail depending on implementation
889        // But it should not panic
890        let _ = result;
891    }
892
893    #[tokio::test]
894    async fn test_root_namespace_operations() {
895        let (namespace, _temp_dir) = create_test_namespace().await;
896
897        // Test list_namespaces - should return empty list for root
898        let request = ListNamespacesRequest::new();
899        let result = namespace.list_namespaces(request).await;
900        assert!(result.is_ok());
901        assert_eq!(result.unwrap().namespaces.len(), 0);
902
903        // Test describe_namespace - should succeed for root
904        let request = DescribeNamespaceRequest::new();
905        let result = namespace.describe_namespace(request).await;
906        assert!(result.is_ok());
907
908        // Test namespace_exists - root always exists
909        let request = NamespaceExistsRequest::new();
910        let result = namespace.namespace_exists(request).await;
911        assert!(result.is_ok());
912
913        // Test create_namespace - root cannot be created
914        let request = CreateNamespaceRequest::new();
915        let result = namespace.create_namespace(request).await;
916        assert!(result.is_err());
917        assert!(result.unwrap_err().to_string().contains("already exists"));
918
919        // Test drop_namespace - root cannot be dropped
920        let request = DropNamespaceRequest::new();
921        let result = namespace.drop_namespace(request).await;
922        assert!(result.is_err());
923        assert!(result
924            .unwrap_err()
925            .to_string()
926            .contains("cannot be dropped"));
927    }
928
929    #[tokio::test]
930    async fn test_non_root_namespace_operations() {
931        let (namespace, _temp_dir) = create_test_namespace().await;
932
933        // Test create_namespace for non-root - not supported
934        let mut request = CreateNamespaceRequest::new();
935        request.id = Some(vec!["child".to_string()]);
936        let result = namespace.create_namespace(request).await;
937        assert!(matches!(result, Err(NamespaceError::NotSupported(_))));
938
939        // Test namespace_exists for non-root - should not exist
940        let mut request = NamespaceExistsRequest::new();
941        request.id = Some(vec!["child".to_string()]);
942        let result = namespace.namespace_exists(request).await;
943        assert!(result.is_err());
944        assert!(result
945            .unwrap_err()
946            .to_string()
947            .contains("Only root namespace exists"));
948
949        // Test drop_namespace for non-root - not supported
950        let mut request = DropNamespaceRequest::new();
951        request.id = Some(vec!["child".to_string()]);
952        let result = namespace.drop_namespace(request).await;
953        assert!(matches!(result, Err(NamespaceError::NotSupported(_))));
954    }
955
956    #[tokio::test]
957    async fn test_config_custom_root() {
958        let temp_dir = TempDir::new().unwrap();
959        let custom_path = temp_dir.path().join("custom");
960        std::fs::create_dir(&custom_path).unwrap();
961
962        let mut properties = HashMap::new();
963        properties.insert(
964            "root".to_string(),
965            custom_path.to_string_lossy().to_string(),
966        );
967
968        let namespace = DirectoryNamespace::new(properties).unwrap();
969
970        // Create test IPC data
971        let schema = create_test_schema();
972        let ipc_data = create_test_ipc_data(&schema);
973
974        // Create a table and verify location
975        let mut request = CreateTableRequest::new();
976        request.id = Some(vec!["test_table".to_string()]);
977
978        let response = namespace
979            .create_table(request, bytes::Bytes::from(ipc_data))
980            .await
981            .unwrap();
982
983        assert!(response.location.unwrap().contains("custom"));
984    }
985
986    #[tokio::test]
987    async fn test_config_storage_options() {
988        let temp_dir = TempDir::new().unwrap();
989        let mut properties = HashMap::new();
990        properties.insert(
991            "root".to_string(),
992            temp_dir.path().to_string_lossy().to_string(),
993        );
994        properties.insert("storage.option1".to_string(), "value1".to_string());
995        properties.insert("storage.option2".to_string(), "value2".to_string());
996
997        let namespace = DirectoryNamespace::new(properties).unwrap();
998
999        // Create test IPC data
1000        let schema = create_test_schema();
1001        let ipc_data = create_test_ipc_data(&schema);
1002
1003        // Create a table and check storage options are included
1004        let mut request = CreateTableRequest::new();
1005        request.id = Some(vec!["test_table".to_string()]);
1006
1007        let response = namespace
1008            .create_table(request, bytes::Bytes::from(ipc_data))
1009            .await
1010            .unwrap();
1011
1012        let storage_options = response.storage_options.unwrap();
1013        assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
1014        assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
1015    }
1016
1017    #[tokio::test]
1018    async fn test_various_arrow_types() {
1019        let (namespace, _temp_dir) = create_test_namespace().await;
1020
1021        // Create schema with various types
1022        let fields = vec![
1023            JsonArrowField {
1024                name: "bool_col".to_string(),
1025                r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
1026                nullable: true,
1027                metadata: None,
1028            },
1029            JsonArrowField {
1030                name: "int8_col".to_string(),
1031                r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
1032                nullable: true,
1033                metadata: None,
1034            },
1035            JsonArrowField {
1036                name: "float64_col".to_string(),
1037                r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
1038                nullable: true,
1039                metadata: None,
1040            },
1041            JsonArrowField {
1042                name: "binary_col".to_string(),
1043                r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
1044                nullable: true,
1045                metadata: None,
1046            },
1047        ];
1048
1049        let schema = JsonArrowSchema {
1050            fields,
1051            metadata: None,
1052        };
1053
1054        // Create IPC data
1055        let ipc_data = create_test_ipc_data(&schema);
1056
1057        let mut request = CreateTableRequest::new();
1058        request.id = Some(vec!["complex_table".to_string()]);
1059
1060        let response = namespace
1061            .create_table(request, bytes::Bytes::from(ipc_data))
1062            .await
1063            .unwrap();
1064
1065        assert!(response.location.is_some());
1066    }
1067
1068    #[tokio::test]
1069    async fn test_connect_dir() {
1070        let temp_dir = TempDir::new().unwrap();
1071        let mut properties = HashMap::new();
1072        properties.insert(
1073            "root".to_string(),
1074            temp_dir.path().to_string_lossy().to_string(),
1075        );
1076
1077        let namespace = crate::connect("dir", properties).await.unwrap();
1078
1079        // Test basic operation through the trait object
1080        let request = ListTablesRequest::new();
1081        let response = namespace.list_tables(request).await.unwrap();
1082        assert_eq!(response.tables.len(), 0);
1083    }
1084
1085    #[test]
1086    fn test_parse_storage_path_local() {
1087        let storage_options = HashMap::new();
1088
1089        // Test local filesystem paths
1090        let (scheme, config) =
1091            DirectoryNamespace::parse_storage_path("/path/to/data", &storage_options).unwrap();
1092        assert!(matches!(scheme, opendal::Scheme::Fs));
1093        assert_eq!(config.get("root").unwrap(), "/path/to/data");
1094
1095        // Test relative path
1096        let (scheme, config) =
1097            DirectoryNamespace::parse_storage_path("./data", &storage_options).unwrap();
1098        assert!(matches!(scheme, opendal::Scheme::Fs));
1099        assert_eq!(config.get("root").unwrap(), "./data");
1100    }
1101
1102    #[test]
1103    fn test_parse_storage_path_s3() {
1104        let storage_options = HashMap::new();
1105
1106        // Test S3 URL
1107        let (scheme, config) =
1108            DirectoryNamespace::parse_storage_path("s3://my-bucket/path/to/data", &storage_options)
1109                .unwrap();
1110        assert!(matches!(scheme, opendal::Scheme::S3));
1111        assert_eq!(config.get("bucket").unwrap(), "my-bucket");
1112        assert_eq!(config.get("root").unwrap(), "path/to/data");
1113
1114        // Test S3 with just bucket
1115        let (scheme, config) =
1116            DirectoryNamespace::parse_storage_path("s3://my-bucket", &storage_options).unwrap();
1117        assert!(matches!(scheme, opendal::Scheme::S3));
1118        assert_eq!(config.get("bucket").unwrap(), "my-bucket");
1119        assert_eq!(config.get("root").unwrap(), "");
1120    }
1121
1122    #[test]
1123    fn test_parse_storage_path_gcs() {
1124        let storage_options = HashMap::new();
1125
1126        // Test GCS URL
1127        let (scheme, config) = DirectoryNamespace::parse_storage_path(
1128            "gcs://my-bucket/path/to/data",
1129            &storage_options,
1130        )
1131        .unwrap();
1132        assert!(matches!(scheme, opendal::Scheme::Gcs));
1133        assert_eq!(config.get("bucket").unwrap(), "my-bucket");
1134        assert_eq!(config.get("root").unwrap(), "path/to/data");
1135    }
1136
1137    #[test]
1138    fn test_parse_storage_path_azblob() {
1139        let storage_options = HashMap::new();
1140
1141        // Test Azure Blob URL
1142        let (scheme, config) = DirectoryNamespace::parse_storage_path(
1143            "azblob://my-container/path/to/data",
1144            &storage_options,
1145        )
1146        .unwrap();
1147        assert!(matches!(scheme, opendal::Scheme::Azblob));
1148        assert_eq!(config.get("container").unwrap(), "my-container");
1149        assert_eq!(config.get("root").unwrap(), "path/to/data");
1150
1151        // Test with abfs alias
1152        let (scheme, config) =
1153            DirectoryNamespace::parse_storage_path("abfs://my-container/path", &storage_options)
1154                .unwrap();
1155        assert!(matches!(scheme, opendal::Scheme::Azblob));
1156        assert_eq!(config.get("container").unwrap(), "my-container");
1157        assert_eq!(config.get("root").unwrap(), "path");
1158    }
1159
1160    #[test]
1161    fn test_normalize_scheme() {
1162        // Test scheme normalization
1163        assert_eq!(DirectoryNamespace::normalize_scheme("s3a"), "s3");
1164        assert_eq!(DirectoryNamespace::normalize_scheme("s3n"), "s3");
1165        assert_eq!(DirectoryNamespace::normalize_scheme("S3A"), "s3");
1166        assert_eq!(DirectoryNamespace::normalize_scheme("abfs"), "azblob");
1167        assert_eq!(DirectoryNamespace::normalize_scheme("ABFS"), "azblob");
1168        assert_eq!(DirectoryNamespace::normalize_scheme("file"), "fs");
1169        assert_eq!(DirectoryNamespace::normalize_scheme("FILE"), "fs");
1170        assert_eq!(DirectoryNamespace::normalize_scheme("gcs"), "gcs");
1171        assert_eq!(DirectoryNamespace::normalize_scheme("random"), "random");
1172    }
1173
1174    #[test]
1175    fn test_fs_scheme_url() {
1176        let storage_options = HashMap::new();
1177
1178        // Test file:// URLs
1179        let (scheme, config) =
1180            DirectoryNamespace::parse_storage_path("file:///absolute/path", &storage_options)
1181                .unwrap();
1182        assert!(matches!(scheme, opendal::Scheme::Fs));
1183        assert_eq!(config.get("root").unwrap(), "/absolute/path");
1184
1185        // Test fs:// URLs
1186        let (scheme, config) =
1187            DirectoryNamespace::parse_storage_path("fs:///absolute/path", &storage_options)
1188                .unwrap();
1189        assert!(matches!(scheme, opendal::Scheme::Fs));
1190        assert_eq!(config.get("root").unwrap(), "/absolute/path");
1191    }
1192
1193    #[test]
1194    fn test_storage_options_passed_through() {
1195        // Test that storage options are properly passed through parse_storage_path
1196        let mut storage_options = HashMap::new();
1197        storage_options.insert("aws_access_key_id".to_string(), "test_key".to_string());
1198        storage_options.insert(
1199            "aws_secret_access_key".to_string(),
1200            "test_secret".to_string(),
1201        );
1202        storage_options.insert("region".to_string(), "us-west-2".to_string());
1203
1204        // Test with S3 - storage options should be included
1205        let (scheme, config) =
1206            DirectoryNamespace::parse_storage_path("s3://my-bucket/path", &storage_options)
1207                .unwrap();
1208        assert!(matches!(scheme, opendal::Scheme::S3));
1209        assert_eq!(config.get("bucket").unwrap(), "my-bucket");
1210        assert_eq!(config.get("root").unwrap(), "path");
1211        assert_eq!(config.get("aws_access_key_id").unwrap(), "test_key");
1212        assert_eq!(config.get("aws_secret_access_key").unwrap(), "test_secret");
1213        assert_eq!(config.get("region").unwrap(), "us-west-2");
1214
1215        // Test with local filesystem - storage options should still be included
1216        let (scheme, config) =
1217            DirectoryNamespace::parse_storage_path("/local/path", &storage_options).unwrap();
1218        assert!(matches!(scheme, opendal::Scheme::Fs));
1219        assert_eq!(config.get("root").unwrap(), "/local/path");
1220        // Even for local fs, storage options should be passed through
1221        assert_eq!(config.get("aws_access_key_id").unwrap(), "test_key");
1222    }
1223
1224    #[tokio::test]
1225    async fn test_create_table_with_ipc_data() {
1226        use arrow::array::{Int32Array, StringArray};
1227        use arrow::ipc::writer::StreamWriter;
1228
1229        let (namespace, _temp_dir) = create_test_namespace().await;
1230
1231        // Create a schema with some fields
1232        let schema = create_test_schema();
1233
1234        // Create some test data that matches the schema
1235        let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
1236        let arrow_schema = Arc::new(arrow_schema);
1237
1238        // Create a RecordBatch with actual data
1239        let id_array = Int32Array::from(vec![1, 2, 3]);
1240        let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
1241        let batch = arrow::record_batch::RecordBatch::try_new(
1242            arrow_schema.clone(),
1243            vec![Arc::new(id_array), Arc::new(name_array)],
1244        )
1245        .unwrap();
1246
1247        // Write the batch to an IPC stream
1248        let mut buffer = Vec::new();
1249        {
1250            let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
1251            writer.write(&batch).unwrap();
1252            writer.finish().unwrap();
1253        }
1254
1255        // Create table with the IPC data
1256        let mut request = CreateTableRequest::new();
1257        request.id = Some(vec!["test_table_with_data".to_string()]);
1258
1259        let response = namespace
1260            .create_table(request, Bytes::from(buffer))
1261            .await
1262            .unwrap();
1263
1264        assert_eq!(response.version, Some(1));
1265        assert!(response
1266            .location
1267            .unwrap()
1268            .contains("test_table_with_data.lance"));
1269
1270        // Verify table exists
1271        let mut exists_request = TableExistsRequest::new();
1272        exists_request.id = Some(vec!["test_table_with_data".to_string()]);
1273        namespace.table_exists(exists_request).await.unwrap();
1274    }
1275
1276    #[tokio::test]
1277    async fn test_create_empty_table() {
1278        let (namespace, temp_dir) = create_test_namespace().await;
1279
1280        let mut request = CreateEmptyTableRequest::new();
1281        request.id = Some(vec!["empty_table".to_string()]);
1282
1283        let response = namespace.create_empty_table(request).await.unwrap();
1284
1285        assert!(response.location.is_some());
1286        assert!(response.location.unwrap().ends_with("empty_table.lance"));
1287
1288        // Verify the .lance-reserved file was created in the correct location
1289        let table_dir = temp_dir.path().join("empty_table.lance");
1290        assert!(table_dir.exists());
1291        assert!(table_dir.is_dir());
1292
1293        let reserved_file = table_dir.join(".lance-reserved");
1294        assert!(reserved_file.exists());
1295        assert!(reserved_file.is_file());
1296
1297        // Verify file is empty
1298        let metadata = std::fs::metadata(&reserved_file).unwrap();
1299        assert_eq!(metadata.len(), 0);
1300
1301        // Verify table exists by checking for .lance-reserved file
1302        let mut exists_request = TableExistsRequest::new();
1303        exists_request.id = Some(vec!["empty_table".to_string()]);
1304        namespace.table_exists(exists_request).await.unwrap();
1305
1306        // List tables should include the empty table
1307        let list_request = ListTablesRequest::new();
1308        let list_response = namespace.list_tables(list_request).await.unwrap();
1309        assert!(list_response.tables.contains(&"empty_table".to_string()));
1310
1311        // Verify describe table works for empty table
1312        let mut describe_request = DescribeTableRequest::new();
1313        describe_request.id = Some(vec!["empty_table".to_string()]);
1314        let describe_response = namespace.describe_table(describe_request).await.unwrap();
1315        assert!(describe_response.location.is_some());
1316        assert!(describe_response.location.unwrap().contains("empty_table"));
1317    }
1318
1319    #[tokio::test]
1320    async fn test_create_empty_table_with_wrong_location() {
1321        let (namespace, _temp_dir) = create_test_namespace().await;
1322
1323        let mut request = CreateEmptyTableRequest::new();
1324        request.id = Some(vec!["test_table".to_string()]);
1325        request.location = Some("/wrong/path/table.lance".to_string());
1326
1327        let result = namespace.create_empty_table(request).await;
1328        assert!(result.is_err());
1329        assert!(result
1330            .unwrap_err()
1331            .to_string()
1332            .contains("must be at location"));
1333    }
1334
1335    #[tokio::test]
1336    async fn test_create_empty_table_then_drop() {
1337        let (namespace, temp_dir) = create_test_namespace().await;
1338
1339        // Create an empty table
1340        let mut create_request = CreateEmptyTableRequest::new();
1341        create_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1342
1343        let create_response = namespace.create_empty_table(create_request).await.unwrap();
1344        assert!(create_response.location.is_some());
1345
1346        // Verify it exists
1347        let table_dir = temp_dir.path().join("empty_table_to_drop.lance");
1348        assert!(table_dir.exists());
1349        let reserved_file = table_dir.join(".lance-reserved");
1350        assert!(reserved_file.exists());
1351
1352        // Drop the table
1353        let mut drop_request = DropTableRequest::new();
1354        drop_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1355        let drop_response = namespace.drop_table(drop_request).await.unwrap();
1356        assert!(drop_response.location.is_some());
1357
1358        // Verify table directory was removed
1359        assert!(!table_dir.exists());
1360        assert!(!reserved_file.exists());
1361
1362        // Verify table no longer exists
1363        let mut exists_request = TableExistsRequest::new();
1364        exists_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1365        let exists_result = namespace.table_exists(exists_request).await;
1366        assert!(exists_result.is_err());
1367    }
1368}