lance_namespace_impls/
dir.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Directory-based Lance Namespace implementation.
5//!
6//! This module provides a directory-based implementation of the Lance namespace
7//! that stores tables as Lance datasets in a filesystem directory structure.
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use lance::dataset::{Dataset, WriteParams};
15use lance::session::Session;
16use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
17use object_store::path::Path;
18
19use lance_namespace::models::{
20    CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
21    CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DescribeNamespaceRequest,
22    DescribeNamespaceResponse, DescribeTableRequest, DescribeTableResponse, DropNamespaceRequest,
23    DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest,
24    ListNamespacesResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest,
25    TableExistsRequest,
26};
27
28use lance_core::{box_error, Error, Result};
29use lance_namespace::LanceNamespace;
30
31/// Builder for creating a DirectoryNamespace.
32///
33/// This builder provides a fluent API for configuring and establishing
34/// connections to directory-based Lance namespaces.
35///
36/// # Examples
37///
38/// ```no_run
39/// # use lance_namespace_impls::DirectoryNamespaceBuilder;
40/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
41/// // Create a local directory namespace
42/// let namespace = DirectoryNamespaceBuilder::new("/path/to/data")
43///     .build()
44///     .await?;
45/// # Ok(())
46/// # }
47/// ```
48///
49/// ```no_run
50/// # use lance_namespace_impls::DirectoryNamespaceBuilder;
51/// # use lance::session::Session;
52/// # use std::sync::Arc;
53/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
54/// // Create with custom storage options and session
55/// let session = Arc::new(Session::default());
56/// let namespace = DirectoryNamespaceBuilder::new("s3://bucket/path")
57///     .storage_option("region", "us-west-2")
58///     .storage_option("access_key_id", "key")
59///     .session(session)
60///     .build()
61///     .await?;
62/// # Ok(())
63/// # }
64/// ```
65#[derive(Debug, Clone)]
66pub struct DirectoryNamespaceBuilder {
67    root: String,
68    storage_options: Option<HashMap<String, String>>,
69    session: Option<Arc<Session>>,
70}
71
72impl DirectoryNamespaceBuilder {
73    /// Create a new DirectoryNamespaceBuilder with the specified root path.
74    ///
75    /// # Arguments
76    ///
77    /// * `root` - Root directory path (local path or cloud URI like s3://bucket/path)
78    pub fn new(root: impl Into<String>) -> Self {
79        Self {
80            root: root.into().trim_end_matches('/').to_string(),
81            storage_options: None,
82            session: None,
83        }
84    }
85
86    /// Create a DirectoryNamespaceBuilder from properties HashMap.
87    ///
88    /// This method parses a properties map into builder configuration.
89    /// It expects:
90    /// - `root`: The root directory path (required)
91    /// - `storage.*`: Storage options (optional, prefix will be stripped)
92    ///
93    /// # Arguments
94    ///
95    /// * `properties` - Configuration properties
96    /// * `session` - Optional Lance session to reuse object store registry
97    ///
98    /// # Returns
99    ///
100    /// Returns a `DirectoryNamespaceBuilder` instance.
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if the `root` property is missing.
105    ///
106    /// # Examples
107    ///
108    /// ```no_run
109    /// # use lance_namespace_impls::DirectoryNamespaceBuilder;
110    /// # use std::collections::HashMap;
111    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
112    /// let mut properties = HashMap::new();
113    /// properties.insert("root".to_string(), "/path/to/data".to_string());
114    /// properties.insert("storage.region".to_string(), "us-west-2".to_string());
115    ///
116    /// let namespace = DirectoryNamespaceBuilder::from_properties(properties, None)?
117    ///     .build()
118    ///     .await?;
119    /// # Ok(())
120    /// # }
121    /// ```
122    pub fn from_properties(
123        properties: HashMap<String, String>,
124        session: Option<Arc<Session>>,
125    ) -> Result<Self> {
126        // Extract root from properties (required)
127        let root = properties
128            .get("root")
129            .cloned()
130            .ok_or_else(|| Error::Namespace {
131                source: "Missing required property 'root' for directory namespace".into(),
132                location: snafu::location!(),
133            })?;
134
135        // Extract storage options (properties prefixed with "storage.")
136        let storage_options: HashMap<String, String> = properties
137            .iter()
138            .filter_map(|(k, v)| {
139                k.strip_prefix("storage.")
140                    .map(|key| (key.to_string(), v.clone()))
141            })
142            .collect();
143
144        let storage_options = if storage_options.is_empty() {
145            None
146        } else {
147            Some(storage_options)
148        };
149
150        Ok(Self {
151            root: root.trim_end_matches('/').to_string(),
152            storage_options,
153            session,
154        })
155    }
156
157    /// Add a storage option.
158    ///
159    /// # Arguments
160    ///
161    /// * `key` - Storage option key (e.g., "region", "access_key_id")
162    /// * `value` - Storage option value
163    pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
164        self.storage_options
165            .get_or_insert_with(HashMap::new)
166            .insert(key.into(), value.into());
167        self
168    }
169
170    /// Add multiple storage options.
171    ///
172    /// # Arguments
173    ///
174    /// * `options` - HashMap of storage options to add
175    pub fn storage_options(mut self, options: HashMap<String, String>) -> Self {
176        self.storage_options
177            .get_or_insert_with(HashMap::new)
178            .extend(options);
179        self
180    }
181
182    /// Set the Lance session to use for this namespace.
183    ///
184    /// When a session is provided, the namespace will reuse the session's
185    /// object store registry, allowing multiple namespaces and datasets
186    /// to share the same underlying storage connections.
187    ///
188    /// # Arguments
189    ///
190    /// * `session` - Arc-wrapped Lance session
191    pub fn session(mut self, session: Arc<Session>) -> Self {
192        self.session = Some(session);
193        self
194    }
195
196    /// Build the DirectoryNamespace.
197    ///
198    /// # Returns
199    ///
200    /// Returns a `DirectoryNamespace` instance.
201    ///
202    /// # Errors
203    ///
204    /// Returns an error if:
205    /// - The root path is invalid
206    /// - Connection to the storage backend fails
207    /// - Storage options are invalid
208    pub async fn build(self) -> Result<DirectoryNamespace> {
209        let (object_store, base_path) =
210            Self::initialize_object_store(&self.root, &self.storage_options, &self.session).await?;
211
212        Ok(DirectoryNamespace {
213            root: self.root,
214            storage_options: self.storage_options,
215            session: self.session,
216            object_store,
217            base_path,
218        })
219    }
220
221    /// Initialize the Lance ObjectStore based on the configuration
222    async fn initialize_object_store(
223        root: &str,
224        storage_options: &Option<HashMap<String, String>>,
225        session: &Option<Arc<Session>>,
226    ) -> Result<(Arc<ObjectStore>, Path)> {
227        // Build ObjectStoreParams from storage options
228        let params = ObjectStoreParams {
229            storage_options: storage_options.clone(),
230            ..Default::default()
231        };
232
233        // Use object store registry from session if provided, otherwise create a new one
234        let registry = if let Some(session) = session {
235            session.store_registry()
236        } else {
237            Arc::new(ObjectStoreRegistry::default())
238        };
239
240        // Use Lance's object store factory to create from URI
241        let (object_store, base_path) = ObjectStore::from_uri_and_params(registry, root, &params)
242            .await
243            .map_err(|e| Error::Namespace {
244                source: format!("Failed to create object store: {}", e).into(),
245                location: snafu::location!(),
246            })?;
247
248        Ok((object_store, base_path))
249    }
250}
251
252/// Directory-based implementation of Lance Namespace.
253///
254/// This implementation stores tables as Lance datasets in a directory structure.
255/// It supports local filesystems and cloud storage backends through Lance's object store.
256pub struct DirectoryNamespace {
257    root: String,
258    storage_options: Option<HashMap<String, String>>,
259    #[allow(dead_code)]
260    session: Option<Arc<Session>>,
261    object_store: Arc<ObjectStore>,
262    base_path: Path,
263}
264
265impl DirectoryNamespace {
266    /// Validate that the namespace ID represents the root namespace
267    fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
268        if let Some(id) = id {
269            if !id.is_empty() {
270                return Err(Error::Namespace {
271                    source: format!(
272                        "Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
273                        id
274                    ).into(),
275                    location: snafu::location!(),
276                });
277            }
278        }
279        Ok(())
280    }
281
282    /// Extract table name from table ID
283    fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
284        let id = id.as_ref().ok_or_else(|| Error::Namespace {
285            source: "Directory namespace table ID cannot be empty".into(),
286            location: snafu::location!(),
287        })?;
288
289        if id.len() != 1 {
290            return Err(Error::Namespace {
291                source: format!(
292                    "Directory namespace only supports single-level table IDs, but got: {:?}",
293                    id
294                )
295                .into(),
296                location: snafu::location!(),
297            });
298        }
299
300        Ok(id[0].clone())
301    }
302
303    /// Get the full URI path for a table (for returning in responses)
304    fn table_full_uri(&self, table_name: &str) -> String {
305        format!("{}/{}.lance", &self.root, table_name)
306    }
307
308    /// Get the object store path for a table (relative to base_path)
309    fn table_path(&self, table_name: &str) -> Path {
310        self.base_path
311            .child(format!("{}.lance", table_name).as_str())
312    }
313
314    /// Get the versions directory path for a table
315    fn table_versions_path(&self, table_name: &str) -> Path {
316        // Need to chain child calls to avoid URL encoding the slash
317        self.base_path
318            .child(format!("{}.lance", table_name).as_str())
319            .child("_versions")
320    }
321
322    /// Get the reserved file path for a table
323    fn table_reserved_file_path(&self, table_name: &str) -> Path {
324        // Need to chain child calls to avoid URL encoding the slash
325        self.base_path
326            .child(format!("{}.lance", table_name).as_str())
327            .child(".lance-reserved")
328    }
329}
330
331#[async_trait]
332impl LanceNamespace for DirectoryNamespace {
333    async fn list_namespaces(
334        &self,
335        request: ListNamespacesRequest,
336    ) -> Result<ListNamespacesResponse> {
337        // Validate this is a request for the root namespace
338        Self::validate_root_namespace_id(&request.id)?;
339
340        // Directory namespace only contains the root namespace (empty list)
341        Ok(ListNamespacesResponse::new(vec![]))
342    }
343
344    async fn describe_namespace(
345        &self,
346        request: DescribeNamespaceRequest,
347    ) -> Result<DescribeNamespaceResponse> {
348        // Validate this is a request for the root namespace
349        Self::validate_root_namespace_id(&request.id)?;
350
351        // Return description of the root namespace
352        Ok(DescribeNamespaceResponse {
353            properties: Some(HashMap::new()),
354        })
355    }
356
357    async fn create_namespace(
358        &self,
359        request: CreateNamespaceRequest,
360    ) -> Result<CreateNamespaceResponse> {
361        // Root namespace always exists and cannot be created
362        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
363            return Err(Error::Namespace {
364                source: "Root namespace already exists and cannot be created".into(),
365                location: snafu::location!(),
366            });
367        }
368
369        // Non-root namespaces are not supported
370        Err(Error::NotSupported {
371            source: "Directory namespace only supports the root namespace".into(),
372            location: snafu::location!(),
373        })
374    }
375
376    async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
377        // Root namespace always exists and cannot be dropped
378        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
379            return Err(Error::Namespace {
380                source: "Root namespace cannot be dropped".into(),
381                location: snafu::location!(),
382            });
383        }
384
385        // Non-root namespaces are not supported
386        Err(Error::NotSupported {
387            source: "Directory namespace only supports the root namespace".into(),
388            location: snafu::location!(),
389        })
390    }
391
392    async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
393        // Root namespace always exists
394        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
395            return Ok(());
396        }
397
398        // Non-root namespaces don't exist
399        Err(Error::Namespace {
400            source: "Only root namespace exists in directory namespace".into(),
401            location: snafu::location!(),
402        })
403    }
404
405    async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
406        Self::validate_root_namespace_id(&request.id)?;
407
408        let mut tables = Vec::new();
409
410        // List all entries in the base directory
411        let entries = self
412            .object_store
413            .read_dir(self.base_path.clone())
414            .await
415            .map_err(|e| Error::IO {
416                source: box_error(std::io::Error::other(format!(
417                    "Failed to list directory: {}",
418                    e
419                ))),
420                location: snafu::location!(),
421            })?;
422
423        for entry in entries {
424            let path = entry.trim_end_matches('/');
425
426            // Only process directory-like paths that end with .lance
427            if !path.ends_with(".lance") {
428                continue;
429            }
430
431            // Extract table name (remove .lance suffix)
432            let table_name = &path[..path.len() - 6];
433
434            // Check if it's a valid Lance dataset or has .lance-reserved file
435            let mut is_table = false;
436
437            // First check for .lance-reserved file
438            let reserved_file_path = self.table_reserved_file_path(table_name);
439            if self
440                .object_store
441                .exists(&reserved_file_path)
442                .await
443                .unwrap_or(false)
444            {
445                is_table = true;
446            }
447
448            // If not found, check for _versions directory
449            if !is_table {
450                let versions_path = self.table_versions_path(table_name);
451                if let Ok(version_entries) = self.object_store.read_dir(versions_path).await {
452                    // If there's at least one version file, it's a valid Lance dataset
453                    if !version_entries.is_empty() {
454                        is_table = true;
455                    }
456                }
457            }
458
459            if is_table {
460                tables.push(table_name.to_string());
461            }
462        }
463
464        let response = ListTablesResponse::new(tables);
465        Ok(response)
466    }
467
468    async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
469        let table_name = Self::table_name_from_id(&request.id)?;
470        let table_uri = self.table_full_uri(&table_name);
471
472        // Check if table exists - either as Lance dataset or with .lance-reserved file
473        let mut table_exists = false;
474
475        // First check for .lance-reserved file
476        let reserved_file_path = self.table_reserved_file_path(&table_name);
477        if self
478            .object_store
479            .exists(&reserved_file_path)
480            .await
481            .unwrap_or(false)
482        {
483            table_exists = true;
484        }
485
486        // If not found, check if it's a Lance dataset by looking for _versions directory
487        if !table_exists {
488            let versions_path = self.table_versions_path(&table_name);
489            if let Ok(entries) = self.object_store.read_dir(versions_path).await {
490                if !entries.is_empty() {
491                    table_exists = true;
492                }
493            }
494        }
495
496        if !table_exists {
497            return Err(Error::Namespace {
498                source: format!("Table does not exist: {}", table_name).into(),
499                location: snafu::location!(),
500            });
501        }
502
503        Ok(DescribeTableResponse {
504            version: None,
505            location: Some(table_uri),
506            schema: None,
507            properties: None,
508            storage_options: self.storage_options.clone(),
509        })
510    }
511
512    async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
513        let table_name = Self::table_name_from_id(&request.id)?;
514
515        // Check if table exists - either as Lance dataset or with .lance-reserved file
516        let mut table_exists = false;
517
518        // First check for .lance-reserved file
519        let reserved_file_path = self.table_reserved_file_path(&table_name);
520        if self
521            .object_store
522            .exists(&reserved_file_path)
523            .await
524            .unwrap_or(false)
525        {
526            table_exists = true;
527        }
528
529        // If not found, check if it's a Lance dataset by looking for _versions directory
530        if !table_exists {
531            let versions_path = self.table_versions_path(&table_name);
532            if let Ok(entries) = self.object_store.read_dir(versions_path).await {
533                if !entries.is_empty() {
534                    table_exists = true;
535                }
536            }
537        }
538
539        if !table_exists {
540            return Err(Error::Namespace {
541                source: format!("Table does not exist: {}", table_name).into(),
542                location: snafu::location!(),
543            });
544        }
545
546        Ok(())
547    }
548
549    async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
550        let table_name = Self::table_name_from_id(&request.id)?;
551        let table_uri = self.table_full_uri(&table_name);
552
553        // Remove the entire table directory
554        let table_path = self.table_path(&table_name);
555
556        self.object_store
557            .remove_dir_all(table_path)
558            .await
559            .map_err(|e| Error::Namespace {
560                source: format!("Failed to drop table {}: {}", table_name, e).into(),
561                location: snafu::location!(),
562            })?;
563
564        Ok(DropTableResponse {
565            id: request.id,
566            location: Some(table_uri),
567            properties: None,
568            transaction_id: None,
569        })
570    }
571
572    async fn create_table(
573        &self,
574        request: CreateTableRequest,
575        request_data: Bytes,
576    ) -> Result<CreateTableResponse> {
577        let table_name = Self::table_name_from_id(&request.id)?;
578        let table_uri = self.table_full_uri(&table_name);
579
580        // Validate that request_data is provided and is a valid Arrow IPC stream
581        if request_data.is_empty() {
582            return Err(Error::Namespace {
583                source: "Request data (Arrow IPC stream) is required for create_table".into(),
584                location: snafu::location!(),
585            });
586        }
587
588        // Validate location if provided
589        if let Some(location) = &request.location {
590            let location = location.trim_end_matches('/');
591            if location != table_uri {
592                return Err(Error::Namespace {
593                    source: format!(
594                        "Cannot create table {} at location {}, must be at location {}",
595                        table_name, location, table_uri
596                    )
597                    .into(),
598                    location: snafu::location!(),
599                });
600            }
601        }
602
603        // Parse the Arrow IPC stream from request_data
604        use arrow::ipc::reader::StreamReader;
605        use std::io::Cursor;
606
607        let cursor = Cursor::new(request_data.to_vec());
608        let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Namespace {
609            source: format!("Invalid Arrow IPC stream: {}", e).into(),
610            location: snafu::location!(),
611        })?;
612
613        // Extract schema from the IPC stream
614        let arrow_schema = stream_reader.schema();
615
616        // Collect all batches from the stream
617        let mut batches = Vec::new();
618        for batch_result in stream_reader {
619            batches.push(batch_result.map_err(|e| Error::Namespace {
620                source: format!("Failed to read batch from IPC stream: {}", e).into(),
621                location: snafu::location!(),
622            })?);
623        }
624
625        // Create RecordBatchReader from the batches
626        let reader = if batches.is_empty() {
627            // If no batches in the stream, create an empty batch with the schema
628            let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
629            let batches = vec![Ok(batch)];
630            arrow::record_batch::RecordBatchIterator::new(batches, arrow_schema.clone())
631        } else {
632            // Convert to RecordBatchIterator
633            let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
634            arrow::record_batch::RecordBatchIterator::new(batch_results, arrow_schema)
635        };
636
637        // Set up write parameters for creating a new dataset
638        // Populate store_params with storage options to ensure they're forwarded to Dataset::write
639        let store_params = self.storage_options.as_ref().map(|opts| ObjectStoreParams {
640            storage_options: Some(opts.clone()),
641            ..Default::default()
642        });
643
644        let write_params = WriteParams {
645            mode: lance::dataset::WriteMode::Create,
646            store_params,
647            ..Default::default()
648        };
649
650        // Create the Lance dataset using the actual Lance API
651        Dataset::write(reader, &table_uri, Some(write_params))
652            .await
653            .map_err(|e| Error::Namespace {
654                source: format!("Failed to create Lance dataset: {}", e).into(),
655                location: snafu::location!(),
656            })?;
657
658        Ok(CreateTableResponse {
659            version: Some(1),
660            location: Some(table_uri),
661            properties: None,
662            storage_options: self.storage_options.clone(),
663        })
664    }
665
666    async fn create_empty_table(
667        &self,
668        request: CreateEmptyTableRequest,
669    ) -> Result<CreateEmptyTableResponse> {
670        let table_name = Self::table_name_from_id(&request.id)?;
671        let table_uri = self.table_full_uri(&table_name);
672
673        // Validate location if provided
674        if let Some(location) = &request.location {
675            let location = location.trim_end_matches('/');
676            if location != table_uri {
677                return Err(Error::Namespace {
678                    source: format!(
679                        "Cannot create table {} at location {}, must be at location {}",
680                        table_name, location, table_uri
681                    )
682                    .into(),
683                    location: snafu::location!(),
684                });
685            }
686        }
687
688        // Create the .lance-reserved file to mark the table as existing
689        let reserved_file_path = self.table_reserved_file_path(&table_name);
690
691        self.object_store
692            .create(&reserved_file_path)
693            .await
694            .map_err(|e| Error::Namespace {
695                source: format!(
696                    "Failed to create .lance-reserved file for table {}: {}",
697                    table_name, e
698                )
699                .into(),
700                location: snafu::location!(),
701            })?
702            .shutdown()
703            .await
704            .map_err(|e| Error::Namespace {
705                source: format!(
706                    "Failed to finalize .lance-reserved file for table {}: {}",
707                    table_name, e
708                )
709                .into(),
710                location: snafu::location!(),
711            })?;
712
713        Ok(CreateEmptyTableResponse {
714            location: Some(table_uri),
715            properties: None,
716            storage_options: self.storage_options.clone(),
717        })
718    }
719}
720
721#[cfg(test)]
722mod tests {
723    use super::*;
724    use lance_core::utils::tempfile::TempStdDir;
725    use lance_namespace::models::{JsonArrowDataType, JsonArrowField, JsonArrowSchema};
726    use lance_namespace::schema::convert_json_arrow_schema;
727    use std::sync::Arc;
728
729    /// Helper to create a test DirectoryNamespace with a temporary directory
730    async fn create_test_namespace() -> (DirectoryNamespace, TempStdDir) {
731        let temp_dir = TempStdDir::default();
732
733        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
734            .build()
735            .await
736            .unwrap();
737        (namespace, temp_dir)
738    }
739
740    /// Helper to create test IPC data from a schema
741    fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
742        use arrow::ipc::writer::StreamWriter;
743
744        let arrow_schema = convert_json_arrow_schema(schema).unwrap();
745        let arrow_schema = Arc::new(arrow_schema);
746        let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
747        let mut buffer = Vec::new();
748        {
749            let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
750            writer.write(&batch).unwrap();
751            writer.finish().unwrap();
752        }
753        buffer
754    }
755
756    /// Helper to create a simple test schema
757    fn create_test_schema() -> JsonArrowSchema {
758        let int_type = JsonArrowDataType::new("int32".to_string());
759        let string_type = JsonArrowDataType::new("utf8".to_string());
760
761        let id_field = JsonArrowField {
762            name: "id".to_string(),
763            r#type: Box::new(int_type),
764            nullable: false,
765            metadata: None,
766        };
767
768        let name_field = JsonArrowField {
769            name: "name".to_string(),
770            r#type: Box::new(string_type),
771            nullable: true,
772            metadata: None,
773        };
774
775        JsonArrowSchema {
776            fields: vec![id_field, name_field],
777            metadata: None,
778        }
779    }
780
781    #[tokio::test]
782    async fn test_create_table() {
783        let (namespace, _temp_dir) = create_test_namespace().await;
784
785        // Create test IPC data
786        let schema = create_test_schema();
787        let ipc_data = create_test_ipc_data(&schema);
788
789        let mut request = CreateTableRequest::new();
790        request.id = Some(vec!["test_table".to_string()]);
791
792        let response = namespace
793            .create_table(request, bytes::Bytes::from(ipc_data))
794            .await
795            .unwrap();
796
797        assert!(response.location.is_some());
798        assert!(response.location.unwrap().ends_with("test_table.lance"));
799        assert_eq!(response.version, Some(1));
800    }
801
802    #[tokio::test]
803    async fn test_create_table_without_data() {
804        let (namespace, _temp_dir) = create_test_namespace().await;
805
806        let mut request = CreateTableRequest::new();
807        request.id = Some(vec!["test_table".to_string()]);
808
809        let result = namespace.create_table(request, bytes::Bytes::new()).await;
810        assert!(result.is_err());
811        assert!(result
812            .unwrap_err()
813            .to_string()
814            .contains("Arrow IPC stream) is required"));
815    }
816
817    #[tokio::test]
818    async fn test_create_table_with_invalid_id() {
819        let (namespace, _temp_dir) = create_test_namespace().await;
820
821        // Create test IPC data
822        let schema = create_test_schema();
823        let ipc_data = create_test_ipc_data(&schema);
824
825        // Test with empty ID
826        let mut request = CreateTableRequest::new();
827        request.id = Some(vec![]);
828
829        let result = namespace
830            .create_table(request, bytes::Bytes::from(ipc_data.clone()))
831            .await;
832        assert!(result.is_err());
833
834        // Test with multi-level ID
835        let mut request = CreateTableRequest::new();
836        request.id = Some(vec!["namespace".to_string(), "table".to_string()]);
837
838        let result = namespace
839            .create_table(request, bytes::Bytes::from(ipc_data))
840            .await;
841        assert!(result.is_err());
842        assert!(result
843            .unwrap_err()
844            .to_string()
845            .contains("single-level table IDs"));
846    }
847
848    #[tokio::test]
849    async fn test_create_table_with_wrong_location() {
850        let (namespace, _temp_dir) = create_test_namespace().await;
851
852        // Create test IPC data
853        let schema = create_test_schema();
854        let ipc_data = create_test_ipc_data(&schema);
855
856        let mut request = CreateTableRequest::new();
857        request.id = Some(vec!["test_table".to_string()]);
858        request.location = Some("/wrong/path/table.lance".to_string());
859
860        let result = namespace
861            .create_table(request, bytes::Bytes::from(ipc_data))
862            .await;
863        assert!(result.is_err());
864        assert!(result
865            .unwrap_err()
866            .to_string()
867            .contains("must be at location"));
868    }
869
870    #[tokio::test]
871    async fn test_list_tables() {
872        let (namespace, _temp_dir) = create_test_namespace().await;
873
874        // Initially, no tables
875        let request = ListTablesRequest::new();
876        let response = namespace.list_tables(request).await.unwrap();
877        assert_eq!(response.tables.len(), 0);
878
879        // Create test IPC data
880        let schema = create_test_schema();
881        let ipc_data = create_test_ipc_data(&schema);
882
883        // Create a table
884        let mut create_request = CreateTableRequest::new();
885        create_request.id = Some(vec!["table1".to_string()]);
886        namespace
887            .create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
888            .await
889            .unwrap();
890
891        // Create another table
892        let mut create_request = CreateTableRequest::new();
893        create_request.id = Some(vec!["table2".to_string()]);
894        namespace
895            .create_table(create_request, bytes::Bytes::from(ipc_data))
896            .await
897            .unwrap();
898
899        // List tables should return both
900        let request = ListTablesRequest::new();
901        let response = namespace.list_tables(request).await.unwrap();
902        let tables = response.tables;
903        assert_eq!(tables.len(), 2);
904        assert!(tables.contains(&"table1".to_string()));
905        assert!(tables.contains(&"table2".to_string()));
906    }
907
908    #[tokio::test]
909    async fn test_list_tables_with_namespace_id() {
910        let (namespace, _temp_dir) = create_test_namespace().await;
911
912        let mut request = ListTablesRequest::new();
913        request.id = Some(vec!["namespace".to_string()]);
914
915        let result = namespace.list_tables(request).await;
916        assert!(result.is_err());
917        assert!(result
918            .unwrap_err()
919            .to_string()
920            .contains("root namespace operations"));
921    }
922
923    #[tokio::test]
924    async fn test_describe_table() {
925        let (namespace, _temp_dir) = create_test_namespace().await;
926
927        // Create a table first
928        let schema = create_test_schema();
929        let ipc_data = create_test_ipc_data(&schema);
930
931        let mut create_request = CreateTableRequest::new();
932        create_request.id = Some(vec!["test_table".to_string()]);
933        namespace
934            .create_table(create_request, bytes::Bytes::from(ipc_data))
935            .await
936            .unwrap();
937
938        // Describe the table
939        let mut request = DescribeTableRequest::new();
940        request.id = Some(vec!["test_table".to_string()]);
941        let response = namespace.describe_table(request).await.unwrap();
942
943        assert!(response.location.is_some());
944        assert!(response.location.unwrap().ends_with("test_table.lance"));
945    }
946
947    #[tokio::test]
948    async fn test_describe_nonexistent_table() {
949        let (namespace, _temp_dir) = create_test_namespace().await;
950
951        let mut request = DescribeTableRequest::new();
952        request.id = Some(vec!["nonexistent".to_string()]);
953
954        let result = namespace.describe_table(request).await;
955        assert!(result.is_err());
956        assert!(result
957            .unwrap_err()
958            .to_string()
959            .contains("Table does not exist"));
960    }
961
962    #[tokio::test]
963    async fn test_table_exists() {
964        let (namespace, _temp_dir) = create_test_namespace().await;
965
966        // Create a table
967        let schema = create_test_schema();
968        let ipc_data = create_test_ipc_data(&schema);
969
970        let mut create_request = CreateTableRequest::new();
971        create_request.id = Some(vec!["existing_table".to_string()]);
972        namespace
973            .create_table(create_request, bytes::Bytes::from(ipc_data))
974            .await
975            .unwrap();
976
977        // Check existing table
978        let mut request = TableExistsRequest::new();
979        request.id = Some(vec!["existing_table".to_string()]);
980        let result = namespace.table_exists(request).await;
981        assert!(result.is_ok());
982
983        // Check non-existent table
984        let mut request = TableExistsRequest::new();
985        request.id = Some(vec!["nonexistent".to_string()]);
986        let result = namespace.table_exists(request).await;
987        assert!(result.is_err());
988        assert!(result
989            .unwrap_err()
990            .to_string()
991            .contains("Table does not exist"));
992    }
993
994    #[tokio::test]
995    async fn test_drop_table() {
996        let (namespace, _temp_dir) = create_test_namespace().await;
997
998        // Create a table
999        let schema = create_test_schema();
1000        let ipc_data = create_test_ipc_data(&schema);
1001
1002        let mut create_request = CreateTableRequest::new();
1003        create_request.id = Some(vec!["table_to_drop".to_string()]);
1004        namespace
1005            .create_table(create_request, bytes::Bytes::from(ipc_data))
1006            .await
1007            .unwrap();
1008
1009        // Verify it exists
1010        let mut exists_request = TableExistsRequest::new();
1011        exists_request.id = Some(vec!["table_to_drop".to_string()]);
1012        assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
1013
1014        // Drop the table
1015        let mut drop_request = DropTableRequest::new();
1016        drop_request.id = Some(vec!["table_to_drop".to_string()]);
1017        let response = namespace.drop_table(drop_request).await.unwrap();
1018        assert!(response.location.is_some());
1019
1020        // Verify it no longer exists
1021        assert!(namespace.table_exists(exists_request).await.is_err());
1022    }
1023
1024    #[tokio::test]
1025    async fn test_drop_nonexistent_table() {
1026        let (namespace, _temp_dir) = create_test_namespace().await;
1027
1028        let mut request = DropTableRequest::new();
1029        request.id = Some(vec!["nonexistent".to_string()]);
1030
1031        // Should not fail when dropping non-existent table (idempotent)
1032        let result = namespace.drop_table(request).await;
1033        // The operation might succeed or fail depending on implementation
1034        // But it should not panic
1035        let _ = result;
1036    }
1037
1038    #[tokio::test]
1039    async fn test_root_namespace_operations() {
1040        let (namespace, _temp_dir) = create_test_namespace().await;
1041
1042        // Test list_namespaces - should return empty list for root
1043        let request = ListNamespacesRequest::new();
1044        let result = namespace.list_namespaces(request).await;
1045        assert!(result.is_ok());
1046        assert_eq!(result.unwrap().namespaces.len(), 0);
1047
1048        // Test describe_namespace - should succeed for root
1049        let request = DescribeNamespaceRequest::new();
1050        let result = namespace.describe_namespace(request).await;
1051        assert!(result.is_ok());
1052
1053        // Test namespace_exists - root always exists
1054        let request = NamespaceExistsRequest::new();
1055        let result = namespace.namespace_exists(request).await;
1056        assert!(result.is_ok());
1057
1058        // Test create_namespace - root cannot be created
1059        let request = CreateNamespaceRequest::new();
1060        let result = namespace.create_namespace(request).await;
1061        assert!(result.is_err());
1062        assert!(result.unwrap_err().to_string().contains("already exists"));
1063
1064        // Test drop_namespace - root cannot be dropped
1065        let request = DropNamespaceRequest::new();
1066        let result = namespace.drop_namespace(request).await;
1067        assert!(result.is_err());
1068        assert!(result
1069            .unwrap_err()
1070            .to_string()
1071            .contains("cannot be dropped"));
1072    }
1073
1074    #[tokio::test]
1075    async fn test_non_root_namespace_operations() {
1076        let (namespace, _temp_dir) = create_test_namespace().await;
1077
1078        // Test create_namespace for non-root - not supported
1079        let mut request = CreateNamespaceRequest::new();
1080        request.id = Some(vec!["child".to_string()]);
1081        let result = namespace.create_namespace(request).await;
1082        assert!(matches!(result, Err(Error::NotSupported { .. })));
1083
1084        // Test namespace_exists for non-root - should not exist
1085        let mut request = NamespaceExistsRequest::new();
1086        request.id = Some(vec!["child".to_string()]);
1087        let result = namespace.namespace_exists(request).await;
1088        assert!(result.is_err());
1089        assert!(result
1090            .unwrap_err()
1091            .to_string()
1092            .contains("Only root namespace exists"));
1093
1094        // Test drop_namespace for non-root - not supported
1095        let mut request = DropNamespaceRequest::new();
1096        request.id = Some(vec!["child".to_string()]);
1097        let result = namespace.drop_namespace(request).await;
1098        assert!(matches!(result, Err(Error::NotSupported { .. })));
1099    }
1100
1101    #[tokio::test]
1102    async fn test_config_custom_root() {
1103        let temp_dir = TempStdDir::default();
1104        let custom_path = temp_dir.join("custom");
1105        std::fs::create_dir(&custom_path).unwrap();
1106
1107        let namespace = DirectoryNamespaceBuilder::new(custom_path.to_string_lossy().to_string())
1108            .build()
1109            .await
1110            .unwrap();
1111
1112        // Create test IPC data
1113        let schema = create_test_schema();
1114        let ipc_data = create_test_ipc_data(&schema);
1115
1116        // Create a table and verify location
1117        let mut request = CreateTableRequest::new();
1118        request.id = Some(vec!["test_table".to_string()]);
1119
1120        let response = namespace
1121            .create_table(request, bytes::Bytes::from(ipc_data))
1122            .await
1123            .unwrap();
1124
1125        assert!(response.location.unwrap().contains("custom"));
1126    }
1127
1128    #[tokio::test]
1129    async fn test_config_storage_options() {
1130        let temp_dir = TempStdDir::default();
1131
1132        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1133            .storage_option("option1", "value1")
1134            .storage_option("option2", "value2")
1135            .build()
1136            .await
1137            .unwrap();
1138
1139        // Create test IPC data
1140        let schema = create_test_schema();
1141        let ipc_data = create_test_ipc_data(&schema);
1142
1143        // Create a table and check storage options are included
1144        let mut request = CreateTableRequest::new();
1145        request.id = Some(vec!["test_table".to_string()]);
1146
1147        let response = namespace
1148            .create_table(request, bytes::Bytes::from(ipc_data))
1149            .await
1150            .unwrap();
1151
1152        let storage_options = response.storage_options.unwrap();
1153        assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
1154        assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
1155    }
1156
1157    #[tokio::test]
1158    async fn test_various_arrow_types() {
1159        let (namespace, _temp_dir) = create_test_namespace().await;
1160
1161        // Create schema with various types
1162        let fields = vec![
1163            JsonArrowField {
1164                name: "bool_col".to_string(),
1165                r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
1166                nullable: true,
1167                metadata: None,
1168            },
1169            JsonArrowField {
1170                name: "int8_col".to_string(),
1171                r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
1172                nullable: true,
1173                metadata: None,
1174            },
1175            JsonArrowField {
1176                name: "float64_col".to_string(),
1177                r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
1178                nullable: true,
1179                metadata: None,
1180            },
1181            JsonArrowField {
1182                name: "binary_col".to_string(),
1183                r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
1184                nullable: true,
1185                metadata: None,
1186            },
1187        ];
1188
1189        let schema = JsonArrowSchema {
1190            fields,
1191            metadata: None,
1192        };
1193
1194        // Create IPC data
1195        let ipc_data = create_test_ipc_data(&schema);
1196
1197        let mut request = CreateTableRequest::new();
1198        request.id = Some(vec!["complex_table".to_string()]);
1199
1200        let response = namespace
1201            .create_table(request, bytes::Bytes::from(ipc_data))
1202            .await
1203            .unwrap();
1204
1205        assert!(response.location.is_some());
1206    }
1207
1208    #[tokio::test]
1209    async fn test_connect_dir() {
1210        let temp_dir = TempStdDir::default();
1211
1212        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1213            .build()
1214            .await
1215            .unwrap();
1216
1217        // Test basic operation through the concrete type
1218        let request = ListTablesRequest::new();
1219        let response = namespace.list_tables(request).await.unwrap();
1220        assert_eq!(response.tables.len(), 0);
1221    }
1222
1223    #[tokio::test]
1224    async fn test_create_table_with_ipc_data() {
1225        use arrow::array::{Int32Array, StringArray};
1226        use arrow::ipc::writer::StreamWriter;
1227
1228        let (namespace, _temp_dir) = create_test_namespace().await;
1229
1230        // Create a schema with some fields
1231        let schema = create_test_schema();
1232
1233        // Create some test data that matches the schema
1234        let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
1235        let arrow_schema = Arc::new(arrow_schema);
1236
1237        // Create a RecordBatch with actual data
1238        let id_array = Int32Array::from(vec![1, 2, 3]);
1239        let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
1240        let batch = arrow::record_batch::RecordBatch::try_new(
1241            arrow_schema.clone(),
1242            vec![Arc::new(id_array), Arc::new(name_array)],
1243        )
1244        .unwrap();
1245
1246        // Write the batch to an IPC stream
1247        let mut buffer = Vec::new();
1248        {
1249            let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
1250            writer.write(&batch).unwrap();
1251            writer.finish().unwrap();
1252        }
1253
1254        // Create table with the IPC data
1255        let mut request = CreateTableRequest::new();
1256        request.id = Some(vec!["test_table_with_data".to_string()]);
1257
1258        let response = namespace
1259            .create_table(request, Bytes::from(buffer))
1260            .await
1261            .unwrap();
1262
1263        assert_eq!(response.version, Some(1));
1264        assert!(response
1265            .location
1266            .unwrap()
1267            .contains("test_table_with_data.lance"));
1268
1269        // Verify table exists
1270        let mut exists_request = TableExistsRequest::new();
1271        exists_request.id = Some(vec!["test_table_with_data".to_string()]);
1272        namespace.table_exists(exists_request).await.unwrap();
1273    }
1274
1275    #[tokio::test]
1276    async fn test_create_empty_table() {
1277        let (namespace, temp_dir) = create_test_namespace().await;
1278
1279        let mut request = CreateEmptyTableRequest::new();
1280        request.id = Some(vec!["empty_table".to_string()]);
1281
1282        let response = namespace.create_empty_table(request).await.unwrap();
1283
1284        assert!(response.location.is_some());
1285        assert!(response.location.unwrap().ends_with("empty_table.lance"));
1286
1287        // Verify the .lance-reserved file was created in the correct location
1288        let table_dir = temp_dir.join("empty_table.lance");
1289        assert!(table_dir.exists());
1290        assert!(table_dir.is_dir());
1291
1292        let reserved_file = table_dir.join(".lance-reserved");
1293        assert!(reserved_file.exists());
1294        assert!(reserved_file.is_file());
1295
1296        // Verify file is empty
1297        let metadata = std::fs::metadata(&reserved_file).unwrap();
1298        assert_eq!(metadata.len(), 0);
1299
1300        // Verify table exists by checking for .lance-reserved file
1301        let mut exists_request = TableExistsRequest::new();
1302        exists_request.id = Some(vec!["empty_table".to_string()]);
1303        namespace.table_exists(exists_request).await.unwrap();
1304
1305        // List tables should include the empty table
1306        let list_request = ListTablesRequest::new();
1307        let list_response = namespace.list_tables(list_request).await.unwrap();
1308        assert!(list_response.tables.contains(&"empty_table".to_string()));
1309
1310        // Verify describe table works for empty table
1311        let mut describe_request = DescribeTableRequest::new();
1312        describe_request.id = Some(vec!["empty_table".to_string()]);
1313        let describe_response = namespace.describe_table(describe_request).await.unwrap();
1314        assert!(describe_response.location.is_some());
1315        assert!(describe_response.location.unwrap().contains("empty_table"));
1316    }
1317
1318    #[tokio::test]
1319    async fn test_create_empty_table_with_wrong_location() {
1320        let (namespace, _temp_dir) = create_test_namespace().await;
1321
1322        let mut request = CreateEmptyTableRequest::new();
1323        request.id = Some(vec!["test_table".to_string()]);
1324        request.location = Some("/wrong/path/table.lance".to_string());
1325
1326        let result = namespace.create_empty_table(request).await;
1327        assert!(result.is_err());
1328        assert!(result
1329            .unwrap_err()
1330            .to_string()
1331            .contains("must be at location"));
1332    }
1333
1334    #[tokio::test]
1335    async fn test_create_empty_table_then_drop() {
1336        let (namespace, temp_dir) = create_test_namespace().await;
1337
1338        // Create an empty table
1339        let mut create_request = CreateEmptyTableRequest::new();
1340        create_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1341
1342        let create_response = namespace.create_empty_table(create_request).await.unwrap();
1343        assert!(create_response.location.is_some());
1344
1345        // Verify it exists
1346        let table_dir = temp_dir.join("empty_table_to_drop.lance");
1347        assert!(table_dir.exists());
1348        let reserved_file = table_dir.join(".lance-reserved");
1349        assert!(reserved_file.exists());
1350
1351        // Drop the table
1352        let mut drop_request = DropTableRequest::new();
1353        drop_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1354        let drop_response = namespace.drop_table(drop_request).await.unwrap();
1355        assert!(drop_response.location.is_some());
1356
1357        // Verify table directory was removed
1358        assert!(!table_dir.exists());
1359        assert!(!reserved_file.exists());
1360
1361        // Verify table no longer exists
1362        let mut exists_request = TableExistsRequest::new();
1363        exists_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1364        let exists_result = namespace.table_exists(exists_request).await;
1365        assert!(exists_result.is_err());
1366    }
1367}