Skip to main content

lance_namespace_impls/
dir.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Directory-based Lance Namespace implementation.
5//!
6//! This module provides a directory-based implementation of the Lance namespace
7//! that stores tables as Lance datasets in a filesystem directory structure.
8
9pub mod manifest;
10
11use arrow::record_batch::RecordBatchIterator;
12use arrow_ipc::reader::StreamReader;
13use async_trait::async_trait;
14use bytes::Bytes;
15use lance::dataset::{Dataset, WriteParams};
16use lance::session::Session;
17use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
18use object_store::path::Path;
19use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, PutMode, PutOptions};
20use std::collections::HashMap;
21use std::io::Cursor;
22use std::sync::Arc;
23
24use crate::context::DynamicContextProvider;
25use lance_namespace::models::{
26    CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
27    CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DeclareTableRequest,
28    DeclareTableResponse, DescribeNamespaceRequest, DescribeNamespaceResponse,
29    DescribeTableRequest, DescribeTableResponse, DropNamespaceRequest, DropNamespaceResponse,
30    DropTableRequest, DropTableResponse, Identity, ListNamespacesRequest, ListNamespacesResponse,
31    ListTablesRequest, ListTablesResponse, NamespaceExistsRequest, TableExistsRequest,
32};
33
34use lance_core::{box_error, Error, Result};
35use lance_namespace::schema::arrow_schema_to_json;
36use lance_namespace::LanceNamespace;
37
38use crate::credentials::{
39    create_credential_vendor_for_location, has_credential_vendor_config, CredentialVendor,
40};
41
42/// Result of checking table status atomically.
43///
44/// This struct captures the state of a table directory in a single snapshot,
45/// avoiding race conditions between checking existence and other status flags.
46pub(crate) struct TableStatus {
47    /// Whether the table directory exists (has any files)
48    pub(crate) exists: bool,
49    /// Whether the table has a `.lance-deregistered` marker file
50    pub(crate) is_deregistered: bool,
51    /// Whether the table has a `.lance-reserved` marker file (declared but not written)
52    pub(crate) has_reserved_file: bool,
53}
54
55/// Builder for creating a DirectoryNamespace.
56///
57/// This builder provides a fluent API for configuring and establishing
58/// connections to directory-based Lance namespaces.
59///
60/// # Examples
61///
62/// ```no_run
63/// # use lance_namespace_impls::DirectoryNamespaceBuilder;
64/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
65/// // Create a local directory namespace
66/// let namespace = DirectoryNamespaceBuilder::new("/path/to/data")
67///     .build()
68///     .await?;
69/// # Ok(())
70/// # }
71/// ```
72///
73/// ```no_run
74/// # use lance_namespace_impls::DirectoryNamespaceBuilder;
75/// # use lance::session::Session;
76/// # use std::sync::Arc;
77/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
78/// // Create with custom storage options and session
79/// let session = Arc::new(Session::default());
80/// let namespace = DirectoryNamespaceBuilder::new("s3://bucket/path")
81///     .storage_option("region", "us-west-2")
82///     .storage_option("access_key_id", "key")
83///     .session(session)
84///     .build()
85///     .await?;
86/// # Ok(())
87/// # }
88/// ```
89#[derive(Clone)]
90pub struct DirectoryNamespaceBuilder {
91    root: String,
92    storage_options: Option<HashMap<String, String>>,
93    session: Option<Arc<Session>>,
94    manifest_enabled: bool,
95    dir_listing_enabled: bool,
96    inline_optimization_enabled: bool,
97    credential_vendor_properties: HashMap<String, String>,
98    context_provider: Option<Arc<dyn DynamicContextProvider>>,
99}
100
101impl std::fmt::Debug for DirectoryNamespaceBuilder {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        f.debug_struct("DirectoryNamespaceBuilder")
104            .field("root", &self.root)
105            .field("storage_options", &self.storage_options)
106            .field("manifest_enabled", &self.manifest_enabled)
107            .field("dir_listing_enabled", &self.dir_listing_enabled)
108            .field(
109                "inline_optimization_enabled",
110                &self.inline_optimization_enabled,
111            )
112            .field(
113                "context_provider",
114                &self.context_provider.as_ref().map(|_| "Some(...)"),
115            )
116            .finish()
117    }
118}
119
120impl DirectoryNamespaceBuilder {
121    /// Create a new DirectoryNamespaceBuilder with the specified root path.
122    ///
123    /// # Arguments
124    ///
125    /// * `root` - Root directory path (local path or cloud URI like s3://bucket/path)
126    pub fn new(root: impl Into<String>) -> Self {
127        Self {
128            root: root.into().trim_end_matches('/').to_string(),
129            storage_options: None,
130            session: None,
131            manifest_enabled: true,
132            dir_listing_enabled: true, // Default to enabled for backwards compatibility
133            inline_optimization_enabled: true,
134            credential_vendor_properties: HashMap::new(),
135            context_provider: None,
136        }
137    }
138
139    /// Enable or disable manifest-based listing.
140    ///
141    /// When enabled (default), the namespace uses a `__manifest` table to track tables.
142    /// When disabled, relies solely on directory scanning.
143    pub fn manifest_enabled(mut self, enabled: bool) -> Self {
144        self.manifest_enabled = enabled;
145        self
146    }
147
148    /// Enable or disable directory-based listing fallback.
149    ///
150    /// When enabled (default), falls back to directory scanning for tables not in the manifest.
151    /// When disabled, only consults the manifest table.
152    pub fn dir_listing_enabled(mut self, enabled: bool) -> Self {
153        self.dir_listing_enabled = enabled;
154        self
155    }
156
157    /// Enable or disable inline optimization of the __manifest table.
158    ///
159    /// When enabled (default), performs compaction and indexing on the __manifest table
160    /// after every write operation to maintain optimal performance.
161    /// When disabled, manual optimization must be performed separately.
162    pub fn inline_optimization_enabled(mut self, enabled: bool) -> Self {
163        self.inline_optimization_enabled = enabled;
164        self
165    }
166
167    /// Create a DirectoryNamespaceBuilder from properties HashMap.
168    ///
169    /// This method parses a properties map into builder configuration.
170    /// It expects:
171    /// - `root`: The root directory path (required)
172    /// - `manifest_enabled`: Enable manifest-based table tracking (optional, default: true)
173    /// - `dir_listing_enabled`: Enable directory listing for table discovery (optional, default: true)
174    /// - `inline_optimization_enabled`: Enable inline optimization of __manifest table (optional, default: true)
175    /// - `storage.*`: Storage options (optional, prefix will be stripped)
176    ///
177    /// Credential vendor properties (prefixed with `credential_vendor.`, prefix is stripped):
178    /// - `credential_vendor.enabled`: Set to "true" to enable credential vending (required)
179    /// - `credential_vendor.permission`: Permission level: read, write, or admin (default: read)
180    ///
181    /// AWS-specific properties (for s3:// locations):
182    /// - `credential_vendor.aws_role_arn`: AWS IAM role ARN (required for AWS)
183    /// - `credential_vendor.aws_external_id`: AWS external ID (optional)
184    /// - `credential_vendor.aws_region`: AWS region (optional)
185    /// - `credential_vendor.aws_role_session_name`: AWS role session name (optional)
186    /// - `credential_vendor.aws_duration_millis`: Credential duration in ms (default: 3600000, range: 15min-12hrs)
187    ///
188    /// GCP-specific properties (for gs:// locations):
189    /// - `credential_vendor.gcp_service_account`: Service account to impersonate (optional)
190    ///
191    /// Note: GCP uses Application Default Credentials (ADC). To use a service account key file,
192    /// set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable before starting.
193    /// GCP token duration cannot be configured; it's determined by the STS endpoint (typically 1 hour).
194    ///
195    /// Azure-specific properties (for az:// locations):
196    /// - `credential_vendor.azure_account_name`: Azure storage account name (required for Azure)
197    /// - `credential_vendor.azure_tenant_id`: Azure tenant ID (optional)
198    /// - `credential_vendor.azure_duration_millis`: Credential duration in ms (default: 3600000, up to 7 days)
199    ///
200    /// # Arguments
201    ///
202    /// * `properties` - Configuration properties
203    /// * `session` - Optional Lance session to reuse object store registry
204    ///
205    /// # Returns
206    ///
207    /// Returns a `DirectoryNamespaceBuilder` instance.
208    ///
209    /// # Errors
210    ///
211    /// Returns an error if the `root` property is missing.
212    ///
213    /// # Examples
214    ///
215    /// ```no_run
216    /// # use lance_namespace_impls::DirectoryNamespaceBuilder;
217    /// # use std::collections::HashMap;
218    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
219    /// let mut properties = HashMap::new();
220    /// properties.insert("root".to_string(), "/path/to/data".to_string());
221    /// properties.insert("manifest_enabled".to_string(), "true".to_string());
222    /// properties.insert("dir_listing_enabled".to_string(), "false".to_string());
223    /// properties.insert("storage.region".to_string(), "us-west-2".to_string());
224    ///
225    /// let namespace = DirectoryNamespaceBuilder::from_properties(properties, None)?
226    ///     .build()
227    ///     .await?;
228    /// # Ok(())
229    /// # }
230    /// ```
231    pub fn from_properties(
232        properties: HashMap<String, String>,
233        session: Option<Arc<Session>>,
234    ) -> Result<Self> {
235        // Extract root from properties (required)
236        let root = properties
237            .get("root")
238            .cloned()
239            .ok_or_else(|| Error::Namespace {
240                source: "Missing required property 'root' for directory namespace".into(),
241                location: snafu::location!(),
242            })?;
243
244        // Extract storage options (properties prefixed with "storage.")
245        let storage_options: HashMap<String, String> = properties
246            .iter()
247            .filter_map(|(k, v)| {
248                k.strip_prefix("storage.")
249                    .map(|key| (key.to_string(), v.clone()))
250            })
251            .collect();
252
253        let storage_options = if storage_options.is_empty() {
254            None
255        } else {
256            Some(storage_options)
257        };
258
259        // Extract manifest_enabled (default: true)
260        let manifest_enabled = properties
261            .get("manifest_enabled")
262            .and_then(|v| v.parse::<bool>().ok())
263            .unwrap_or(true);
264
265        // Extract dir_listing_enabled (default: true)
266        let dir_listing_enabled = properties
267            .get("dir_listing_enabled")
268            .and_then(|v| v.parse::<bool>().ok())
269            .unwrap_or(true);
270
271        // Extract inline_optimization_enabled (default: true)
272        let inline_optimization_enabled = properties
273            .get("inline_optimization_enabled")
274            .and_then(|v| v.parse::<bool>().ok())
275            .unwrap_or(true);
276
277        // Extract credential vendor properties (properties prefixed with "credential_vendor.")
278        // The prefix is stripped to get short property names
279        // The build() method will check if enabled=true before creating the vendor
280        let credential_vendor_properties: HashMap<String, String> = properties
281            .iter()
282            .filter_map(|(k, v)| {
283                k.strip_prefix("credential_vendor.")
284                    .map(|key| (key.to_string(), v.clone()))
285            })
286            .collect();
287
288        Ok(Self {
289            root: root.trim_end_matches('/').to_string(),
290            storage_options,
291            session,
292            manifest_enabled,
293            dir_listing_enabled,
294            inline_optimization_enabled,
295            credential_vendor_properties,
296            context_provider: None,
297        })
298    }
299
300    /// Add a storage option.
301    ///
302    /// # Arguments
303    ///
304    /// * `key` - Storage option key (e.g., "region", "access_key_id")
305    /// * `value` - Storage option value
306    pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
307        self.storage_options
308            .get_or_insert_with(HashMap::new)
309            .insert(key.into(), value.into());
310        self
311    }
312
313    /// Add multiple storage options.
314    ///
315    /// # Arguments
316    ///
317    /// * `options` - HashMap of storage options to add
318    pub fn storage_options(mut self, options: HashMap<String, String>) -> Self {
319        self.storage_options
320            .get_or_insert_with(HashMap::new)
321            .extend(options);
322        self
323    }
324
325    /// Set the Lance session to use for this namespace.
326    ///
327    /// When a session is provided, the namespace will reuse the session's
328    /// object store registry, allowing multiple namespaces and datasets
329    /// to share the same underlying storage connections.
330    ///
331    /// # Arguments
332    ///
333    /// * `session` - Arc-wrapped Lance session
334    pub fn session(mut self, session: Arc<Session>) -> Self {
335        self.session = Some(session);
336        self
337    }
338
339    /// Add a credential vendor property.
340    ///
341    /// Use short property names without the `credential_vendor.` prefix.
342    /// Common properties: `enabled`, `permission`.
343    /// AWS properties: `aws_role_arn`, `aws_external_id`, `aws_region`, `aws_role_session_name`, `aws_duration_millis`.
344    /// GCP properties: `gcp_service_account`.
345    /// Azure properties: `azure_account_name`, `azure_tenant_id`, `azure_duration_millis`.
346    ///
347    /// # Arguments
348    ///
349    /// * `key` - Property key (e.g., "enabled", "aws_role_arn")
350    /// * `value` - Property value
351    ///
352    /// # Example
353    ///
354    /// ```no_run
355    /// # use lance_namespace_impls::DirectoryNamespaceBuilder;
356    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
357    /// let namespace = DirectoryNamespaceBuilder::new("s3://my-bucket/data")
358    ///     .credential_vendor_property("enabled", "true")
359    ///     .credential_vendor_property("aws_role_arn", "arn:aws:iam::123456789012:role/MyRole")
360    ///     .credential_vendor_property("permission", "read")
361    ///     .build()
362    ///     .await?;
363    /// # Ok(())
364    /// # }
365    /// ```
366    pub fn credential_vendor_property(
367        mut self,
368        key: impl Into<String>,
369        value: impl Into<String>,
370    ) -> Self {
371        self.credential_vendor_properties
372            .insert(key.into(), value.into());
373        self
374    }
375
376    /// Add multiple credential vendor properties.
377    ///
378    /// Use short property names without the `credential_vendor.` prefix.
379    ///
380    /// # Arguments
381    ///
382    /// * `properties` - HashMap of credential vendor properties to add
383    pub fn credential_vendor_properties(mut self, properties: HashMap<String, String>) -> Self {
384        self.credential_vendor_properties.extend(properties);
385        self
386    }
387
388    /// Set a dynamic context provider for per-request context.
389    ///
390    /// The provider can be used to generate additional context for operations.
391    /// For DirectoryNamespace, the context is stored but not directly used
392    /// in operations (unlike RestNamespace where it's converted to HTTP headers).
393    ///
394    /// # Arguments
395    ///
396    /// * `provider` - The context provider implementation
397    pub fn context_provider(mut self, provider: Arc<dyn DynamicContextProvider>) -> Self {
398        self.context_provider = Some(provider);
399        self
400    }
401
402    /// Build the DirectoryNamespace.
403    ///
404    /// # Returns
405    ///
406    /// Returns a `DirectoryNamespace` instance.
407    ///
408    /// # Errors
409    ///
410    /// Returns an error if:
411    /// - The root path is invalid
412    /// - Connection to the storage backend fails
413    /// - Storage options are invalid
414    pub async fn build(self) -> Result<DirectoryNamespace> {
415        let (object_store, base_path) =
416            Self::initialize_object_store(&self.root, &self.storage_options, &self.session).await?;
417
418        let manifest_ns = if self.manifest_enabled {
419            match manifest::ManifestNamespace::from_directory(
420                self.root.clone(),
421                self.storage_options.clone(),
422                self.session.clone(),
423                object_store.clone(),
424                base_path.clone(),
425                self.dir_listing_enabled,
426                self.inline_optimization_enabled,
427            )
428            .await
429            {
430                Ok(ns) => Some(Arc::new(ns)),
431                Err(e) => {
432                    // Failed to initialize manifest namespace, fall back to directory listing only
433                    log::warn!(
434                        "Failed to initialize manifest namespace, falling back to directory listing only: {}",
435                        e
436                    );
437                    None
438                }
439            }
440        } else {
441            None
442        };
443
444        // Create credential vendor once during initialization if enabled
445        let credential_vendor = if has_credential_vendor_config(&self.credential_vendor_properties)
446        {
447            create_credential_vendor_for_location(&self.root, &self.credential_vendor_properties)
448                .await?
449                .map(Arc::from)
450        } else {
451            None
452        };
453
454        Ok(DirectoryNamespace {
455            root: self.root,
456            storage_options: self.storage_options,
457            session: self.session,
458            object_store,
459            base_path,
460            manifest_ns,
461            dir_listing_enabled: self.dir_listing_enabled,
462            credential_vendor,
463            context_provider: self.context_provider,
464        })
465    }
466
467    /// Initialize the Lance ObjectStore based on the configuration
468    async fn initialize_object_store(
469        root: &str,
470        storage_options: &Option<HashMap<String, String>>,
471        session: &Option<Arc<Session>>,
472    ) -> Result<(Arc<ObjectStore>, Path)> {
473        // Build ObjectStoreParams from storage options
474        let accessor = storage_options.clone().map(|opts| {
475            Arc::new(lance_io::object_store::StorageOptionsAccessor::with_static_options(opts))
476        });
477        let params = ObjectStoreParams {
478            storage_options_accessor: accessor,
479            ..Default::default()
480        };
481
482        // Use object store registry from session if provided, otherwise create a new one
483        let registry = if let Some(session) = session {
484            session.store_registry()
485        } else {
486            Arc::new(ObjectStoreRegistry::default())
487        };
488
489        // Use Lance's object store factory to create from URI
490        let (object_store, base_path) = ObjectStore::from_uri_and_params(registry, root, &params)
491            .await
492            .map_err(|e| Error::Namespace {
493                source: format!("Failed to create object store: {}", e).into(),
494                location: snafu::location!(),
495            })?;
496
497        Ok((object_store, base_path))
498    }
499}
500
501/// Directory-based implementation of Lance Namespace.
502///
503/// This implementation stores tables as Lance datasets in a directory structure.
504/// It supports local filesystems and cloud storage backends through Lance's object store.
505///
506/// ## Manifest-based Listing
507///
508/// When `manifest_enabled=true`, the namespace uses a special `__manifest` Lance table to track tables
509/// instead of scanning the filesystem. This provides:
510/// - Better performance for listing operations
511/// - Ability to track table metadata
512/// - Foundation for future features like namespaces and table renaming
513///
514/// When `dir_listing_enabled=true`, the namespace falls back to directory scanning for tables not
515/// found in the manifest, enabling gradual migration.
516///
517/// ## Credential Vending
518///
519/// When credential vendor properties are configured, `describe_table` will vend temporary
520/// credentials based on the table location URI. The vendor type is auto-selected:
521/// - `s3://` locations use AWS STS AssumeRole
522/// - `gs://` locations use GCP OAuth2 tokens
523/// - `az://` locations use Azure SAS tokens
524pub struct DirectoryNamespace {
525    root: String,
526    storage_options: Option<HashMap<String, String>>,
527    #[allow(dead_code)]
528    session: Option<Arc<Session>>,
529    object_store: Arc<ObjectStore>,
530    base_path: Path,
531    manifest_ns: Option<Arc<manifest::ManifestNamespace>>,
532    dir_listing_enabled: bool,
533    /// Credential vendor created once during initialization.
534    /// Used to vend temporary credentials for table access.
535    credential_vendor: Option<Arc<dyn CredentialVendor>>,
536    /// Dynamic context provider for per-request context.
537    /// Stored but not directly used in operations (available for future extensions).
538    #[allow(dead_code)]
539    context_provider: Option<Arc<dyn DynamicContextProvider>>,
540}
541
542impl std::fmt::Debug for DirectoryNamespace {
543    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
544        write!(f, "{}", self.namespace_id())
545    }
546}
547
548impl std::fmt::Display for DirectoryNamespace {
549    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
550        write!(f, "{}", self.namespace_id())
551    }
552}
553
554impl DirectoryNamespace {
555    /// Apply pagination to a list of table names
556    ///
557    /// Sorts the list alphabetically and applies pagination using page_token (start_after) and limit.
558    ///
559    /// # Arguments
560    /// * `names` - The vector of table names to paginate
561    /// * `page_token` - Skip items until finding one greater than this value (start_after semantics)
562    /// * `limit` - Maximum number of items to keep
563    fn apply_pagination(names: &mut Vec<String>, page_token: Option<String>, limit: Option<i32>) {
564        // Sort alphabetically for consistent ordering
565        names.sort();
566
567        // Apply page_token filtering (start_after semantics)
568        if let Some(start_after) = page_token {
569            if let Some(index) = names
570                .iter()
571                .position(|name| name.as_str() > start_after.as_str())
572            {
573                names.drain(0..index);
574            } else {
575                names.clear();
576            }
577        }
578
579        // Apply limit
580        if let Some(limit) = limit {
581            if limit >= 0 {
582                names.truncate(limit as usize);
583            }
584        }
585    }
586
587    /// List tables using directory scanning (fallback method)
588    async fn list_directory_tables(&self) -> Result<Vec<String>> {
589        let mut tables = Vec::new();
590        let entries = self
591            .object_store
592            .read_dir(self.base_path.clone())
593            .await
594            .map_err(|e| Error::IO {
595                source: box_error(std::io::Error::other(format!(
596                    "Failed to list directory: {}",
597                    e
598                ))),
599                location: snafu::location!(),
600            })?;
601
602        for entry in entries {
603            let path = entry.trim_end_matches('/');
604            if !path.ends_with(".lance") {
605                continue;
606            }
607
608            let table_name = &path[..path.len() - 6];
609
610            // Use atomic check to skip deregistered tables and declared-but-not-written tables
611            let status = self.check_table_status(table_name).await;
612            if status.is_deregistered || status.has_reserved_file {
613                continue;
614            }
615
616            tables.push(table_name.to_string());
617        }
618
619        Ok(tables)
620    }
621
622    /// Validate that the namespace ID represents the root namespace
623    fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
624        if let Some(id) = id {
625            if !id.is_empty() {
626                return Err(Error::Namespace {
627                    source: format!(
628                        "Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
629                        id
630                    ).into(),
631                    location: snafu::location!(),
632                });
633            }
634        }
635        Ok(())
636    }
637
638    /// Extract table name from table ID
639    fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
640        let id = id.as_ref().ok_or_else(|| Error::Namespace {
641            source: "Directory namespace table ID cannot be empty".into(),
642            location: snafu::location!(),
643        })?;
644
645        if id.len() != 1 {
646            return Err(Error::Namespace {
647                source: format!(
648                    "Multi-level table IDs are only supported when manifest mode is enabled, but got: {:?}",
649                    id
650                )
651                .into(),
652                location: snafu::location!(),
653            });
654        }
655
656        Ok(id[0].clone())
657    }
658
659    /// Get the full URI path for a table (for returning in responses)
660    fn table_full_uri(&self, table_name: &str) -> String {
661        format!("{}/{}.lance", &self.root, table_name)
662    }
663
664    /// Get the object store path for a table (relative to base_path)
665    fn table_path(&self, table_name: &str) -> Path {
666        self.base_path
667            .child(format!("{}.lance", table_name).as_str())
668    }
669
670    /// Get the reserved file path for a table
671    fn table_reserved_file_path(&self, table_name: &str) -> Path {
672        self.base_path
673            .child(format!("{}.lance", table_name).as_str())
674            .child(".lance-reserved")
675    }
676
677    /// Get the deregistered marker file path for a table
678    fn table_deregistered_file_path(&self, table_name: &str) -> Path {
679        self.base_path
680            .child(format!("{}.lance", table_name).as_str())
681            .child(".lance-deregistered")
682    }
683
684    /// Atomically check table existence and deregistration status.
685    ///
686    /// This performs a single directory listing to get a consistent snapshot of the
687    /// table's state, avoiding race conditions between checking existence and
688    /// checking deregistration status.
689    pub(crate) async fn check_table_status(&self, table_name: &str) -> TableStatus {
690        let table_path = self.table_path(table_name);
691        match self.object_store.read_dir(table_path).await {
692            Ok(entries) => {
693                let exists = !entries.is_empty();
694                let is_deregistered = entries.iter().any(|e| e.ends_with(".lance-deregistered"));
695                let has_reserved_file = entries.iter().any(|e| e.ends_with(".lance-reserved"));
696                TableStatus {
697                    exists,
698                    is_deregistered,
699                    has_reserved_file,
700                }
701            }
702            Err(_) => TableStatus {
703                exists: false,
704                is_deregistered: false,
705                has_reserved_file: false,
706            },
707        }
708    }
709
710    /// Atomically create a marker file using put_if_not_exists semantics.
711    ///
712    /// This uses `PutMode::Create` which will fail if the file already exists,
713    /// providing atomic creation semantics to avoid race conditions.
714    ///
715    /// Returns Ok(()) if the file was created successfully.
716    /// Returns Err with appropriate message if the file already exists or other error.
717    async fn put_marker_file_atomic(
718        &self,
719        path: &Path,
720        file_description: &str,
721    ) -> std::result::Result<(), String> {
722        let put_opts = PutOptions {
723            mode: PutMode::Create,
724            ..Default::default()
725        };
726
727        match self
728            .object_store
729            .inner
730            .put_opts(path, bytes::Bytes::new().into(), put_opts)
731            .await
732        {
733            Ok(_) => Ok(()),
734            Err(ObjectStoreError::AlreadyExists { .. })
735            | Err(ObjectStoreError::Precondition { .. }) => {
736                Err(format!("{} already exists", file_description))
737            }
738            Err(e) => Err(format!("Failed to create {}: {}", file_description, e)),
739        }
740    }
741
742    /// Get storage options for a table, using credential vending if configured.
743    ///
744    /// If credential vendor properties are configured and the table location matches
745    /// a supported cloud provider, this will create an appropriate vendor and vend
746    /// temporary credentials scoped to the table location. Otherwise, returns the
747    /// static storage options.
748    ///
749    /// The vendor type is auto-selected based on the table URI:
750    /// - `s3://` locations use AWS STS AssumeRole
751    /// - `gs://` locations use GCP OAuth2 tokens
752    /// - `az://` locations use Azure SAS tokens
753    ///
754    /// The permission level (Read, Write, Admin) is configured at namespace
755    /// initialization time via the `credential_vendor_permission` property.
756    ///
757    /// # Arguments
758    ///
759    /// * `table_uri` - The full URI of the table
760    /// * `identity` - Optional identity from the request for identity-based credential vending
761    async fn get_storage_options_for_table(
762        &self,
763        table_uri: &str,
764        identity: Option<&Identity>,
765    ) -> Result<Option<HashMap<String, String>>> {
766        if let Some(ref vendor) = self.credential_vendor {
767            let vended = vendor.vend_credentials(table_uri, identity).await?;
768            return Ok(Some(vended.storage_options));
769        }
770        Ok(self.storage_options.clone())
771    }
772
773    /// Migrate directory-based tables to the manifest.
774    ///
775    /// This is a one-time migration operation that:
776    /// 1. Scans the directory for existing `.lance` tables
777    /// 2. Registers any unmigrated tables in the manifest
778    /// 3. Returns the count of tables that were migrated
779    ///
780    /// This method is safe to run multiple times - it will skip tables that are already
781    /// registered in the manifest.
782    ///
783    /// # Usage
784    ///
785    /// After creating tables in directory-only mode or dual mode, you can migrate them
786    /// to the manifest to enable manifest-only mode:
787    ///
788    /// ```no_run
789    /// # use lance_namespace_impls::DirectoryNamespaceBuilder;
790    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
791    /// // Create namespace with dual mode (manifest + directory listing)
792    /// let namespace = DirectoryNamespaceBuilder::new("/path/to/data")
793    ///     .manifest_enabled(true)
794    ///     .dir_listing_enabled(true)
795    ///     .build()
796    ///     .await?;
797    ///
798    /// // ... tables are created and used ...
799    ///
800    /// // Migrate existing directory tables to manifest
801    /// let migrated_count = namespace.migrate().await?;
802    /// println!("Migrated {} tables", migrated_count);
803    ///
804    /// // Now you can disable directory listing for better performance:
805    /// // (requires rebuilding the namespace)
806    /// let namespace = DirectoryNamespaceBuilder::new("/path/to/data")
807    ///     .manifest_enabled(true)
808    ///     .dir_listing_enabled(false)  // All tables now in manifest
809    ///     .build()
810    ///     .await?;
811    /// # Ok(())
812    /// # }
813    /// ```
814    ///
815    /// # Returns
816    ///
817    /// Returns the number of tables that were migrated to the manifest.
818    ///
819    /// # Errors
820    ///
821    /// Returns an error if:
822    /// - Manifest is not enabled
823    /// - Directory listing fails
824    /// - Manifest registration fails
825    pub async fn migrate(&self) -> Result<usize> {
826        // We only care about tables in the root namespace
827        let Some(ref manifest_ns) = self.manifest_ns else {
828            return Ok(0); // No manifest, nothing to migrate
829        };
830
831        // Get all table locations already in the manifest
832        let manifest_locations = manifest_ns.list_manifest_table_locations().await?;
833
834        // Get all tables from directory
835        let dir_tables = self.list_directory_tables().await?;
836
837        // Register each directory table that doesn't have an overlapping location
838        // If a directory name already exists in the manifest,
839        // that means the table must have already been migrated or created
840        // in the manifest, so we can skip it.
841        let mut migrated_count = 0;
842        for table_name in dir_tables {
843            // For root namespace tables, the directory name is "table_name.lance"
844            let dir_name = format!("{}.lance", table_name);
845            if !manifest_locations.contains(&dir_name) {
846                manifest_ns.register_table(&table_name, dir_name).await?;
847                migrated_count += 1;
848            }
849        }
850
851        Ok(migrated_count)
852    }
853}
854
855#[async_trait]
856impl LanceNamespace for DirectoryNamespace {
857    async fn list_namespaces(
858        &self,
859        request: ListNamespacesRequest,
860    ) -> Result<ListNamespacesResponse> {
861        if let Some(ref manifest_ns) = self.manifest_ns {
862            return manifest_ns.list_namespaces(request).await;
863        }
864
865        Self::validate_root_namespace_id(&request.id)?;
866        Ok(ListNamespacesResponse::new(vec![]))
867    }
868
869    async fn describe_namespace(
870        &self,
871        request: DescribeNamespaceRequest,
872    ) -> Result<DescribeNamespaceResponse> {
873        if let Some(ref manifest_ns) = self.manifest_ns {
874            return manifest_ns.describe_namespace(request).await;
875        }
876
877        Self::validate_root_namespace_id(&request.id)?;
878        #[allow(clippy::needless_update)]
879        Ok(DescribeNamespaceResponse {
880            properties: Some(HashMap::new()),
881            ..Default::default()
882        })
883    }
884
885    async fn create_namespace(
886        &self,
887        request: CreateNamespaceRequest,
888    ) -> Result<CreateNamespaceResponse> {
889        if let Some(ref manifest_ns) = self.manifest_ns {
890            return manifest_ns.create_namespace(request).await;
891        }
892
893        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
894            return Err(Error::Namespace {
895                source: "Root namespace already exists and cannot be created".into(),
896                location: snafu::location!(),
897            });
898        }
899
900        Err(Error::NotSupported {
901            source: "Child namespaces are only supported when manifest mode is enabled".into(),
902            location: snafu::location!(),
903        })
904    }
905
906    async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
907        if let Some(ref manifest_ns) = self.manifest_ns {
908            return manifest_ns.drop_namespace(request).await;
909        }
910
911        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
912            return Err(Error::Namespace {
913                source: "Root namespace cannot be dropped".into(),
914                location: snafu::location!(),
915            });
916        }
917
918        Err(Error::NotSupported {
919            source: "Child namespaces are only supported when manifest mode is enabled".into(),
920            location: snafu::location!(),
921        })
922    }
923
924    async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
925        if let Some(ref manifest_ns) = self.manifest_ns {
926            return manifest_ns.namespace_exists(request).await;
927        }
928
929        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
930            return Ok(());
931        }
932
933        Err(Error::Namespace {
934            source: "Child namespaces are only supported when manifest mode is enabled".into(),
935            location: snafu::location!(),
936        })
937    }
938
939    async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
940        // Validate that namespace ID is provided
941        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
942            source: "Namespace ID is required".into(),
943            location: snafu::location!(),
944        })?;
945
946        // For child namespaces, always delegate to manifest (if enabled)
947        if !namespace_id.is_empty() {
948            if let Some(ref manifest_ns) = self.manifest_ns {
949                return manifest_ns.list_tables(request).await;
950            }
951            return Err(Error::NotSupported {
952                source: "Child namespaces are only supported when manifest mode is enabled".into(),
953                location: snafu::location!(),
954            });
955        }
956
957        // When only manifest is enabled (no directory listing), delegate directly to manifest
958        if let Some(ref manifest_ns) = self.manifest_ns {
959            if !self.dir_listing_enabled {
960                return manifest_ns.list_tables(request).await;
961            }
962        }
963
964        // When both manifest and directory listing are enabled, we need to merge and deduplicate
965        let mut tables = if self.manifest_ns.is_some() && self.dir_listing_enabled {
966            // Get all manifest table locations (for deduplication)
967            let manifest_locations = if let Some(ref manifest_ns) = self.manifest_ns {
968                manifest_ns.list_manifest_table_locations().await?
969            } else {
970                std::collections::HashSet::new()
971            };
972
973            // Get all manifest tables (without pagination for merging)
974            let mut manifest_request = request.clone();
975            manifest_request.limit = None;
976            manifest_request.page_token = None;
977            let manifest_tables = if let Some(ref manifest_ns) = self.manifest_ns {
978                let manifest_response = manifest_ns.list_tables(manifest_request).await?;
979                manifest_response.tables
980            } else {
981                vec![]
982            };
983
984            // Start with all manifest table names
985            // Add directory tables that aren't already in the manifest (by location)
986            let mut all_tables: Vec<String> = manifest_tables;
987            let dir_tables = self.list_directory_tables().await?;
988            for table_name in dir_tables {
989                // Check if this table's location is already in the manifest
990                // Manifest stores full URIs, so we need to check both formats
991                let full_location = format!("{}/{}.lance", self.root, table_name);
992                let relative_location = format!("{}.lance", table_name);
993                if !manifest_locations.contains(&full_location)
994                    && !manifest_locations.contains(&relative_location)
995                {
996                    all_tables.push(table_name);
997                }
998            }
999
1000            all_tables
1001        } else {
1002            self.list_directory_tables().await?
1003        };
1004
1005        // Apply sorting and pagination
1006        Self::apply_pagination(&mut tables, request.page_token, request.limit);
1007        let response = ListTablesResponse::new(tables);
1008        Ok(response)
1009    }
1010
1011    async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1012        if let Some(ref manifest_ns) = self.manifest_ns {
1013            match manifest_ns.describe_table(request.clone()).await {
1014                Ok(mut response) => {
1015                    // Only apply identity-based credential vending when explicitly requested
1016                    if request.vend_credentials == Some(true) && self.credential_vendor.is_some() {
1017                        if let Some(ref table_uri) = response.table_uri {
1018                            let identity = request.identity.as_deref();
1019                            response.storage_options = self
1020                                .get_storage_options_for_table(table_uri, identity)
1021                                .await?;
1022                        }
1023                    } else if request.vend_credentials == Some(false) {
1024                        response.storage_options = None;
1025                    }
1026                    return Ok(response);
1027                }
1028                Err(_)
1029                    if self.dir_listing_enabled
1030                        && request.id.as_ref().is_some_and(|id| id.len() == 1) =>
1031                {
1032                    // Fall through to directory check only for single-level IDs
1033                }
1034                Err(e) => return Err(e),
1035            }
1036        }
1037
1038        let table_name = Self::table_name_from_id(&request.id)?;
1039        let table_uri = self.table_full_uri(&table_name);
1040
1041        // Atomically check table existence and deregistration status
1042        let status = self.check_table_status(&table_name).await;
1043
1044        if !status.exists {
1045            return Err(Error::Namespace {
1046                source: format!("Table does not exist: {}", table_name).into(),
1047                location: snafu::location!(),
1048            });
1049        }
1050
1051        if status.is_deregistered {
1052            return Err(Error::Namespace {
1053                source: format!("Table is deregistered: {}", table_name).into(),
1054                location: snafu::location!(),
1055            });
1056        }
1057
1058        let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
1059        // For backwards compatibility, only skip vending credentials when explicitly set to false
1060        let vend_credentials = request.vend_credentials.unwrap_or(true);
1061        let identity = request.identity.as_deref();
1062
1063        // If not loading detailed metadata, return minimal response with just location
1064        if !load_detailed_metadata {
1065            let storage_options = if vend_credentials {
1066                self.get_storage_options_for_table(&table_uri, identity)
1067                    .await?
1068            } else {
1069                None
1070            };
1071            return Ok(DescribeTableResponse {
1072                table: Some(table_name),
1073                namespace: request.id.as_ref().map(|id| {
1074                    if id.len() > 1 {
1075                        id[..id.len() - 1].to_vec()
1076                    } else {
1077                        vec![]
1078                    }
1079                }),
1080                location: Some(table_uri.clone()),
1081                table_uri: Some(table_uri),
1082                storage_options,
1083                ..Default::default()
1084            });
1085        }
1086
1087        // Try to load the dataset to get real information
1088        match Dataset::open(&table_uri).await {
1089            Ok(mut dataset) => {
1090                // If a specific version is requested, checkout that version
1091                if let Some(requested_version) = request.version {
1092                    dataset = dataset.checkout_version(requested_version as u64).await?;
1093                }
1094
1095                let version_info = dataset.version();
1096                let lance_schema = dataset.schema();
1097                let arrow_schema: arrow_schema::Schema = lance_schema.into();
1098                let json_schema = arrow_schema_to_json(&arrow_schema)?;
1099                let storage_options = if vend_credentials {
1100                    self.get_storage_options_for_table(&table_uri, identity)
1101                        .await?
1102                } else {
1103                    None
1104                };
1105
1106                // Convert BTreeMap to HashMap for the response
1107                let metadata: std::collections::HashMap<String, String> =
1108                    version_info.metadata.into_iter().collect();
1109
1110                Ok(DescribeTableResponse {
1111                    table: Some(table_name),
1112                    namespace: request.id.as_ref().map(|id| {
1113                        if id.len() > 1 {
1114                            id[..id.len() - 1].to_vec()
1115                        } else {
1116                            vec![]
1117                        }
1118                    }),
1119                    version: Some(version_info.version as i64),
1120                    location: Some(table_uri.clone()),
1121                    table_uri: Some(table_uri),
1122                    schema: Some(Box::new(json_schema)),
1123                    storage_options,
1124                    metadata: Some(metadata),
1125                    ..Default::default()
1126                })
1127            }
1128            Err(err) => {
1129                // Use the reserved file status from the atomic check
1130                if status.has_reserved_file {
1131                    let storage_options = if vend_credentials {
1132                        self.get_storage_options_for_table(&table_uri, identity)
1133                            .await?
1134                    } else {
1135                        None
1136                    };
1137                    Ok(DescribeTableResponse {
1138                        table: Some(table_name),
1139                        namespace: request.id.as_ref().map(|id| {
1140                            if id.len() > 1 {
1141                                id[..id.len() - 1].to_vec()
1142                            } else {
1143                                vec![]
1144                            }
1145                        }),
1146                        location: Some(table_uri.clone()),
1147                        table_uri: Some(table_uri),
1148                        storage_options,
1149                        ..Default::default()
1150                    })
1151                } else {
1152                    Err(Error::Namespace {
1153                        source: format!(
1154                            "Table directory exists but cannot load dataset {}: {:?}",
1155                            table_name, err
1156                        )
1157                        .into(),
1158                        location: snafu::location!(),
1159                    })
1160                }
1161            }
1162        }
1163    }
1164
1165    async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1166        if let Some(ref manifest_ns) = self.manifest_ns {
1167            match manifest_ns.table_exists(request.clone()).await {
1168                Ok(()) => return Ok(()),
1169                Err(_) if self.dir_listing_enabled => {
1170                    // Fall through to directory check
1171                }
1172                Err(e) => return Err(e),
1173            }
1174        }
1175
1176        let table_name = Self::table_name_from_id(&request.id)?;
1177
1178        // Atomically check table existence and deregistration status
1179        let status = self.check_table_status(&table_name).await;
1180
1181        if !status.exists {
1182            return Err(Error::Namespace {
1183                source: format!("Table does not exist: {}", table_name).into(),
1184                location: snafu::location!(),
1185            });
1186        }
1187
1188        if status.is_deregistered {
1189            return Err(Error::Namespace {
1190                source: format!("Table is deregistered: {}", table_name).into(),
1191                location: snafu::location!(),
1192            });
1193        }
1194
1195        Ok(())
1196    }
1197
1198    async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1199        if let Some(ref manifest_ns) = self.manifest_ns {
1200            return manifest_ns.drop_table(request).await;
1201        }
1202
1203        let table_name = Self::table_name_from_id(&request.id)?;
1204        let table_uri = self.table_full_uri(&table_name);
1205        let table_path = self.table_path(&table_name);
1206
1207        self.object_store
1208            .remove_dir_all(table_path)
1209            .await
1210            .map_err(|e| Error::Namespace {
1211                source: format!("Failed to drop table {}: {}", table_name, e).into(),
1212                location: snafu::location!(),
1213            })?;
1214
1215        Ok(DropTableResponse {
1216            id: request.id,
1217            location: Some(table_uri),
1218            ..Default::default()
1219        })
1220    }
1221
1222    async fn create_table(
1223        &self,
1224        request: CreateTableRequest,
1225        request_data: Bytes,
1226    ) -> Result<CreateTableResponse> {
1227        if let Some(ref manifest_ns) = self.manifest_ns {
1228            return manifest_ns.create_table(request, request_data).await;
1229        }
1230
1231        let table_name = Self::table_name_from_id(&request.id)?;
1232        let table_uri = self.table_full_uri(&table_name);
1233        if request_data.is_empty() {
1234            return Err(Error::Namespace {
1235                source: "Request data (Arrow IPC stream) is required for create_table".into(),
1236                location: snafu::location!(),
1237            });
1238        }
1239
1240        // Parse the Arrow IPC stream from request_data
1241        let cursor = Cursor::new(request_data.to_vec());
1242        let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Namespace {
1243            source: format!("Invalid Arrow IPC stream: {}", e).into(),
1244            location: snafu::location!(),
1245        })?;
1246        let arrow_schema = stream_reader.schema();
1247
1248        // Collect all batches from the stream
1249        let mut batches = Vec::new();
1250        for batch_result in stream_reader {
1251            batches.push(batch_result.map_err(|e| Error::Namespace {
1252                source: format!("Failed to read batch from IPC stream: {}", e).into(),
1253                location: snafu::location!(),
1254            })?);
1255        }
1256
1257        // Create RecordBatchReader from the batches
1258        let reader = if batches.is_empty() {
1259            let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
1260            let batches = vec![Ok(batch)];
1261            RecordBatchIterator::new(batches, arrow_schema.clone())
1262        } else {
1263            let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
1264            RecordBatchIterator::new(batch_results, arrow_schema)
1265        };
1266
1267        let store_params = self.storage_options.as_ref().map(|opts| ObjectStoreParams {
1268            storage_options_accessor: Some(Arc::new(
1269                lance_io::object_store::StorageOptionsAccessor::with_static_options(opts.clone()),
1270            )),
1271            ..Default::default()
1272        });
1273
1274        let write_params = WriteParams {
1275            mode: lance::dataset::WriteMode::Create,
1276            store_params,
1277            ..Default::default()
1278        };
1279
1280        // Create the Lance dataset using the actual Lance API
1281        Dataset::write(reader, &table_uri, Some(write_params))
1282            .await
1283            .map_err(|e| Error::Namespace {
1284                source: format!("Failed to create Lance dataset: {}", e).into(),
1285                location: snafu::location!(),
1286            })?;
1287
1288        Ok(CreateTableResponse {
1289            version: Some(1),
1290            location: Some(table_uri),
1291            storage_options: self.storage_options.clone(),
1292            ..Default::default()
1293        })
1294    }
1295
1296    async fn create_empty_table(
1297        &self,
1298        request: CreateEmptyTableRequest,
1299    ) -> Result<CreateEmptyTableResponse> {
1300        if let Some(ref manifest_ns) = self.manifest_ns {
1301            #[allow(deprecated)]
1302            let mut response = manifest_ns.create_empty_table(request.clone()).await?;
1303            // Only apply identity-based credential vending when explicitly requested
1304            if request.vend_credentials == Some(true) && self.credential_vendor.is_some() {
1305                if let Some(ref location) = response.location {
1306                    let identity = request.identity.as_deref();
1307                    response.storage_options = self
1308                        .get_storage_options_for_table(location, identity)
1309                        .await?;
1310                }
1311            } else if request.vend_credentials == Some(false) {
1312                response.storage_options = None;
1313            }
1314            return Ok(response);
1315        }
1316
1317        let table_name = Self::table_name_from_id(&request.id)?;
1318        let table_uri = self.table_full_uri(&table_name);
1319
1320        // Validate location if provided
1321        if let Some(location) = &request.location {
1322            let location = location.trim_end_matches('/');
1323            if location != table_uri {
1324                return Err(Error::Namespace {
1325                    source: format!(
1326                        "Cannot create table {} at location {}, must be at location {}",
1327                        table_name, location, table_uri
1328                    )
1329                    .into(),
1330                    location: snafu::location!(),
1331                });
1332            }
1333        }
1334
1335        // Atomically create the .lance-reserved file to mark the table as existing.
1336        // This uses put_if_not_exists semantics to avoid race conditions.
1337        let reserved_file_path = self.table_reserved_file_path(&table_name);
1338
1339        self.put_marker_file_atomic(&reserved_file_path, &format!("table {}", table_name))
1340            .await
1341            .map_err(|e| Error::Namespace {
1342                source: e.into(),
1343                location: snafu::location!(),
1344            })?;
1345
1346        // For backwards compatibility, only skip vending credentials when explicitly set to false
1347        let vend_credentials = request.vend_credentials.unwrap_or(true);
1348        let identity = request.identity.as_deref();
1349        let storage_options = if vend_credentials {
1350            self.get_storage_options_for_table(&table_uri, identity)
1351                .await?
1352        } else {
1353            None
1354        };
1355
1356        Ok(CreateEmptyTableResponse {
1357            location: Some(table_uri),
1358            storage_options,
1359            ..Default::default()
1360        })
1361    }
1362
1363    async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
1364        if let Some(ref manifest_ns) = self.manifest_ns {
1365            let mut response = manifest_ns.declare_table(request.clone()).await?;
1366            // Only apply identity-based credential vending when explicitly requested
1367            if request.vend_credentials == Some(true) && self.credential_vendor.is_some() {
1368                if let Some(ref location) = response.location {
1369                    let identity = request.identity.as_deref();
1370                    response.storage_options = self
1371                        .get_storage_options_for_table(location, identity)
1372                        .await?;
1373                }
1374            } else if request.vend_credentials == Some(false) {
1375                response.storage_options = None;
1376            }
1377            return Ok(response);
1378        }
1379
1380        let table_name = Self::table_name_from_id(&request.id)?;
1381        let table_uri = self.table_full_uri(&table_name);
1382
1383        // Validate location if provided
1384        if let Some(location) = &request.location {
1385            let location = location.trim_end_matches('/');
1386            if location != table_uri {
1387                return Err(Error::Namespace {
1388                    source: format!(
1389                        "Cannot declare table {} at location {}, must be at location {}",
1390                        table_name, location, table_uri
1391                    )
1392                    .into(),
1393                    location: snafu::location!(),
1394                });
1395            }
1396        }
1397
1398        // Check if table already has data (created via create_table).
1399        // The atomic put only prevents races between concurrent declare_table calls,
1400        // not between declare_table and existing data.
1401        let status = self.check_table_status(&table_name).await;
1402        if status.exists && !status.has_reserved_file {
1403            // Table has data but no reserved file - it was created with data
1404            return Err(Error::Namespace {
1405                source: format!("Table already exists: {}", table_name).into(),
1406                location: snafu::location!(),
1407            });
1408        }
1409
1410        // Atomically create the .lance-reserved file to mark the table as declared.
1411        // This uses put_if_not_exists semantics to avoid race conditions between
1412        // concurrent declare_table calls.
1413        let reserved_file_path = self.table_reserved_file_path(&table_name);
1414
1415        self.put_marker_file_atomic(&reserved_file_path, &format!("table {}", table_name))
1416            .await
1417            .map_err(|e| Error::Namespace {
1418                source: e.into(),
1419                location: snafu::location!(),
1420            })?;
1421
1422        // For backwards compatibility, only skip vending credentials when explicitly set to false
1423        let vend_credentials = request.vend_credentials.unwrap_or(true);
1424        let identity = request.identity.as_deref();
1425        let storage_options = if vend_credentials {
1426            self.get_storage_options_for_table(&table_uri, identity)
1427                .await?
1428        } else {
1429            None
1430        };
1431
1432        Ok(DeclareTableResponse {
1433            location: Some(table_uri),
1434            storage_options,
1435            ..Default::default()
1436        })
1437    }
1438
1439    async fn register_table(
1440        &self,
1441        request: lance_namespace::models::RegisterTableRequest,
1442    ) -> Result<lance_namespace::models::RegisterTableResponse> {
1443        // If manifest is enabled, delegate to manifest namespace
1444        if let Some(ref manifest_ns) = self.manifest_ns {
1445            return LanceNamespace::register_table(manifest_ns.as_ref(), request).await;
1446        }
1447
1448        // Without manifest, register_table is not supported
1449        Err(Error::NotSupported {
1450            source: "register_table is only supported when manifest mode is enabled".into(),
1451            location: snafu::location!(),
1452        })
1453    }
1454
1455    async fn deregister_table(
1456        &self,
1457        request: lance_namespace::models::DeregisterTableRequest,
1458    ) -> Result<lance_namespace::models::DeregisterTableResponse> {
1459        // If manifest is enabled, delegate to manifest namespace
1460        if let Some(ref manifest_ns) = self.manifest_ns {
1461            return LanceNamespace::deregister_table(manifest_ns.as_ref(), request).await;
1462        }
1463
1464        // V1 mode: create a .lance-deregistered marker file in the table directory
1465        let table_name = Self::table_name_from_id(&request.id)?;
1466        let table_uri = self.table_full_uri(&table_name);
1467
1468        // Check table existence and deregistration status.
1469        // This provides better error messages for common cases.
1470        let status = self.check_table_status(&table_name).await;
1471
1472        if !status.exists {
1473            return Err(Error::Namespace {
1474                source: format!("Table does not exist: {}", table_name).into(),
1475                location: snafu::location!(),
1476            });
1477        }
1478
1479        if status.is_deregistered {
1480            return Err(Error::Namespace {
1481                source: format!("Table is already deregistered: {}", table_name).into(),
1482                location: snafu::location!(),
1483            });
1484        }
1485
1486        // Atomically create the .lance-deregistered marker file.
1487        // This uses put_if_not_exists semantics to prevent race conditions
1488        // when multiple processes try to deregister the same table concurrently.
1489        // If a race occurs and another process already created the file,
1490        // we'll get an AlreadyExists error which we convert to a proper message.
1491        let deregistered_path = self.table_deregistered_file_path(&table_name);
1492        self.put_marker_file_atomic(
1493            &deregistered_path,
1494            &format!("deregistration marker for table {}", table_name),
1495        )
1496        .await
1497        .map_err(|e| {
1498            // Convert "already exists" to "already deregistered" for better UX
1499            let message = if e.contains("already exists") {
1500                format!("Table is already deregistered: {}", table_name)
1501            } else {
1502                e
1503            };
1504            Error::Namespace {
1505                source: message.into(),
1506                location: snafu::location!(),
1507            }
1508        })?;
1509
1510        Ok(lance_namespace::models::DeregisterTableResponse {
1511            id: request.id,
1512            location: Some(table_uri),
1513            ..Default::default()
1514        })
1515    }
1516
1517    fn namespace_id(&self) -> String {
1518        format!("DirectoryNamespace {{ root: {:?} }}", self.root)
1519    }
1520}
1521
1522#[cfg(test)]
1523mod tests {
1524    use super::*;
1525    use arrow_ipc::reader::StreamReader;
1526    use lance::dataset::Dataset;
1527    use lance_core::utils::tempfile::TempStdDir;
1528    use lance_namespace::models::{
1529        CreateTableRequest, JsonArrowDataType, JsonArrowField, JsonArrowSchema, ListTablesRequest,
1530    };
1531    use lance_namespace::schema::convert_json_arrow_schema;
1532    use std::io::Cursor;
1533    use std::sync::Arc;
1534
1535    /// Helper to create a test DirectoryNamespace with a temporary directory
1536    async fn create_test_namespace() -> (DirectoryNamespace, TempStdDir) {
1537        let temp_dir = TempStdDir::default();
1538
1539        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1540            .build()
1541            .await
1542            .unwrap();
1543        (namespace, temp_dir)
1544    }
1545
1546    /// Helper to create test IPC data from a schema
1547    fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
1548        use arrow::ipc::writer::StreamWriter;
1549
1550        let arrow_schema = convert_json_arrow_schema(schema).unwrap();
1551        let arrow_schema = Arc::new(arrow_schema);
1552        let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
1553        let mut buffer = Vec::new();
1554        {
1555            let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
1556            writer.write(&batch).unwrap();
1557            writer.finish().unwrap();
1558        }
1559        buffer
1560    }
1561
1562    /// Helper to create a simple test schema
1563    fn create_test_schema() -> JsonArrowSchema {
1564        let int_type = JsonArrowDataType::new("int32".to_string());
1565        let string_type = JsonArrowDataType::new("utf8".to_string());
1566
1567        let id_field = JsonArrowField {
1568            name: "id".to_string(),
1569            r#type: Box::new(int_type),
1570            nullable: false,
1571            metadata: None,
1572        };
1573
1574        let name_field = JsonArrowField {
1575            name: "name".to_string(),
1576            r#type: Box::new(string_type),
1577            nullable: true,
1578            metadata: None,
1579        };
1580
1581        JsonArrowSchema {
1582            fields: vec![id_field, name_field],
1583            metadata: None,
1584        }
1585    }
1586
1587    #[tokio::test]
1588    async fn test_create_table() {
1589        let (namespace, _temp_dir) = create_test_namespace().await;
1590
1591        // Create test IPC data
1592        let schema = create_test_schema();
1593        let ipc_data = create_test_ipc_data(&schema);
1594
1595        let mut request = CreateTableRequest::new();
1596        request.id = Some(vec!["test_table".to_string()]);
1597
1598        let response = namespace
1599            .create_table(request, bytes::Bytes::from(ipc_data))
1600            .await
1601            .unwrap();
1602
1603        assert!(response.location.is_some());
1604        assert!(response.location.unwrap().ends_with("test_table.lance"));
1605        assert_eq!(response.version, Some(1));
1606    }
1607
1608    #[tokio::test]
1609    async fn test_create_table_without_data() {
1610        let (namespace, _temp_dir) = create_test_namespace().await;
1611
1612        let mut request = CreateTableRequest::new();
1613        request.id = Some(vec!["test_table".to_string()]);
1614
1615        let result = namespace.create_table(request, bytes::Bytes::new()).await;
1616        assert!(result.is_err());
1617        assert!(result
1618            .unwrap_err()
1619            .to_string()
1620            .contains("Arrow IPC stream) is required"));
1621    }
1622
1623    #[tokio::test]
1624    async fn test_create_table_with_invalid_id() {
1625        let (namespace, _temp_dir) = create_test_namespace().await;
1626
1627        // Create test IPC data
1628        let schema = create_test_schema();
1629        let ipc_data = create_test_ipc_data(&schema);
1630
1631        // Test with empty ID
1632        let mut request = CreateTableRequest::new();
1633        request.id = Some(vec![]);
1634
1635        let result = namespace
1636            .create_table(request, bytes::Bytes::from(ipc_data.clone()))
1637            .await;
1638        assert!(result.is_err());
1639
1640        // Test with multi-level ID - should now work with manifest enabled
1641        // First create the parent namespace
1642        let mut create_ns_req = CreateNamespaceRequest::new();
1643        create_ns_req.id = Some(vec!["test_namespace".to_string()]);
1644        namespace.create_namespace(create_ns_req).await.unwrap();
1645
1646        // Now create table in the namespace
1647        let mut request = CreateTableRequest::new();
1648        request.id = Some(vec!["test_namespace".to_string(), "table".to_string()]);
1649
1650        let result = namespace
1651            .create_table(request, bytes::Bytes::from(ipc_data))
1652            .await;
1653        // Should succeed with manifest enabled
1654        assert!(
1655            result.is_ok(),
1656            "Multi-level table IDs should work with manifest enabled"
1657        );
1658    }
1659
1660    #[tokio::test]
1661    async fn test_list_tables() {
1662        let (namespace, _temp_dir) = create_test_namespace().await;
1663
1664        // Initially, no tables
1665        let mut request = ListTablesRequest::new();
1666        request.id = Some(vec![]);
1667        let response = namespace.list_tables(request).await.unwrap();
1668        assert_eq!(response.tables.len(), 0);
1669
1670        // Create test IPC data
1671        let schema = create_test_schema();
1672        let ipc_data = create_test_ipc_data(&schema);
1673
1674        // Create a table
1675        let mut create_request = CreateTableRequest::new();
1676        create_request.id = Some(vec!["table1".to_string()]);
1677        namespace
1678            .create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
1679            .await
1680            .unwrap();
1681
1682        // Create another table
1683        let mut create_request = CreateTableRequest::new();
1684        create_request.id = Some(vec!["table2".to_string()]);
1685        namespace
1686            .create_table(create_request, bytes::Bytes::from(ipc_data))
1687            .await
1688            .unwrap();
1689
1690        // List tables should return both
1691        let mut request = ListTablesRequest::new();
1692        request.id = Some(vec![]);
1693        let response = namespace.list_tables(request).await.unwrap();
1694        let tables = response.tables;
1695        assert_eq!(tables.len(), 2);
1696        assert!(tables.contains(&"table1".to_string()));
1697        assert!(tables.contains(&"table2".to_string()));
1698    }
1699
1700    #[tokio::test]
1701    async fn test_list_tables_with_namespace_id() {
1702        let (namespace, _temp_dir) = create_test_namespace().await;
1703
1704        // First create a child namespace
1705        let mut create_ns_req = CreateNamespaceRequest::new();
1706        create_ns_req.id = Some(vec!["test_namespace".to_string()]);
1707        namespace.create_namespace(create_ns_req).await.unwrap();
1708
1709        // Now list tables in the child namespace
1710        let mut request = ListTablesRequest::new();
1711        request.id = Some(vec!["test_namespace".to_string()]);
1712
1713        let result = namespace.list_tables(request).await;
1714        // Should succeed (with manifest enabled) and return empty list (no tables yet)
1715        assert!(
1716            result.is_ok(),
1717            "list_tables should work with child namespace when manifest is enabled"
1718        );
1719        let response = result.unwrap();
1720        assert_eq!(
1721            response.tables.len(),
1722            0,
1723            "Namespace should have no tables yet"
1724        );
1725    }
1726
1727    #[tokio::test]
1728    async fn test_describe_table() {
1729        let (namespace, _temp_dir) = create_test_namespace().await;
1730
1731        // Create a table first
1732        let schema = create_test_schema();
1733        let ipc_data = create_test_ipc_data(&schema);
1734
1735        let mut create_request = CreateTableRequest::new();
1736        create_request.id = Some(vec!["test_table".to_string()]);
1737        namespace
1738            .create_table(create_request, bytes::Bytes::from(ipc_data))
1739            .await
1740            .unwrap();
1741
1742        // Describe the table
1743        let mut request = DescribeTableRequest::new();
1744        request.id = Some(vec!["test_table".to_string()]);
1745        let response = namespace.describe_table(request).await.unwrap();
1746
1747        assert!(response.location.is_some());
1748        assert!(response.location.unwrap().ends_with("test_table.lance"));
1749    }
1750
1751    #[tokio::test]
1752    async fn test_describe_nonexistent_table() {
1753        let (namespace, _temp_dir) = create_test_namespace().await;
1754
1755        let mut request = DescribeTableRequest::new();
1756        request.id = Some(vec!["nonexistent".to_string()]);
1757
1758        let result = namespace.describe_table(request).await;
1759        assert!(result.is_err());
1760        assert!(result
1761            .unwrap_err()
1762            .to_string()
1763            .contains("Table does not exist"));
1764    }
1765
1766    #[tokio::test]
1767    async fn test_table_exists() {
1768        let (namespace, _temp_dir) = create_test_namespace().await;
1769
1770        // Create a table
1771        let schema = create_test_schema();
1772        let ipc_data = create_test_ipc_data(&schema);
1773
1774        let mut create_request = CreateTableRequest::new();
1775        create_request.id = Some(vec!["existing_table".to_string()]);
1776        namespace
1777            .create_table(create_request, bytes::Bytes::from(ipc_data))
1778            .await
1779            .unwrap();
1780
1781        // Check existing table
1782        let mut request = TableExistsRequest::new();
1783        request.id = Some(vec!["existing_table".to_string()]);
1784        let result = namespace.table_exists(request).await;
1785        assert!(result.is_ok());
1786
1787        // Check non-existent table
1788        let mut request = TableExistsRequest::new();
1789        request.id = Some(vec!["nonexistent".to_string()]);
1790        let result = namespace.table_exists(request).await;
1791        assert!(result.is_err());
1792        assert!(result
1793            .unwrap_err()
1794            .to_string()
1795            .contains("Table does not exist"));
1796    }
1797
1798    #[tokio::test]
1799    async fn test_drop_table() {
1800        let (namespace, _temp_dir) = create_test_namespace().await;
1801
1802        // Create a table
1803        let schema = create_test_schema();
1804        let ipc_data = create_test_ipc_data(&schema);
1805
1806        let mut create_request = CreateTableRequest::new();
1807        create_request.id = Some(vec!["table_to_drop".to_string()]);
1808        namespace
1809            .create_table(create_request, bytes::Bytes::from(ipc_data))
1810            .await
1811            .unwrap();
1812
1813        // Verify it exists
1814        let mut exists_request = TableExistsRequest::new();
1815        exists_request.id = Some(vec!["table_to_drop".to_string()]);
1816        assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
1817
1818        // Drop the table
1819        let mut drop_request = DropTableRequest::new();
1820        drop_request.id = Some(vec!["table_to_drop".to_string()]);
1821        let response = namespace.drop_table(drop_request).await.unwrap();
1822        assert!(response.location.is_some());
1823
1824        // Verify it no longer exists
1825        assert!(namespace.table_exists(exists_request).await.is_err());
1826    }
1827
1828    #[tokio::test]
1829    async fn test_drop_nonexistent_table() {
1830        let (namespace, _temp_dir) = create_test_namespace().await;
1831
1832        let mut request = DropTableRequest::new();
1833        request.id = Some(vec!["nonexistent".to_string()]);
1834
1835        // Should not fail when dropping non-existent table (idempotent)
1836        let result = namespace.drop_table(request).await;
1837        // The operation might succeed or fail depending on implementation
1838        // But it should not panic
1839        let _ = result;
1840    }
1841
1842    #[tokio::test]
1843    async fn test_root_namespace_operations() {
1844        let (namespace, _temp_dir) = create_test_namespace().await;
1845
1846        // Test list_namespaces - should return empty list for root
1847        let mut request = ListNamespacesRequest::new();
1848        request.id = Some(vec![]);
1849        let result = namespace.list_namespaces(request).await;
1850        assert!(result.is_ok());
1851        assert_eq!(result.unwrap().namespaces.len(), 0);
1852
1853        // Test describe_namespace - should succeed for root
1854        let mut request = DescribeNamespaceRequest::new();
1855        request.id = Some(vec![]);
1856        let result = namespace.describe_namespace(request).await;
1857        assert!(result.is_ok());
1858
1859        // Test namespace_exists - root always exists
1860        let mut request = NamespaceExistsRequest::new();
1861        request.id = Some(vec![]);
1862        let result = namespace.namespace_exists(request).await;
1863        assert!(result.is_ok());
1864
1865        // Test create_namespace - root cannot be created
1866        let mut request = CreateNamespaceRequest::new();
1867        request.id = Some(vec![]);
1868        let result = namespace.create_namespace(request).await;
1869        assert!(result.is_err());
1870        assert!(result.unwrap_err().to_string().contains("already exists"));
1871
1872        // Test drop_namespace - root cannot be dropped
1873        let mut request = DropNamespaceRequest::new();
1874        request.id = Some(vec![]);
1875        let result = namespace.drop_namespace(request).await;
1876        assert!(result.is_err());
1877        assert!(result
1878            .unwrap_err()
1879            .to_string()
1880            .contains("cannot be dropped"));
1881    }
1882
1883    #[tokio::test]
1884    async fn test_non_root_namespace_operations() {
1885        let (namespace, _temp_dir) = create_test_namespace().await;
1886
1887        // With manifest enabled (default), child namespaces are now supported
1888        // Test create_namespace for non-root - should succeed with manifest
1889        let mut request = CreateNamespaceRequest::new();
1890        request.id = Some(vec!["child".to_string()]);
1891        let result = namespace.create_namespace(request).await;
1892        assert!(
1893            result.is_ok(),
1894            "Child namespace creation should succeed with manifest enabled"
1895        );
1896
1897        // Test namespace_exists for non-root - should exist after creation
1898        let mut request = NamespaceExistsRequest::new();
1899        request.id = Some(vec!["child".to_string()]);
1900        let result = namespace.namespace_exists(request).await;
1901        assert!(
1902            result.is_ok(),
1903            "Child namespace should exist after creation"
1904        );
1905
1906        // Test drop_namespace for non-root - should succeed
1907        let mut request = DropNamespaceRequest::new();
1908        request.id = Some(vec!["child".to_string()]);
1909        let result = namespace.drop_namespace(request).await;
1910        assert!(
1911            result.is_ok(),
1912            "Child namespace drop should succeed with manifest enabled"
1913        );
1914
1915        // Verify namespace no longer exists
1916        let mut request = NamespaceExistsRequest::new();
1917        request.id = Some(vec!["child".to_string()]);
1918        let result = namespace.namespace_exists(request).await;
1919        assert!(
1920            result.is_err(),
1921            "Child namespace should not exist after drop"
1922        );
1923    }
1924
1925    #[tokio::test]
1926    async fn test_config_custom_root() {
1927        let temp_dir = TempStdDir::default();
1928        let custom_path = temp_dir.join("custom");
1929        std::fs::create_dir(&custom_path).unwrap();
1930
1931        let namespace = DirectoryNamespaceBuilder::new(custom_path.to_string_lossy().to_string())
1932            .build()
1933            .await
1934            .unwrap();
1935
1936        // Create test IPC data
1937        let schema = create_test_schema();
1938        let ipc_data = create_test_ipc_data(&schema);
1939
1940        // Create a table and verify location
1941        let mut request = CreateTableRequest::new();
1942        request.id = Some(vec!["test_table".to_string()]);
1943
1944        let response = namespace
1945            .create_table(request, bytes::Bytes::from(ipc_data))
1946            .await
1947            .unwrap();
1948
1949        assert!(response.location.unwrap().contains("custom"));
1950    }
1951
1952    #[tokio::test]
1953    async fn test_config_storage_options() {
1954        let temp_dir = TempStdDir::default();
1955
1956        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1957            .storage_option("option1", "value1")
1958            .storage_option("option2", "value2")
1959            .build()
1960            .await
1961            .unwrap();
1962
1963        // Create test IPC data
1964        let schema = create_test_schema();
1965        let ipc_data = create_test_ipc_data(&schema);
1966
1967        // Create a table and check storage options are included
1968        let mut request = CreateTableRequest::new();
1969        request.id = Some(vec!["test_table".to_string()]);
1970
1971        let response = namespace
1972            .create_table(request, bytes::Bytes::from(ipc_data))
1973            .await
1974            .unwrap();
1975
1976        let storage_options = response.storage_options.unwrap();
1977        assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
1978        assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
1979    }
1980
1981    #[tokio::test]
1982    async fn test_from_properties_manifest_enabled() {
1983        let temp_dir = TempStdDir::default();
1984
1985        let mut properties = HashMap::new();
1986        properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
1987        properties.insert("manifest_enabled".to_string(), "true".to_string());
1988        properties.insert("dir_listing_enabled".to_string(), "false".to_string());
1989
1990        let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
1991        assert!(builder.manifest_enabled);
1992        assert!(!builder.dir_listing_enabled);
1993
1994        let namespace = builder.build().await.unwrap();
1995
1996        // Create test IPC data
1997        let schema = create_test_schema();
1998        let ipc_data = create_test_ipc_data(&schema);
1999
2000        // Create a table
2001        let mut request = CreateTableRequest::new();
2002        request.id = Some(vec!["test_table".to_string()]);
2003
2004        let response = namespace
2005            .create_table(request, bytes::Bytes::from(ipc_data))
2006            .await
2007            .unwrap();
2008
2009        assert!(response.location.is_some());
2010    }
2011
2012    #[tokio::test]
2013    async fn test_from_properties_dir_listing_enabled() {
2014        let temp_dir = TempStdDir::default();
2015
2016        let mut properties = HashMap::new();
2017        properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2018        properties.insert("manifest_enabled".to_string(), "false".to_string());
2019        properties.insert("dir_listing_enabled".to_string(), "true".to_string());
2020
2021        let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2022        assert!(!builder.manifest_enabled);
2023        assert!(builder.dir_listing_enabled);
2024
2025        let namespace = builder.build().await.unwrap();
2026
2027        // Create test IPC data
2028        let schema = create_test_schema();
2029        let ipc_data = create_test_ipc_data(&schema);
2030
2031        // Create a table
2032        let mut request = CreateTableRequest::new();
2033        request.id = Some(vec!["test_table".to_string()]);
2034
2035        let response = namespace
2036            .create_table(request, bytes::Bytes::from(ipc_data))
2037            .await
2038            .unwrap();
2039
2040        assert!(response.location.is_some());
2041    }
2042
2043    #[tokio::test]
2044    async fn test_from_properties_defaults() {
2045        let temp_dir = TempStdDir::default();
2046
2047        let mut properties = HashMap::new();
2048        properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2049
2050        let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2051        // Both should default to true
2052        assert!(builder.manifest_enabled);
2053        assert!(builder.dir_listing_enabled);
2054    }
2055
2056    #[tokio::test]
2057    async fn test_from_properties_with_storage_options() {
2058        let temp_dir = TempStdDir::default();
2059
2060        let mut properties = HashMap::new();
2061        properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2062        properties.insert("manifest_enabled".to_string(), "true".to_string());
2063        properties.insert("storage.region".to_string(), "us-west-2".to_string());
2064        properties.insert("storage.bucket".to_string(), "my-bucket".to_string());
2065
2066        let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2067        assert!(builder.manifest_enabled);
2068        assert!(builder.storage_options.is_some());
2069
2070        let storage_options = builder.storage_options.unwrap();
2071        assert_eq!(
2072            storage_options.get("region"),
2073            Some(&"us-west-2".to_string())
2074        );
2075        assert_eq!(
2076            storage_options.get("bucket"),
2077            Some(&"my-bucket".to_string())
2078        );
2079    }
2080
2081    #[tokio::test]
2082    async fn test_various_arrow_types() {
2083        let (namespace, _temp_dir) = create_test_namespace().await;
2084
2085        // Create schema with various types
2086        let fields = vec![
2087            JsonArrowField {
2088                name: "bool_col".to_string(),
2089                r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
2090                nullable: true,
2091                metadata: None,
2092            },
2093            JsonArrowField {
2094                name: "int8_col".to_string(),
2095                r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
2096                nullable: true,
2097                metadata: None,
2098            },
2099            JsonArrowField {
2100                name: "float64_col".to_string(),
2101                r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
2102                nullable: true,
2103                metadata: None,
2104            },
2105            JsonArrowField {
2106                name: "binary_col".to_string(),
2107                r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
2108                nullable: true,
2109                metadata: None,
2110            },
2111        ];
2112
2113        let schema = JsonArrowSchema {
2114            fields,
2115            metadata: None,
2116        };
2117
2118        // Create IPC data
2119        let ipc_data = create_test_ipc_data(&schema);
2120
2121        let mut request = CreateTableRequest::new();
2122        request.id = Some(vec!["complex_table".to_string()]);
2123
2124        let response = namespace
2125            .create_table(request, bytes::Bytes::from(ipc_data))
2126            .await
2127            .unwrap();
2128
2129        assert!(response.location.is_some());
2130    }
2131
2132    #[tokio::test]
2133    async fn test_connect_dir() {
2134        let temp_dir = TempStdDir::default();
2135
2136        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
2137            .build()
2138            .await
2139            .unwrap();
2140
2141        // Test basic operation through the concrete type
2142        let mut request = ListTablesRequest::new();
2143        request.id = Some(vec![]);
2144        let response = namespace.list_tables(request).await.unwrap();
2145        assert_eq!(response.tables.len(), 0);
2146    }
2147
2148    #[tokio::test]
2149    async fn test_create_table_with_ipc_data() {
2150        use arrow::array::{Int32Array, StringArray};
2151        use arrow::ipc::writer::StreamWriter;
2152
2153        let (namespace, _temp_dir) = create_test_namespace().await;
2154
2155        // Create a schema with some fields
2156        let schema = create_test_schema();
2157
2158        // Create some test data that matches the schema
2159        let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
2160        let arrow_schema = Arc::new(arrow_schema);
2161
2162        // Create a RecordBatch with actual data
2163        let id_array = Int32Array::from(vec![1, 2, 3]);
2164        let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
2165        let batch = arrow::record_batch::RecordBatch::try_new(
2166            arrow_schema.clone(),
2167            vec![Arc::new(id_array), Arc::new(name_array)],
2168        )
2169        .unwrap();
2170
2171        // Write the batch to an IPC stream
2172        let mut buffer = Vec::new();
2173        {
2174            let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
2175            writer.write(&batch).unwrap();
2176            writer.finish().unwrap();
2177        }
2178
2179        // Create table with the IPC data
2180        let mut request = CreateTableRequest::new();
2181        request.id = Some(vec!["test_table_with_data".to_string()]);
2182
2183        let response = namespace
2184            .create_table(request, Bytes::from(buffer))
2185            .await
2186            .unwrap();
2187
2188        assert_eq!(response.version, Some(1));
2189        assert!(response
2190            .location
2191            .unwrap()
2192            .contains("test_table_with_data.lance"));
2193
2194        // Verify table exists
2195        let mut exists_request = TableExistsRequest::new();
2196        exists_request.id = Some(vec!["test_table_with_data".to_string()]);
2197        namespace.table_exists(exists_request).await.unwrap();
2198    }
2199
2200    #[tokio::test]
2201    #[allow(deprecated)]
2202    async fn test_create_empty_table() {
2203        let (namespace, temp_dir) = create_test_namespace().await;
2204
2205        let mut request = CreateEmptyTableRequest::new();
2206        request.id = Some(vec!["empty_table".to_string()]);
2207
2208        let response = namespace.create_empty_table(request).await.unwrap();
2209
2210        assert!(response.location.is_some());
2211        assert!(response.location.unwrap().ends_with("empty_table.lance"));
2212
2213        // Verify the .lance-reserved file was created in the correct location
2214        let table_dir = temp_dir.join("empty_table.lance");
2215        assert!(table_dir.exists());
2216        assert!(table_dir.is_dir());
2217
2218        let reserved_file = table_dir.join(".lance-reserved");
2219        assert!(reserved_file.exists());
2220        assert!(reserved_file.is_file());
2221
2222        // Verify file is empty
2223        let metadata = std::fs::metadata(&reserved_file).unwrap();
2224        assert_eq!(metadata.len(), 0);
2225
2226        // Verify table exists by checking for .lance-reserved file
2227        let mut exists_request = TableExistsRequest::new();
2228        exists_request.id = Some(vec!["empty_table".to_string()]);
2229        namespace.table_exists(exists_request).await.unwrap();
2230
2231        // List tables should include the empty table
2232        let mut list_request = ListTablesRequest::new();
2233        list_request.id = Some(vec![]);
2234        let list_response = namespace.list_tables(list_request).await.unwrap();
2235        assert!(list_response.tables.contains(&"empty_table".to_string()));
2236
2237        // Verify describe table works for empty table
2238        let mut describe_request = DescribeTableRequest::new();
2239        describe_request.id = Some(vec!["empty_table".to_string()]);
2240        let describe_response = namespace.describe_table(describe_request).await.unwrap();
2241        assert!(describe_response.location.is_some());
2242        assert!(describe_response.location.unwrap().contains("empty_table"));
2243    }
2244
2245    #[tokio::test]
2246    #[allow(deprecated)]
2247    async fn test_create_empty_table_with_wrong_location() {
2248        let (namespace, _temp_dir) = create_test_namespace().await;
2249
2250        let mut request = CreateEmptyTableRequest::new();
2251        request.id = Some(vec!["test_table".to_string()]);
2252        request.location = Some("/wrong/path/table.lance".to_string());
2253
2254        let result = namespace.create_empty_table(request).await;
2255        assert!(result.is_err());
2256        assert!(result
2257            .unwrap_err()
2258            .to_string()
2259            .contains("must be at location"));
2260    }
2261
2262    #[tokio::test]
2263    #[allow(deprecated)]
2264    async fn test_create_empty_table_then_drop() {
2265        let (namespace, temp_dir) = create_test_namespace().await;
2266
2267        // Create an empty table
2268        let mut create_request = CreateEmptyTableRequest::new();
2269        create_request.id = Some(vec!["empty_table_to_drop".to_string()]);
2270
2271        let create_response = namespace.create_empty_table(create_request).await.unwrap();
2272        assert!(create_response.location.is_some());
2273
2274        // Verify it exists
2275        let table_dir = temp_dir.join("empty_table_to_drop.lance");
2276        assert!(table_dir.exists());
2277        let reserved_file = table_dir.join(".lance-reserved");
2278        assert!(reserved_file.exists());
2279
2280        // Drop the table
2281        let mut drop_request = DropTableRequest::new();
2282        drop_request.id = Some(vec!["empty_table_to_drop".to_string()]);
2283        let drop_response = namespace.drop_table(drop_request).await.unwrap();
2284        assert!(drop_response.location.is_some());
2285
2286        // Verify table directory was removed
2287        assert!(!table_dir.exists());
2288        assert!(!reserved_file.exists());
2289
2290        // Verify table no longer exists
2291        let mut exists_request = TableExistsRequest::new();
2292        exists_request.id = Some(vec!["empty_table_to_drop".to_string()]);
2293        let exists_result = namespace.table_exists(exists_request).await;
2294        assert!(exists_result.is_err());
2295    }
2296
2297    #[tokio::test]
2298    async fn test_child_namespace_create_and_list() {
2299        let (namespace, _temp_dir) = create_test_namespace().await;
2300
2301        // Create multiple child namespaces
2302        for i in 1..=3 {
2303            let mut create_req = CreateNamespaceRequest::new();
2304            create_req.id = Some(vec![format!("ns{}", i)]);
2305            let result = namespace.create_namespace(create_req).await;
2306            assert!(result.is_ok(), "Failed to create child namespace ns{}", i);
2307        }
2308
2309        // List child namespaces
2310        let list_req = ListNamespacesRequest {
2311            id: Some(vec![]),
2312            ..Default::default()
2313        };
2314        let result = namespace.list_namespaces(list_req).await;
2315        assert!(result.is_ok());
2316        let namespaces = result.unwrap().namespaces;
2317        assert_eq!(namespaces.len(), 3);
2318        assert!(namespaces.contains(&"ns1".to_string()));
2319        assert!(namespaces.contains(&"ns2".to_string()));
2320        assert!(namespaces.contains(&"ns3".to_string()));
2321    }
2322
2323    #[tokio::test]
2324    async fn test_nested_namespace_hierarchy() {
2325        let (namespace, _temp_dir) = create_test_namespace().await;
2326
2327        // Create parent namespace
2328        let mut create_req = CreateNamespaceRequest::new();
2329        create_req.id = Some(vec!["parent".to_string()]);
2330        namespace.create_namespace(create_req).await.unwrap();
2331
2332        // Create nested children
2333        let mut create_req = CreateNamespaceRequest::new();
2334        create_req.id = Some(vec!["parent".to_string(), "child1".to_string()]);
2335        namespace.create_namespace(create_req).await.unwrap();
2336
2337        let mut create_req = CreateNamespaceRequest::new();
2338        create_req.id = Some(vec!["parent".to_string(), "child2".to_string()]);
2339        namespace.create_namespace(create_req).await.unwrap();
2340
2341        // List children of parent
2342        let list_req = ListNamespacesRequest {
2343            id: Some(vec!["parent".to_string()]),
2344            ..Default::default()
2345        };
2346        let result = namespace.list_namespaces(list_req).await;
2347        assert!(result.is_ok());
2348        let children = result.unwrap().namespaces;
2349        assert_eq!(children.len(), 2);
2350        assert!(children.contains(&"child1".to_string()));
2351        assert!(children.contains(&"child2".to_string()));
2352
2353        // List root should only show parent
2354        let list_req = ListNamespacesRequest {
2355            id: Some(vec![]),
2356            ..Default::default()
2357        };
2358        let result = namespace.list_namespaces(list_req).await;
2359        assert!(result.is_ok());
2360        let root_namespaces = result.unwrap().namespaces;
2361        assert_eq!(root_namespaces.len(), 1);
2362        assert_eq!(root_namespaces[0], "parent");
2363    }
2364
2365    #[tokio::test]
2366    async fn test_table_in_child_namespace() {
2367        let (namespace, _temp_dir) = create_test_namespace().await;
2368
2369        // Create child namespace
2370        let mut create_ns_req = CreateNamespaceRequest::new();
2371        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2372        namespace.create_namespace(create_ns_req).await.unwrap();
2373
2374        // Create table in child namespace
2375        let schema = create_test_schema();
2376        let ipc_data = create_test_ipc_data(&schema);
2377        let mut create_table_req = CreateTableRequest::new();
2378        create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2379        let result = namespace
2380            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2381            .await;
2382        assert!(result.is_ok(), "Failed to create table in child namespace");
2383
2384        // List tables in child namespace
2385        let list_req = ListTablesRequest {
2386            id: Some(vec!["test_ns".to_string()]),
2387            ..Default::default()
2388        };
2389        let result = namespace.list_tables(list_req).await;
2390        assert!(result.is_ok());
2391        let tables = result.unwrap().tables;
2392        assert_eq!(tables.len(), 1);
2393        assert_eq!(tables[0], "table1");
2394
2395        // Verify table exists
2396        let mut exists_req = TableExistsRequest::new();
2397        exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2398        let result = namespace.table_exists(exists_req).await;
2399        assert!(result.is_ok());
2400
2401        // Describe table in child namespace
2402        let mut describe_req = DescribeTableRequest::new();
2403        describe_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2404        let result = namespace.describe_table(describe_req).await;
2405        assert!(result.is_ok());
2406        let response = result.unwrap();
2407        assert!(response.location.is_some());
2408    }
2409
2410    #[tokio::test]
2411    async fn test_multiple_tables_in_child_namespace() {
2412        let (namespace, _temp_dir) = create_test_namespace().await;
2413
2414        // Create child namespace
2415        let mut create_ns_req = CreateNamespaceRequest::new();
2416        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2417        namespace.create_namespace(create_ns_req).await.unwrap();
2418
2419        // Create multiple tables
2420        let schema = create_test_schema();
2421        let ipc_data = create_test_ipc_data(&schema);
2422        for i in 1..=3 {
2423            let mut create_table_req = CreateTableRequest::new();
2424            create_table_req.id = Some(vec!["test_ns".to_string(), format!("table{}", i)]);
2425            namespace
2426                .create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
2427                .await
2428                .unwrap();
2429        }
2430
2431        // List tables
2432        let list_req = ListTablesRequest {
2433            id: Some(vec!["test_ns".to_string()]),
2434            ..Default::default()
2435        };
2436        let result = namespace.list_tables(list_req).await;
2437        assert!(result.is_ok());
2438        let tables = result.unwrap().tables;
2439        assert_eq!(tables.len(), 3);
2440        assert!(tables.contains(&"table1".to_string()));
2441        assert!(tables.contains(&"table2".to_string()));
2442        assert!(tables.contains(&"table3".to_string()));
2443    }
2444
2445    #[tokio::test]
2446    async fn test_drop_table_in_child_namespace() {
2447        let (namespace, _temp_dir) = create_test_namespace().await;
2448
2449        // Create child namespace
2450        let mut create_ns_req = CreateNamespaceRequest::new();
2451        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2452        namespace.create_namespace(create_ns_req).await.unwrap();
2453
2454        // Create table
2455        let schema = create_test_schema();
2456        let ipc_data = create_test_ipc_data(&schema);
2457        let mut create_table_req = CreateTableRequest::new();
2458        create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2459        namespace
2460            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2461            .await
2462            .unwrap();
2463
2464        // Drop table
2465        let mut drop_req = DropTableRequest::new();
2466        drop_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2467        let result = namespace.drop_table(drop_req).await;
2468        assert!(result.is_ok(), "Failed to drop table in child namespace");
2469
2470        // Verify table no longer exists
2471        let mut exists_req = TableExistsRequest::new();
2472        exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2473        let result = namespace.table_exists(exists_req).await;
2474        assert!(result.is_err());
2475    }
2476
2477    #[tokio::test]
2478    #[allow(deprecated)]
2479    async fn test_empty_table_in_child_namespace() {
2480        let (namespace, _temp_dir) = create_test_namespace().await;
2481
2482        // Create child namespace
2483        let mut create_ns_req = CreateNamespaceRequest::new();
2484        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2485        namespace.create_namespace(create_ns_req).await.unwrap();
2486
2487        // Create empty table
2488        let mut create_empty_req = CreateEmptyTableRequest::new();
2489        create_empty_req.id = Some(vec!["test_ns".to_string(), "empty_table".to_string()]);
2490        let result = namespace.create_empty_table(create_empty_req).await;
2491        assert!(
2492            result.is_ok(),
2493            "Failed to create empty table in child namespace"
2494        );
2495
2496        // Verify table exists
2497        let mut exists_req = TableExistsRequest::new();
2498        exists_req.id = Some(vec!["test_ns".to_string(), "empty_table".to_string()]);
2499        let result = namespace.table_exists(exists_req).await;
2500        assert!(result.is_ok());
2501    }
2502
2503    #[tokio::test]
2504    async fn test_deeply_nested_namespace() {
2505        let (namespace, _temp_dir) = create_test_namespace().await;
2506
2507        // Create deeply nested namespace hierarchy
2508        let mut create_req = CreateNamespaceRequest::new();
2509        create_req.id = Some(vec!["level1".to_string()]);
2510        namespace.create_namespace(create_req).await.unwrap();
2511
2512        let mut create_req = CreateNamespaceRequest::new();
2513        create_req.id = Some(vec!["level1".to_string(), "level2".to_string()]);
2514        namespace.create_namespace(create_req).await.unwrap();
2515
2516        let mut create_req = CreateNamespaceRequest::new();
2517        create_req.id = Some(vec![
2518            "level1".to_string(),
2519            "level2".to_string(),
2520            "level3".to_string(),
2521        ]);
2522        namespace.create_namespace(create_req).await.unwrap();
2523
2524        // Create table in deeply nested namespace
2525        let schema = create_test_schema();
2526        let ipc_data = create_test_ipc_data(&schema);
2527        let mut create_table_req = CreateTableRequest::new();
2528        create_table_req.id = Some(vec![
2529            "level1".to_string(),
2530            "level2".to_string(),
2531            "level3".to_string(),
2532            "table1".to_string(),
2533        ]);
2534        let result = namespace
2535            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2536            .await;
2537        assert!(
2538            result.is_ok(),
2539            "Failed to create table in deeply nested namespace"
2540        );
2541
2542        // Verify table exists
2543        let mut exists_req = TableExistsRequest::new();
2544        exists_req.id = Some(vec![
2545            "level1".to_string(),
2546            "level2".to_string(),
2547            "level3".to_string(),
2548            "table1".to_string(),
2549        ]);
2550        let result = namespace.table_exists(exists_req).await;
2551        assert!(result.is_ok());
2552    }
2553
2554    #[tokio::test]
2555    async fn test_namespace_with_properties() {
2556        let (namespace, _temp_dir) = create_test_namespace().await;
2557
2558        // Create namespace with properties
2559        let mut properties = HashMap::new();
2560        properties.insert("owner".to_string(), "test_user".to_string());
2561        properties.insert("description".to_string(), "Test namespace".to_string());
2562
2563        let mut create_req = CreateNamespaceRequest::new();
2564        create_req.id = Some(vec!["test_ns".to_string()]);
2565        create_req.properties = Some(properties.clone());
2566        namespace.create_namespace(create_req).await.unwrap();
2567
2568        // Describe namespace and verify properties
2569        let describe_req = DescribeNamespaceRequest {
2570            id: Some(vec!["test_ns".to_string()]),
2571            ..Default::default()
2572        };
2573        let result = namespace.describe_namespace(describe_req).await;
2574        assert!(result.is_ok());
2575        let response = result.unwrap();
2576        assert!(response.properties.is_some());
2577        let props = response.properties.unwrap();
2578        assert_eq!(props.get("owner"), Some(&"test_user".to_string()));
2579        assert_eq!(
2580            props.get("description"),
2581            Some(&"Test namespace".to_string())
2582        );
2583    }
2584
2585    #[tokio::test]
2586    async fn test_cannot_drop_namespace_with_tables() {
2587        let (namespace, _temp_dir) = create_test_namespace().await;
2588
2589        // Create namespace
2590        let mut create_ns_req = CreateNamespaceRequest::new();
2591        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2592        namespace.create_namespace(create_ns_req).await.unwrap();
2593
2594        // Create table in namespace
2595        let schema = create_test_schema();
2596        let ipc_data = create_test_ipc_data(&schema);
2597        let mut create_table_req = CreateTableRequest::new();
2598        create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2599        namespace
2600            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2601            .await
2602            .unwrap();
2603
2604        // Try to drop namespace - should fail
2605        let mut drop_req = DropNamespaceRequest::new();
2606        drop_req.id = Some(vec!["test_ns".to_string()]);
2607        let result = namespace.drop_namespace(drop_req).await;
2608        assert!(
2609            result.is_err(),
2610            "Should not be able to drop namespace with tables"
2611        );
2612    }
2613
2614    #[tokio::test]
2615    async fn test_isolation_between_namespaces() {
2616        let (namespace, _temp_dir) = create_test_namespace().await;
2617
2618        // Create two namespaces
2619        let mut create_req = CreateNamespaceRequest::new();
2620        create_req.id = Some(vec!["ns1".to_string()]);
2621        namespace.create_namespace(create_req).await.unwrap();
2622
2623        let mut create_req = CreateNamespaceRequest::new();
2624        create_req.id = Some(vec!["ns2".to_string()]);
2625        namespace.create_namespace(create_req).await.unwrap();
2626
2627        // Create table with same name in both namespaces
2628        let schema = create_test_schema();
2629        let ipc_data = create_test_ipc_data(&schema);
2630
2631        let mut create_table_req = CreateTableRequest::new();
2632        create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2633        namespace
2634            .create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
2635            .await
2636            .unwrap();
2637
2638        let mut create_table_req = CreateTableRequest::new();
2639        create_table_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
2640        namespace
2641            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2642            .await
2643            .unwrap();
2644
2645        // List tables in each namespace
2646        let list_req = ListTablesRequest {
2647            id: Some(vec!["ns1".to_string()]),
2648            page_token: None,
2649            limit: None,
2650            ..Default::default()
2651        };
2652        let result = namespace.list_tables(list_req).await.unwrap();
2653        assert_eq!(result.tables.len(), 1);
2654        assert_eq!(result.tables[0], "table1");
2655
2656        let list_req = ListTablesRequest {
2657            id: Some(vec!["ns2".to_string()]),
2658            page_token: None,
2659            limit: None,
2660            ..Default::default()
2661        };
2662        let result = namespace.list_tables(list_req).await.unwrap();
2663        assert_eq!(result.tables.len(), 1);
2664        assert_eq!(result.tables[0], "table1");
2665
2666        // Drop table in ns1 shouldn't affect ns2
2667        let mut drop_req = DropTableRequest::new();
2668        drop_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2669        namespace.drop_table(drop_req).await.unwrap();
2670
2671        // Verify ns1 table is gone but ns2 table still exists
2672        let mut exists_req = TableExistsRequest::new();
2673        exists_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2674        assert!(namespace.table_exists(exists_req).await.is_err());
2675
2676        let mut exists_req = TableExistsRequest::new();
2677        exists_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
2678        assert!(namespace.table_exists(exists_req).await.is_ok());
2679    }
2680
2681    #[tokio::test]
2682    async fn test_migrate_directory_tables() {
2683        let temp_dir = TempStdDir::default();
2684        let temp_path = temp_dir.to_str().unwrap();
2685
2686        // Step 1: Create tables in directory-only mode
2687        let dir_only_ns = DirectoryNamespaceBuilder::new(temp_path)
2688            .manifest_enabled(false)
2689            .dir_listing_enabled(true)
2690            .build()
2691            .await
2692            .unwrap();
2693
2694        // Create some tables
2695        let schema = create_test_schema();
2696        let ipc_data = create_test_ipc_data(&schema);
2697
2698        for i in 1..=3 {
2699            let mut create_req = CreateTableRequest::new();
2700            create_req.id = Some(vec![format!("table{}", i)]);
2701            dir_only_ns
2702                .create_table(create_req, bytes::Bytes::from(ipc_data.clone()))
2703                .await
2704                .unwrap();
2705        }
2706
2707        drop(dir_only_ns);
2708
2709        // Step 2: Create namespace with dual mode (manifest + directory listing)
2710        let dual_mode_ns = DirectoryNamespaceBuilder::new(temp_path)
2711            .manifest_enabled(true)
2712            .dir_listing_enabled(true)
2713            .build()
2714            .await
2715            .unwrap();
2716
2717        // Before migration, tables should be visible (via directory listing fallback)
2718        let mut list_req = ListTablesRequest::new();
2719        list_req.id = Some(vec![]);
2720        let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
2721        assert_eq!(tables.len(), 3);
2722
2723        // Run migration
2724        let migrated_count = dual_mode_ns.migrate().await.unwrap();
2725        assert_eq!(migrated_count, 3, "Should migrate all 3 tables");
2726
2727        // Verify tables are now in manifest
2728        let mut list_req = ListTablesRequest::new();
2729        list_req.id = Some(vec![]);
2730        let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
2731        assert_eq!(tables.len(), 3);
2732
2733        // Run migration again - should be idempotent
2734        let migrated_count = dual_mode_ns.migrate().await.unwrap();
2735        assert_eq!(
2736            migrated_count, 0,
2737            "Should not migrate already-migrated tables"
2738        );
2739
2740        drop(dual_mode_ns);
2741
2742        // Step 3: Create namespace with manifest-only mode
2743        let manifest_only_ns = DirectoryNamespaceBuilder::new(temp_path)
2744            .manifest_enabled(true)
2745            .dir_listing_enabled(false)
2746            .build()
2747            .await
2748            .unwrap();
2749
2750        // Tables should still be accessible (now from manifest only)
2751        let mut list_req = ListTablesRequest::new();
2752        list_req.id = Some(vec![]);
2753        let tables = manifest_only_ns.list_tables(list_req).await.unwrap().tables;
2754        assert_eq!(tables.len(), 3);
2755        assert!(tables.contains(&"table1".to_string()));
2756        assert!(tables.contains(&"table2".to_string()));
2757        assert!(tables.contains(&"table3".to_string()));
2758    }
2759
2760    #[tokio::test]
2761    async fn test_migrate_without_manifest() {
2762        let temp_dir = TempStdDir::default();
2763        let temp_path = temp_dir.to_str().unwrap();
2764
2765        // Create namespace without manifest
2766        let namespace = DirectoryNamespaceBuilder::new(temp_path)
2767            .manifest_enabled(false)
2768            .dir_listing_enabled(true)
2769            .build()
2770            .await
2771            .unwrap();
2772
2773        // migrate() should return 0 when manifest is not enabled
2774        let migrated_count = namespace.migrate().await.unwrap();
2775        assert_eq!(migrated_count, 0);
2776    }
2777
2778    #[tokio::test]
2779    async fn test_register_table() {
2780        use lance_namespace::models::{RegisterTableRequest, TableExistsRequest};
2781
2782        let temp_dir = TempStdDir::default();
2783        let temp_path = temp_dir.to_str().unwrap();
2784
2785        let namespace = DirectoryNamespaceBuilder::new(temp_path)
2786            .build()
2787            .await
2788            .unwrap();
2789
2790        // Create a physical table first using lance directly
2791        let schema = create_test_schema();
2792        let ipc_data = create_test_ipc_data(&schema);
2793
2794        let table_uri = format!("{}/external_table.lance", temp_path);
2795        let cursor = Cursor::new(ipc_data);
2796        let stream_reader = StreamReader::try_new(cursor, None).unwrap();
2797        let batches: Vec<_> = stream_reader
2798            .collect::<std::result::Result<Vec<_>, _>>()
2799            .unwrap();
2800        let schema = batches[0].schema();
2801        let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
2802        let reader = RecordBatchIterator::new(batch_results, schema);
2803        Dataset::write(Box::new(reader), &table_uri, None)
2804            .await
2805            .unwrap();
2806
2807        // Register the table
2808        let mut register_req = RegisterTableRequest::new("external_table.lance".to_string());
2809        register_req.id = Some(vec!["registered_table".to_string()]);
2810
2811        let response = namespace.register_table(register_req).await.unwrap();
2812        assert_eq!(response.location, Some("external_table.lance".to_string()));
2813
2814        // Verify table exists in namespace
2815        let mut exists_req = TableExistsRequest::new();
2816        exists_req.id = Some(vec!["registered_table".to_string()]);
2817        assert!(namespace.table_exists(exists_req).await.is_ok());
2818
2819        // Verify we can list the table
2820        let mut list_req = ListTablesRequest::new();
2821        list_req.id = Some(vec![]);
2822        let tables = namespace.list_tables(list_req).await.unwrap();
2823        assert!(tables.tables.contains(&"registered_table".to_string()));
2824    }
2825
2826    #[tokio::test]
2827    async fn test_register_table_duplicate_fails() {
2828        use lance_namespace::models::RegisterTableRequest;
2829
2830        let temp_dir = TempStdDir::default();
2831        let temp_path = temp_dir.to_str().unwrap();
2832
2833        let namespace = DirectoryNamespaceBuilder::new(temp_path)
2834            .build()
2835            .await
2836            .unwrap();
2837
2838        // Register a table
2839        let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
2840        register_req.id = Some(vec!["test_table".to_string()]);
2841
2842        namespace
2843            .register_table(register_req.clone())
2844            .await
2845            .unwrap();
2846
2847        // Try to register again - should fail
2848        let result = namespace.register_table(register_req).await;
2849        assert!(result.is_err());
2850        assert!(result.unwrap_err().to_string().contains("already exists"));
2851    }
2852
2853    #[tokio::test]
2854    async fn test_deregister_table() {
2855        use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
2856
2857        let temp_dir = TempStdDir::default();
2858        let temp_path = temp_dir.to_str().unwrap();
2859
2860        // Create namespace with manifest-only mode (no directory listing fallback)
2861        // This ensures deregistered tables are truly invisible
2862        let namespace = DirectoryNamespaceBuilder::new(temp_path)
2863            .manifest_enabled(true)
2864            .dir_listing_enabled(false)
2865            .build()
2866            .await
2867            .unwrap();
2868
2869        // Create a table
2870        let schema = create_test_schema();
2871        let ipc_data = create_test_ipc_data(&schema);
2872
2873        let mut create_req = CreateTableRequest::new();
2874        create_req.id = Some(vec!["test_table".to_string()]);
2875        namespace
2876            .create_table(create_req, bytes::Bytes::from(ipc_data))
2877            .await
2878            .unwrap();
2879
2880        // Verify table exists
2881        let mut exists_req = TableExistsRequest::new();
2882        exists_req.id = Some(vec!["test_table".to_string()]);
2883        assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
2884
2885        // Deregister the table
2886        let mut deregister_req = DeregisterTableRequest::new();
2887        deregister_req.id = Some(vec!["test_table".to_string()]);
2888        let response = namespace.deregister_table(deregister_req).await.unwrap();
2889
2890        // Should return location and id
2891        assert!(
2892            response.location.is_some(),
2893            "Deregister should return location"
2894        );
2895        let location = response.location.as_ref().unwrap();
2896        // Location should be a proper file:// URI with the temp path
2897        // Use uri_to_url to normalize the temp path to a URL for comparison
2898        let expected_url = lance_io::object_store::uri_to_url(temp_path)
2899            .expect("Failed to convert temp path to URL");
2900        let expected_prefix = expected_url.to_string();
2901        assert!(
2902            location.starts_with(&expected_prefix),
2903            "Location should start with '{}', got: {}",
2904            expected_prefix,
2905            location
2906        );
2907        assert!(
2908            location.contains("test_table"),
2909            "Location should contain table name: {}",
2910            location
2911        );
2912        assert_eq!(response.id, Some(vec!["test_table".to_string()]));
2913
2914        // Verify table no longer exists in namespace (removed from manifest)
2915        assert!(namespace.table_exists(exists_req).await.is_err());
2916
2917        // Verify physical data still exists at the returned location
2918        let dataset = Dataset::open(location).await;
2919        assert!(
2920            dataset.is_ok(),
2921            "Physical table data should still exist at {}",
2922            location
2923        );
2924    }
2925
2926    #[tokio::test]
2927    async fn test_deregister_table_in_child_namespace() {
2928        use lance_namespace::models::{
2929            CreateNamespaceRequest, DeregisterTableRequest, TableExistsRequest,
2930        };
2931
2932        let temp_dir = TempStdDir::default();
2933        let temp_path = temp_dir.to_str().unwrap();
2934
2935        let namespace = DirectoryNamespaceBuilder::new(temp_path)
2936            .build()
2937            .await
2938            .unwrap();
2939
2940        // Create child namespace
2941        let mut create_ns_req = CreateNamespaceRequest::new();
2942        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2943        namespace.create_namespace(create_ns_req).await.unwrap();
2944
2945        // Create a table in the child namespace
2946        let schema = create_test_schema();
2947        let ipc_data = create_test_ipc_data(&schema);
2948
2949        let mut create_req = CreateTableRequest::new();
2950        create_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
2951        namespace
2952            .create_table(create_req, bytes::Bytes::from(ipc_data))
2953            .await
2954            .unwrap();
2955
2956        // Deregister the table
2957        let mut deregister_req = DeregisterTableRequest::new();
2958        deregister_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
2959        let response = namespace.deregister_table(deregister_req).await.unwrap();
2960
2961        // Should return location and id in child namespace
2962        assert!(
2963            response.location.is_some(),
2964            "Deregister should return location"
2965        );
2966        let location = response.location.as_ref().unwrap();
2967        // Location should be a proper file:// URI with the temp path
2968        // Use uri_to_url to normalize the temp path to a URL for comparison
2969        let expected_url = lance_io::object_store::uri_to_url(temp_path)
2970            .expect("Failed to convert temp path to URL");
2971        let expected_prefix = expected_url.to_string();
2972        assert!(
2973            location.starts_with(&expected_prefix),
2974            "Location should start with '{}', got: {}",
2975            expected_prefix,
2976            location
2977        );
2978        assert!(
2979            location.contains("test_ns") && location.contains("test_table"),
2980            "Location should contain namespace and table name: {}",
2981            location
2982        );
2983        assert_eq!(
2984            response.id,
2985            Some(vec!["test_ns".to_string(), "test_table".to_string()])
2986        );
2987
2988        // Verify table no longer exists
2989        let mut exists_req = TableExistsRequest::new();
2990        exists_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
2991        assert!(namespace.table_exists(exists_req).await.is_err());
2992    }
2993
2994    #[tokio::test]
2995    async fn test_register_without_manifest_fails() {
2996        use lance_namespace::models::RegisterTableRequest;
2997
2998        let temp_dir = TempStdDir::default();
2999        let temp_path = temp_dir.to_str().unwrap();
3000
3001        // Create namespace without manifest
3002        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3003            .manifest_enabled(false)
3004            .build()
3005            .await
3006            .unwrap();
3007
3008        // Try to register - should fail (register requires manifest)
3009        let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
3010        register_req.id = Some(vec!["test_table".to_string()]);
3011        let result = namespace.register_table(register_req).await;
3012        assert!(result.is_err());
3013        assert!(result
3014            .unwrap_err()
3015            .to_string()
3016            .contains("manifest mode is enabled"));
3017
3018        // Note: deregister_table now works in V1 mode via .lance-deregistered marker files
3019        // See test_deregister_table_v1_mode for that test case
3020    }
3021
3022    #[tokio::test]
3023    async fn test_register_table_rejects_absolute_uri() {
3024        use lance_namespace::models::RegisterTableRequest;
3025
3026        let temp_dir = TempStdDir::default();
3027        let temp_path = temp_dir.to_str().unwrap();
3028
3029        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3030            .build()
3031            .await
3032            .unwrap();
3033
3034        // Try to register with absolute URI - should fail
3035        let mut register_req = RegisterTableRequest::new("s3://bucket/table.lance".to_string());
3036        register_req.id = Some(vec!["test_table".to_string()]);
3037        let result = namespace.register_table(register_req).await;
3038        assert!(result.is_err());
3039        let err_msg = result.unwrap_err().to_string();
3040        assert!(err_msg.contains("Absolute URIs are not allowed"));
3041    }
3042
3043    #[tokio::test]
3044    async fn test_register_table_rejects_absolute_path() {
3045        use lance_namespace::models::RegisterTableRequest;
3046
3047        let temp_dir = TempStdDir::default();
3048        let temp_path = temp_dir.to_str().unwrap();
3049
3050        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3051            .build()
3052            .await
3053            .unwrap();
3054
3055        // Try to register with absolute path - should fail
3056        let mut register_req = RegisterTableRequest::new("/tmp/table.lance".to_string());
3057        register_req.id = Some(vec!["test_table".to_string()]);
3058        let result = namespace.register_table(register_req).await;
3059        assert!(result.is_err());
3060        let err_msg = result.unwrap_err().to_string();
3061        assert!(err_msg.contains("Absolute paths are not allowed"));
3062    }
3063
3064    #[tokio::test]
3065    async fn test_register_table_rejects_path_traversal() {
3066        use lance_namespace::models::RegisterTableRequest;
3067
3068        let temp_dir = TempStdDir::default();
3069        let temp_path = temp_dir.to_str().unwrap();
3070
3071        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3072            .build()
3073            .await
3074            .unwrap();
3075
3076        // Try to register with path traversal - should fail
3077        let mut register_req = RegisterTableRequest::new("../outside/table.lance".to_string());
3078        register_req.id = Some(vec!["test_table".to_string()]);
3079        let result = namespace.register_table(register_req).await;
3080        assert!(result.is_err());
3081        let err_msg = result.unwrap_err().to_string();
3082        assert!(err_msg.contains("Path traversal is not allowed"));
3083    }
3084
3085    #[tokio::test]
3086    async fn test_namespace_write() {
3087        use arrow::array::Int32Array;
3088        use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
3089        use arrow::record_batch::{RecordBatch, RecordBatchIterator};
3090        use lance::dataset::{Dataset, WriteMode, WriteParams};
3091        use lance_namespace::LanceNamespace;
3092
3093        let (namespace, _temp_dir) = create_test_namespace().await;
3094        let namespace = Arc::new(namespace) as Arc<dyn LanceNamespace>;
3095
3096        // Use child namespace instead of root
3097        let table_id = vec!["test_ns".to_string(), "test_table".to_string()];
3098        let schema = Arc::new(ArrowSchema::new(vec![
3099            ArrowField::new("a", DataType::Int32, false),
3100            ArrowField::new("b", DataType::Int32, false),
3101        ]));
3102
3103        // Test 1: CREATE mode
3104        let data1 = RecordBatch::try_new(
3105            schema.clone(),
3106            vec![
3107                Arc::new(Int32Array::from(vec![1, 2, 3])),
3108                Arc::new(Int32Array::from(vec![10, 20, 30])),
3109            ],
3110        )
3111        .unwrap();
3112
3113        let reader1 = RecordBatchIterator::new(vec![data1].into_iter().map(Ok), schema.clone());
3114        let dataset =
3115            Dataset::write_into_namespace(reader1, namespace.clone(), table_id.clone(), None)
3116                .await
3117                .unwrap();
3118
3119        assert_eq!(dataset.count_rows(None).await.unwrap(), 3);
3120        assert_eq!(dataset.version().version, 1);
3121
3122        // Test 2: APPEND mode
3123        let data2 = RecordBatch::try_new(
3124            schema.clone(),
3125            vec![
3126                Arc::new(Int32Array::from(vec![4, 5])),
3127                Arc::new(Int32Array::from(vec![40, 50])),
3128            ],
3129        )
3130        .unwrap();
3131
3132        let params_append = WriteParams {
3133            mode: WriteMode::Append,
3134            ..Default::default()
3135        };
3136
3137        let reader2 = RecordBatchIterator::new(vec![data2].into_iter().map(Ok), schema.clone());
3138        let dataset = Dataset::write_into_namespace(
3139            reader2,
3140            namespace.clone(),
3141            table_id.clone(),
3142            Some(params_append),
3143        )
3144        .await
3145        .unwrap();
3146
3147        assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
3148        assert_eq!(dataset.version().version, 2);
3149
3150        // Test 3: OVERWRITE mode
3151        let data3 = RecordBatch::try_new(
3152            schema.clone(),
3153            vec![
3154                Arc::new(Int32Array::from(vec![100, 200])),
3155                Arc::new(Int32Array::from(vec![1000, 2000])),
3156            ],
3157        )
3158        .unwrap();
3159
3160        let params_overwrite = WriteParams {
3161            mode: WriteMode::Overwrite,
3162            ..Default::default()
3163        };
3164
3165        let reader3 = RecordBatchIterator::new(vec![data3].into_iter().map(Ok), schema.clone());
3166        let dataset = Dataset::write_into_namespace(
3167            reader3,
3168            namespace.clone(),
3169            table_id.clone(),
3170            Some(params_overwrite),
3171        )
3172        .await
3173        .unwrap();
3174
3175        assert_eq!(dataset.count_rows(None).await.unwrap(), 2);
3176        assert_eq!(dataset.version().version, 3);
3177
3178        // Verify old data was replaced
3179        let result = dataset.scan().try_into_batch().await.unwrap();
3180        let a_col = result
3181            .column_by_name("a")
3182            .unwrap()
3183            .as_any()
3184            .downcast_ref::<Int32Array>()
3185            .unwrap();
3186        assert_eq!(a_col.values(), &[100, 200]);
3187    }
3188
3189    // ============================================================
3190    // Tests for declare_table
3191    // ============================================================
3192
3193    #[tokio::test]
3194    async fn test_declare_table_v1_mode() {
3195        use lance_namespace::models::{
3196            DeclareTableRequest, DescribeTableRequest, TableExistsRequest,
3197        };
3198
3199        let temp_dir = TempStdDir::default();
3200        let temp_path = temp_dir.to_str().unwrap();
3201
3202        // Create namespace in V1 mode (no manifest)
3203        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3204            .manifest_enabled(false)
3205            .build()
3206            .await
3207            .unwrap();
3208
3209        // Declare a table
3210        let mut declare_req = DeclareTableRequest::new();
3211        declare_req.id = Some(vec!["test_table".to_string()]);
3212        let response = namespace.declare_table(declare_req).await.unwrap();
3213
3214        // Should return location
3215        assert!(response.location.is_some());
3216        let location = response.location.as_ref().unwrap();
3217        assert!(location.ends_with("test_table.lance"));
3218
3219        // Table should exist (via reserved file)
3220        let mut exists_req = TableExistsRequest::new();
3221        exists_req.id = Some(vec!["test_table".to_string()]);
3222        assert!(namespace.table_exists(exists_req).await.is_ok());
3223
3224        // Describe should work but return no version/schema (not written yet)
3225        let mut describe_req = DescribeTableRequest::new();
3226        describe_req.id = Some(vec!["test_table".to_string()]);
3227        let describe_response = namespace.describe_table(describe_req).await.unwrap();
3228        assert!(describe_response.location.is_some());
3229        assert!(describe_response.version.is_none()); // Not written yet
3230        assert!(describe_response.schema.is_none()); // Not written yet
3231    }
3232
3233    #[tokio::test]
3234    async fn test_declare_table_with_manifest() {
3235        use lance_namespace::models::{DeclareTableRequest, TableExistsRequest};
3236
3237        let temp_dir = TempStdDir::default();
3238        let temp_path = temp_dir.to_str().unwrap();
3239
3240        // Create namespace with manifest
3241        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3242            .manifest_enabled(true)
3243            .dir_listing_enabled(false)
3244            .build()
3245            .await
3246            .unwrap();
3247
3248        // Declare a table
3249        let mut declare_req = DeclareTableRequest::new();
3250        declare_req.id = Some(vec!["test_table".to_string()]);
3251        let response = namespace.declare_table(declare_req).await.unwrap();
3252
3253        // Should return location
3254        assert!(response.location.is_some());
3255
3256        // Table should exist in manifest
3257        let mut exists_req = TableExistsRequest::new();
3258        exists_req.id = Some(vec!["test_table".to_string()]);
3259        assert!(namespace.table_exists(exists_req).await.is_ok());
3260    }
3261
3262    #[tokio::test]
3263    async fn test_declare_table_when_table_exists() {
3264        use lance_namespace::models::DeclareTableRequest;
3265
3266        let temp_dir = TempStdDir::default();
3267        let temp_path = temp_dir.to_str().unwrap();
3268
3269        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3270            .manifest_enabled(false)
3271            .build()
3272            .await
3273            .unwrap();
3274
3275        // First create a table with actual data
3276        let schema = create_test_schema();
3277        let ipc_data = create_test_ipc_data(&schema);
3278        let mut create_req = CreateTableRequest::new();
3279        create_req.id = Some(vec!["test_table".to_string()]);
3280        namespace
3281            .create_table(create_req, bytes::Bytes::from(ipc_data))
3282            .await
3283            .unwrap();
3284
3285        // Try to declare the same table - should fail because it already has data
3286        let mut declare_req = DeclareTableRequest::new();
3287        declare_req.id = Some(vec!["test_table".to_string()]);
3288        let result = namespace.declare_table(declare_req).await;
3289        assert!(result.is_err());
3290    }
3291
3292    // ============================================================
3293    // Tests for deregister_table in V1 mode
3294    // ============================================================
3295
3296    #[tokio::test]
3297    async fn test_deregister_table_v1_mode() {
3298        use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
3299
3300        let temp_dir = TempStdDir::default();
3301        let temp_path = temp_dir.to_str().unwrap();
3302
3303        // Create namespace in V1 mode (no manifest, with dir listing)
3304        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3305            .manifest_enabled(false)
3306            .dir_listing_enabled(true)
3307            .build()
3308            .await
3309            .unwrap();
3310
3311        // Create a table with data
3312        let schema = create_test_schema();
3313        let ipc_data = create_test_ipc_data(&schema);
3314        let mut create_req = CreateTableRequest::new();
3315        create_req.id = Some(vec!["test_table".to_string()]);
3316        namespace
3317            .create_table(create_req, bytes::Bytes::from(ipc_data))
3318            .await
3319            .unwrap();
3320
3321        // Verify table exists
3322        let mut exists_req = TableExistsRequest::new();
3323        exists_req.id = Some(vec!["test_table".to_string()]);
3324        assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
3325
3326        // Deregister the table
3327        let mut deregister_req = DeregisterTableRequest::new();
3328        deregister_req.id = Some(vec!["test_table".to_string()]);
3329        let response = namespace.deregister_table(deregister_req).await.unwrap();
3330
3331        // Should return location
3332        assert!(response.location.is_some());
3333        let location = response.location.as_ref().unwrap();
3334        assert!(location.contains("test_table"));
3335
3336        // Table should no longer exist (deregistered)
3337        let result = namespace.table_exists(exists_req).await;
3338        assert!(result.is_err());
3339        assert!(result.unwrap_err().to_string().contains("deregistered"));
3340
3341        // Physical data should still exist
3342        let dataset = Dataset::open(location).await;
3343        assert!(dataset.is_ok(), "Physical table data should still exist");
3344    }
3345
3346    #[tokio::test]
3347    async fn test_deregister_table_v1_already_deregistered() {
3348        use lance_namespace::models::DeregisterTableRequest;
3349
3350        let temp_dir = TempStdDir::default();
3351        let temp_path = temp_dir.to_str().unwrap();
3352
3353        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3354            .manifest_enabled(false)
3355            .dir_listing_enabled(true)
3356            .build()
3357            .await
3358            .unwrap();
3359
3360        // Create a table
3361        let schema = create_test_schema();
3362        let ipc_data = create_test_ipc_data(&schema);
3363        let mut create_req = CreateTableRequest::new();
3364        create_req.id = Some(vec!["test_table".to_string()]);
3365        namespace
3366            .create_table(create_req, bytes::Bytes::from(ipc_data))
3367            .await
3368            .unwrap();
3369
3370        // Deregister once
3371        let mut deregister_req = DeregisterTableRequest::new();
3372        deregister_req.id = Some(vec!["test_table".to_string()]);
3373        namespace
3374            .deregister_table(deregister_req.clone())
3375            .await
3376            .unwrap();
3377
3378        // Try to deregister again - should fail
3379        let result = namespace.deregister_table(deregister_req).await;
3380        assert!(result.is_err());
3381        assert!(result
3382            .unwrap_err()
3383            .to_string()
3384            .contains("already deregistered"));
3385    }
3386
3387    // ============================================================
3388    // Tests for list_tables skipping deregistered tables
3389    // ============================================================
3390
3391    #[tokio::test]
3392    async fn test_list_tables_skips_deregistered_v1() {
3393        use lance_namespace::models::DeregisterTableRequest;
3394
3395        let temp_dir = TempStdDir::default();
3396        let temp_path = temp_dir.to_str().unwrap();
3397
3398        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3399            .manifest_enabled(false)
3400            .dir_listing_enabled(true)
3401            .build()
3402            .await
3403            .unwrap();
3404
3405        // Create two tables
3406        let schema = create_test_schema();
3407        let ipc_data = create_test_ipc_data(&schema);
3408
3409        let mut create_req1 = CreateTableRequest::new();
3410        create_req1.id = Some(vec!["table1".to_string()]);
3411        namespace
3412            .create_table(create_req1, bytes::Bytes::from(ipc_data.clone()))
3413            .await
3414            .unwrap();
3415
3416        let mut create_req2 = CreateTableRequest::new();
3417        create_req2.id = Some(vec!["table2".to_string()]);
3418        namespace
3419            .create_table(create_req2, bytes::Bytes::from(ipc_data))
3420            .await
3421            .unwrap();
3422
3423        // List tables - should see both (root namespace = empty vec)
3424        let mut list_req = ListTablesRequest::new();
3425        list_req.id = Some(vec![]);
3426        let list_response = namespace.list_tables(list_req.clone()).await.unwrap();
3427        assert_eq!(list_response.tables.len(), 2);
3428
3429        // Deregister table1
3430        let mut deregister_req = DeregisterTableRequest::new();
3431        deregister_req.id = Some(vec!["table1".to_string()]);
3432        namespace.deregister_table(deregister_req).await.unwrap();
3433
3434        // List tables - should only see table2
3435        let list_response = namespace.list_tables(list_req).await.unwrap();
3436        assert_eq!(list_response.tables.len(), 1);
3437        assert!(list_response.tables.contains(&"table2".to_string()));
3438        assert!(!list_response.tables.contains(&"table1".to_string()));
3439    }
3440
3441    // ============================================================
3442    // Tests for describe_table and table_exists with deregistered tables
3443    // ============================================================
3444
3445    #[tokio::test]
3446    async fn test_describe_table_fails_for_deregistered_v1() {
3447        use lance_namespace::models::{DeregisterTableRequest, DescribeTableRequest};
3448
3449        let temp_dir = TempStdDir::default();
3450        let temp_path = temp_dir.to_str().unwrap();
3451
3452        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3453            .manifest_enabled(false)
3454            .dir_listing_enabled(true)
3455            .build()
3456            .await
3457            .unwrap();
3458
3459        // Create a table
3460        let schema = create_test_schema();
3461        let ipc_data = create_test_ipc_data(&schema);
3462        let mut create_req = CreateTableRequest::new();
3463        create_req.id = Some(vec!["test_table".to_string()]);
3464        namespace
3465            .create_table(create_req, bytes::Bytes::from(ipc_data))
3466            .await
3467            .unwrap();
3468
3469        // Describe should work before deregistration
3470        let mut describe_req = DescribeTableRequest::new();
3471        describe_req.id = Some(vec!["test_table".to_string()]);
3472        assert!(namespace.describe_table(describe_req.clone()).await.is_ok());
3473
3474        // Deregister
3475        let mut deregister_req = DeregisterTableRequest::new();
3476        deregister_req.id = Some(vec!["test_table".to_string()]);
3477        namespace.deregister_table(deregister_req).await.unwrap();
3478
3479        // Describe should fail after deregistration
3480        let result = namespace.describe_table(describe_req).await;
3481        assert!(result.is_err());
3482        assert!(result.unwrap_err().to_string().contains("deregistered"));
3483    }
3484
3485    #[tokio::test]
3486    async fn test_table_exists_fails_for_deregistered_v1() {
3487        use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
3488
3489        let temp_dir = TempStdDir::default();
3490        let temp_path = temp_dir.to_str().unwrap();
3491
3492        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3493            .manifest_enabled(false)
3494            .dir_listing_enabled(true)
3495            .build()
3496            .await
3497            .unwrap();
3498
3499        // Create a table
3500        let schema = create_test_schema();
3501        let ipc_data = create_test_ipc_data(&schema);
3502        let mut create_req = CreateTableRequest::new();
3503        create_req.id = Some(vec!["test_table".to_string()]);
3504        namespace
3505            .create_table(create_req, bytes::Bytes::from(ipc_data))
3506            .await
3507            .unwrap();
3508
3509        // Table exists should work before deregistration
3510        let mut exists_req = TableExistsRequest::new();
3511        exists_req.id = Some(vec!["test_table".to_string()]);
3512        assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
3513
3514        // Deregister
3515        let mut deregister_req = DeregisterTableRequest::new();
3516        deregister_req.id = Some(vec!["test_table".to_string()]);
3517        namespace.deregister_table(deregister_req).await.unwrap();
3518
3519        // Table exists should fail after deregistration
3520        let result = namespace.table_exists(exists_req).await;
3521        assert!(result.is_err());
3522        assert!(result.unwrap_err().to_string().contains("deregistered"));
3523    }
3524
3525    #[tokio::test]
3526    async fn test_atomic_table_status_check() {
3527        // This test verifies that the TableStatus check is atomic
3528        // by ensuring a single directory listing is used
3529
3530        let temp_dir = TempStdDir::default();
3531        let temp_path = temp_dir.to_str().unwrap();
3532
3533        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3534            .manifest_enabled(false)
3535            .dir_listing_enabled(true)
3536            .build()
3537            .await
3538            .unwrap();
3539
3540        // Create a table
3541        let schema = create_test_schema();
3542        let ipc_data = create_test_ipc_data(&schema);
3543        let mut create_req = CreateTableRequest::new();
3544        create_req.id = Some(vec!["test_table".to_string()]);
3545        namespace
3546            .create_table(create_req, bytes::Bytes::from(ipc_data))
3547            .await
3548            .unwrap();
3549
3550        // Table status should show exists=true, is_deregistered=false
3551        let status = namespace.check_table_status("test_table").await;
3552        assert!(status.exists);
3553        assert!(!status.is_deregistered);
3554        assert!(!status.has_reserved_file);
3555    }
3556}