lance_namespace_impls/
dir.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Directory-based Lance Namespace implementation.
5//!
6//! This module provides a directory-based implementation of the Lance namespace
7//! that stores tables as Lance datasets in a filesystem directory structure.
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use lance::dataset::{Dataset, WriteParams};
15use lance::session::Session;
16use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
17use object_store::path::Path;
18
19use lance_namespace::models::{
20    CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
21    CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DescribeNamespaceRequest,
22    DescribeNamespaceResponse, DescribeTableRequest, DescribeTableResponse, DropNamespaceRequest,
23    DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest,
24    ListNamespacesResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest,
25    TableExistsRequest,
26};
27
28use lance_core::{box_error, Error, Result};
29use lance_namespace::LanceNamespace;
30
31/// Builder for creating a DirectoryNamespace.
32///
33/// This builder provides a fluent API for configuring and establishing
34/// connections to directory-based Lance namespaces.
35///
36/// # Examples
37///
38/// ```no_run
39/// # use lance_namespace_impls::DirectoryNamespaceBuilder;
40/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
41/// // Create a local directory namespace
42/// let namespace = DirectoryNamespaceBuilder::new("/path/to/data")
43///     .build()
44///     .await?;
45/// # Ok(())
46/// # }
47/// ```
48///
49/// ```no_run
50/// # use lance_namespace_impls::DirectoryNamespaceBuilder;
51/// # use lance::session::Session;
52/// # use std::sync::Arc;
53/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
54/// // Create with custom storage options and session
55/// let session = Arc::new(Session::default());
56/// let namespace = DirectoryNamespaceBuilder::new("s3://bucket/path")
57///     .storage_option("region", "us-west-2")
58///     .storage_option("access_key_id", "key")
59///     .session(session)
60///     .build()
61///     .await?;
62/// # Ok(())
63/// # }
64/// ```
65#[derive(Debug, Clone)]
66pub struct DirectoryNamespaceBuilder {
67    root: String,
68    storage_options: Option<HashMap<String, String>>,
69    session: Option<Arc<Session>>,
70}
71
72impl DirectoryNamespaceBuilder {
73    /// Create a new DirectoryNamespaceBuilder with the specified root path.
74    ///
75    /// # Arguments
76    ///
77    /// * `root` - Root directory path (local path or cloud URI like s3://bucket/path)
78    pub fn new(root: impl Into<String>) -> Self {
79        Self {
80            root: root.into().trim_end_matches('/').to_string(),
81            storage_options: None,
82            session: None,
83        }
84    }
85
86    /// Create a DirectoryNamespaceBuilder from properties HashMap.
87    ///
88    /// This method parses a properties map into builder configuration.
89    /// It expects:
90    /// - `root`: The root directory path (required)
91    /// - `storage.*`: Storage options (optional, prefix will be stripped)
92    ///
93    /// # Arguments
94    ///
95    /// * `properties` - Configuration properties
96    /// * `session` - Optional Lance session to reuse object store registry
97    ///
98    /// # Returns
99    ///
100    /// Returns a `DirectoryNamespaceBuilder` instance.
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if the `root` property is missing.
105    ///
106    /// # Examples
107    ///
108    /// ```no_run
109    /// # use lance_namespace_impls::DirectoryNamespaceBuilder;
110    /// # use std::collections::HashMap;
111    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
112    /// let mut properties = HashMap::new();
113    /// properties.insert("root".to_string(), "/path/to/data".to_string());
114    /// properties.insert("storage.region".to_string(), "us-west-2".to_string());
115    ///
116    /// let namespace = DirectoryNamespaceBuilder::from_properties(properties, None)?
117    ///     .build()
118    ///     .await?;
119    /// # Ok(())
120    /// # }
121    /// ```
122    pub fn from_properties(
123        properties: HashMap<String, String>,
124        session: Option<Arc<Session>>,
125    ) -> Result<Self> {
126        // Extract root from properties (required)
127        let root = properties
128            .get("root")
129            .cloned()
130            .ok_or_else(|| Error::Namespace {
131                source: "Missing required property 'root' for directory namespace".into(),
132                location: snafu::location!(),
133            })?;
134
135        // Extract storage options (properties prefixed with "storage.")
136        let storage_options: HashMap<String, String> = properties
137            .iter()
138            .filter_map(|(k, v)| {
139                k.strip_prefix("storage.")
140                    .map(|key| (key.to_string(), v.clone()))
141            })
142            .collect();
143
144        let storage_options = if storage_options.is_empty() {
145            None
146        } else {
147            Some(storage_options)
148        };
149
150        Ok(Self {
151            root: root.trim_end_matches('/').to_string(),
152            storage_options,
153            session,
154        })
155    }
156
157    /// Add a storage option.
158    ///
159    /// # Arguments
160    ///
161    /// * `key` - Storage option key (e.g., "region", "access_key_id")
162    /// * `value` - Storage option value
163    pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
164        self.storage_options
165            .get_or_insert_with(HashMap::new)
166            .insert(key.into(), value.into());
167        self
168    }
169
170    /// Add multiple storage options.
171    ///
172    /// # Arguments
173    ///
174    /// * `options` - HashMap of storage options to add
175    pub fn storage_options(mut self, options: HashMap<String, String>) -> Self {
176        self.storage_options
177            .get_or_insert_with(HashMap::new)
178            .extend(options);
179        self
180    }
181
182    /// Set the Lance session to use for this namespace.
183    ///
184    /// When a session is provided, the namespace will reuse the session's
185    /// object store registry, allowing multiple namespaces and datasets
186    /// to share the same underlying storage connections.
187    ///
188    /// # Arguments
189    ///
190    /// * `session` - Arc-wrapped Lance session
191    pub fn session(mut self, session: Arc<Session>) -> Self {
192        self.session = Some(session);
193        self
194    }
195
196    /// Build the DirectoryNamespace.
197    ///
198    /// # Returns
199    ///
200    /// Returns a `DirectoryNamespace` instance.
201    ///
202    /// # Errors
203    ///
204    /// Returns an error if:
205    /// - The root path is invalid
206    /// - Connection to the storage backend fails
207    /// - Storage options are invalid
208    pub async fn build(self) -> Result<DirectoryNamespace> {
209        let (object_store, base_path) =
210            Self::initialize_object_store(&self.root, &self.storage_options, &self.session).await?;
211
212        Ok(DirectoryNamespace {
213            root: self.root,
214            storage_options: self.storage_options,
215            session: self.session,
216            object_store,
217            base_path,
218        })
219    }
220
221    /// Initialize the Lance ObjectStore based on the configuration
222    async fn initialize_object_store(
223        root: &str,
224        storage_options: &Option<HashMap<String, String>>,
225        session: &Option<Arc<Session>>,
226    ) -> Result<(Arc<ObjectStore>, Path)> {
227        // Build ObjectStoreParams from storage options
228        let params = ObjectStoreParams {
229            storage_options: storage_options.clone(),
230            ..Default::default()
231        };
232
233        // Use object store registry from session if provided, otherwise create a new one
234        let registry = if let Some(session) = session {
235            session.store_registry()
236        } else {
237            Arc::new(ObjectStoreRegistry::default())
238        };
239
240        // Use Lance's object store factory to create from URI
241        let (object_store, base_path) = ObjectStore::from_uri_and_params(registry, root, &params)
242            .await
243            .map_err(|e| Error::Namespace {
244                source: format!("Failed to create object store: {}", e).into(),
245                location: snafu::location!(),
246            })?;
247
248        Ok((object_store, base_path))
249    }
250}
251
252/// Directory-based implementation of Lance Namespace.
253///
254/// This implementation stores tables as Lance datasets in a directory structure.
255/// It supports local filesystems and cloud storage backends through Lance's object store.
256pub struct DirectoryNamespace {
257    root: String,
258    storage_options: Option<HashMap<String, String>>,
259    #[allow(dead_code)]
260    session: Option<Arc<Session>>,
261    object_store: Arc<ObjectStore>,
262    base_path: Path,
263}
264
265impl std::fmt::Debug for DirectoryNamespace {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        write!(f, "{}", self.namespace_id())
268    }
269}
270
271impl std::fmt::Display for DirectoryNamespace {
272    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273        write!(f, "{}", self.namespace_id())
274    }
275}
276
277impl DirectoryNamespace {
278    /// Validate that the namespace ID represents the root namespace
279    fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
280        if let Some(id) = id {
281            if !id.is_empty() {
282                return Err(Error::Namespace {
283                    source: format!(
284                        "Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
285                        id
286                    ).into(),
287                    location: snafu::location!(),
288                });
289            }
290        }
291        Ok(())
292    }
293
294    /// Extract table name from table ID
295    fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
296        let id = id.as_ref().ok_or_else(|| Error::Namespace {
297            source: "Directory namespace table ID cannot be empty".into(),
298            location: snafu::location!(),
299        })?;
300
301        if id.len() != 1 {
302            return Err(Error::Namespace {
303                source: format!(
304                    "Directory namespace only supports single-level table IDs, but got: {:?}",
305                    id
306                )
307                .into(),
308                location: snafu::location!(),
309            });
310        }
311
312        Ok(id[0].clone())
313    }
314
315    /// Get the full URI path for a table (for returning in responses)
316    fn table_full_uri(&self, table_name: &str) -> String {
317        format!("{}/{}.lance", &self.root, table_name)
318    }
319
320    /// Get the object store path for a table (relative to base_path)
321    fn table_path(&self, table_name: &str) -> Path {
322        self.base_path
323            .child(format!("{}.lance", table_name).as_str())
324    }
325
326    /// Get the versions directory path for a table
327    fn table_versions_path(&self, table_name: &str) -> Path {
328        // Need to chain child calls to avoid URL encoding the slash
329        self.base_path
330            .child(format!("{}.lance", table_name).as_str())
331            .child("_versions")
332    }
333
334    /// Get the reserved file path for a table
335    fn table_reserved_file_path(&self, table_name: &str) -> Path {
336        // Need to chain child calls to avoid URL encoding the slash
337        self.base_path
338            .child(format!("{}.lance", table_name).as_str())
339            .child(".lance-reserved")
340    }
341}
342
343#[async_trait]
344impl LanceNamespace for DirectoryNamespace {
345    async fn list_namespaces(
346        &self,
347        request: ListNamespacesRequest,
348    ) -> Result<ListNamespacesResponse> {
349        // Validate this is a request for the root namespace
350        Self::validate_root_namespace_id(&request.id)?;
351
352        // Directory namespace only contains the root namespace (empty list)
353        Ok(ListNamespacesResponse::new(vec![]))
354    }
355
356    async fn describe_namespace(
357        &self,
358        request: DescribeNamespaceRequest,
359    ) -> Result<DescribeNamespaceResponse> {
360        // Validate this is a request for the root namespace
361        Self::validate_root_namespace_id(&request.id)?;
362
363        // Return description of the root namespace
364        Ok(DescribeNamespaceResponse {
365            properties: Some(HashMap::new()),
366        })
367    }
368
369    async fn create_namespace(
370        &self,
371        request: CreateNamespaceRequest,
372    ) -> Result<CreateNamespaceResponse> {
373        // Root namespace always exists and cannot be created
374        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
375            return Err(Error::Namespace {
376                source: "Root namespace already exists and cannot be created".into(),
377                location: snafu::location!(),
378            });
379        }
380
381        // Non-root namespaces are not supported
382        Err(Error::NotSupported {
383            source: "Directory namespace only supports the root namespace".into(),
384            location: snafu::location!(),
385        })
386    }
387
388    async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
389        // Root namespace always exists and cannot be dropped
390        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
391            return Err(Error::Namespace {
392                source: "Root namespace cannot be dropped".into(),
393                location: snafu::location!(),
394            });
395        }
396
397        // Non-root namespaces are not supported
398        Err(Error::NotSupported {
399            source: "Directory namespace only supports the root namespace".into(),
400            location: snafu::location!(),
401        })
402    }
403
404    async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
405        // Root namespace always exists
406        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
407            return Ok(());
408        }
409
410        // Non-root namespaces don't exist
411        Err(Error::Namespace {
412            source: "Only root namespace exists in directory namespace".into(),
413            location: snafu::location!(),
414        })
415    }
416
417    async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
418        Self::validate_root_namespace_id(&request.id)?;
419
420        let mut tables = Vec::new();
421
422        // List all entries in the base directory
423        let entries = self
424            .object_store
425            .read_dir(self.base_path.clone())
426            .await
427            .map_err(|e| Error::IO {
428                source: box_error(std::io::Error::other(format!(
429                    "Failed to list directory: {}",
430                    e
431                ))),
432                location: snafu::location!(),
433            })?;
434
435        for entry in entries {
436            let path = entry.trim_end_matches('/');
437
438            // Only process directory-like paths that end with .lance
439            if !path.ends_with(".lance") {
440                continue;
441            }
442
443            // Extract table name (remove .lance suffix)
444            let table_name = &path[..path.len() - 6];
445
446            // Check if it's a valid Lance dataset or has .lance-reserved file
447            let mut is_table = false;
448
449            // First check for .lance-reserved file
450            let reserved_file_path = self.table_reserved_file_path(table_name);
451            if self
452                .object_store
453                .exists(&reserved_file_path)
454                .await
455                .unwrap_or(false)
456            {
457                is_table = true;
458            }
459
460            // If not found, check for _versions directory
461            if !is_table {
462                let versions_path = self.table_versions_path(table_name);
463                if let Ok(version_entries) = self.object_store.read_dir(versions_path).await {
464                    // If there's at least one version file, it's a valid Lance dataset
465                    if !version_entries.is_empty() {
466                        is_table = true;
467                    }
468                }
469            }
470
471            if is_table {
472                tables.push(table_name.to_string());
473            }
474        }
475
476        let response = ListTablesResponse::new(tables);
477        Ok(response)
478    }
479
480    async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
481        let table_name = Self::table_name_from_id(&request.id)?;
482        let table_uri = self.table_full_uri(&table_name);
483
484        // Check if table exists - either as Lance dataset or with .lance-reserved file
485        let mut table_exists = false;
486
487        // First check for .lance-reserved file
488        let reserved_file_path = self.table_reserved_file_path(&table_name);
489        if self
490            .object_store
491            .exists(&reserved_file_path)
492            .await
493            .unwrap_or(false)
494        {
495            table_exists = true;
496        }
497
498        // If not found, check if it's a Lance dataset by looking for _versions directory
499        if !table_exists {
500            let versions_path = self.table_versions_path(&table_name);
501            if let Ok(entries) = self.object_store.read_dir(versions_path).await {
502                if !entries.is_empty() {
503                    table_exists = true;
504                }
505            }
506        }
507
508        if !table_exists {
509            return Err(Error::Namespace {
510                source: format!("Table does not exist: {}", table_name).into(),
511                location: snafu::location!(),
512            });
513        }
514
515        Ok(DescribeTableResponse {
516            version: None,
517            location: Some(table_uri),
518            schema: None,
519            properties: None,
520            storage_options: self.storage_options.clone(),
521        })
522    }
523
524    async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
525        let table_name = Self::table_name_from_id(&request.id)?;
526
527        // Check if table exists - either as Lance dataset or with .lance-reserved file
528        let mut table_exists = false;
529
530        // First check for .lance-reserved file
531        let reserved_file_path = self.table_reserved_file_path(&table_name);
532        if self
533            .object_store
534            .exists(&reserved_file_path)
535            .await
536            .unwrap_or(false)
537        {
538            table_exists = true;
539        }
540
541        // If not found, check if it's a Lance dataset by looking for _versions directory
542        if !table_exists {
543            let versions_path = self.table_versions_path(&table_name);
544            if let Ok(entries) = self.object_store.read_dir(versions_path).await {
545                if !entries.is_empty() {
546                    table_exists = true;
547                }
548            }
549        }
550
551        if !table_exists {
552            return Err(Error::Namespace {
553                source: format!("Table does not exist: {}", table_name).into(),
554                location: snafu::location!(),
555            });
556        }
557
558        Ok(())
559    }
560
561    async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
562        let table_name = Self::table_name_from_id(&request.id)?;
563        let table_uri = self.table_full_uri(&table_name);
564
565        // Remove the entire table directory
566        let table_path = self.table_path(&table_name);
567
568        self.object_store
569            .remove_dir_all(table_path)
570            .await
571            .map_err(|e| Error::Namespace {
572                source: format!("Failed to drop table {}: {}", table_name, e).into(),
573                location: snafu::location!(),
574            })?;
575
576        Ok(DropTableResponse {
577            id: request.id,
578            location: Some(table_uri),
579            properties: None,
580            transaction_id: None,
581        })
582    }
583
584    async fn create_table(
585        &self,
586        request: CreateTableRequest,
587        request_data: Bytes,
588    ) -> Result<CreateTableResponse> {
589        let table_name = Self::table_name_from_id(&request.id)?;
590        let table_uri = self.table_full_uri(&table_name);
591
592        // Validate that request_data is provided and is a valid Arrow IPC stream
593        if request_data.is_empty() {
594            return Err(Error::Namespace {
595                source: "Request data (Arrow IPC stream) is required for create_table".into(),
596                location: snafu::location!(),
597            });
598        }
599
600        // Validate location if provided
601        if let Some(location) = &request.location {
602            let location = location.trim_end_matches('/');
603            if location != table_uri {
604                return Err(Error::Namespace {
605                    source: format!(
606                        "Cannot create table {} at location {}, must be at location {}",
607                        table_name, location, table_uri
608                    )
609                    .into(),
610                    location: snafu::location!(),
611                });
612            }
613        }
614
615        // Parse the Arrow IPC stream from request_data
616        use arrow::ipc::reader::StreamReader;
617        use std::io::Cursor;
618
619        let cursor = Cursor::new(request_data.to_vec());
620        let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Namespace {
621            source: format!("Invalid Arrow IPC stream: {}", e).into(),
622            location: snafu::location!(),
623        })?;
624
625        // Extract schema from the IPC stream
626        let arrow_schema = stream_reader.schema();
627
628        // Collect all batches from the stream
629        let mut batches = Vec::new();
630        for batch_result in stream_reader {
631            batches.push(batch_result.map_err(|e| Error::Namespace {
632                source: format!("Failed to read batch from IPC stream: {}", e).into(),
633                location: snafu::location!(),
634            })?);
635        }
636
637        // Create RecordBatchReader from the batches
638        let reader = if batches.is_empty() {
639            // If no batches in the stream, create an empty batch with the schema
640            let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
641            let batches = vec![Ok(batch)];
642            arrow::record_batch::RecordBatchIterator::new(batches, arrow_schema.clone())
643        } else {
644            // Convert to RecordBatchIterator
645            let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
646            arrow::record_batch::RecordBatchIterator::new(batch_results, arrow_schema)
647        };
648
649        // Set up write parameters for creating a new dataset
650        // Populate store_params with storage options to ensure they're forwarded to Dataset::write
651        let store_params = self.storage_options.as_ref().map(|opts| ObjectStoreParams {
652            storage_options: Some(opts.clone()),
653            ..Default::default()
654        });
655
656        let write_params = WriteParams {
657            mode: lance::dataset::WriteMode::Create,
658            store_params,
659            ..Default::default()
660        };
661
662        // Create the Lance dataset using the actual Lance API
663        Dataset::write(reader, &table_uri, Some(write_params))
664            .await
665            .map_err(|e| Error::Namespace {
666                source: format!("Failed to create Lance dataset: {}", e).into(),
667                location: snafu::location!(),
668            })?;
669
670        Ok(CreateTableResponse {
671            version: Some(1),
672            location: Some(table_uri),
673            properties: None,
674            storage_options: self.storage_options.clone(),
675        })
676    }
677
678    async fn create_empty_table(
679        &self,
680        request: CreateEmptyTableRequest,
681    ) -> Result<CreateEmptyTableResponse> {
682        let table_name = Self::table_name_from_id(&request.id)?;
683        let table_uri = self.table_full_uri(&table_name);
684
685        // Validate location if provided
686        if let Some(location) = &request.location {
687            let location = location.trim_end_matches('/');
688            if location != table_uri {
689                return Err(Error::Namespace {
690                    source: format!(
691                        "Cannot create table {} at location {}, must be at location {}",
692                        table_name, location, table_uri
693                    )
694                    .into(),
695                    location: snafu::location!(),
696                });
697            }
698        }
699
700        // Create the .lance-reserved file to mark the table as existing
701        let reserved_file_path = self.table_reserved_file_path(&table_name);
702
703        self.object_store
704            .create(&reserved_file_path)
705            .await
706            .map_err(|e| Error::Namespace {
707                source: format!(
708                    "Failed to create .lance-reserved file for table {}: {}",
709                    table_name, e
710                )
711                .into(),
712                location: snafu::location!(),
713            })?
714            .shutdown()
715            .await
716            .map_err(|e| Error::Namespace {
717                source: format!(
718                    "Failed to finalize .lance-reserved file for table {}: {}",
719                    table_name, e
720                )
721                .into(),
722                location: snafu::location!(),
723            })?;
724
725        Ok(CreateEmptyTableResponse {
726            location: Some(table_uri),
727            properties: None,
728            storage_options: self.storage_options.clone(),
729        })
730    }
731
732    fn namespace_id(&self) -> String {
733        format!("DirectoryNamespace {{ root: {:?} }}", self.root)
734    }
735}
736
737#[cfg(test)]
738mod tests {
739    use super::*;
740    use lance_core::utils::tempfile::TempStdDir;
741    use lance_namespace::models::{JsonArrowDataType, JsonArrowField, JsonArrowSchema};
742    use lance_namespace::schema::convert_json_arrow_schema;
743    use std::sync::Arc;
744
745    /// Helper to create a test DirectoryNamespace with a temporary directory
746    async fn create_test_namespace() -> (DirectoryNamespace, TempStdDir) {
747        let temp_dir = TempStdDir::default();
748
749        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
750            .build()
751            .await
752            .unwrap();
753        (namespace, temp_dir)
754    }
755
756    /// Helper to create test IPC data from a schema
757    fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
758        use arrow::ipc::writer::StreamWriter;
759
760        let arrow_schema = convert_json_arrow_schema(schema).unwrap();
761        let arrow_schema = Arc::new(arrow_schema);
762        let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
763        let mut buffer = Vec::new();
764        {
765            let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
766            writer.write(&batch).unwrap();
767            writer.finish().unwrap();
768        }
769        buffer
770    }
771
772    /// Helper to create a simple test schema
773    fn create_test_schema() -> JsonArrowSchema {
774        let int_type = JsonArrowDataType::new("int32".to_string());
775        let string_type = JsonArrowDataType::new("utf8".to_string());
776
777        let id_field = JsonArrowField {
778            name: "id".to_string(),
779            r#type: Box::new(int_type),
780            nullable: false,
781            metadata: None,
782        };
783
784        let name_field = JsonArrowField {
785            name: "name".to_string(),
786            r#type: Box::new(string_type),
787            nullable: true,
788            metadata: None,
789        };
790
791        JsonArrowSchema {
792            fields: vec![id_field, name_field],
793            metadata: None,
794        }
795    }
796
797    #[tokio::test]
798    async fn test_create_table() {
799        let (namespace, _temp_dir) = create_test_namespace().await;
800
801        // Create test IPC data
802        let schema = create_test_schema();
803        let ipc_data = create_test_ipc_data(&schema);
804
805        let mut request = CreateTableRequest::new();
806        request.id = Some(vec!["test_table".to_string()]);
807
808        let response = namespace
809            .create_table(request, bytes::Bytes::from(ipc_data))
810            .await
811            .unwrap();
812
813        assert!(response.location.is_some());
814        assert!(response.location.unwrap().ends_with("test_table.lance"));
815        assert_eq!(response.version, Some(1));
816    }
817
818    #[tokio::test]
819    async fn test_create_table_without_data() {
820        let (namespace, _temp_dir) = create_test_namespace().await;
821
822        let mut request = CreateTableRequest::new();
823        request.id = Some(vec!["test_table".to_string()]);
824
825        let result = namespace.create_table(request, bytes::Bytes::new()).await;
826        assert!(result.is_err());
827        assert!(result
828            .unwrap_err()
829            .to_string()
830            .contains("Arrow IPC stream) is required"));
831    }
832
833    #[tokio::test]
834    async fn test_create_table_with_invalid_id() {
835        let (namespace, _temp_dir) = create_test_namespace().await;
836
837        // Create test IPC data
838        let schema = create_test_schema();
839        let ipc_data = create_test_ipc_data(&schema);
840
841        // Test with empty ID
842        let mut request = CreateTableRequest::new();
843        request.id = Some(vec![]);
844
845        let result = namespace
846            .create_table(request, bytes::Bytes::from(ipc_data.clone()))
847            .await;
848        assert!(result.is_err());
849
850        // Test with multi-level ID
851        let mut request = CreateTableRequest::new();
852        request.id = Some(vec!["namespace".to_string(), "table".to_string()]);
853
854        let result = namespace
855            .create_table(request, bytes::Bytes::from(ipc_data))
856            .await;
857        assert!(result.is_err());
858        assert!(result
859            .unwrap_err()
860            .to_string()
861            .contains("single-level table IDs"));
862    }
863
864    #[tokio::test]
865    async fn test_create_table_with_wrong_location() {
866        let (namespace, _temp_dir) = create_test_namespace().await;
867
868        // Create test IPC data
869        let schema = create_test_schema();
870        let ipc_data = create_test_ipc_data(&schema);
871
872        let mut request = CreateTableRequest::new();
873        request.id = Some(vec!["test_table".to_string()]);
874        request.location = Some("/wrong/path/table.lance".to_string());
875
876        let result = namespace
877            .create_table(request, bytes::Bytes::from(ipc_data))
878            .await;
879        assert!(result.is_err());
880        assert!(result
881            .unwrap_err()
882            .to_string()
883            .contains("must be at location"));
884    }
885
886    #[tokio::test]
887    async fn test_list_tables() {
888        let (namespace, _temp_dir) = create_test_namespace().await;
889
890        // Initially, no tables
891        let request = ListTablesRequest::new();
892        let response = namespace.list_tables(request).await.unwrap();
893        assert_eq!(response.tables.len(), 0);
894
895        // Create test IPC data
896        let schema = create_test_schema();
897        let ipc_data = create_test_ipc_data(&schema);
898
899        // Create a table
900        let mut create_request = CreateTableRequest::new();
901        create_request.id = Some(vec!["table1".to_string()]);
902        namespace
903            .create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
904            .await
905            .unwrap();
906
907        // Create another table
908        let mut create_request = CreateTableRequest::new();
909        create_request.id = Some(vec!["table2".to_string()]);
910        namespace
911            .create_table(create_request, bytes::Bytes::from(ipc_data))
912            .await
913            .unwrap();
914
915        // List tables should return both
916        let request = ListTablesRequest::new();
917        let response = namespace.list_tables(request).await.unwrap();
918        let tables = response.tables;
919        assert_eq!(tables.len(), 2);
920        assert!(tables.contains(&"table1".to_string()));
921        assert!(tables.contains(&"table2".to_string()));
922    }
923
924    #[tokio::test]
925    async fn test_list_tables_with_namespace_id() {
926        let (namespace, _temp_dir) = create_test_namespace().await;
927
928        let mut request = ListTablesRequest::new();
929        request.id = Some(vec!["namespace".to_string()]);
930
931        let result = namespace.list_tables(request).await;
932        assert!(result.is_err());
933        assert!(result
934            .unwrap_err()
935            .to_string()
936            .contains("root namespace operations"));
937    }
938
939    #[tokio::test]
940    async fn test_describe_table() {
941        let (namespace, _temp_dir) = create_test_namespace().await;
942
943        // Create a table first
944        let schema = create_test_schema();
945        let ipc_data = create_test_ipc_data(&schema);
946
947        let mut create_request = CreateTableRequest::new();
948        create_request.id = Some(vec!["test_table".to_string()]);
949        namespace
950            .create_table(create_request, bytes::Bytes::from(ipc_data))
951            .await
952            .unwrap();
953
954        // Describe the table
955        let mut request = DescribeTableRequest::new();
956        request.id = Some(vec!["test_table".to_string()]);
957        let response = namespace.describe_table(request).await.unwrap();
958
959        assert!(response.location.is_some());
960        assert!(response.location.unwrap().ends_with("test_table.lance"));
961    }
962
963    #[tokio::test]
964    async fn test_describe_nonexistent_table() {
965        let (namespace, _temp_dir) = create_test_namespace().await;
966
967        let mut request = DescribeTableRequest::new();
968        request.id = Some(vec!["nonexistent".to_string()]);
969
970        let result = namespace.describe_table(request).await;
971        assert!(result.is_err());
972        assert!(result
973            .unwrap_err()
974            .to_string()
975            .contains("Table does not exist"));
976    }
977
978    #[tokio::test]
979    async fn test_table_exists() {
980        let (namespace, _temp_dir) = create_test_namespace().await;
981
982        // Create a table
983        let schema = create_test_schema();
984        let ipc_data = create_test_ipc_data(&schema);
985
986        let mut create_request = CreateTableRequest::new();
987        create_request.id = Some(vec!["existing_table".to_string()]);
988        namespace
989            .create_table(create_request, bytes::Bytes::from(ipc_data))
990            .await
991            .unwrap();
992
993        // Check existing table
994        let mut request = TableExistsRequest::new();
995        request.id = Some(vec!["existing_table".to_string()]);
996        let result = namespace.table_exists(request).await;
997        assert!(result.is_ok());
998
999        // Check non-existent table
1000        let mut request = TableExistsRequest::new();
1001        request.id = Some(vec!["nonexistent".to_string()]);
1002        let result = namespace.table_exists(request).await;
1003        assert!(result.is_err());
1004        assert!(result
1005            .unwrap_err()
1006            .to_string()
1007            .contains("Table does not exist"));
1008    }
1009
1010    #[tokio::test]
1011    async fn test_drop_table() {
1012        let (namespace, _temp_dir) = create_test_namespace().await;
1013
1014        // Create a table
1015        let schema = create_test_schema();
1016        let ipc_data = create_test_ipc_data(&schema);
1017
1018        let mut create_request = CreateTableRequest::new();
1019        create_request.id = Some(vec!["table_to_drop".to_string()]);
1020        namespace
1021            .create_table(create_request, bytes::Bytes::from(ipc_data))
1022            .await
1023            .unwrap();
1024
1025        // Verify it exists
1026        let mut exists_request = TableExistsRequest::new();
1027        exists_request.id = Some(vec!["table_to_drop".to_string()]);
1028        assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
1029
1030        // Drop the table
1031        let mut drop_request = DropTableRequest::new();
1032        drop_request.id = Some(vec!["table_to_drop".to_string()]);
1033        let response = namespace.drop_table(drop_request).await.unwrap();
1034        assert!(response.location.is_some());
1035
1036        // Verify it no longer exists
1037        assert!(namespace.table_exists(exists_request).await.is_err());
1038    }
1039
1040    #[tokio::test]
1041    async fn test_drop_nonexistent_table() {
1042        let (namespace, _temp_dir) = create_test_namespace().await;
1043
1044        let mut request = DropTableRequest::new();
1045        request.id = Some(vec!["nonexistent".to_string()]);
1046
1047        // Should not fail when dropping non-existent table (idempotent)
1048        let result = namespace.drop_table(request).await;
1049        // The operation might succeed or fail depending on implementation
1050        // But it should not panic
1051        let _ = result;
1052    }
1053
1054    #[tokio::test]
1055    async fn test_root_namespace_operations() {
1056        let (namespace, _temp_dir) = create_test_namespace().await;
1057
1058        // Test list_namespaces - should return empty list for root
1059        let request = ListNamespacesRequest::new();
1060        let result = namespace.list_namespaces(request).await;
1061        assert!(result.is_ok());
1062        assert_eq!(result.unwrap().namespaces.len(), 0);
1063
1064        // Test describe_namespace - should succeed for root
1065        let request = DescribeNamespaceRequest::new();
1066        let result = namespace.describe_namespace(request).await;
1067        assert!(result.is_ok());
1068
1069        // Test namespace_exists - root always exists
1070        let request = NamespaceExistsRequest::new();
1071        let result = namespace.namespace_exists(request).await;
1072        assert!(result.is_ok());
1073
1074        // Test create_namespace - root cannot be created
1075        let request = CreateNamespaceRequest::new();
1076        let result = namespace.create_namespace(request).await;
1077        assert!(result.is_err());
1078        assert!(result.unwrap_err().to_string().contains("already exists"));
1079
1080        // Test drop_namespace - root cannot be dropped
1081        let request = DropNamespaceRequest::new();
1082        let result = namespace.drop_namespace(request).await;
1083        assert!(result.is_err());
1084        assert!(result
1085            .unwrap_err()
1086            .to_string()
1087            .contains("cannot be dropped"));
1088    }
1089
1090    #[tokio::test]
1091    async fn test_non_root_namespace_operations() {
1092        let (namespace, _temp_dir) = create_test_namespace().await;
1093
1094        // Test create_namespace for non-root - not supported
1095        let mut request = CreateNamespaceRequest::new();
1096        request.id = Some(vec!["child".to_string()]);
1097        let result = namespace.create_namespace(request).await;
1098        assert!(matches!(result, Err(Error::NotSupported { .. })));
1099
1100        // Test namespace_exists for non-root - should not exist
1101        let mut request = NamespaceExistsRequest::new();
1102        request.id = Some(vec!["child".to_string()]);
1103        let result = namespace.namespace_exists(request).await;
1104        assert!(result.is_err());
1105        assert!(result
1106            .unwrap_err()
1107            .to_string()
1108            .contains("Only root namespace exists"));
1109
1110        // Test drop_namespace for non-root - not supported
1111        let mut request = DropNamespaceRequest::new();
1112        request.id = Some(vec!["child".to_string()]);
1113        let result = namespace.drop_namespace(request).await;
1114        assert!(matches!(result, Err(Error::NotSupported { .. })));
1115    }
1116
1117    #[tokio::test]
1118    async fn test_config_custom_root() {
1119        let temp_dir = TempStdDir::default();
1120        let custom_path = temp_dir.join("custom");
1121        std::fs::create_dir(&custom_path).unwrap();
1122
1123        let namespace = DirectoryNamespaceBuilder::new(custom_path.to_string_lossy().to_string())
1124            .build()
1125            .await
1126            .unwrap();
1127
1128        // Create test IPC data
1129        let schema = create_test_schema();
1130        let ipc_data = create_test_ipc_data(&schema);
1131
1132        // Create a table and verify location
1133        let mut request = CreateTableRequest::new();
1134        request.id = Some(vec!["test_table".to_string()]);
1135
1136        let response = namespace
1137            .create_table(request, bytes::Bytes::from(ipc_data))
1138            .await
1139            .unwrap();
1140
1141        assert!(response.location.unwrap().contains("custom"));
1142    }
1143
1144    #[tokio::test]
1145    async fn test_config_storage_options() {
1146        let temp_dir = TempStdDir::default();
1147
1148        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1149            .storage_option("option1", "value1")
1150            .storage_option("option2", "value2")
1151            .build()
1152            .await
1153            .unwrap();
1154
1155        // Create test IPC data
1156        let schema = create_test_schema();
1157        let ipc_data = create_test_ipc_data(&schema);
1158
1159        // Create a table and check storage options are included
1160        let mut request = CreateTableRequest::new();
1161        request.id = Some(vec!["test_table".to_string()]);
1162
1163        let response = namespace
1164            .create_table(request, bytes::Bytes::from(ipc_data))
1165            .await
1166            .unwrap();
1167
1168        let storage_options = response.storage_options.unwrap();
1169        assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
1170        assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
1171    }
1172
1173    #[tokio::test]
1174    async fn test_various_arrow_types() {
1175        let (namespace, _temp_dir) = create_test_namespace().await;
1176
1177        // Create schema with various types
1178        let fields = vec![
1179            JsonArrowField {
1180                name: "bool_col".to_string(),
1181                r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
1182                nullable: true,
1183                metadata: None,
1184            },
1185            JsonArrowField {
1186                name: "int8_col".to_string(),
1187                r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
1188                nullable: true,
1189                metadata: None,
1190            },
1191            JsonArrowField {
1192                name: "float64_col".to_string(),
1193                r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
1194                nullable: true,
1195                metadata: None,
1196            },
1197            JsonArrowField {
1198                name: "binary_col".to_string(),
1199                r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
1200                nullable: true,
1201                metadata: None,
1202            },
1203        ];
1204
1205        let schema = JsonArrowSchema {
1206            fields,
1207            metadata: None,
1208        };
1209
1210        // Create IPC data
1211        let ipc_data = create_test_ipc_data(&schema);
1212
1213        let mut request = CreateTableRequest::new();
1214        request.id = Some(vec!["complex_table".to_string()]);
1215
1216        let response = namespace
1217            .create_table(request, bytes::Bytes::from(ipc_data))
1218            .await
1219            .unwrap();
1220
1221        assert!(response.location.is_some());
1222    }
1223
1224    #[tokio::test]
1225    async fn test_connect_dir() {
1226        let temp_dir = TempStdDir::default();
1227
1228        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1229            .build()
1230            .await
1231            .unwrap();
1232
1233        // Test basic operation through the concrete type
1234        let request = ListTablesRequest::new();
1235        let response = namespace.list_tables(request).await.unwrap();
1236        assert_eq!(response.tables.len(), 0);
1237    }
1238
1239    #[tokio::test]
1240    async fn test_create_table_with_ipc_data() {
1241        use arrow::array::{Int32Array, StringArray};
1242        use arrow::ipc::writer::StreamWriter;
1243
1244        let (namespace, _temp_dir) = create_test_namespace().await;
1245
1246        // Create a schema with some fields
1247        let schema = create_test_schema();
1248
1249        // Create some test data that matches the schema
1250        let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
1251        let arrow_schema = Arc::new(arrow_schema);
1252
1253        // Create a RecordBatch with actual data
1254        let id_array = Int32Array::from(vec![1, 2, 3]);
1255        let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
1256        let batch = arrow::record_batch::RecordBatch::try_new(
1257            arrow_schema.clone(),
1258            vec![Arc::new(id_array), Arc::new(name_array)],
1259        )
1260        .unwrap();
1261
1262        // Write the batch to an IPC stream
1263        let mut buffer = Vec::new();
1264        {
1265            let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
1266            writer.write(&batch).unwrap();
1267            writer.finish().unwrap();
1268        }
1269
1270        // Create table with the IPC data
1271        let mut request = CreateTableRequest::new();
1272        request.id = Some(vec!["test_table_with_data".to_string()]);
1273
1274        let response = namespace
1275            .create_table(request, Bytes::from(buffer))
1276            .await
1277            .unwrap();
1278
1279        assert_eq!(response.version, Some(1));
1280        assert!(response
1281            .location
1282            .unwrap()
1283            .contains("test_table_with_data.lance"));
1284
1285        // Verify table exists
1286        let mut exists_request = TableExistsRequest::new();
1287        exists_request.id = Some(vec!["test_table_with_data".to_string()]);
1288        namespace.table_exists(exists_request).await.unwrap();
1289    }
1290
1291    #[tokio::test]
1292    async fn test_create_empty_table() {
1293        let (namespace, temp_dir) = create_test_namespace().await;
1294
1295        let mut request = CreateEmptyTableRequest::new();
1296        request.id = Some(vec!["empty_table".to_string()]);
1297
1298        let response = namespace.create_empty_table(request).await.unwrap();
1299
1300        assert!(response.location.is_some());
1301        assert!(response.location.unwrap().ends_with("empty_table.lance"));
1302
1303        // Verify the .lance-reserved file was created in the correct location
1304        let table_dir = temp_dir.join("empty_table.lance");
1305        assert!(table_dir.exists());
1306        assert!(table_dir.is_dir());
1307
1308        let reserved_file = table_dir.join(".lance-reserved");
1309        assert!(reserved_file.exists());
1310        assert!(reserved_file.is_file());
1311
1312        // Verify file is empty
1313        let metadata = std::fs::metadata(&reserved_file).unwrap();
1314        assert_eq!(metadata.len(), 0);
1315
1316        // Verify table exists by checking for .lance-reserved file
1317        let mut exists_request = TableExistsRequest::new();
1318        exists_request.id = Some(vec!["empty_table".to_string()]);
1319        namespace.table_exists(exists_request).await.unwrap();
1320
1321        // List tables should include the empty table
1322        let list_request = ListTablesRequest::new();
1323        let list_response = namespace.list_tables(list_request).await.unwrap();
1324        assert!(list_response.tables.contains(&"empty_table".to_string()));
1325
1326        // Verify describe table works for empty table
1327        let mut describe_request = DescribeTableRequest::new();
1328        describe_request.id = Some(vec!["empty_table".to_string()]);
1329        let describe_response = namespace.describe_table(describe_request).await.unwrap();
1330        assert!(describe_response.location.is_some());
1331        assert!(describe_response.location.unwrap().contains("empty_table"));
1332    }
1333
1334    #[tokio::test]
1335    async fn test_create_empty_table_with_wrong_location() {
1336        let (namespace, _temp_dir) = create_test_namespace().await;
1337
1338        let mut request = CreateEmptyTableRequest::new();
1339        request.id = Some(vec!["test_table".to_string()]);
1340        request.location = Some("/wrong/path/table.lance".to_string());
1341
1342        let result = namespace.create_empty_table(request).await;
1343        assert!(result.is_err());
1344        assert!(result
1345            .unwrap_err()
1346            .to_string()
1347            .contains("must be at location"));
1348    }
1349
1350    #[tokio::test]
1351    async fn test_create_empty_table_then_drop() {
1352        let (namespace, temp_dir) = create_test_namespace().await;
1353
1354        // Create an empty table
1355        let mut create_request = CreateEmptyTableRequest::new();
1356        create_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1357
1358        let create_response = namespace.create_empty_table(create_request).await.unwrap();
1359        assert!(create_response.location.is_some());
1360
1361        // Verify it exists
1362        let table_dir = temp_dir.join("empty_table_to_drop.lance");
1363        assert!(table_dir.exists());
1364        let reserved_file = table_dir.join(".lance-reserved");
1365        assert!(reserved_file.exists());
1366
1367        // Drop the table
1368        let mut drop_request = DropTableRequest::new();
1369        drop_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1370        let drop_response = namespace.drop_table(drop_request).await.unwrap();
1371        assert!(drop_response.location.is_some());
1372
1373        // Verify table directory was removed
1374        assert!(!table_dir.exists());
1375        assert!(!reserved_file.exists());
1376
1377        // Verify table no longer exists
1378        let mut exists_request = TableExistsRequest::new();
1379        exists_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1380        let exists_result = namespace.table_exists(exists_request).await;
1381        assert!(exists_result.is_err());
1382    }
1383}