Skip to main content

lance_namespace_impls/
dir.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Directory-based Lance Namespace implementation.
5//!
6//! This module provides a directory-based implementation of the Lance namespace
7//! that stores tables as Lance datasets in a filesystem directory structure.
8
9pub mod manifest;
10
11use arrow::record_batch::RecordBatchIterator;
12use arrow_ipc::reader::StreamReader;
13use async_trait::async_trait;
14use bytes::Bytes;
15use futures::TryStreamExt;
16use lance::dataset::builder::DatasetBuilder;
17use lance::dataset::{Dataset, WriteMode, WriteParams};
18use lance::session::Session;
19use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
20use lance_table::io::commit::ManifestNamingScheme;
21use object_store::path::Path;
22use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, PutMode, PutOptions};
23use std::collections::HashMap;
24use std::io::Cursor;
25use std::sync::Arc;
26
27use crate::context::DynamicContextProvider;
28use lance_namespace::models::{
29    BatchDeleteTableVersionsRequest, BatchDeleteTableVersionsResponse, CreateNamespaceRequest,
30    CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, CreateTableVersionRequest,
31    CreateTableVersionResponse, DeclareTableRequest, DeclareTableResponse,
32    DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
33    DescribeTableResponse, DescribeTableVersionRequest, DescribeTableVersionResponse,
34    DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, DropTableResponse, Identity,
35    ListNamespacesRequest, ListNamespacesResponse, ListTableVersionsRequest,
36    ListTableVersionsResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest,
37    TableExistsRequest, TableVersion,
38};
39
40use lance_core::{Error, Result, box_error};
41use lance_namespace::LanceNamespace;
42use lance_namespace::schema::arrow_schema_to_json;
43
44use crate::credentials::{
45    CredentialVendor, create_credential_vendor_for_location, has_credential_vendor_config,
46};
47
48/// Result of checking table status atomically.
49///
50/// This struct captures the state of a table directory in a single snapshot,
51/// avoiding race conditions between checking existence and other status flags.
52pub(crate) struct TableStatus {
53    /// Whether the table directory exists (has any files)
54    pub(crate) exists: bool,
55    /// Whether the table has a `.lance-deregistered` marker file
56    pub(crate) is_deregistered: bool,
57    /// Whether the table has a `.lance-reserved` marker file (declared but not written)
58    pub(crate) has_reserved_file: bool,
59}
60
61/// Builder for creating a DirectoryNamespace.
62///
63/// This builder provides a fluent API for configuring and establishing
64/// connections to directory-based Lance namespaces.
65///
66/// # Examples
67///
68/// ```no_run
69/// # use lance_namespace_impls::DirectoryNamespaceBuilder;
70/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
71/// // Create a local directory namespace
72/// let namespace = DirectoryNamespaceBuilder::new("/path/to/data")
73///     .build()
74///     .await?;
75/// # Ok(())
76/// # }
77/// ```
78///
79/// ```no_run
80/// # use lance_namespace_impls::DirectoryNamespaceBuilder;
81/// # use lance::session::Session;
82/// # use std::sync::Arc;
83/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
84/// // Create with custom storage options and session
85/// let session = Arc::new(Session::default());
86/// let namespace = DirectoryNamespaceBuilder::new("s3://bucket/path")
87///     .storage_option("region", "us-west-2")
88///     .storage_option("access_key_id", "key")
89///     .session(session)
90///     .build()
91///     .await?;
92/// # Ok(())
93/// # }
94/// ```
95#[derive(Clone)]
96pub struct DirectoryNamespaceBuilder {
97    root: String,
98    storage_options: Option<HashMap<String, String>>,
99    session: Option<Arc<Session>>,
100    manifest_enabled: bool,
101    dir_listing_enabled: bool,
102    inline_optimization_enabled: bool,
103    table_version_tracking_enabled: bool,
104    /// When true, table versions are stored in the `__manifest` table instead of
105    /// relying on Lance's native version management.
106    table_version_storage_enabled: bool,
107    credential_vendor_properties: HashMap<String, String>,
108    context_provider: Option<Arc<dyn DynamicContextProvider>>,
109    commit_retries: Option<u32>,
110}
111
112impl std::fmt::Debug for DirectoryNamespaceBuilder {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        f.debug_struct("DirectoryNamespaceBuilder")
115            .field("root", &self.root)
116            .field("storage_options", &self.storage_options)
117            .field("manifest_enabled", &self.manifest_enabled)
118            .field("dir_listing_enabled", &self.dir_listing_enabled)
119            .field(
120                "inline_optimization_enabled",
121                &self.inline_optimization_enabled,
122            )
123            .field(
124                "table_version_tracking_enabled",
125                &self.table_version_tracking_enabled,
126            )
127            .field(
128                "table_version_storage_enabled",
129                &self.table_version_storage_enabled,
130            )
131            .field(
132                "context_provider",
133                &self.context_provider.as_ref().map(|_| "Some(...)"),
134            )
135            .finish()
136    }
137}
138
139impl DirectoryNamespaceBuilder {
140    /// Create a new DirectoryNamespaceBuilder with the specified root path.
141    ///
142    /// # Arguments
143    ///
144    /// * `root` - Root directory path (local path or cloud URI like s3://bucket/path)
145    pub fn new(root: impl Into<String>) -> Self {
146        Self {
147            root: root.into().trim_end_matches('/').to_string(),
148            storage_options: None,
149            session: None,
150            manifest_enabled: true,
151            dir_listing_enabled: true, // Default to enabled for backwards compatibility
152            inline_optimization_enabled: true,
153            table_version_tracking_enabled: false, // Default to disabled
154            table_version_storage_enabled: false,  // Default to disabled
155            credential_vendor_properties: HashMap::new(),
156            context_provider: None,
157            commit_retries: None,
158        }
159    }
160
161    /// Enable or disable manifest-based listing.
162    ///
163    /// When enabled (default), the namespace uses a `__manifest` table to track tables.
164    /// When disabled, relies solely on directory scanning.
165    pub fn manifest_enabled(mut self, enabled: bool) -> Self {
166        self.manifest_enabled = enabled;
167        self
168    }
169
170    /// Enable or disable directory-based listing fallback.
171    ///
172    /// When enabled (default), falls back to directory scanning for tables not in the manifest.
173    /// When disabled, only consults the manifest table.
174    pub fn dir_listing_enabled(mut self, enabled: bool) -> Self {
175        self.dir_listing_enabled = enabled;
176        self
177    }
178
179    /// Enable or disable inline optimization of the __manifest table.
180    ///
181    /// When enabled (default), performs compaction and indexing on the __manifest table
182    /// after every write operation to maintain optimal performance.
183    /// When disabled, manual optimization must be performed separately.
184    pub fn inline_optimization_enabled(mut self, enabled: bool) -> Self {
185        self.inline_optimization_enabled = enabled;
186        self
187    }
188
189    /// Enable or disable table version tracking through the namespace.
190    ///
191    /// When enabled, `describe_table` returns `managed_versioning: true` to indicate
192    /// that commits should go through the namespace's table version APIs rather than
193    /// direct object store operations.
194    ///
195    /// When disabled (default), `managed_versioning` is not set.
196    pub fn table_version_tracking_enabled(mut self, enabled: bool) -> Self {
197        self.table_version_tracking_enabled = enabled;
198        self
199    }
200
201    /// Enable or disable table version management through the `__manifest` table.
202    ///
203    /// When enabled, table versions are tracked as `table_version` entries in the
204    /// `__manifest` Lance table. This enables:
205    /// - Centralized version tracking instead of per-table `_versions/` directories
206    ///
207    /// Requires `manifest_enabled` to be true.
208    /// When disabled (default), version storage uses per-table storage operations.
209    pub fn table_version_storage_enabled(mut self, enabled: bool) -> Self {
210        self.table_version_storage_enabled = enabled;
211        self
212    }
213
214    /// Create a DirectoryNamespaceBuilder from properties HashMap.
215    ///
216    /// This method parses a properties map into builder configuration.
217    /// It expects:
218    /// - `root`: The root directory path (required)
219    /// - `manifest_enabled`: Enable manifest-based table tracking (optional, default: true)
220    /// - `dir_listing_enabled`: Enable directory listing for table discovery (optional, default: true)
221    /// - `inline_optimization_enabled`: Enable inline optimization of __manifest table (optional, default: true)
222    /// - `storage.*`: Storage options (optional, prefix will be stripped)
223    ///
224    /// Credential vendor properties (prefixed with `credential_vendor.`, prefix is stripped):
225    /// - `credential_vendor.enabled`: Set to "true" to enable credential vending (required)
226    /// - `credential_vendor.permission`: Permission level: read, write, or admin (default: read)
227    ///
228    /// AWS-specific properties (for s3:// locations):
229    /// - `credential_vendor.aws_role_arn`: AWS IAM role ARN (required for AWS)
230    /// - `credential_vendor.aws_external_id`: AWS external ID (optional)
231    /// - `credential_vendor.aws_region`: AWS region (optional)
232    /// - `credential_vendor.aws_role_session_name`: AWS role session name (optional)
233    /// - `credential_vendor.aws_duration_millis`: Credential duration in ms (default: 3600000, range: 15min-12hrs)
234    ///
235    /// GCP-specific properties (for gs:// locations):
236    /// - `credential_vendor.gcp_service_account`: Service account to impersonate (optional)
237    ///
238    /// Note: GCP uses Application Default Credentials (ADC). To use a service account key file,
239    /// set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable before starting.
240    /// GCP token duration cannot be configured; it's determined by the STS endpoint (typically 1 hour).
241    ///
242    /// Azure-specific properties (for az:// locations):
243    /// - `credential_vendor.azure_account_name`: Azure storage account name (required for Azure)
244    /// - `credential_vendor.azure_tenant_id`: Azure tenant ID (optional)
245    /// - `credential_vendor.azure_duration_millis`: Credential duration in ms (default: 3600000, up to 7 days)
246    ///
247    /// # Arguments
248    ///
249    /// * `properties` - Configuration properties
250    /// * `session` - Optional Lance session to reuse object store registry
251    ///
252    /// # Returns
253    ///
254    /// Returns a `DirectoryNamespaceBuilder` instance.
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if the `root` property is missing.
259    ///
260    /// # Examples
261    ///
262    /// ```no_run
263    /// # use lance_namespace_impls::DirectoryNamespaceBuilder;
264    /// # use std::collections::HashMap;
265    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
266    /// let mut properties = HashMap::new();
267    /// properties.insert("root".to_string(), "/path/to/data".to_string());
268    /// properties.insert("manifest_enabled".to_string(), "true".to_string());
269    /// properties.insert("dir_listing_enabled".to_string(), "false".to_string());
270    /// properties.insert("storage.region".to_string(), "us-west-2".to_string());
271    ///
272    /// let namespace = DirectoryNamespaceBuilder::from_properties(properties, None)?
273    ///     .build()
274    ///     .await?;
275    /// # Ok(())
276    /// # }
277    /// ```
278    pub fn from_properties(
279        properties: HashMap<String, String>,
280        session: Option<Arc<Session>>,
281    ) -> Result<Self> {
282        // Extract root from properties (required)
283        let root = properties.get("root").cloned().ok_or_else(|| {
284            Error::namespace_source(
285                "Missing required property 'root' for directory namespace".into(),
286            )
287        })?;
288
289        // Extract storage options (properties prefixed with "storage.")
290        let storage_options: HashMap<String, String> = properties
291            .iter()
292            .filter_map(|(k, v)| {
293                k.strip_prefix("storage.")
294                    .map(|key| (key.to_string(), v.clone()))
295            })
296            .collect();
297
298        let storage_options = if storage_options.is_empty() {
299            None
300        } else {
301            Some(storage_options)
302        };
303
304        // Extract manifest_enabled (default: true)
305        let manifest_enabled = properties
306            .get("manifest_enabled")
307            .and_then(|v| v.parse::<bool>().ok())
308            .unwrap_or(true);
309
310        // Extract dir_listing_enabled (default: true)
311        let dir_listing_enabled = properties
312            .get("dir_listing_enabled")
313            .and_then(|v| v.parse::<bool>().ok())
314            .unwrap_or(true);
315
316        // Extract inline_optimization_enabled (default: true)
317        let inline_optimization_enabled = properties
318            .get("inline_optimization_enabled")
319            .and_then(|v| v.parse::<bool>().ok())
320            .unwrap_or(true);
321
322        // Extract table_version_tracking_enabled (default: false)
323        let table_version_tracking_enabled = properties
324            .get("table_version_tracking_enabled")
325            .and_then(|v| v.parse::<bool>().ok())
326            .unwrap_or(false);
327
328        // Extract table_version_storage_enabled (default: false)
329        let table_version_storage_enabled = properties
330            .get("table_version_storage_enabled")
331            .and_then(|v| v.parse::<bool>().ok())
332            .unwrap_or(false);
333
334        // Extract credential vendor properties (properties prefixed with "credential_vendor.")
335        // The prefix is stripped to get short property names
336        // The build() method will check if enabled=true before creating the vendor
337        let credential_vendor_properties: HashMap<String, String> = properties
338            .iter()
339            .filter_map(|(k, v)| {
340                k.strip_prefix("credential_vendor.")
341                    .map(|key| (key.to_string(), v.clone()))
342            })
343            .collect();
344
345        let commit_retries = properties
346            .get("commit_retries")
347            .and_then(|v| v.parse::<u32>().ok());
348
349        Ok(Self {
350            root: root.trim_end_matches('/').to_string(),
351            storage_options,
352            session,
353            manifest_enabled,
354            dir_listing_enabled,
355            inline_optimization_enabled,
356            table_version_tracking_enabled,
357            table_version_storage_enabled,
358            credential_vendor_properties,
359            context_provider: None,
360            commit_retries,
361        })
362    }
363
364    /// Add a storage option.
365    ///
366    /// # Arguments
367    ///
368    /// * `key` - Storage option key (e.g., "region", "access_key_id")
369    /// * `value` - Storage option value
370    pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
371        self.storage_options
372            .get_or_insert_with(HashMap::new)
373            .insert(key.into(), value.into());
374        self
375    }
376
377    /// Add multiple storage options.
378    ///
379    /// # Arguments
380    ///
381    /// * `options` - HashMap of storage options to add
382    pub fn storage_options(mut self, options: HashMap<String, String>) -> Self {
383        self.storage_options
384            .get_or_insert_with(HashMap::new)
385            .extend(options);
386        self
387    }
388
389    /// Set the Lance session to use for this namespace.
390    ///
391    /// When a session is provided, the namespace will reuse the session's
392    /// object store registry, allowing multiple namespaces and datasets
393    /// to share the same underlying storage connections.
394    ///
395    /// # Arguments
396    ///
397    /// * `session` - Arc-wrapped Lance session
398    pub fn session(mut self, session: Arc<Session>) -> Self {
399        self.session = Some(session);
400        self
401    }
402
403    /// Set the number of retries for commit operations on the manifest table.
404    /// If not set, defaults to [`lance_table::io::commit::CommitConfig`] default (20).
405    pub fn commit_retries(mut self, retries: u32) -> Self {
406        self.commit_retries = Some(retries);
407        self
408    }
409
410    /// Add a credential vendor property.
411    ///
412    /// Use short property names without the `credential_vendor.` prefix.
413    /// Common properties: `enabled`, `permission`.
414    /// AWS properties: `aws_role_arn`, `aws_external_id`, `aws_region`, `aws_role_session_name`, `aws_duration_millis`.
415    /// GCP properties: `gcp_service_account`.
416    /// Azure properties: `azure_account_name`, `azure_tenant_id`, `azure_duration_millis`.
417    ///
418    /// # Arguments
419    ///
420    /// * `key` - Property key (e.g., "enabled", "aws_role_arn")
421    /// * `value` - Property value
422    ///
423    /// # Example
424    ///
425    /// ```no_run
426    /// # use lance_namespace_impls::DirectoryNamespaceBuilder;
427    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
428    /// let namespace = DirectoryNamespaceBuilder::new("s3://my-bucket/data")
429    ///     .credential_vendor_property("enabled", "true")
430    ///     .credential_vendor_property("aws_role_arn", "arn:aws:iam::123456789012:role/MyRole")
431    ///     .credential_vendor_property("permission", "read")
432    ///     .build()
433    ///     .await?;
434    /// # Ok(())
435    /// # }
436    /// ```
437    pub fn credential_vendor_property(
438        mut self,
439        key: impl Into<String>,
440        value: impl Into<String>,
441    ) -> Self {
442        self.credential_vendor_properties
443            .insert(key.into(), value.into());
444        self
445    }
446
447    /// Add multiple credential vendor properties.
448    ///
449    /// Use short property names without the `credential_vendor.` prefix.
450    ///
451    /// # Arguments
452    ///
453    /// * `properties` - HashMap of credential vendor properties to add
454    pub fn credential_vendor_properties(mut self, properties: HashMap<String, String>) -> Self {
455        self.credential_vendor_properties.extend(properties);
456        self
457    }
458
459    /// Set a dynamic context provider for per-request context.
460    ///
461    /// The provider can be used to generate additional context for operations.
462    /// For DirectoryNamespace, the context is stored but not directly used
463    /// in operations (unlike RestNamespace where it's converted to HTTP headers).
464    ///
465    /// # Arguments
466    ///
467    /// * `provider` - The context provider implementation
468    pub fn context_provider(mut self, provider: Arc<dyn DynamicContextProvider>) -> Self {
469        self.context_provider = Some(provider);
470        self
471    }
472
473    /// Build the DirectoryNamespace.
474    ///
475    /// # Returns
476    ///
477    /// Returns a `DirectoryNamespace` instance.
478    ///
479    /// # Errors
480    ///
481    /// Returns an error if:
482    /// - The root path is invalid
483    /// - Connection to the storage backend fails
484    /// - Storage options are invalid
485    pub async fn build(self) -> Result<DirectoryNamespace> {
486        // Validate: table_version_storage_enabled requires manifest_enabled
487        if self.table_version_storage_enabled && !self.manifest_enabled {
488            return Err(Error::invalid_input(
489                "table_version_storage_enabled requires manifest_enabled=true",
490            ));
491        }
492
493        let (object_store, base_path) =
494            Self::initialize_object_store(&self.root, &self.storage_options, &self.session).await?;
495
496        let manifest_ns = if self.manifest_enabled {
497            match manifest::ManifestNamespace::from_directory(
498                self.root.clone(),
499                self.storage_options.clone(),
500                self.session.clone(),
501                object_store.clone(),
502                base_path.clone(),
503                self.dir_listing_enabled,
504                self.inline_optimization_enabled,
505                self.commit_retries,
506                self.table_version_storage_enabled,
507            )
508            .await
509            {
510                Ok(ns) => Some(Arc::new(ns)),
511                Err(e) => {
512                    // Failed to initialize manifest namespace, fall back to directory listing only
513                    log::warn!(
514                        "Failed to initialize manifest namespace, falling back to directory listing only: {}",
515                        e
516                    );
517                    None
518                }
519            }
520        } else {
521            None
522        };
523
524        // Create credential vendor once during initialization if enabled
525        let credential_vendor = if has_credential_vendor_config(&self.credential_vendor_properties)
526        {
527            create_credential_vendor_for_location(&self.root, &self.credential_vendor_properties)
528                .await?
529                .map(Arc::from)
530        } else {
531            None
532        };
533
534        Ok(DirectoryNamespace {
535            root: self.root,
536            storage_options: self.storage_options,
537            session: self.session,
538            object_store,
539            base_path,
540            manifest_ns,
541            dir_listing_enabled: self.dir_listing_enabled,
542            table_version_tracking_enabled: self.table_version_tracking_enabled,
543            table_version_storage_enabled: self.table_version_storage_enabled,
544            credential_vendor,
545            context_provider: self.context_provider,
546        })
547    }
548
549    /// Initialize the Lance ObjectStore based on the configuration
550    async fn initialize_object_store(
551        root: &str,
552        storage_options: &Option<HashMap<String, String>>,
553        session: &Option<Arc<Session>>,
554    ) -> Result<(Arc<ObjectStore>, Path)> {
555        // Build ObjectStoreParams from storage options
556        let accessor = storage_options.clone().map(|opts| {
557            Arc::new(lance_io::object_store::StorageOptionsAccessor::with_static_options(opts))
558        });
559        let params = ObjectStoreParams {
560            storage_options_accessor: accessor,
561            ..Default::default()
562        };
563
564        // Use object store registry from session if provided, otherwise create a new one
565        let registry = if let Some(session) = session {
566            session.store_registry()
567        } else {
568            Arc::new(ObjectStoreRegistry::default())
569        };
570
571        // Use Lance's object store factory to create from URI
572        let (object_store, base_path) = ObjectStore::from_uri_and_params(registry, root, &params)
573            .await
574            .map_err(|e| {
575                Error::namespace_source(format!("Failed to create object store: {}", e).into())
576            })?;
577
578        Ok((object_store, base_path))
579    }
580}
581
582/// Directory-based implementation of Lance Namespace.
583///
584/// This implementation stores tables as Lance datasets in a directory structure.
585/// It supports local filesystems and cloud storage backends through Lance's object store.
586///
587/// ## Manifest-based Listing
588///
589/// When `manifest_enabled=true`, the namespace uses a special `__manifest` Lance table to track tables
590/// instead of scanning the filesystem. This provides:
591/// - Better performance for listing operations
592/// - Ability to track table metadata
593/// - Foundation for future features like namespaces and table renaming
594///
595/// When `dir_listing_enabled=true`, the namespace falls back to directory scanning for tables not
596/// found in the manifest, enabling gradual migration.
597///
598/// ## Credential Vending
599///
600/// When credential vendor properties are configured, `describe_table` will vend temporary
601/// credentials based on the table location URI. The vendor type is auto-selected:
602/// - `s3://` locations use AWS STS AssumeRole
603/// - `gs://` locations use GCP OAuth2 tokens
604/// - `az://` locations use Azure SAS tokens
605pub struct DirectoryNamespace {
606    root: String,
607    storage_options: Option<HashMap<String, String>>,
608    #[allow(dead_code)]
609    session: Option<Arc<Session>>,
610    object_store: Arc<ObjectStore>,
611    base_path: Path,
612    manifest_ns: Option<Arc<manifest::ManifestNamespace>>,
613    dir_listing_enabled: bool,
614    /// When true, `describe_table` returns `managed_versioning: true` to indicate
615    /// commits should go through namespace table version APIs.
616    table_version_tracking_enabled: bool,
617    /// When true, table versions are stored in the `__manifest` table.
618    table_version_storage_enabled: bool,
619    /// Credential vendor created once during initialization.
620    /// Used to vend temporary credentials for table access.
621    credential_vendor: Option<Arc<dyn CredentialVendor>>,
622    /// Dynamic context provider for per-request context.
623    /// Stored but not directly used in operations (available for future extensions).
624    #[allow(dead_code)]
625    context_provider: Option<Arc<dyn DynamicContextProvider>>,
626}
627
628impl std::fmt::Debug for DirectoryNamespace {
629    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
630        write!(f, "{}", self.namespace_id())
631    }
632}
633
634impl std::fmt::Display for DirectoryNamespace {
635    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
636        write!(f, "{}", self.namespace_id())
637    }
638}
639
640/// Describes the version ranges to delete for a single table.
641/// Used by `batch_delete_table_versions` and `delete_physical_version_files`.
642struct TableDeleteEntry {
643    table_id: Option<Vec<String>>,
644    ranges: Vec<(i64, i64)>,
645}
646
647impl DirectoryNamespace {
648    /// Apply pagination to a list of table names
649    ///
650    /// Sorts the list alphabetically and applies pagination using page_token (start_after) and limit.
651    ///
652    /// # Arguments
653    /// * `names` - The vector of table names to paginate
654    /// * `page_token` - Skip items until finding one greater than this value (start_after semantics)
655    /// * `limit` - Maximum number of items to keep
656    fn apply_pagination(names: &mut Vec<String>, page_token: Option<String>, limit: Option<i32>) {
657        // Sort alphabetically for consistent ordering
658        names.sort();
659
660        // Apply page_token filtering (start_after semantics)
661        if let Some(start_after) = page_token {
662            if let Some(index) = names
663                .iter()
664                .position(|name| name.as_str() > start_after.as_str())
665            {
666                names.drain(0..index);
667            } else {
668                names.clear();
669            }
670        }
671
672        // Apply limit
673        if let Some(limit) = limit
674            && limit >= 0
675        {
676            names.truncate(limit as usize);
677        }
678    }
679
680    /// List tables using directory scanning (fallback method)
681    async fn list_directory_tables(&self) -> Result<Vec<String>> {
682        let mut tables = Vec::new();
683        let entries = self
684            .object_store
685            .read_dir(self.base_path.clone())
686            .await
687            .map_err(|e| {
688                Error::io_source(box_error(std::io::Error::other(format!(
689                    "Failed to list directory: {}",
690                    e
691                ))))
692            })?;
693
694        for entry in entries {
695            let path = entry.trim_end_matches('/');
696            if !path.ends_with(".lance") {
697                continue;
698            }
699
700            let table_name = &path[..path.len() - 6];
701
702            // Use atomic check to skip deregistered tables and declared-but-not-written tables
703            let status = self.check_table_status(table_name).await;
704            if status.is_deregistered || status.has_reserved_file {
705                continue;
706            }
707
708            tables.push(table_name.to_string());
709        }
710
711        Ok(tables)
712    }
713
714    /// Validate that the namespace ID represents the root namespace
715    fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
716        if let Some(id) = id
717            && !id.is_empty()
718        {
719            return Err(Error::namespace_source(format!(
720                "Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
721                id
722            ).into()));
723        }
724        Ok(())
725    }
726
727    /// Extract table name from table ID
728    fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
729        let id = id.as_ref().ok_or_else(|| {
730            Error::namespace_source("Directory namespace table ID cannot be empty".into())
731        })?;
732
733        if id.len() != 1 {
734            return Err(Error::namespace_source(format!(
735                "Multi-level table IDs are only supported when manifest mode is enabled, but got: {:?}",
736                id
737            )
738            .into()));
739        }
740
741        Ok(id[0].clone())
742    }
743
744    async fn resolve_table_location(&self, id: &Option<Vec<String>>) -> Result<String> {
745        let mut describe_req = DescribeTableRequest::new();
746        describe_req.id = id.clone();
747        describe_req.load_detailed_metadata = Some(false);
748
749        let describe_resp = self.describe_table(describe_req).await?;
750
751        describe_resp.location.ok_or_else(|| {
752            Error::namespace_source(format!("Table location not found for: {:?}", id).into())
753        })
754    }
755
756    fn table_full_uri(&self, table_name: &str) -> String {
757        format!("{}/{}.lance", &self.root, table_name)
758    }
759
760    fn uri_to_object_store_path(uri: &str) -> Path {
761        let path_str = if let Some(rest) = uri.strip_prefix("file://") {
762            rest
763        } else if let Some(rest) = uri.strip_prefix("s3://") {
764            rest.split_once('/').map(|(_, p)| p).unwrap_or(rest)
765        } else if let Some(rest) = uri.strip_prefix("gs://") {
766            rest.split_once('/').map(|(_, p)| p).unwrap_or(rest)
767        } else if let Some(rest) = uri.strip_prefix("az://") {
768            rest.split_once('/').map(|(_, p)| p).unwrap_or(rest)
769        } else {
770            uri
771        };
772        Path::from(path_str)
773    }
774
775    /// Get the object store path for a table (relative to base_path)
776    fn table_path(&self, table_name: &str) -> Path {
777        self.base_path
778            .child(format!("{}.lance", table_name).as_str())
779    }
780
781    /// Get the reserved file path for a table
782    fn table_reserved_file_path(&self, table_name: &str) -> Path {
783        self.base_path
784            .child(format!("{}.lance", table_name).as_str())
785            .child(".lance-reserved")
786    }
787
788    /// Get the deregistered marker file path for a table
789    fn table_deregistered_file_path(&self, table_name: &str) -> Path {
790        self.base_path
791            .child(format!("{}.lance", table_name).as_str())
792            .child(".lance-deregistered")
793    }
794
795    /// Atomically check table existence and deregistration status.
796    ///
797    /// This performs a single directory listing to get a consistent snapshot of the
798    /// table's state, avoiding race conditions between checking existence and
799    /// checking deregistration status.
800    pub(crate) async fn check_table_status(&self, table_name: &str) -> TableStatus {
801        let table_path = self.table_path(table_name);
802        match self.object_store.read_dir(table_path).await {
803            Ok(entries) => {
804                let exists = !entries.is_empty();
805                let is_deregistered = entries.iter().any(|e| e.ends_with(".lance-deregistered"));
806                let has_reserved_file = entries.iter().any(|e| e.ends_with(".lance-reserved"));
807                TableStatus {
808                    exists,
809                    is_deregistered,
810                    has_reserved_file,
811                }
812            }
813            Err(_) => TableStatus {
814                exists: false,
815                is_deregistered: false,
816                has_reserved_file: false,
817            },
818        }
819    }
820
821    async fn put_marker_file_atomic(
822        &self,
823        path: &Path,
824        file_description: &str,
825    ) -> std::result::Result<(), String> {
826        let put_opts = PutOptions {
827            mode: PutMode::Create,
828            ..Default::default()
829        };
830
831        match self
832            .object_store
833            .inner
834            .put_opts(path, bytes::Bytes::new().into(), put_opts)
835            .await
836        {
837            Ok(_) => Ok(()),
838            Err(ObjectStoreError::AlreadyExists { .. })
839            | Err(ObjectStoreError::Precondition { .. }) => {
840                Err(format!("{} already exists", file_description))
841            }
842            Err(e) => Err(format!("Failed to create {}: {}", file_description, e)),
843        }
844    }
845
846    /// Get storage options for a table, using credential vending if configured.
847    ///
848    /// If credential vendor properties are configured and the table location matches
849    /// a supported cloud provider, this will create an appropriate vendor and vend
850    /// temporary credentials scoped to the table location. Otherwise, returns the
851    /// static storage options.
852    ///
853    /// The vendor type is auto-selected based on the table URI:
854    /// - `s3://` locations use AWS STS AssumeRole
855    /// - `gs://` locations use GCP OAuth2 tokens
856    /// - `az://` locations use Azure SAS tokens
857    ///
858    /// The permission level (Read, Write, Admin) is configured at namespace
859    /// initialization time via the `credential_vendor_permission` property.
860    ///
861    /// # Arguments
862    ///
863    /// * `table_uri` - The full URI of the table
864    /// * `identity` - Optional identity from the request for identity-based credential vending
865    async fn get_storage_options_for_table(
866        &self,
867        table_uri: &str,
868        vend_credentials: bool,
869        identity: Option<&Identity>,
870    ) -> Result<Option<HashMap<String, String>>> {
871        if vend_credentials && let Some(ref vendor) = self.credential_vendor {
872            let vended = vendor.vend_credentials(table_uri, identity).await?;
873            return Ok(Some(vended.storage_options));
874        }
875        // When no credential vendor is configured, return None to avoid
876        // leaking the namespace's own static credentials to clients.
877        Ok(None)
878    }
879
880    /// Migrate directory-based tables to the manifest.
881    ///
882    /// This is a one-time migration operation that:
883    /// 1. Scans the directory for existing `.lance` tables
884    /// 2. Registers any unmigrated tables in the manifest
885    /// 3. Returns the count of tables that were migrated
886    ///
887    /// This method is safe to run multiple times - it will skip tables that are already
888    /// registered in the manifest.
889    ///
890    /// # Usage
891    ///
892    /// After creating tables in directory-only mode or dual mode, you can migrate them
893    /// to the manifest to enable manifest-only mode:
894    ///
895    /// ```no_run
896    /// # use lance_namespace_impls::DirectoryNamespaceBuilder;
897    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
898    /// // Create namespace with dual mode (manifest + directory listing)
899    /// let namespace = DirectoryNamespaceBuilder::new("/path/to/data")
900    ///     .manifest_enabled(true)
901    ///     .dir_listing_enabled(true)
902    ///     .build()
903    ///     .await?;
904    ///
905    /// // ... tables are created and used ...
906    ///
907    /// // Migrate existing directory tables to manifest
908    /// let migrated_count = namespace.migrate().await?;
909    /// println!("Migrated {} tables", migrated_count);
910    ///
911    /// // Now you can disable directory listing for better performance:
912    /// // (requires rebuilding the namespace)
913    /// let namespace = DirectoryNamespaceBuilder::new("/path/to/data")
914    ///     .manifest_enabled(true)
915    ///     .dir_listing_enabled(false)  // All tables now in manifest
916    ///     .build()
917    ///     .await?;
918    /// # Ok(())
919    /// # }
920    /// ```
921    ///
922    /// # Returns
923    ///
924    /// Returns the number of tables that were migrated to the manifest.
925    ///
926    /// # Errors
927    ///
928    /// Returns an error if:
929    /// - Manifest is not enabled
930    /// - Directory listing fails
931    /// - Manifest registration fails
932    pub async fn migrate(&self) -> Result<usize> {
933        // We only care about tables in the root namespace
934        let Some(ref manifest_ns) = self.manifest_ns else {
935            return Ok(0); // No manifest, nothing to migrate
936        };
937
938        // Get all table locations already in the manifest
939        let manifest_locations = manifest_ns.list_manifest_table_locations().await?;
940
941        // Get all tables from directory
942        let dir_tables = self.list_directory_tables().await?;
943
944        // Register each directory table that doesn't have an overlapping location
945        // If a directory name already exists in the manifest,
946        // that means the table must have already been migrated or created
947        // in the manifest, so we can skip it.
948        let mut migrated_count = 0;
949        for table_name in dir_tables {
950            // For root namespace tables, the directory name is "table_name.lance"
951            let dir_name = format!("{}.lance", table_name);
952            if !manifest_locations.contains(&dir_name) {
953                manifest_ns.register_table(&table_name, dir_name).await?;
954                migrated_count += 1;
955            }
956        }
957
958        Ok(migrated_count)
959    }
960
961    /// Delete physical manifest files for the given table version ranges (best-effort).
962    ///
963    /// This helper is used by `batch_delete_table_versions` in both the manifest-enabled
964    /// and non-manifest paths. It resolves each table's storage location, computes the
965    /// version file paths, and attempts to delete them. Errors are logged (best-effort)
966    /// when `best_effort` is true, or returned immediately when false.
967    ///
968    /// Returns the number of files successfully deleted.
969    async fn delete_physical_version_files(
970        &self,
971        table_entries: &[TableDeleteEntry],
972        best_effort: bool,
973    ) -> Result<i64> {
974        let mut deleted_count = 0i64;
975        for te in table_entries {
976            let table_uri = self.resolve_table_location(&te.table_id).await?;
977            let table_path = Self::uri_to_object_store_path(&table_uri);
978            let table_path_str = table_path.as_ref();
979            let versions_dir_path = Path::from(format!("{}_versions", table_path_str));
980
981            for (start, end) in &te.ranges {
982                for version in *start..=*end {
983                    let version_path =
984                        versions_dir_path.child(format!("{}.manifest", version as u64));
985                    match self.object_store.inner.delete(&version_path).await {
986                        Ok(_) => {
987                            deleted_count += 1;
988                        }
989                        Err(object_store::Error::NotFound { .. }) => {}
990                        Err(e) => {
991                            if best_effort {
992                                log::warn!(
993                                    "Failed to delete manifest file for version {} of table {:?}: {:?}",
994                                    version,
995                                    te.table_id,
996                                    e
997                                );
998                            } else {
999                                return Err(Error::namespace_source(
1000                                    format!(
1001                                        "Failed to delete version {} for table at '{}': {}",
1002                                        version, table_uri, e
1003                                    )
1004                                    .into(),
1005                                ));
1006                            }
1007                        }
1008                    }
1009                }
1010            }
1011        }
1012        Ok(deleted_count)
1013    }
1014}
1015
1016#[async_trait]
1017impl LanceNamespace for DirectoryNamespace {
1018    async fn list_namespaces(
1019        &self,
1020        request: ListNamespacesRequest,
1021    ) -> Result<ListNamespacesResponse> {
1022        if let Some(ref manifest_ns) = self.manifest_ns {
1023            return manifest_ns.list_namespaces(request).await;
1024        }
1025
1026        Self::validate_root_namespace_id(&request.id)?;
1027        Ok(ListNamespacesResponse::new(vec![]))
1028    }
1029
1030    async fn describe_namespace(
1031        &self,
1032        request: DescribeNamespaceRequest,
1033    ) -> Result<DescribeNamespaceResponse> {
1034        if let Some(ref manifest_ns) = self.manifest_ns {
1035            return manifest_ns.describe_namespace(request).await;
1036        }
1037
1038        Self::validate_root_namespace_id(&request.id)?;
1039        #[allow(clippy::needless_update)]
1040        Ok(DescribeNamespaceResponse {
1041            properties: Some(HashMap::new()),
1042            ..Default::default()
1043        })
1044    }
1045
1046    async fn create_namespace(
1047        &self,
1048        request: CreateNamespaceRequest,
1049    ) -> Result<CreateNamespaceResponse> {
1050        if let Some(ref manifest_ns) = self.manifest_ns {
1051            return manifest_ns.create_namespace(request).await;
1052        }
1053
1054        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
1055            return Err(Error::namespace_source(
1056                "Root namespace already exists and cannot be created".into(),
1057            ));
1058        }
1059
1060        Err(Error::not_supported_source(
1061            "Child namespaces are only supported when manifest mode is enabled".into(),
1062        ))
1063    }
1064
1065    async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
1066        if let Some(ref manifest_ns) = self.manifest_ns {
1067            return manifest_ns.drop_namespace(request).await;
1068        }
1069
1070        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
1071            return Err(Error::namespace_source(
1072                "Root namespace cannot be dropped".into(),
1073            ));
1074        }
1075
1076        Err(Error::not_supported_source(
1077            "Child namespaces are only supported when manifest mode is enabled".into(),
1078        ))
1079    }
1080
1081    async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
1082        if let Some(ref manifest_ns) = self.manifest_ns {
1083            return manifest_ns.namespace_exists(request).await;
1084        }
1085
1086        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
1087            return Ok(());
1088        }
1089
1090        Err(Error::namespace_source(
1091            "Child namespaces are only supported when manifest mode is enabled".into(),
1092        ))
1093    }
1094
1095    async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1096        // Validate that namespace ID is provided
1097        let namespace_id = request
1098            .id
1099            .as_ref()
1100            .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
1101
1102        // For child namespaces, always delegate to manifest (if enabled)
1103        if !namespace_id.is_empty() {
1104            if let Some(ref manifest_ns) = self.manifest_ns {
1105                return manifest_ns.list_tables(request).await;
1106            }
1107            return Err(Error::not_supported_source(
1108                "Child namespaces are only supported when manifest mode is enabled".into(),
1109            ));
1110        }
1111
1112        // When only manifest is enabled (no directory listing), delegate directly to manifest
1113        if let Some(ref manifest_ns) = self.manifest_ns
1114            && !self.dir_listing_enabled
1115        {
1116            return manifest_ns.list_tables(request).await;
1117        }
1118
1119        // When both manifest and directory listing are enabled, we need to merge and deduplicate
1120        let mut tables = if self.manifest_ns.is_some() && self.dir_listing_enabled {
1121            // Get all manifest table locations (for deduplication)
1122            let manifest_locations = if let Some(ref manifest_ns) = self.manifest_ns {
1123                manifest_ns.list_manifest_table_locations().await?
1124            } else {
1125                std::collections::HashSet::new()
1126            };
1127
1128            // Get all manifest tables (without pagination for merging)
1129            let mut manifest_request = request.clone();
1130            manifest_request.limit = None;
1131            manifest_request.page_token = None;
1132            let manifest_tables = if let Some(ref manifest_ns) = self.manifest_ns {
1133                let manifest_response = manifest_ns.list_tables(manifest_request).await?;
1134                manifest_response.tables
1135            } else {
1136                vec![]
1137            };
1138
1139            // Start with all manifest table names
1140            // Add directory tables that aren't already in the manifest (by location)
1141            let mut all_tables: Vec<String> = manifest_tables;
1142            let dir_tables = self.list_directory_tables().await?;
1143            for table_name in dir_tables {
1144                // Check if this table's location is already in the manifest
1145                // Manifest stores full URIs, so we need to check both formats
1146                let full_location = format!("{}/{}.lance", self.root, table_name);
1147                let relative_location = format!("{}.lance", table_name);
1148                if !manifest_locations.contains(&full_location)
1149                    && !manifest_locations.contains(&relative_location)
1150                {
1151                    all_tables.push(table_name);
1152                }
1153            }
1154
1155            all_tables
1156        } else {
1157            self.list_directory_tables().await?
1158        };
1159
1160        // Apply sorting and pagination
1161        Self::apply_pagination(&mut tables, request.page_token, request.limit);
1162        let response = ListTablesResponse::new(tables);
1163        Ok(response)
1164    }
1165
1166    async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1167        if let Some(ref manifest_ns) = self.manifest_ns {
1168            match manifest_ns.describe_table(request.clone()).await {
1169                Ok(mut response) => {
1170                    if let Some(ref table_uri) = response.table_uri {
1171                        // For backwards compatibility, only skip vending credentials when explicitly set to false
1172                        let vend = request.vend_credentials.unwrap_or(true);
1173                        let identity = request.identity.as_deref();
1174                        response.storage_options = self
1175                            .get_storage_options_for_table(table_uri, vend, identity)
1176                            .await?;
1177                    }
1178                    // Set managed_versioning flag when table_version_tracking_enabled
1179                    if self.table_version_tracking_enabled {
1180                        response.managed_versioning = Some(true);
1181                    }
1182                    return Ok(response);
1183                }
1184                Err(_)
1185                    if self.dir_listing_enabled
1186                        && request.id.as_ref().is_some_and(|id| id.len() == 1) =>
1187                {
1188                    // Fall through to directory check only for single-level IDs
1189                }
1190                Err(e) => return Err(e),
1191            }
1192        }
1193
1194        let table_name = Self::table_name_from_id(&request.id)?;
1195        let table_uri = self.table_full_uri(&table_name);
1196
1197        // Atomically check table existence and deregistration status
1198        let status = self.check_table_status(&table_name).await;
1199
1200        if !status.exists {
1201            return Err(Error::namespace_source(
1202                format!("Table does not exist: {}", table_name).into(),
1203            ));
1204        }
1205
1206        if status.is_deregistered {
1207            return Err(Error::namespace_source(
1208                format!("Table is deregistered: {}", table_name).into(),
1209            ));
1210        }
1211
1212        let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
1213        // For backwards compatibility, only skip vending credentials when explicitly set to false
1214        let vend_credentials = request.vend_credentials.unwrap_or(true);
1215        let identity = request.identity.as_deref();
1216
1217        // If not loading detailed metadata, return minimal response with just location
1218        if !load_detailed_metadata {
1219            let storage_options = self
1220                .get_storage_options_for_table(&table_uri, vend_credentials, identity)
1221                .await?;
1222            return Ok(DescribeTableResponse {
1223                table: Some(table_name),
1224                namespace: request.id.as_ref().map(|id| {
1225                    if id.len() > 1 {
1226                        id[..id.len() - 1].to_vec()
1227                    } else {
1228                        vec![]
1229                    }
1230                }),
1231                location: Some(table_uri.clone()),
1232                table_uri: Some(table_uri),
1233                storage_options,
1234                managed_versioning: if self.table_version_tracking_enabled {
1235                    Some(true)
1236                } else {
1237                    None
1238                },
1239                ..Default::default()
1240            });
1241        }
1242
1243        // Try to load the dataset to get real information
1244        // Use DatasetBuilder with storage options to support S3 with custom endpoints
1245        let mut builder = DatasetBuilder::from_uri(&table_uri);
1246        if let Some(opts) = &self.storage_options {
1247            builder = builder.with_storage_options(opts.clone());
1248        }
1249        if let Some(sess) = &self.session {
1250            builder = builder.with_session(sess.clone());
1251        }
1252        match builder.load().await {
1253            Ok(mut dataset) => {
1254                // If a specific version is requested, checkout that version
1255                if let Some(requested_version) = request.version {
1256                    dataset = dataset.checkout_version(requested_version as u64).await?;
1257                }
1258
1259                let version_info = dataset.version();
1260                let lance_schema = dataset.schema();
1261                let arrow_schema: arrow_schema::Schema = lance_schema.into();
1262                let json_schema = arrow_schema_to_json(&arrow_schema)?;
1263                let storage_options = self
1264                    .get_storage_options_for_table(&table_uri, vend_credentials, identity)
1265                    .await?;
1266
1267                // Convert BTreeMap to HashMap for the response
1268                let metadata: std::collections::HashMap<String, String> =
1269                    version_info.metadata.into_iter().collect();
1270
1271                Ok(DescribeTableResponse {
1272                    table: Some(table_name),
1273                    namespace: request.id.as_ref().map(|id| {
1274                        if id.len() > 1 {
1275                            id[..id.len() - 1].to_vec()
1276                        } else {
1277                            vec![]
1278                        }
1279                    }),
1280                    version: Some(version_info.version as i64),
1281                    location: Some(table_uri.clone()),
1282                    table_uri: Some(table_uri),
1283                    schema: Some(Box::new(json_schema)),
1284                    storage_options,
1285                    metadata: Some(metadata),
1286                    managed_versioning: if self.table_version_tracking_enabled {
1287                        Some(true)
1288                    } else {
1289                        None
1290                    },
1291                    ..Default::default()
1292                })
1293            }
1294            Err(err) => {
1295                // Use the reserved file status from the atomic check
1296                if status.has_reserved_file {
1297                    let storage_options = self
1298                        .get_storage_options_for_table(&table_uri, vend_credentials, identity)
1299                        .await?;
1300                    Ok(DescribeTableResponse {
1301                        table: Some(table_name),
1302                        namespace: request.id.as_ref().map(|id| {
1303                            if id.len() > 1 {
1304                                id[..id.len() - 1].to_vec()
1305                            } else {
1306                                vec![]
1307                            }
1308                        }),
1309                        location: Some(table_uri.clone()),
1310                        table_uri: Some(table_uri),
1311                        storage_options,
1312                        managed_versioning: if self.table_version_tracking_enabled {
1313                            Some(true)
1314                        } else {
1315                            None
1316                        },
1317                        ..Default::default()
1318                    })
1319                } else {
1320                    Err(Error::namespace_source(
1321                        format!(
1322                            "Table directory exists but cannot load dataset {}: {:?}",
1323                            table_name, err
1324                        )
1325                        .into(),
1326                    ))
1327                }
1328            }
1329        }
1330    }
1331
1332    async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1333        if let Some(ref manifest_ns) = self.manifest_ns {
1334            match manifest_ns.table_exists(request.clone()).await {
1335                Ok(()) => return Ok(()),
1336                Err(_) if self.dir_listing_enabled => {
1337                    // Fall through to directory check
1338                }
1339                Err(e) => return Err(e),
1340            }
1341        }
1342
1343        let table_name = Self::table_name_from_id(&request.id)?;
1344
1345        // Atomically check table existence and deregistration status
1346        let status = self.check_table_status(&table_name).await;
1347
1348        if !status.exists {
1349            return Err(Error::namespace_source(
1350                format!("Table does not exist: {}", table_name).into(),
1351            ));
1352        }
1353
1354        if status.is_deregistered {
1355            return Err(Error::namespace_source(
1356                format!("Table is deregistered: {}", table_name).into(),
1357            ));
1358        }
1359
1360        Ok(())
1361    }
1362
1363    async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1364        if let Some(ref manifest_ns) = self.manifest_ns {
1365            return manifest_ns.drop_table(request).await;
1366        }
1367
1368        let table_name = Self::table_name_from_id(&request.id)?;
1369        let table_uri = self.table_full_uri(&table_name);
1370        let table_path = self.table_path(&table_name);
1371
1372        self.object_store
1373            .remove_dir_all(table_path)
1374            .await
1375            .map_err(|e| {
1376                Error::namespace_source(
1377                    format!("Failed to drop table {}: {}", table_name, e).into(),
1378                )
1379            })?;
1380
1381        Ok(DropTableResponse {
1382            id: request.id,
1383            location: Some(table_uri),
1384            ..Default::default()
1385        })
1386    }
1387
1388    async fn create_table(
1389        &self,
1390        request: CreateTableRequest,
1391        request_data: Bytes,
1392    ) -> Result<CreateTableResponse> {
1393        if let Some(ref manifest_ns) = self.manifest_ns {
1394            return manifest_ns.create_table(request, request_data).await;
1395        }
1396
1397        let table_name = Self::table_name_from_id(&request.id)?;
1398        let table_uri = self.table_full_uri(&table_name);
1399        if request_data.is_empty() {
1400            return Err(Error::namespace_source(
1401                "Request data (Arrow IPC stream) is required for create_table".into(),
1402            ));
1403        }
1404
1405        // Parse the Arrow IPC stream from request_data
1406        let cursor = Cursor::new(request_data.to_vec());
1407        let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| {
1408            Error::namespace_source(format!("Invalid Arrow IPC stream: {}", e).into())
1409        })?;
1410        let arrow_schema = stream_reader.schema();
1411
1412        // Collect all batches from the stream
1413        let mut batches = Vec::new();
1414        for batch_result in stream_reader {
1415            batches.push(batch_result.map_err(|e| {
1416                Error::namespace_source(
1417                    format!("Failed to read batch from IPC stream: {}", e).into(),
1418                )
1419            })?);
1420        }
1421
1422        // Create RecordBatchReader from the batches
1423        let reader = if batches.is_empty() {
1424            let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
1425            let batches = vec![Ok(batch)];
1426            RecordBatchIterator::new(batches, arrow_schema.clone())
1427        } else {
1428            let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
1429            RecordBatchIterator::new(batch_results, arrow_schema)
1430        };
1431
1432        let store_params = self.storage_options.as_ref().map(|opts| ObjectStoreParams {
1433            storage_options_accessor: Some(Arc::new(
1434                lance_io::object_store::StorageOptionsAccessor::with_static_options(opts.clone()),
1435            )),
1436            ..Default::default()
1437        });
1438
1439        let write_params = WriteParams {
1440            mode: WriteMode::Create,
1441            store_params,
1442            ..Default::default()
1443        };
1444
1445        // Create the Lance dataset using the actual Lance API
1446        Dataset::write(reader, &table_uri, Some(write_params))
1447            .await
1448            .map_err(|e| {
1449                Error::namespace_source(format!("Failed to create Lance dataset: {}", e).into())
1450            })?;
1451
1452        Ok(CreateTableResponse {
1453            version: Some(1),
1454            location: Some(table_uri),
1455            storage_options: self.storage_options.clone(),
1456            ..Default::default()
1457        })
1458    }
1459
1460    async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
1461        if let Some(ref manifest_ns) = self.manifest_ns {
1462            let mut response = manifest_ns.declare_table(request.clone()).await?;
1463            if let Some(ref location) = response.location {
1464                // For backwards compatibility, only skip vending credentials when explicitly set to false
1465                let vend = request.vend_credentials.unwrap_or(true);
1466                let identity = request.identity.as_deref();
1467                response.storage_options = self
1468                    .get_storage_options_for_table(location, vend, identity)
1469                    .await?;
1470            }
1471            // Set managed_versioning when table_version_tracking_enabled
1472            if self.table_version_tracking_enabled {
1473                response.managed_versioning = Some(true);
1474            }
1475            return Ok(response);
1476        }
1477
1478        let table_name = Self::table_name_from_id(&request.id)?;
1479        let table_uri = self.table_full_uri(&table_name);
1480
1481        // Validate location if provided
1482        if let Some(location) = &request.location {
1483            let location = location.trim_end_matches('/');
1484            if location != table_uri {
1485                return Err(Error::namespace_source(
1486                    format!(
1487                        "Cannot declare table {} at location {}, must be at location {}",
1488                        table_name, location, table_uri
1489                    )
1490                    .into(),
1491                ));
1492            }
1493        }
1494
1495        // Check if table already has data (created via create_table).
1496        // The atomic put only prevents races between concurrent declare_table calls,
1497        // not between declare_table and existing data.
1498        let status = self.check_table_status(&table_name).await;
1499        if status.exists && !status.has_reserved_file {
1500            // Table has data but no reserved file - it was created with data
1501            return Err(Error::namespace_source(
1502                format!("Table already exists: {}", table_name).into(),
1503            ));
1504        }
1505
1506        // Atomically create the .lance-reserved file to mark the table as declared.
1507        // This uses put_if_not_exists semantics to avoid race conditions between
1508        // concurrent declare_table calls.
1509        let reserved_file_path = self.table_reserved_file_path(&table_name);
1510
1511        self.put_marker_file_atomic(&reserved_file_path, &format!("table {}", table_name))
1512            .await
1513            .map_err(|e| Error::namespace_source(e.into()))?;
1514
1515        // For backwards compatibility, only skip vending credentials when explicitly set to false
1516        let vend_credentials = request.vend_credentials.unwrap_or(true);
1517        let identity = request.identity.as_deref();
1518        let storage_options = self
1519            .get_storage_options_for_table(&table_uri, vend_credentials, identity)
1520            .await?;
1521
1522        Ok(DeclareTableResponse {
1523            location: Some(table_uri),
1524            storage_options,
1525            managed_versioning: if self.table_version_tracking_enabled {
1526                Some(true)
1527            } else {
1528                None
1529            },
1530            ..Default::default()
1531        })
1532    }
1533
1534    async fn register_table(
1535        &self,
1536        request: lance_namespace::models::RegisterTableRequest,
1537    ) -> Result<lance_namespace::models::RegisterTableResponse> {
1538        // If manifest is enabled, delegate to manifest namespace
1539        if let Some(ref manifest_ns) = self.manifest_ns {
1540            return LanceNamespace::register_table(manifest_ns.as_ref(), request).await;
1541        }
1542
1543        // Without manifest, register_table is not supported
1544        Err(Error::not_supported_source(
1545            "register_table is only supported when manifest mode is enabled".into(),
1546        ))
1547    }
1548
1549    async fn deregister_table(
1550        &self,
1551        request: lance_namespace::models::DeregisterTableRequest,
1552    ) -> Result<lance_namespace::models::DeregisterTableResponse> {
1553        // If manifest is enabled, delegate to manifest namespace
1554        if let Some(ref manifest_ns) = self.manifest_ns {
1555            return LanceNamespace::deregister_table(manifest_ns.as_ref(), request).await;
1556        }
1557
1558        // V1 mode: create a .lance-deregistered marker file in the table directory
1559        let table_name = Self::table_name_from_id(&request.id)?;
1560        let table_uri = self.table_full_uri(&table_name);
1561
1562        // Check table existence and deregistration status.
1563        // This provides better error messages for common cases.
1564        let status = self.check_table_status(&table_name).await;
1565
1566        if !status.exists {
1567            return Err(Error::namespace_source(
1568                format!("Table does not exist: {}", table_name).into(),
1569            ));
1570        }
1571
1572        if status.is_deregistered {
1573            return Err(Error::namespace_source(
1574                format!("Table is already deregistered: {}", table_name).into(),
1575            ));
1576        }
1577
1578        // Atomically create the .lance-deregistered marker file.
1579        // This uses put_if_not_exists semantics to prevent race conditions
1580        // when multiple processes try to deregister the same table concurrently.
1581        // If a race occurs and another process already created the file,
1582        // we'll get an AlreadyExists error which we convert to a proper message.
1583        let deregistered_path = self.table_deregistered_file_path(&table_name);
1584        self.put_marker_file_atomic(
1585            &deregistered_path,
1586            &format!("deregistration marker for table {}", table_name),
1587        )
1588        .await
1589        .map_err(|e| {
1590            // Convert "already exists" to "already deregistered" for better UX
1591            let message = if e.contains("already exists") {
1592                format!("Table is already deregistered: {}", table_name)
1593            } else {
1594                e
1595            };
1596            Error::namespace_source(message.into())
1597        })?;
1598
1599        Ok(lance_namespace::models::DeregisterTableResponse {
1600            id: request.id,
1601            location: Some(table_uri),
1602            ..Default::default()
1603        })
1604    }
1605
1606    async fn list_table_versions(
1607        &self,
1608        request: ListTableVersionsRequest,
1609    ) -> Result<ListTableVersionsResponse> {
1610        // When table_version_storage_enabled, query from __manifest
1611        if self.table_version_storage_enabled
1612            && let Some(ref manifest_ns) = self.manifest_ns
1613        {
1614            let table_id = request.id.clone().unwrap_or_default();
1615            let want_descending = request.descending == Some(true);
1616            return manifest_ns
1617                .list_table_versions(&table_id, want_descending, request.limit)
1618                .await;
1619        }
1620
1621        // Fallback when table_version_storage is not enabled: list from _versions/ directory
1622        let table_uri = self.resolve_table_location(&request.id).await?;
1623
1624        let table_path = Self::uri_to_object_store_path(&table_uri);
1625        let versions_dir = table_path.child("_versions");
1626        let manifest_metas: Vec<_> = self
1627            .object_store
1628            .read_dir_all(&versions_dir, None)
1629            .try_collect()
1630            .await
1631            .map_err(|e| {
1632                Error::namespace_source(
1633                    format!(
1634                        "Failed to list manifest files for table at '{}': {}",
1635                        table_uri, e
1636                    )
1637                    .into(),
1638                )
1639            })?;
1640
1641        let is_v2_naming = manifest_metas
1642            .first()
1643            .is_some_and(|meta| meta.location.filename().is_some_and(|f| f.len() == 29));
1644
1645        let mut table_versions: Vec<TableVersion> = manifest_metas
1646            .into_iter()
1647            .filter_map(|meta| {
1648                let filename = meta.location.filename()?;
1649                let version_str = filename.strip_suffix(".manifest")?;
1650                if version_str.starts_with('d') {
1651                    return None;
1652                }
1653                let file_version: u64 = version_str.parse().ok()?;
1654
1655                let actual_version = if file_version > u64::MAX / 2 {
1656                    u64::MAX - file_version
1657                } else {
1658                    file_version
1659                };
1660
1661                // Use full path from object_store (relative to object store root)
1662                Some(TableVersion {
1663                    version: actual_version as i64,
1664                    manifest_path: meta.location.to_string(),
1665                    manifest_size: Some(meta.size as i64),
1666                    e_tag: meta.e_tag,
1667                    timestamp_millis: Some(meta.last_modified.timestamp_millis()),
1668                    metadata: None,
1669                })
1670            })
1671            .collect();
1672
1673        let list_is_ordered = self.object_store.list_is_lexically_ordered;
1674        let want_descending = request.descending == Some(true);
1675
1676        let needs_sort = if list_is_ordered {
1677            if is_v2_naming {
1678                !want_descending
1679            } else {
1680                want_descending
1681            }
1682        } else {
1683            true
1684        };
1685
1686        if needs_sort {
1687            if want_descending {
1688                table_versions.sort_by(|a, b| b.version.cmp(&a.version));
1689            } else {
1690                table_versions.sort_by(|a, b| a.version.cmp(&b.version));
1691            }
1692        }
1693
1694        if let Some(limit) = request.limit {
1695            table_versions.truncate(limit as usize);
1696        }
1697
1698        Ok(ListTableVersionsResponse {
1699            versions: table_versions,
1700            page_token: None,
1701        })
1702    }
1703
1704    async fn create_table_version(
1705        &self,
1706        request: CreateTableVersionRequest,
1707    ) -> Result<CreateTableVersionResponse> {
1708        let table_uri = self.resolve_table_location(&request.id).await?;
1709
1710        let staging_manifest_path = &request.manifest_path;
1711        let version = request.version as u64;
1712
1713        let table_path = Self::uri_to_object_store_path(&table_uri);
1714
1715        // Determine naming scheme from request, default to V2
1716        let naming_scheme = match request.naming_scheme.as_deref() {
1717            Some("V1") => ManifestNamingScheme::V1,
1718            _ => ManifestNamingScheme::V2,
1719        };
1720
1721        // Compute final path using the naming scheme
1722        let final_path = naming_scheme.manifest_path(&table_path, version);
1723
1724        let staging_path = Self::uri_to_object_store_path(staging_manifest_path);
1725        let manifest_data = self
1726            .object_store
1727            .inner
1728            .get(&staging_path)
1729            .await
1730            .map_err(|e| {
1731                Error::namespace_source(
1732                    format!(
1733                        "Failed to read staging manifest at '{}': {}",
1734                        staging_manifest_path, e
1735                    )
1736                    .into(),
1737                )
1738            })?
1739            .bytes()
1740            .await
1741            .map_err(|e| {
1742                Error::namespace_source(
1743                    format!(
1744                        "Failed to read staging manifest bytes at '{}': {}",
1745                        staging_manifest_path, e
1746                    )
1747                    .into(),
1748                )
1749            })?;
1750
1751        let manifest_size = manifest_data.len() as i64;
1752
1753        let put_result = self
1754            .object_store
1755            .inner
1756            .put_opts(
1757                &final_path,
1758                manifest_data.into(),
1759                PutOptions {
1760                    mode: PutMode::Create,
1761                    ..Default::default()
1762                },
1763            )
1764            .await
1765            .map_err(|e| match e {
1766                object_store::Error::AlreadyExists { .. }
1767                | object_store::Error::Precondition { .. } => Error::namespace_source(
1768                    format!(
1769                        "Version {} already exists for table at '{}'",
1770                        version, table_uri
1771                    )
1772                    .into(),
1773                ),
1774                _ => Error::namespace_source(
1775                    format!(
1776                        "Failed to create version {} for table at '{}': {}",
1777                        version, table_uri, e
1778                    )
1779                    .into(),
1780                ),
1781            })?;
1782
1783        // Delete the staging manifest after successful copy
1784        if let Err(e) = self.object_store.inner.delete(&staging_path).await {
1785            log::warn!(
1786                "Failed to delete staging manifest at '{}': {:?}",
1787                staging_path,
1788                e
1789            );
1790        }
1791
1792        // If table_version_storage_enabled is enabled, also record in __manifest (best-effort)
1793        if self.table_version_storage_enabled
1794            && let Some(ref manifest_ns) = self.manifest_ns
1795        {
1796            let table_id_str =
1797                manifest::ManifestNamespace::str_object_id(&request.id.clone().unwrap_or_default());
1798            let object_id =
1799                manifest::ManifestNamespace::build_version_object_id(&table_id_str, version as i64);
1800            let metadata_json = serde_json::json!({
1801                "manifest_path": final_path.to_string(),
1802                "manifest_size": manifest_size,
1803                "e_tag": put_result.e_tag,
1804                "naming_scheme": request.naming_scheme.as_deref().unwrap_or("V2"),
1805            })
1806            .to_string();
1807
1808            if let Err(e) = manifest_ns
1809                .insert_into_manifest_with_metadata(
1810                    vec![manifest::ManifestEntry {
1811                        object_id,
1812                        object_type: manifest::ObjectType::TableVersion,
1813                        location: None,
1814                        metadata: Some(metadata_json),
1815                    }],
1816                    None,
1817                )
1818                .await
1819            {
1820                log::warn!(
1821                    "Failed to record table version in __manifest (best-effort): {:?}",
1822                    e
1823                );
1824            }
1825        }
1826
1827        Ok(CreateTableVersionResponse {
1828            transaction_id: None,
1829            version: Some(Box::new(TableVersion {
1830                version: version as i64,
1831                manifest_path: final_path.to_string(),
1832                manifest_size: Some(manifest_size),
1833                e_tag: put_result.e_tag,
1834                timestamp_millis: None,
1835                metadata: None,
1836            })),
1837        })
1838    }
1839
1840    async fn describe_table_version(
1841        &self,
1842        request: DescribeTableVersionRequest,
1843    ) -> Result<DescribeTableVersionResponse> {
1844        // When table_version_storage_enabled and a specific version is requested,
1845        // query from __manifest to avoid opening the entire dataset
1846        if self.table_version_storage_enabled
1847            && let (Some(manifest_ns), Some(version)) = (&self.manifest_ns, request.version)
1848        {
1849            let table_id = request.id.clone().unwrap_or_default();
1850            return manifest_ns.describe_table_version(&table_id, version).await;
1851        }
1852
1853        // Fallback when table_version_storage is not enabled: open the dataset to describe the version
1854        let table_uri = self.resolve_table_location(&request.id).await?;
1855
1856        // Use DatasetBuilder with storage options to support S3 with custom endpoints
1857        let mut builder = DatasetBuilder::from_uri(&table_uri);
1858        if let Some(opts) = &self.storage_options {
1859            builder = builder.with_storage_options(opts.clone());
1860        }
1861        if let Some(sess) = &self.session {
1862            builder = builder.with_session(sess.clone());
1863        }
1864        let mut dataset = builder.load().await.map_err(|e| {
1865            Error::namespace_source(
1866                format!("Failed to open table at '{}': {}", table_uri, e).into(),
1867            )
1868        })?;
1869
1870        if let Some(version) = request.version {
1871            dataset = dataset
1872                .checkout_version(version as u64)
1873                .await
1874                .map_err(|e| {
1875                    Error::namespace_source(
1876                        format!(
1877                            "Failed to checkout version {} for table at '{}': {}",
1878                            version, table_uri, e
1879                        )
1880                        .into(),
1881                    )
1882                })?;
1883        }
1884
1885        let version_info = dataset.version();
1886        let manifest_location = dataset.manifest_location();
1887        let metadata: std::collections::HashMap<String, String> =
1888            version_info.metadata.into_iter().collect();
1889
1890        let table_version = TableVersion {
1891            version: version_info.version as i64,
1892            manifest_path: manifest_location.path.to_string(),
1893            manifest_size: manifest_location.size.map(|s| s as i64),
1894            e_tag: manifest_location.e_tag.clone(),
1895            timestamp_millis: Some(version_info.timestamp.timestamp_millis()),
1896            metadata: if metadata.is_empty() {
1897                None
1898            } else {
1899                Some(metadata)
1900            },
1901        };
1902
1903        Ok(DescribeTableVersionResponse {
1904            version: Box::new(table_version),
1905        })
1906    }
1907
1908    async fn batch_delete_table_versions(
1909        &self,
1910        request: BatchDeleteTableVersionsRequest,
1911    ) -> Result<BatchDeleteTableVersionsResponse> {
1912        // Single-table mode: use `id` (from path parameter) + `ranges` to delete
1913        // versions from one table.
1914        let ranges: Vec<(i64, i64)> = request
1915            .ranges
1916            .iter()
1917            .map(|r| {
1918                let start = r.start_version;
1919                let end = if r.end_version > 0 {
1920                    r.end_version
1921                } else {
1922                    start
1923                };
1924                (start, end)
1925            })
1926            .collect();
1927        let table_entries = vec![TableDeleteEntry {
1928            table_id: request.id.clone(),
1929            ranges,
1930        }];
1931
1932        let mut total_deleted_count = 0i64;
1933
1934        if self.table_version_storage_enabled
1935            && let Some(ref manifest_ns) = self.manifest_ns
1936        {
1937            // Phase 1 (atomic commit point): Delete version records from __manifest
1938            // for ALL tables in a single atomic operation. This is the authoritative
1939            // source of truth — once __manifest entries are removed, the versions
1940            // are logically deleted across all tables atomically.
1941
1942            // Collect all (table_id_str, ranges) for batch deletion
1943            let mut all_object_ids: Vec<String> = Vec::new();
1944            for te in &table_entries {
1945                let table_id_str = manifest::ManifestNamespace::str_object_id(
1946                    &te.table_id.clone().unwrap_or_default(),
1947                );
1948                for (start, end) in &te.ranges {
1949                    for version in *start..=*end {
1950                        let object_id = manifest::ManifestNamespace::build_version_object_id(
1951                            &table_id_str,
1952                            version,
1953                        );
1954                        all_object_ids.push(object_id);
1955                    }
1956                }
1957            }
1958
1959            if !all_object_ids.is_empty() {
1960                total_deleted_count = manifest_ns
1961                    .batch_delete_table_versions_by_object_ids(&all_object_ids)
1962                    .await?;
1963            }
1964
1965            // Phase 2: Delete physical manifest files (best-effort).
1966            // Even if some file deletions fail, the versions are already removed from
1967            // __manifest, so they won't be visible to readers. Leftover files are
1968            // orphaned but harmless and can be cleaned up later.
1969            let _ = self
1970                .delete_physical_version_files(&table_entries, true)
1971                .await;
1972
1973            return Ok(BatchDeleteTableVersionsResponse {
1974                deleted_count: Some(total_deleted_count),
1975                transaction_id: None,
1976            });
1977        }
1978
1979        // Fallback when table_version_storage is not enabled: delete physical files directly (no __manifest)
1980        total_deleted_count = self
1981            .delete_physical_version_files(&table_entries, false)
1982            .await?;
1983
1984        Ok(BatchDeleteTableVersionsResponse {
1985            deleted_count: Some(total_deleted_count),
1986            transaction_id: None,
1987        })
1988    }
1989
1990    fn namespace_id(&self) -> String {
1991        format!("DirectoryNamespace {{ root: {:?} }}", self.root)
1992    }
1993}
1994
1995#[cfg(test)]
1996mod tests {
1997    use super::*;
1998    use arrow_ipc::reader::StreamReader;
1999    use lance::dataset::Dataset;
2000    use lance_core::utils::tempfile::{TempStdDir, TempStrDir};
2001    use lance_namespace::models::{
2002        CreateTableRequest, JsonArrowDataType, JsonArrowField, JsonArrowSchema, ListTablesRequest,
2003    };
2004    use lance_namespace::schema::convert_json_arrow_schema;
2005    use std::io::Cursor;
2006    use std::sync::Arc;
2007
2008    /// Helper to create a test DirectoryNamespace with a temporary directory
2009    async fn create_test_namespace() -> (DirectoryNamespace, TempStdDir) {
2010        let temp_dir = TempStdDir::default();
2011
2012        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
2013            .build()
2014            .await
2015            .unwrap();
2016        (namespace, temp_dir)
2017    }
2018
2019    /// Helper to create test IPC data from a schema
2020    fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
2021        use arrow::ipc::writer::StreamWriter;
2022
2023        let arrow_schema = convert_json_arrow_schema(schema).unwrap();
2024        let arrow_schema = Arc::new(arrow_schema);
2025        let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
2026        let mut buffer = Vec::new();
2027        {
2028            let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
2029            writer.write(&batch).unwrap();
2030            writer.finish().unwrap();
2031        }
2032        buffer
2033    }
2034
2035    /// Helper to create a simple test schema
2036    fn create_test_schema() -> JsonArrowSchema {
2037        let int_type = JsonArrowDataType::new("int32".to_string());
2038        let string_type = JsonArrowDataType::new("utf8".to_string());
2039
2040        let id_field = JsonArrowField {
2041            name: "id".to_string(),
2042            r#type: Box::new(int_type),
2043            nullable: false,
2044            metadata: None,
2045        };
2046
2047        let name_field = JsonArrowField {
2048            name: "name".to_string(),
2049            r#type: Box::new(string_type),
2050            nullable: true,
2051            metadata: None,
2052        };
2053
2054        JsonArrowSchema {
2055            fields: vec![id_field, name_field],
2056            metadata: None,
2057        }
2058    }
2059
2060    #[tokio::test]
2061    async fn test_create_table() {
2062        let (namespace, _temp_dir) = create_test_namespace().await;
2063
2064        // Create test IPC data
2065        let schema = create_test_schema();
2066        let ipc_data = create_test_ipc_data(&schema);
2067
2068        let mut request = CreateTableRequest::new();
2069        request.id = Some(vec!["test_table".to_string()]);
2070
2071        let response = namespace
2072            .create_table(request, bytes::Bytes::from(ipc_data))
2073            .await
2074            .unwrap();
2075
2076        assert!(response.location.is_some());
2077        assert!(response.location.unwrap().ends_with("test_table.lance"));
2078        assert_eq!(response.version, Some(1));
2079    }
2080
2081    #[tokio::test]
2082    async fn test_create_table_without_data() {
2083        let (namespace, _temp_dir) = create_test_namespace().await;
2084
2085        let mut request = CreateTableRequest::new();
2086        request.id = Some(vec!["test_table".to_string()]);
2087
2088        let result = namespace.create_table(request, bytes::Bytes::new()).await;
2089        assert!(result.is_err());
2090        assert!(
2091            result
2092                .unwrap_err()
2093                .to_string()
2094                .contains("Arrow IPC stream) is required")
2095        );
2096    }
2097
2098    #[tokio::test]
2099    async fn test_create_table_with_invalid_id() {
2100        let (namespace, _temp_dir) = create_test_namespace().await;
2101
2102        // Create test IPC data
2103        let schema = create_test_schema();
2104        let ipc_data = create_test_ipc_data(&schema);
2105
2106        // Test with empty ID
2107        let mut request = CreateTableRequest::new();
2108        request.id = Some(vec![]);
2109
2110        let result = namespace
2111            .create_table(request, bytes::Bytes::from(ipc_data.clone()))
2112            .await;
2113        assert!(result.is_err());
2114
2115        // Test with multi-level ID - should now work with manifest enabled
2116        // First create the parent namespace
2117        let mut create_ns_req = CreateNamespaceRequest::new();
2118        create_ns_req.id = Some(vec!["test_namespace".to_string()]);
2119        namespace.create_namespace(create_ns_req).await.unwrap();
2120
2121        // Now create table in the namespace
2122        let mut request = CreateTableRequest::new();
2123        request.id = Some(vec!["test_namespace".to_string(), "table".to_string()]);
2124
2125        let result = namespace
2126            .create_table(request, bytes::Bytes::from(ipc_data))
2127            .await;
2128        // Should succeed with manifest enabled
2129        assert!(
2130            result.is_ok(),
2131            "Multi-level table IDs should work with manifest enabled"
2132        );
2133    }
2134
2135    #[tokio::test]
2136    async fn test_list_tables() {
2137        let (namespace, _temp_dir) = create_test_namespace().await;
2138
2139        // Initially, no tables
2140        let mut request = ListTablesRequest::new();
2141        request.id = Some(vec![]);
2142        let response = namespace.list_tables(request).await.unwrap();
2143        assert_eq!(response.tables.len(), 0);
2144
2145        // Create test IPC data
2146        let schema = create_test_schema();
2147        let ipc_data = create_test_ipc_data(&schema);
2148
2149        // Create a table
2150        let mut create_request = CreateTableRequest::new();
2151        create_request.id = Some(vec!["table1".to_string()]);
2152        namespace
2153            .create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
2154            .await
2155            .unwrap();
2156
2157        // Create another table
2158        let mut create_request = CreateTableRequest::new();
2159        create_request.id = Some(vec!["table2".to_string()]);
2160        namespace
2161            .create_table(create_request, bytes::Bytes::from(ipc_data))
2162            .await
2163            .unwrap();
2164
2165        // List tables should return both
2166        let mut request = ListTablesRequest::new();
2167        request.id = Some(vec![]);
2168        let response = namespace.list_tables(request).await.unwrap();
2169        let tables = response.tables;
2170        assert_eq!(tables.len(), 2);
2171        assert!(tables.contains(&"table1".to_string()));
2172        assert!(tables.contains(&"table2".to_string()));
2173    }
2174
2175    #[tokio::test]
2176    async fn test_list_tables_with_namespace_id() {
2177        let (namespace, _temp_dir) = create_test_namespace().await;
2178
2179        // First create a child namespace
2180        let mut create_ns_req = CreateNamespaceRequest::new();
2181        create_ns_req.id = Some(vec!["test_namespace".to_string()]);
2182        namespace.create_namespace(create_ns_req).await.unwrap();
2183
2184        // Now list tables in the child namespace
2185        let mut request = ListTablesRequest::new();
2186        request.id = Some(vec!["test_namespace".to_string()]);
2187
2188        let result = namespace.list_tables(request).await;
2189        // Should succeed (with manifest enabled) and return empty list (no tables yet)
2190        assert!(
2191            result.is_ok(),
2192            "list_tables should work with child namespace when manifest is enabled"
2193        );
2194        let response = result.unwrap();
2195        assert_eq!(
2196            response.tables.len(),
2197            0,
2198            "Namespace should have no tables yet"
2199        );
2200    }
2201
2202    #[tokio::test]
2203    async fn test_describe_table() {
2204        let (namespace, _temp_dir) = create_test_namespace().await;
2205
2206        // Create a table first
2207        let schema = create_test_schema();
2208        let ipc_data = create_test_ipc_data(&schema);
2209
2210        let mut create_request = CreateTableRequest::new();
2211        create_request.id = Some(vec!["test_table".to_string()]);
2212        namespace
2213            .create_table(create_request, bytes::Bytes::from(ipc_data))
2214            .await
2215            .unwrap();
2216
2217        // Describe the table
2218        let mut request = DescribeTableRequest::new();
2219        request.id = Some(vec!["test_table".to_string()]);
2220        let response = namespace.describe_table(request).await.unwrap();
2221
2222        assert!(response.location.is_some());
2223        assert!(response.location.unwrap().ends_with("test_table.lance"));
2224    }
2225
2226    #[tokio::test]
2227    async fn test_describe_nonexistent_table() {
2228        let (namespace, _temp_dir) = create_test_namespace().await;
2229
2230        let mut request = DescribeTableRequest::new();
2231        request.id = Some(vec!["nonexistent".to_string()]);
2232
2233        let result = namespace.describe_table(request).await;
2234        assert!(result.is_err());
2235        assert!(
2236            result
2237                .unwrap_err()
2238                .to_string()
2239                .contains("Table does not exist")
2240        );
2241    }
2242
2243    #[tokio::test]
2244    async fn test_table_exists() {
2245        let (namespace, _temp_dir) = create_test_namespace().await;
2246
2247        // Create a table
2248        let schema = create_test_schema();
2249        let ipc_data = create_test_ipc_data(&schema);
2250
2251        let mut create_request = CreateTableRequest::new();
2252        create_request.id = Some(vec!["existing_table".to_string()]);
2253        namespace
2254            .create_table(create_request, bytes::Bytes::from(ipc_data))
2255            .await
2256            .unwrap();
2257
2258        // Check existing table
2259        let mut request = TableExistsRequest::new();
2260        request.id = Some(vec!["existing_table".to_string()]);
2261        let result = namespace.table_exists(request).await;
2262        assert!(result.is_ok());
2263
2264        // Check non-existent table
2265        let mut request = TableExistsRequest::new();
2266        request.id = Some(vec!["nonexistent".to_string()]);
2267        let result = namespace.table_exists(request).await;
2268        assert!(result.is_err());
2269        assert!(
2270            result
2271                .unwrap_err()
2272                .to_string()
2273                .contains("Table does not exist")
2274        );
2275    }
2276
2277    #[tokio::test]
2278    async fn test_drop_table() {
2279        let (namespace, _temp_dir) = create_test_namespace().await;
2280
2281        // Create a table
2282        let schema = create_test_schema();
2283        let ipc_data = create_test_ipc_data(&schema);
2284
2285        let mut create_request = CreateTableRequest::new();
2286        create_request.id = Some(vec!["table_to_drop".to_string()]);
2287        namespace
2288            .create_table(create_request, bytes::Bytes::from(ipc_data))
2289            .await
2290            .unwrap();
2291
2292        // Verify it exists
2293        let mut exists_request = TableExistsRequest::new();
2294        exists_request.id = Some(vec!["table_to_drop".to_string()]);
2295        assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
2296
2297        // Drop the table
2298        let mut drop_request = DropTableRequest::new();
2299        drop_request.id = Some(vec!["table_to_drop".to_string()]);
2300        let response = namespace.drop_table(drop_request).await.unwrap();
2301        assert!(response.location.is_some());
2302
2303        // Verify it no longer exists
2304        assert!(namespace.table_exists(exists_request).await.is_err());
2305    }
2306
2307    #[tokio::test]
2308    async fn test_drop_nonexistent_table() {
2309        let (namespace, _temp_dir) = create_test_namespace().await;
2310
2311        let mut request = DropTableRequest::new();
2312        request.id = Some(vec!["nonexistent".to_string()]);
2313
2314        // Should not fail when dropping non-existent table (idempotent)
2315        let result = namespace.drop_table(request).await;
2316        // The operation might succeed or fail depending on implementation
2317        // But it should not panic
2318        let _ = result;
2319    }
2320
2321    #[tokio::test]
2322    async fn test_root_namespace_operations() {
2323        let (namespace, _temp_dir) = create_test_namespace().await;
2324
2325        // Test list_namespaces - should return empty list for root
2326        let mut request = ListNamespacesRequest::new();
2327        request.id = Some(vec![]);
2328        let result = namespace.list_namespaces(request).await;
2329        assert!(result.is_ok());
2330        assert_eq!(result.unwrap().namespaces.len(), 0);
2331
2332        // Test describe_namespace - should succeed for root
2333        let mut request = DescribeNamespaceRequest::new();
2334        request.id = Some(vec![]);
2335        let result = namespace.describe_namespace(request).await;
2336        assert!(result.is_ok());
2337
2338        // Test namespace_exists - root always exists
2339        let mut request = NamespaceExistsRequest::new();
2340        request.id = Some(vec![]);
2341        let result = namespace.namespace_exists(request).await;
2342        assert!(result.is_ok());
2343
2344        // Test create_namespace - root cannot be created
2345        let mut request = CreateNamespaceRequest::new();
2346        request.id = Some(vec![]);
2347        let result = namespace.create_namespace(request).await;
2348        assert!(result.is_err());
2349        assert!(result.unwrap_err().to_string().contains("already exists"));
2350
2351        // Test drop_namespace - root cannot be dropped
2352        let mut request = DropNamespaceRequest::new();
2353        request.id = Some(vec![]);
2354        let result = namespace.drop_namespace(request).await;
2355        assert!(result.is_err());
2356        assert!(
2357            result
2358                .unwrap_err()
2359                .to_string()
2360                .contains("cannot be dropped")
2361        );
2362    }
2363
2364    #[tokio::test]
2365    async fn test_non_root_namespace_operations() {
2366        let (namespace, _temp_dir) = create_test_namespace().await;
2367
2368        // With manifest enabled (default), child namespaces are now supported
2369        // Test create_namespace for non-root - should succeed with manifest
2370        let mut request = CreateNamespaceRequest::new();
2371        request.id = Some(vec!["child".to_string()]);
2372        let result = namespace.create_namespace(request).await;
2373        assert!(
2374            result.is_ok(),
2375            "Child namespace creation should succeed with manifest enabled"
2376        );
2377
2378        // Test namespace_exists for non-root - should exist after creation
2379        let mut request = NamespaceExistsRequest::new();
2380        request.id = Some(vec!["child".to_string()]);
2381        let result = namespace.namespace_exists(request).await;
2382        assert!(
2383            result.is_ok(),
2384            "Child namespace should exist after creation"
2385        );
2386
2387        // Test drop_namespace for non-root - should succeed
2388        let mut request = DropNamespaceRequest::new();
2389        request.id = Some(vec!["child".to_string()]);
2390        let result = namespace.drop_namespace(request).await;
2391        assert!(
2392            result.is_ok(),
2393            "Child namespace drop should succeed with manifest enabled"
2394        );
2395
2396        // Verify namespace no longer exists
2397        let mut request = NamespaceExistsRequest::new();
2398        request.id = Some(vec!["child".to_string()]);
2399        let result = namespace.namespace_exists(request).await;
2400        assert!(
2401            result.is_err(),
2402            "Child namespace should not exist after drop"
2403        );
2404    }
2405
2406    #[tokio::test]
2407    async fn test_config_custom_root() {
2408        let temp_dir = TempStdDir::default();
2409        let custom_path = temp_dir.join("custom");
2410        std::fs::create_dir(&custom_path).unwrap();
2411
2412        let namespace = DirectoryNamespaceBuilder::new(custom_path.to_string_lossy().to_string())
2413            .build()
2414            .await
2415            .unwrap();
2416
2417        // Create test IPC data
2418        let schema = create_test_schema();
2419        let ipc_data = create_test_ipc_data(&schema);
2420
2421        // Create a table and verify location
2422        let mut request = CreateTableRequest::new();
2423        request.id = Some(vec!["test_table".to_string()]);
2424
2425        let response = namespace
2426            .create_table(request, bytes::Bytes::from(ipc_data))
2427            .await
2428            .unwrap();
2429
2430        assert!(response.location.unwrap().contains("custom"));
2431    }
2432
2433    #[tokio::test]
2434    async fn test_config_storage_options() {
2435        let temp_dir = TempStdDir::default();
2436
2437        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
2438            .storage_option("option1", "value1")
2439            .storage_option("option2", "value2")
2440            .build()
2441            .await
2442            .unwrap();
2443
2444        // Create test IPC data
2445        let schema = create_test_schema();
2446        let ipc_data = create_test_ipc_data(&schema);
2447
2448        // Create a table and check storage options are included
2449        let mut request = CreateTableRequest::new();
2450        request.id = Some(vec!["test_table".to_string()]);
2451
2452        let response = namespace
2453            .create_table(request, bytes::Bytes::from(ipc_data))
2454            .await
2455            .unwrap();
2456
2457        let storage_options = response.storage_options.unwrap();
2458        assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
2459        assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
2460    }
2461
2462    /// When no credential vendor is configured, `describe_table` and
2463    /// `declare_table` must strip credential keys from storage options
2464    /// while preserving non-credential config (region, endpoint, etc.).
2465    #[tokio::test]
2466    async fn test_no_storage_options_without_vendor() {
2467        use lance_namespace::models::DeclareTableRequest;
2468
2469        let temp_dir = TempStdDir::default();
2470
2471        // No manifest, no credential vendor, but storage options with credentials
2472        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
2473            .manifest_enabled(false)
2474            .storage_option("aws_access_key_id", "AKID")
2475            .storage_option("aws_secret_access_key", "SECRET")
2476            .storage_option("region", "us-east-1")
2477            .build()
2478            .await
2479            .unwrap();
2480
2481        let schema = create_test_schema();
2482        let ipc_data = create_test_ipc_data(&schema);
2483
2484        // create_table
2485        let mut create_req = CreateTableRequest::new();
2486        create_req.id = Some(vec!["t1".to_string()]);
2487        namespace
2488            .create_table(create_req, bytes::Bytes::from(ipc_data))
2489            .await
2490            .unwrap();
2491
2492        // describe_table should not return storage options without a vendor
2493        let mut desc_req = DescribeTableRequest::new();
2494        desc_req.id = Some(vec!["t1".to_string()]);
2495        let resp = namespace.describe_table(desc_req).await.unwrap();
2496        assert!(resp.storage_options.is_none());
2497
2498        // declare_table should not return storage options without a vendor
2499        let mut decl_req = DeclareTableRequest::new();
2500        decl_req.id = Some(vec!["t2".to_string()]);
2501        let resp = namespace.declare_table(decl_req).await.unwrap();
2502        assert!(resp.storage_options.is_none());
2503    }
2504
2505    /// Same test with manifest mode enabled.
2506    #[tokio::test]
2507    async fn test_no_storage_options_without_vendor_manifest() {
2508        let temp_dir = TempStdDir::default();
2509
2510        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
2511            .storage_option("aws_access_key_id", "AKID")
2512            .storage_option("aws_secret_access_key", "SECRET")
2513            .storage_option("region", "us-east-1")
2514            .build()
2515            .await
2516            .unwrap();
2517
2518        let schema = create_test_schema();
2519        let ipc_data = create_test_ipc_data(&schema);
2520
2521        let mut create_req = CreateTableRequest::new();
2522        create_req.id = Some(vec!["t1".to_string()]);
2523        namespace
2524            .create_table(create_req, bytes::Bytes::from(ipc_data))
2525            .await
2526            .unwrap();
2527
2528        // describe_table through manifest should not return storage options without a vendor
2529        let mut desc_req = DescribeTableRequest::new();
2530        desc_req.id = Some(vec!["t1".to_string()]);
2531        let resp = namespace.describe_table(desc_req).await.unwrap();
2532        assert!(resp.storage_options.is_none());
2533    }
2534
2535    #[tokio::test]
2536    async fn test_from_properties_manifest_enabled() {
2537        let temp_dir = TempStdDir::default();
2538
2539        let mut properties = HashMap::new();
2540        properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2541        properties.insert("manifest_enabled".to_string(), "true".to_string());
2542        properties.insert("dir_listing_enabled".to_string(), "false".to_string());
2543
2544        let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2545        assert!(builder.manifest_enabled);
2546        assert!(!builder.dir_listing_enabled);
2547
2548        let namespace = builder.build().await.unwrap();
2549
2550        // Create test IPC data
2551        let schema = create_test_schema();
2552        let ipc_data = create_test_ipc_data(&schema);
2553
2554        // Create a table
2555        let mut request = CreateTableRequest::new();
2556        request.id = Some(vec!["test_table".to_string()]);
2557
2558        let response = namespace
2559            .create_table(request, bytes::Bytes::from(ipc_data))
2560            .await
2561            .unwrap();
2562
2563        assert!(response.location.is_some());
2564    }
2565
2566    #[tokio::test]
2567    async fn test_from_properties_dir_listing_enabled() {
2568        let temp_dir = TempStdDir::default();
2569
2570        let mut properties = HashMap::new();
2571        properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2572        properties.insert("manifest_enabled".to_string(), "false".to_string());
2573        properties.insert("dir_listing_enabled".to_string(), "true".to_string());
2574
2575        let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2576        assert!(!builder.manifest_enabled);
2577        assert!(builder.dir_listing_enabled);
2578
2579        let namespace = builder.build().await.unwrap();
2580
2581        // Create test IPC data
2582        let schema = create_test_schema();
2583        let ipc_data = create_test_ipc_data(&schema);
2584
2585        // Create a table
2586        let mut request = CreateTableRequest::new();
2587        request.id = Some(vec!["test_table".to_string()]);
2588
2589        let response = namespace
2590            .create_table(request, bytes::Bytes::from(ipc_data))
2591            .await
2592            .unwrap();
2593
2594        assert!(response.location.is_some());
2595    }
2596
2597    #[tokio::test]
2598    async fn test_from_properties_defaults() {
2599        let temp_dir = TempStdDir::default();
2600
2601        let mut properties = HashMap::new();
2602        properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2603
2604        let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2605        // Both should default to true
2606        assert!(builder.manifest_enabled);
2607        assert!(builder.dir_listing_enabled);
2608    }
2609
2610    #[tokio::test]
2611    async fn test_from_properties_with_storage_options() {
2612        let temp_dir = TempStdDir::default();
2613
2614        let mut properties = HashMap::new();
2615        properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2616        properties.insert("manifest_enabled".to_string(), "true".to_string());
2617        properties.insert("storage.region".to_string(), "us-west-2".to_string());
2618        properties.insert("storage.bucket".to_string(), "my-bucket".to_string());
2619
2620        let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2621        assert!(builder.manifest_enabled);
2622        assert!(builder.storage_options.is_some());
2623
2624        let storage_options = builder.storage_options.unwrap();
2625        assert_eq!(
2626            storage_options.get("region"),
2627            Some(&"us-west-2".to_string())
2628        );
2629        assert_eq!(
2630            storage_options.get("bucket"),
2631            Some(&"my-bucket".to_string())
2632        );
2633    }
2634
2635    #[tokio::test]
2636    async fn test_various_arrow_types() {
2637        let (namespace, _temp_dir) = create_test_namespace().await;
2638
2639        // Create schema with various types
2640        let fields = vec![
2641            JsonArrowField {
2642                name: "bool_col".to_string(),
2643                r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
2644                nullable: true,
2645                metadata: None,
2646            },
2647            JsonArrowField {
2648                name: "int8_col".to_string(),
2649                r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
2650                nullable: true,
2651                metadata: None,
2652            },
2653            JsonArrowField {
2654                name: "float64_col".to_string(),
2655                r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
2656                nullable: true,
2657                metadata: None,
2658            },
2659            JsonArrowField {
2660                name: "binary_col".to_string(),
2661                r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
2662                nullable: true,
2663                metadata: None,
2664            },
2665        ];
2666
2667        let schema = JsonArrowSchema {
2668            fields,
2669            metadata: None,
2670        };
2671
2672        // Create IPC data
2673        let ipc_data = create_test_ipc_data(&schema);
2674
2675        let mut request = CreateTableRequest::new();
2676        request.id = Some(vec!["complex_table".to_string()]);
2677
2678        let response = namespace
2679            .create_table(request, bytes::Bytes::from(ipc_data))
2680            .await
2681            .unwrap();
2682
2683        assert!(response.location.is_some());
2684    }
2685
2686    #[tokio::test]
2687    async fn test_connect_dir() {
2688        let temp_dir = TempStdDir::default();
2689
2690        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
2691            .build()
2692            .await
2693            .unwrap();
2694
2695        // Test basic operation through the concrete type
2696        let mut request = ListTablesRequest::new();
2697        request.id = Some(vec![]);
2698        let response = namespace.list_tables(request).await.unwrap();
2699        assert_eq!(response.tables.len(), 0);
2700    }
2701
2702    #[tokio::test]
2703    async fn test_create_table_with_ipc_data() {
2704        use arrow::array::{Int32Array, StringArray};
2705        use arrow::ipc::writer::StreamWriter;
2706
2707        let (namespace, _temp_dir) = create_test_namespace().await;
2708
2709        // Create a schema with some fields
2710        let schema = create_test_schema();
2711
2712        // Create some test data that matches the schema
2713        let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
2714        let arrow_schema = Arc::new(arrow_schema);
2715
2716        // Create a RecordBatch with actual data
2717        let id_array = Int32Array::from(vec![1, 2, 3]);
2718        let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
2719        let batch = arrow::record_batch::RecordBatch::try_new(
2720            arrow_schema.clone(),
2721            vec![Arc::new(id_array), Arc::new(name_array)],
2722        )
2723        .unwrap();
2724
2725        // Write the batch to an IPC stream
2726        let mut buffer = Vec::new();
2727        {
2728            let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
2729            writer.write(&batch).unwrap();
2730            writer.finish().unwrap();
2731        }
2732
2733        // Create table with the IPC data
2734        let mut request = CreateTableRequest::new();
2735        request.id = Some(vec!["test_table_with_data".to_string()]);
2736
2737        let response = namespace
2738            .create_table(request, Bytes::from(buffer))
2739            .await
2740            .unwrap();
2741
2742        assert_eq!(response.version, Some(1));
2743        assert!(
2744            response
2745                .location
2746                .unwrap()
2747                .contains("test_table_with_data.lance")
2748        );
2749
2750        // Verify table exists
2751        let mut exists_request = TableExistsRequest::new();
2752        exists_request.id = Some(vec!["test_table_with_data".to_string()]);
2753        namespace.table_exists(exists_request).await.unwrap();
2754    }
2755
2756    #[tokio::test]
2757    async fn test_child_namespace_create_and_list() {
2758        let (namespace, _temp_dir) = create_test_namespace().await;
2759
2760        // Create multiple child namespaces
2761        for i in 1..=3 {
2762            let mut create_req = CreateNamespaceRequest::new();
2763            create_req.id = Some(vec![format!("ns{}", i)]);
2764            let result = namespace.create_namespace(create_req).await;
2765            assert!(result.is_ok(), "Failed to create child namespace ns{}", i);
2766        }
2767
2768        // List child namespaces
2769        let list_req = ListNamespacesRequest {
2770            id: Some(vec![]),
2771            ..Default::default()
2772        };
2773        let result = namespace.list_namespaces(list_req).await;
2774        assert!(result.is_ok());
2775        let namespaces = result.unwrap().namespaces;
2776        assert_eq!(namespaces.len(), 3);
2777        assert!(namespaces.contains(&"ns1".to_string()));
2778        assert!(namespaces.contains(&"ns2".to_string()));
2779        assert!(namespaces.contains(&"ns3".to_string()));
2780    }
2781
2782    #[tokio::test]
2783    async fn test_nested_namespace_hierarchy() {
2784        let (namespace, _temp_dir) = create_test_namespace().await;
2785
2786        // Create parent namespace
2787        let mut create_req = CreateNamespaceRequest::new();
2788        create_req.id = Some(vec!["parent".to_string()]);
2789        namespace.create_namespace(create_req).await.unwrap();
2790
2791        // Create nested children
2792        let mut create_req = CreateNamespaceRequest::new();
2793        create_req.id = Some(vec!["parent".to_string(), "child1".to_string()]);
2794        namespace.create_namespace(create_req).await.unwrap();
2795
2796        let mut create_req = CreateNamespaceRequest::new();
2797        create_req.id = Some(vec!["parent".to_string(), "child2".to_string()]);
2798        namespace.create_namespace(create_req).await.unwrap();
2799
2800        // List children of parent
2801        let list_req = ListNamespacesRequest {
2802            id: Some(vec!["parent".to_string()]),
2803            ..Default::default()
2804        };
2805        let result = namespace.list_namespaces(list_req).await;
2806        assert!(result.is_ok());
2807        let children = result.unwrap().namespaces;
2808        assert_eq!(children.len(), 2);
2809        assert!(children.contains(&"child1".to_string()));
2810        assert!(children.contains(&"child2".to_string()));
2811
2812        // List root should only show parent
2813        let list_req = ListNamespacesRequest {
2814            id: Some(vec![]),
2815            ..Default::default()
2816        };
2817        let result = namespace.list_namespaces(list_req).await;
2818        assert!(result.is_ok());
2819        let root_namespaces = result.unwrap().namespaces;
2820        assert_eq!(root_namespaces.len(), 1);
2821        assert_eq!(root_namespaces[0], "parent");
2822    }
2823
2824    #[tokio::test]
2825    async fn test_table_in_child_namespace() {
2826        let (namespace, _temp_dir) = create_test_namespace().await;
2827
2828        // Create child namespace
2829        let mut create_ns_req = CreateNamespaceRequest::new();
2830        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2831        namespace.create_namespace(create_ns_req).await.unwrap();
2832
2833        // Create table in child namespace
2834        let schema = create_test_schema();
2835        let ipc_data = create_test_ipc_data(&schema);
2836        let mut create_table_req = CreateTableRequest::new();
2837        create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2838        let result = namespace
2839            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2840            .await;
2841        assert!(result.is_ok(), "Failed to create table in child namespace");
2842
2843        // List tables in child namespace
2844        let list_req = ListTablesRequest {
2845            id: Some(vec!["test_ns".to_string()]),
2846            ..Default::default()
2847        };
2848        let result = namespace.list_tables(list_req).await;
2849        assert!(result.is_ok());
2850        let tables = result.unwrap().tables;
2851        assert_eq!(tables.len(), 1);
2852        assert_eq!(tables[0], "table1");
2853
2854        // Verify table exists
2855        let mut exists_req = TableExistsRequest::new();
2856        exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2857        let result = namespace.table_exists(exists_req).await;
2858        assert!(result.is_ok());
2859
2860        // Describe table in child namespace
2861        let mut describe_req = DescribeTableRequest::new();
2862        describe_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2863        let result = namespace.describe_table(describe_req).await;
2864        assert!(result.is_ok());
2865        let response = result.unwrap();
2866        assert!(response.location.is_some());
2867    }
2868
2869    #[tokio::test]
2870    async fn test_multiple_tables_in_child_namespace() {
2871        let (namespace, _temp_dir) = create_test_namespace().await;
2872
2873        // Create child namespace
2874        let mut create_ns_req = CreateNamespaceRequest::new();
2875        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2876        namespace.create_namespace(create_ns_req).await.unwrap();
2877
2878        // Create multiple tables
2879        let schema = create_test_schema();
2880        let ipc_data = create_test_ipc_data(&schema);
2881        for i in 1..=3 {
2882            let mut create_table_req = CreateTableRequest::new();
2883            create_table_req.id = Some(vec!["test_ns".to_string(), format!("table{}", i)]);
2884            namespace
2885                .create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
2886                .await
2887                .unwrap();
2888        }
2889
2890        // List tables
2891        let list_req = ListTablesRequest {
2892            id: Some(vec!["test_ns".to_string()]),
2893            ..Default::default()
2894        };
2895        let result = namespace.list_tables(list_req).await;
2896        assert!(result.is_ok());
2897        let tables = result.unwrap().tables;
2898        assert_eq!(tables.len(), 3);
2899        assert!(tables.contains(&"table1".to_string()));
2900        assert!(tables.contains(&"table2".to_string()));
2901        assert!(tables.contains(&"table3".to_string()));
2902    }
2903
2904    #[tokio::test]
2905    async fn test_drop_table_in_child_namespace() {
2906        let (namespace, _temp_dir) = create_test_namespace().await;
2907
2908        // Create child namespace
2909        let mut create_ns_req = CreateNamespaceRequest::new();
2910        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2911        namespace.create_namespace(create_ns_req).await.unwrap();
2912
2913        // Create table
2914        let schema = create_test_schema();
2915        let ipc_data = create_test_ipc_data(&schema);
2916        let mut create_table_req = CreateTableRequest::new();
2917        create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2918        namespace
2919            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2920            .await
2921            .unwrap();
2922
2923        // Drop table
2924        let mut drop_req = DropTableRequest::new();
2925        drop_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2926        let result = namespace.drop_table(drop_req).await;
2927        assert!(result.is_ok(), "Failed to drop table in child namespace");
2928
2929        // Verify table no longer exists
2930        let mut exists_req = TableExistsRequest::new();
2931        exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2932        let result = namespace.table_exists(exists_req).await;
2933        assert!(result.is_err());
2934    }
2935
2936    #[tokio::test]
2937    async fn test_deeply_nested_namespace() {
2938        let (namespace, _temp_dir) = create_test_namespace().await;
2939
2940        // Create deeply nested namespace hierarchy
2941        let mut create_req = CreateNamespaceRequest::new();
2942        create_req.id = Some(vec!["level1".to_string()]);
2943        namespace.create_namespace(create_req).await.unwrap();
2944
2945        let mut create_req = CreateNamespaceRequest::new();
2946        create_req.id = Some(vec!["level1".to_string(), "level2".to_string()]);
2947        namespace.create_namespace(create_req).await.unwrap();
2948
2949        let mut create_req = CreateNamespaceRequest::new();
2950        create_req.id = Some(vec![
2951            "level1".to_string(),
2952            "level2".to_string(),
2953            "level3".to_string(),
2954        ]);
2955        namespace.create_namespace(create_req).await.unwrap();
2956
2957        // Create table in deeply nested namespace
2958        let schema = create_test_schema();
2959        let ipc_data = create_test_ipc_data(&schema);
2960        let mut create_table_req = CreateTableRequest::new();
2961        create_table_req.id = Some(vec![
2962            "level1".to_string(),
2963            "level2".to_string(),
2964            "level3".to_string(),
2965            "table1".to_string(),
2966        ]);
2967        let result = namespace
2968            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2969            .await;
2970        assert!(
2971            result.is_ok(),
2972            "Failed to create table in deeply nested namespace"
2973        );
2974
2975        // Verify table exists
2976        let mut exists_req = TableExistsRequest::new();
2977        exists_req.id = Some(vec![
2978            "level1".to_string(),
2979            "level2".to_string(),
2980            "level3".to_string(),
2981            "table1".to_string(),
2982        ]);
2983        let result = namespace.table_exists(exists_req).await;
2984        assert!(result.is_ok());
2985    }
2986
2987    #[tokio::test]
2988    async fn test_namespace_with_properties() {
2989        let (namespace, _temp_dir) = create_test_namespace().await;
2990
2991        // Create namespace with properties
2992        let mut properties = HashMap::new();
2993        properties.insert("owner".to_string(), "test_user".to_string());
2994        properties.insert("description".to_string(), "Test namespace".to_string());
2995
2996        let mut create_req = CreateNamespaceRequest::new();
2997        create_req.id = Some(vec!["test_ns".to_string()]);
2998        create_req.properties = Some(properties.clone());
2999        namespace.create_namespace(create_req).await.unwrap();
3000
3001        // Describe namespace and verify properties
3002        let describe_req = DescribeNamespaceRequest {
3003            id: Some(vec!["test_ns".to_string()]),
3004            ..Default::default()
3005        };
3006        let result = namespace.describe_namespace(describe_req).await;
3007        assert!(result.is_ok());
3008        let response = result.unwrap();
3009        assert!(response.properties.is_some());
3010        let props = response.properties.unwrap();
3011        assert_eq!(props.get("owner"), Some(&"test_user".to_string()));
3012        assert_eq!(
3013            props.get("description"),
3014            Some(&"Test namespace".to_string())
3015        );
3016    }
3017
3018    #[tokio::test]
3019    async fn test_cannot_drop_namespace_with_tables() {
3020        let (namespace, _temp_dir) = create_test_namespace().await;
3021
3022        // Create namespace
3023        let mut create_ns_req = CreateNamespaceRequest::new();
3024        create_ns_req.id = Some(vec!["test_ns".to_string()]);
3025        namespace.create_namespace(create_ns_req).await.unwrap();
3026
3027        // Create table in namespace
3028        let schema = create_test_schema();
3029        let ipc_data = create_test_ipc_data(&schema);
3030        let mut create_table_req = CreateTableRequest::new();
3031        create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
3032        namespace
3033            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
3034            .await
3035            .unwrap();
3036
3037        // Try to drop namespace - should fail
3038        let mut drop_req = DropNamespaceRequest::new();
3039        drop_req.id = Some(vec!["test_ns".to_string()]);
3040        let result = namespace.drop_namespace(drop_req).await;
3041        assert!(
3042            result.is_err(),
3043            "Should not be able to drop namespace with tables"
3044        );
3045    }
3046
3047    #[tokio::test]
3048    async fn test_isolation_between_namespaces() {
3049        let (namespace, _temp_dir) = create_test_namespace().await;
3050
3051        // Create two namespaces
3052        let mut create_req = CreateNamespaceRequest::new();
3053        create_req.id = Some(vec!["ns1".to_string()]);
3054        namespace.create_namespace(create_req).await.unwrap();
3055
3056        let mut create_req = CreateNamespaceRequest::new();
3057        create_req.id = Some(vec!["ns2".to_string()]);
3058        namespace.create_namespace(create_req).await.unwrap();
3059
3060        // Create table with same name in both namespaces
3061        let schema = create_test_schema();
3062        let ipc_data = create_test_ipc_data(&schema);
3063
3064        let mut create_table_req = CreateTableRequest::new();
3065        create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
3066        namespace
3067            .create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
3068            .await
3069            .unwrap();
3070
3071        let mut create_table_req = CreateTableRequest::new();
3072        create_table_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
3073        namespace
3074            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
3075            .await
3076            .unwrap();
3077
3078        // List tables in each namespace
3079        let list_req = ListTablesRequest {
3080            id: Some(vec!["ns1".to_string()]),
3081            page_token: None,
3082            limit: None,
3083            ..Default::default()
3084        };
3085        let result = namespace.list_tables(list_req).await.unwrap();
3086        assert_eq!(result.tables.len(), 1);
3087        assert_eq!(result.tables[0], "table1");
3088
3089        let list_req = ListTablesRequest {
3090            id: Some(vec!["ns2".to_string()]),
3091            page_token: None,
3092            limit: None,
3093            ..Default::default()
3094        };
3095        let result = namespace.list_tables(list_req).await.unwrap();
3096        assert_eq!(result.tables.len(), 1);
3097        assert_eq!(result.tables[0], "table1");
3098
3099        // Drop table in ns1 shouldn't affect ns2
3100        let mut drop_req = DropTableRequest::new();
3101        drop_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
3102        namespace.drop_table(drop_req).await.unwrap();
3103
3104        // Verify ns1 table is gone but ns2 table still exists
3105        let mut exists_req = TableExistsRequest::new();
3106        exists_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
3107        assert!(namespace.table_exists(exists_req).await.is_err());
3108
3109        let mut exists_req = TableExistsRequest::new();
3110        exists_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
3111        assert!(namespace.table_exists(exists_req).await.is_ok());
3112    }
3113
3114    #[tokio::test]
3115    async fn test_migrate_directory_tables() {
3116        let temp_dir = TempStdDir::default();
3117        let temp_path = temp_dir.to_str().unwrap();
3118
3119        // Step 1: Create tables in directory-only mode
3120        let dir_only_ns = DirectoryNamespaceBuilder::new(temp_path)
3121            .manifest_enabled(false)
3122            .dir_listing_enabled(true)
3123            .build()
3124            .await
3125            .unwrap();
3126
3127        // Create some tables
3128        let schema = create_test_schema();
3129        let ipc_data = create_test_ipc_data(&schema);
3130
3131        for i in 1..=3 {
3132            let mut create_req = CreateTableRequest::new();
3133            create_req.id = Some(vec![format!("table{}", i)]);
3134            dir_only_ns
3135                .create_table(create_req, bytes::Bytes::from(ipc_data.clone()))
3136                .await
3137                .unwrap();
3138        }
3139
3140        drop(dir_only_ns);
3141
3142        // Step 2: Create namespace with dual mode (manifest + directory listing)
3143        let dual_mode_ns = DirectoryNamespaceBuilder::new(temp_path)
3144            .manifest_enabled(true)
3145            .dir_listing_enabled(true)
3146            .build()
3147            .await
3148            .unwrap();
3149
3150        // Before migration, tables should be visible (via directory listing fallback)
3151        let mut list_req = ListTablesRequest::new();
3152        list_req.id = Some(vec![]);
3153        let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
3154        assert_eq!(tables.len(), 3);
3155
3156        // Run migration
3157        let migrated_count = dual_mode_ns.migrate().await.unwrap();
3158        assert_eq!(migrated_count, 3, "Should migrate all 3 tables");
3159
3160        // Verify tables are now in manifest
3161        let mut list_req = ListTablesRequest::new();
3162        list_req.id = Some(vec![]);
3163        let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
3164        assert_eq!(tables.len(), 3);
3165
3166        // Run migration again - should be idempotent
3167        let migrated_count = dual_mode_ns.migrate().await.unwrap();
3168        assert_eq!(
3169            migrated_count, 0,
3170            "Should not migrate already-migrated tables"
3171        );
3172
3173        drop(dual_mode_ns);
3174
3175        // Step 3: Create namespace with manifest-only mode
3176        let manifest_only_ns = DirectoryNamespaceBuilder::new(temp_path)
3177            .manifest_enabled(true)
3178            .dir_listing_enabled(false)
3179            .build()
3180            .await
3181            .unwrap();
3182
3183        // Tables should still be accessible (now from manifest only)
3184        let mut list_req = ListTablesRequest::new();
3185        list_req.id = Some(vec![]);
3186        let tables = manifest_only_ns.list_tables(list_req).await.unwrap().tables;
3187        assert_eq!(tables.len(), 3);
3188        assert!(tables.contains(&"table1".to_string()));
3189        assert!(tables.contains(&"table2".to_string()));
3190        assert!(tables.contains(&"table3".to_string()));
3191    }
3192
3193    #[tokio::test]
3194    async fn test_migrate_without_manifest() {
3195        let temp_dir = TempStdDir::default();
3196        let temp_path = temp_dir.to_str().unwrap();
3197
3198        // Create namespace without manifest
3199        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3200            .manifest_enabled(false)
3201            .dir_listing_enabled(true)
3202            .build()
3203            .await
3204            .unwrap();
3205
3206        // migrate() should return 0 when manifest is not enabled
3207        let migrated_count = namespace.migrate().await.unwrap();
3208        assert_eq!(migrated_count, 0);
3209    }
3210
3211    #[tokio::test]
3212    async fn test_register_table() {
3213        use lance_namespace::models::{RegisterTableRequest, TableExistsRequest};
3214
3215        let temp_dir = TempStdDir::default();
3216        let temp_path = temp_dir.to_str().unwrap();
3217
3218        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3219            .build()
3220            .await
3221            .unwrap();
3222
3223        // Create a physical table first using lance directly
3224        let schema = create_test_schema();
3225        let ipc_data = create_test_ipc_data(&schema);
3226
3227        let table_uri = format!("{}/external_table.lance", temp_path);
3228        let cursor = Cursor::new(ipc_data);
3229        let stream_reader = StreamReader::try_new(cursor, None).unwrap();
3230        let batches: Vec<_> = stream_reader
3231            .collect::<std::result::Result<Vec<_>, _>>()
3232            .unwrap();
3233        let schema = batches[0].schema();
3234        let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
3235        let reader = RecordBatchIterator::new(batch_results, schema);
3236        Dataset::write(Box::new(reader), &table_uri, None)
3237            .await
3238            .unwrap();
3239
3240        // Register the table
3241        let mut register_req = RegisterTableRequest::new("external_table.lance".to_string());
3242        register_req.id = Some(vec!["registered_table".to_string()]);
3243
3244        let response = namespace.register_table(register_req).await.unwrap();
3245        assert_eq!(response.location, Some("external_table.lance".to_string()));
3246
3247        // Verify table exists in namespace
3248        let mut exists_req = TableExistsRequest::new();
3249        exists_req.id = Some(vec!["registered_table".to_string()]);
3250        assert!(namespace.table_exists(exists_req).await.is_ok());
3251
3252        // Verify we can list the table
3253        let mut list_req = ListTablesRequest::new();
3254        list_req.id = Some(vec![]);
3255        let tables = namespace.list_tables(list_req).await.unwrap();
3256        assert!(tables.tables.contains(&"registered_table".to_string()));
3257    }
3258
3259    #[tokio::test]
3260    async fn test_register_table_duplicate_fails() {
3261        use lance_namespace::models::RegisterTableRequest;
3262
3263        let temp_dir = TempStdDir::default();
3264        let temp_path = temp_dir.to_str().unwrap();
3265
3266        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3267            .build()
3268            .await
3269            .unwrap();
3270
3271        // Register a table
3272        let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
3273        register_req.id = Some(vec!["test_table".to_string()]);
3274
3275        namespace
3276            .register_table(register_req.clone())
3277            .await
3278            .unwrap();
3279
3280        // Try to register again - should fail
3281        let result = namespace.register_table(register_req).await;
3282        assert!(result.is_err());
3283        assert!(result.unwrap_err().to_string().contains("already exists"));
3284    }
3285
3286    #[tokio::test]
3287    async fn test_deregister_table() {
3288        use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
3289
3290        let temp_dir = TempStdDir::default();
3291        let temp_path = temp_dir.to_str().unwrap();
3292
3293        // Create namespace with manifest-only mode (no directory listing fallback)
3294        // This ensures deregistered tables are truly invisible
3295        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3296            .manifest_enabled(true)
3297            .dir_listing_enabled(false)
3298            .build()
3299            .await
3300            .unwrap();
3301
3302        // Create a table
3303        let schema = create_test_schema();
3304        let ipc_data = create_test_ipc_data(&schema);
3305
3306        let mut create_req = CreateTableRequest::new();
3307        create_req.id = Some(vec!["test_table".to_string()]);
3308        namespace
3309            .create_table(create_req, bytes::Bytes::from(ipc_data))
3310            .await
3311            .unwrap();
3312
3313        // Verify table exists
3314        let mut exists_req = TableExistsRequest::new();
3315        exists_req.id = Some(vec!["test_table".to_string()]);
3316        assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
3317
3318        // Deregister the table
3319        let mut deregister_req = DeregisterTableRequest::new();
3320        deregister_req.id = Some(vec!["test_table".to_string()]);
3321        let response = namespace.deregister_table(deregister_req).await.unwrap();
3322
3323        // Should return location and id
3324        assert!(
3325            response.location.is_some(),
3326            "Deregister should return location"
3327        );
3328        let location = response.location.as_ref().unwrap();
3329        // Location should be a proper file:// URI with the temp path
3330        // Use uri_to_url to normalize the temp path to a URL for comparison
3331        let expected_url = lance_io::object_store::uri_to_url(temp_path)
3332            .expect("Failed to convert temp path to URL");
3333        let expected_prefix = expected_url.to_string();
3334        assert!(
3335            location.starts_with(&expected_prefix),
3336            "Location should start with '{}', got: {}",
3337            expected_prefix,
3338            location
3339        );
3340        assert!(
3341            location.contains("test_table"),
3342            "Location should contain table name: {}",
3343            location
3344        );
3345        assert_eq!(response.id, Some(vec!["test_table".to_string()]));
3346
3347        // Verify table no longer exists in namespace (removed from manifest)
3348        assert!(namespace.table_exists(exists_req).await.is_err());
3349
3350        // Verify physical data still exists at the returned location
3351        let dataset = Dataset::open(location).await;
3352        assert!(
3353            dataset.is_ok(),
3354            "Physical table data should still exist at {}",
3355            location
3356        );
3357    }
3358
3359    #[tokio::test]
3360    async fn test_deregister_table_in_child_namespace() {
3361        use lance_namespace::models::{
3362            CreateNamespaceRequest, DeregisterTableRequest, TableExistsRequest,
3363        };
3364
3365        let temp_dir = TempStdDir::default();
3366        let temp_path = temp_dir.to_str().unwrap();
3367
3368        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3369            .build()
3370            .await
3371            .unwrap();
3372
3373        // Create child namespace
3374        let mut create_ns_req = CreateNamespaceRequest::new();
3375        create_ns_req.id = Some(vec!["test_ns".to_string()]);
3376        namespace.create_namespace(create_ns_req).await.unwrap();
3377
3378        // Create a table in the child namespace
3379        let schema = create_test_schema();
3380        let ipc_data = create_test_ipc_data(&schema);
3381
3382        let mut create_req = CreateTableRequest::new();
3383        create_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
3384        namespace
3385            .create_table(create_req, bytes::Bytes::from(ipc_data))
3386            .await
3387            .unwrap();
3388
3389        // Deregister the table
3390        let mut deregister_req = DeregisterTableRequest::new();
3391        deregister_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
3392        let response = namespace.deregister_table(deregister_req).await.unwrap();
3393
3394        // Should return location and id in child namespace
3395        assert!(
3396            response.location.is_some(),
3397            "Deregister should return location"
3398        );
3399        let location = response.location.as_ref().unwrap();
3400        // Location should be a proper file:// URI with the temp path
3401        // Use uri_to_url to normalize the temp path to a URL for comparison
3402        let expected_url = lance_io::object_store::uri_to_url(temp_path)
3403            .expect("Failed to convert temp path to URL");
3404        let expected_prefix = expected_url.to_string();
3405        assert!(
3406            location.starts_with(&expected_prefix),
3407            "Location should start with '{}', got: {}",
3408            expected_prefix,
3409            location
3410        );
3411        assert!(
3412            location.contains("test_ns") && location.contains("test_table"),
3413            "Location should contain namespace and table name: {}",
3414            location
3415        );
3416        assert_eq!(
3417            response.id,
3418            Some(vec!["test_ns".to_string(), "test_table".to_string()])
3419        );
3420
3421        // Verify table no longer exists
3422        let mut exists_req = TableExistsRequest::new();
3423        exists_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
3424        assert!(namespace.table_exists(exists_req).await.is_err());
3425    }
3426
3427    #[tokio::test]
3428    async fn test_register_without_manifest_fails() {
3429        use lance_namespace::models::RegisterTableRequest;
3430
3431        let temp_dir = TempStdDir::default();
3432        let temp_path = temp_dir.to_str().unwrap();
3433
3434        // Create namespace without manifest
3435        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3436            .manifest_enabled(false)
3437            .build()
3438            .await
3439            .unwrap();
3440
3441        // Try to register - should fail (register requires manifest)
3442        let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
3443        register_req.id = Some(vec!["test_table".to_string()]);
3444        let result = namespace.register_table(register_req).await;
3445        assert!(result.is_err());
3446        assert!(
3447            result
3448                .unwrap_err()
3449                .to_string()
3450                .contains("manifest mode is enabled")
3451        );
3452
3453        // Note: deregister_table now works in V1 mode via .lance-deregistered marker files
3454        // See test_deregister_table_v1_mode for that test case
3455    }
3456
3457    #[tokio::test]
3458    async fn test_register_table_rejects_absolute_uri() {
3459        use lance_namespace::models::RegisterTableRequest;
3460
3461        let temp_dir = TempStdDir::default();
3462        let temp_path = temp_dir.to_str().unwrap();
3463
3464        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3465            .build()
3466            .await
3467            .unwrap();
3468
3469        // Try to register with absolute URI - should fail
3470        let mut register_req = RegisterTableRequest::new("s3://bucket/table.lance".to_string());
3471        register_req.id = Some(vec!["test_table".to_string()]);
3472        let result = namespace.register_table(register_req).await;
3473        assert!(result.is_err());
3474        let err_msg = result.unwrap_err().to_string();
3475        assert!(err_msg.contains("Absolute URIs are not allowed"));
3476    }
3477
3478    #[tokio::test]
3479    async fn test_register_table_rejects_absolute_path() {
3480        use lance_namespace::models::RegisterTableRequest;
3481
3482        let temp_dir = TempStdDir::default();
3483        let temp_path = temp_dir.to_str().unwrap();
3484
3485        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3486            .build()
3487            .await
3488            .unwrap();
3489
3490        // Try to register with absolute path - should fail
3491        let mut register_req = RegisterTableRequest::new("/tmp/table.lance".to_string());
3492        register_req.id = Some(vec!["test_table".to_string()]);
3493        let result = namespace.register_table(register_req).await;
3494        assert!(result.is_err());
3495        let err_msg = result.unwrap_err().to_string();
3496        assert!(err_msg.contains("Absolute paths are not allowed"));
3497    }
3498
3499    #[tokio::test]
3500    async fn test_register_table_rejects_path_traversal() {
3501        use lance_namespace::models::RegisterTableRequest;
3502
3503        let temp_dir = TempStdDir::default();
3504        let temp_path = temp_dir.to_str().unwrap();
3505
3506        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3507            .build()
3508            .await
3509            .unwrap();
3510
3511        // Try to register with path traversal - should fail
3512        let mut register_req = RegisterTableRequest::new("../outside/table.lance".to_string());
3513        register_req.id = Some(vec!["test_table".to_string()]);
3514        let result = namespace.register_table(register_req).await;
3515        assert!(result.is_err());
3516        let err_msg = result.unwrap_err().to_string();
3517        assert!(err_msg.contains("Path traversal is not allowed"));
3518    }
3519
3520    #[tokio::test]
3521    async fn test_namespace_write() {
3522        use arrow::array::Int32Array;
3523        use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
3524        use arrow::record_batch::{RecordBatch, RecordBatchIterator};
3525        use lance::dataset::{Dataset, WriteMode, WriteParams};
3526        use lance_namespace::LanceNamespace;
3527
3528        let (namespace, _temp_dir) = create_test_namespace().await;
3529        let namespace = Arc::new(namespace) as Arc<dyn LanceNamespace>;
3530
3531        // Use child namespace instead of root
3532        let table_id = vec!["test_ns".to_string(), "test_table".to_string()];
3533        let schema = Arc::new(ArrowSchema::new(vec![
3534            ArrowField::new("a", DataType::Int32, false),
3535            ArrowField::new("b", DataType::Int32, false),
3536        ]));
3537
3538        // Test 1: CREATE mode
3539        let data1 = RecordBatch::try_new(
3540            schema.clone(),
3541            vec![
3542                Arc::new(Int32Array::from(vec![1, 2, 3])),
3543                Arc::new(Int32Array::from(vec![10, 20, 30])),
3544            ],
3545        )
3546        .unwrap();
3547
3548        let reader1 = RecordBatchIterator::new(vec![data1].into_iter().map(Ok), schema.clone());
3549        let dataset =
3550            Dataset::write_into_namespace(reader1, namespace.clone(), table_id.clone(), None)
3551                .await
3552                .unwrap();
3553
3554        assert_eq!(dataset.count_rows(None).await.unwrap(), 3);
3555        assert_eq!(dataset.version().version, 1);
3556
3557        // Test 2: APPEND mode
3558        let data2 = RecordBatch::try_new(
3559            schema.clone(),
3560            vec![
3561                Arc::new(Int32Array::from(vec![4, 5])),
3562                Arc::new(Int32Array::from(vec![40, 50])),
3563            ],
3564        )
3565        .unwrap();
3566
3567        let params_append = WriteParams {
3568            mode: WriteMode::Append,
3569            ..Default::default()
3570        };
3571
3572        let reader2 = RecordBatchIterator::new(vec![data2].into_iter().map(Ok), schema.clone());
3573        let dataset = Dataset::write_into_namespace(
3574            reader2,
3575            namespace.clone(),
3576            table_id.clone(),
3577            Some(params_append),
3578        )
3579        .await
3580        .unwrap();
3581
3582        assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
3583        assert_eq!(dataset.version().version, 2);
3584
3585        // Test 3: OVERWRITE mode
3586        let data3 = RecordBatch::try_new(
3587            schema.clone(),
3588            vec![
3589                Arc::new(Int32Array::from(vec![100, 200])),
3590                Arc::new(Int32Array::from(vec![1000, 2000])),
3591            ],
3592        )
3593        .unwrap();
3594
3595        let params_overwrite = WriteParams {
3596            mode: WriteMode::Overwrite,
3597            ..Default::default()
3598        };
3599
3600        let reader3 = RecordBatchIterator::new(vec![data3].into_iter().map(Ok), schema.clone());
3601        let dataset = Dataset::write_into_namespace(
3602            reader3,
3603            namespace.clone(),
3604            table_id.clone(),
3605            Some(params_overwrite),
3606        )
3607        .await
3608        .unwrap();
3609
3610        assert_eq!(dataset.count_rows(None).await.unwrap(), 2);
3611        assert_eq!(dataset.version().version, 3);
3612
3613        // Verify old data was replaced
3614        let result = dataset.scan().try_into_batch().await.unwrap();
3615        let a_col = result
3616            .column_by_name("a")
3617            .unwrap()
3618            .as_any()
3619            .downcast_ref::<Int32Array>()
3620            .unwrap();
3621        assert_eq!(a_col.values(), &[100, 200]);
3622    }
3623
3624    // ============================================================
3625    // Tests for declare_table
3626    // ============================================================
3627
3628    #[tokio::test]
3629    async fn test_declare_table_v1_mode() {
3630        use lance_namespace::models::{
3631            DeclareTableRequest, DescribeTableRequest, TableExistsRequest,
3632        };
3633
3634        let temp_dir = TempStdDir::default();
3635        let temp_path = temp_dir.to_str().unwrap();
3636
3637        // Create namespace in V1 mode (no manifest)
3638        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3639            .manifest_enabled(false)
3640            .build()
3641            .await
3642            .unwrap();
3643
3644        // Declare a table
3645        let mut declare_req = DeclareTableRequest::new();
3646        declare_req.id = Some(vec!["test_table".to_string()]);
3647        let response = namespace.declare_table(declare_req).await.unwrap();
3648
3649        // Should return location
3650        assert!(response.location.is_some());
3651        let location = response.location.as_ref().unwrap();
3652        assert!(location.ends_with("test_table.lance"));
3653
3654        // Table should exist (via reserved file)
3655        let mut exists_req = TableExistsRequest::new();
3656        exists_req.id = Some(vec!["test_table".to_string()]);
3657        assert!(namespace.table_exists(exists_req).await.is_ok());
3658
3659        // Describe should work but return no version/schema (not written yet)
3660        let mut describe_req = DescribeTableRequest::new();
3661        describe_req.id = Some(vec!["test_table".to_string()]);
3662        let describe_response = namespace.describe_table(describe_req).await.unwrap();
3663        assert!(describe_response.location.is_some());
3664        assert!(describe_response.version.is_none()); // Not written yet
3665        assert!(describe_response.schema.is_none()); // Not written yet
3666    }
3667
3668    #[tokio::test]
3669    async fn test_declare_table_with_manifest() {
3670        use lance_namespace::models::{DeclareTableRequest, TableExistsRequest};
3671
3672        let temp_dir = TempStdDir::default();
3673        let temp_path = temp_dir.to_str().unwrap();
3674
3675        // Create namespace with manifest
3676        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3677            .manifest_enabled(true)
3678            .dir_listing_enabled(false)
3679            .build()
3680            .await
3681            .unwrap();
3682
3683        // Declare a table
3684        let mut declare_req = DeclareTableRequest::new();
3685        declare_req.id = Some(vec!["test_table".to_string()]);
3686        let response = namespace.declare_table(declare_req).await.unwrap();
3687
3688        // Should return location
3689        assert!(response.location.is_some());
3690
3691        // Table should exist in manifest
3692        let mut exists_req = TableExistsRequest::new();
3693        exists_req.id = Some(vec!["test_table".to_string()]);
3694        assert!(namespace.table_exists(exists_req).await.is_ok());
3695    }
3696
3697    #[tokio::test]
3698    async fn test_declare_table_when_table_exists() {
3699        use lance_namespace::models::DeclareTableRequest;
3700
3701        let temp_dir = TempStdDir::default();
3702        let temp_path = temp_dir.to_str().unwrap();
3703
3704        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3705            .manifest_enabled(false)
3706            .build()
3707            .await
3708            .unwrap();
3709
3710        // First create a table with actual data
3711        let schema = create_test_schema();
3712        let ipc_data = create_test_ipc_data(&schema);
3713        let mut create_req = CreateTableRequest::new();
3714        create_req.id = Some(vec!["test_table".to_string()]);
3715        namespace
3716            .create_table(create_req, bytes::Bytes::from(ipc_data))
3717            .await
3718            .unwrap();
3719
3720        // Try to declare the same table - should fail because it already has data
3721        let mut declare_req = DeclareTableRequest::new();
3722        declare_req.id = Some(vec!["test_table".to_string()]);
3723        let result = namespace.declare_table(declare_req).await;
3724        assert!(result.is_err());
3725    }
3726
3727    // ============================================================
3728    // Tests for deregister_table in V1 mode
3729    // ============================================================
3730
3731    #[tokio::test]
3732    async fn test_deregister_table_v1_mode() {
3733        use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
3734
3735        let temp_dir = TempStdDir::default();
3736        let temp_path = temp_dir.to_str().unwrap();
3737
3738        // Create namespace in V1 mode (no manifest, with dir listing)
3739        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3740            .manifest_enabled(false)
3741            .dir_listing_enabled(true)
3742            .build()
3743            .await
3744            .unwrap();
3745
3746        // Create a table with data
3747        let schema = create_test_schema();
3748        let ipc_data = create_test_ipc_data(&schema);
3749        let mut create_req = CreateTableRequest::new();
3750        create_req.id = Some(vec!["test_table".to_string()]);
3751        namespace
3752            .create_table(create_req, bytes::Bytes::from(ipc_data))
3753            .await
3754            .unwrap();
3755
3756        // Verify table exists
3757        let mut exists_req = TableExistsRequest::new();
3758        exists_req.id = Some(vec!["test_table".to_string()]);
3759        assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
3760
3761        // Deregister the table
3762        let mut deregister_req = DeregisterTableRequest::new();
3763        deregister_req.id = Some(vec!["test_table".to_string()]);
3764        let response = namespace.deregister_table(deregister_req).await.unwrap();
3765
3766        // Should return location
3767        assert!(response.location.is_some());
3768        let location = response.location.as_ref().unwrap();
3769        assert!(location.contains("test_table"));
3770
3771        // Table should no longer exist (deregistered)
3772        let result = namespace.table_exists(exists_req).await;
3773        assert!(result.is_err());
3774        assert!(result.unwrap_err().to_string().contains("deregistered"));
3775
3776        // Physical data should still exist
3777        let dataset = Dataset::open(location).await;
3778        assert!(dataset.is_ok(), "Physical table data should still exist");
3779    }
3780
3781    #[tokio::test]
3782    async fn test_deregister_table_v1_already_deregistered() {
3783        use lance_namespace::models::DeregisterTableRequest;
3784
3785        let temp_dir = TempStdDir::default();
3786        let temp_path = temp_dir.to_str().unwrap();
3787
3788        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3789            .manifest_enabled(false)
3790            .dir_listing_enabled(true)
3791            .build()
3792            .await
3793            .unwrap();
3794
3795        // Create a table
3796        let schema = create_test_schema();
3797        let ipc_data = create_test_ipc_data(&schema);
3798        let mut create_req = CreateTableRequest::new();
3799        create_req.id = Some(vec!["test_table".to_string()]);
3800        namespace
3801            .create_table(create_req, bytes::Bytes::from(ipc_data))
3802            .await
3803            .unwrap();
3804
3805        // Deregister once
3806        let mut deregister_req = DeregisterTableRequest::new();
3807        deregister_req.id = Some(vec!["test_table".to_string()]);
3808        namespace
3809            .deregister_table(deregister_req.clone())
3810            .await
3811            .unwrap();
3812
3813        // Try to deregister again - should fail
3814        let result = namespace.deregister_table(deregister_req).await;
3815        assert!(result.is_err());
3816        assert!(
3817            result
3818                .unwrap_err()
3819                .to_string()
3820                .contains("already deregistered")
3821        );
3822    }
3823
3824    // ============================================================
3825    // Tests for list_tables skipping deregistered tables
3826    // ============================================================
3827
3828    #[tokio::test]
3829    async fn test_list_tables_skips_deregistered_v1() {
3830        use lance_namespace::models::DeregisterTableRequest;
3831
3832        let temp_dir = TempStdDir::default();
3833        let temp_path = temp_dir.to_str().unwrap();
3834
3835        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3836            .manifest_enabled(false)
3837            .dir_listing_enabled(true)
3838            .build()
3839            .await
3840            .unwrap();
3841
3842        // Create two tables
3843        let schema = create_test_schema();
3844        let ipc_data = create_test_ipc_data(&schema);
3845
3846        let mut create_req1 = CreateTableRequest::new();
3847        create_req1.id = Some(vec!["table1".to_string()]);
3848        namespace
3849            .create_table(create_req1, bytes::Bytes::from(ipc_data.clone()))
3850            .await
3851            .unwrap();
3852
3853        let mut create_req2 = CreateTableRequest::new();
3854        create_req2.id = Some(vec!["table2".to_string()]);
3855        namespace
3856            .create_table(create_req2, bytes::Bytes::from(ipc_data))
3857            .await
3858            .unwrap();
3859
3860        // List tables - should see both (root namespace = empty vec)
3861        let mut list_req = ListTablesRequest::new();
3862        list_req.id = Some(vec![]);
3863        let list_response = namespace.list_tables(list_req.clone()).await.unwrap();
3864        assert_eq!(list_response.tables.len(), 2);
3865
3866        // Deregister table1
3867        let mut deregister_req = DeregisterTableRequest::new();
3868        deregister_req.id = Some(vec!["table1".to_string()]);
3869        namespace.deregister_table(deregister_req).await.unwrap();
3870
3871        // List tables - should only see table2
3872        let list_response = namespace.list_tables(list_req).await.unwrap();
3873        assert_eq!(list_response.tables.len(), 1);
3874        assert!(list_response.tables.contains(&"table2".to_string()));
3875        assert!(!list_response.tables.contains(&"table1".to_string()));
3876    }
3877
3878    // ============================================================
3879    // Tests for describe_table and table_exists with deregistered tables
3880    // ============================================================
3881
3882    #[tokio::test]
3883    async fn test_describe_table_fails_for_deregistered_v1() {
3884        use lance_namespace::models::{DeregisterTableRequest, DescribeTableRequest};
3885
3886        let temp_dir = TempStdDir::default();
3887        let temp_path = temp_dir.to_str().unwrap();
3888
3889        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3890            .manifest_enabled(false)
3891            .dir_listing_enabled(true)
3892            .build()
3893            .await
3894            .unwrap();
3895
3896        // Create a table
3897        let schema = create_test_schema();
3898        let ipc_data = create_test_ipc_data(&schema);
3899        let mut create_req = CreateTableRequest::new();
3900        create_req.id = Some(vec!["test_table".to_string()]);
3901        namespace
3902            .create_table(create_req, bytes::Bytes::from(ipc_data))
3903            .await
3904            .unwrap();
3905
3906        // Describe should work before deregistration
3907        let mut describe_req = DescribeTableRequest::new();
3908        describe_req.id = Some(vec!["test_table".to_string()]);
3909        assert!(namespace.describe_table(describe_req.clone()).await.is_ok());
3910
3911        // Deregister
3912        let mut deregister_req = DeregisterTableRequest::new();
3913        deregister_req.id = Some(vec!["test_table".to_string()]);
3914        namespace.deregister_table(deregister_req).await.unwrap();
3915
3916        // Describe should fail after deregistration
3917        let result = namespace.describe_table(describe_req).await;
3918        assert!(result.is_err());
3919        assert!(result.unwrap_err().to_string().contains("deregistered"));
3920    }
3921
3922    #[tokio::test]
3923    async fn test_table_exists_fails_for_deregistered_v1() {
3924        use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
3925
3926        let temp_dir = TempStdDir::default();
3927        let temp_path = temp_dir.to_str().unwrap();
3928
3929        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3930            .manifest_enabled(false)
3931            .dir_listing_enabled(true)
3932            .build()
3933            .await
3934            .unwrap();
3935
3936        // Create a table
3937        let schema = create_test_schema();
3938        let ipc_data = create_test_ipc_data(&schema);
3939        let mut create_req = CreateTableRequest::new();
3940        create_req.id = Some(vec!["test_table".to_string()]);
3941        namespace
3942            .create_table(create_req, bytes::Bytes::from(ipc_data))
3943            .await
3944            .unwrap();
3945
3946        // Table exists should work before deregistration
3947        let mut exists_req = TableExistsRequest::new();
3948        exists_req.id = Some(vec!["test_table".to_string()]);
3949        assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
3950
3951        // Deregister
3952        let mut deregister_req = DeregisterTableRequest::new();
3953        deregister_req.id = Some(vec!["test_table".to_string()]);
3954        namespace.deregister_table(deregister_req).await.unwrap();
3955
3956        // Table exists should fail after deregistration
3957        let result = namespace.table_exists(exists_req).await;
3958        assert!(result.is_err());
3959        assert!(result.unwrap_err().to_string().contains("deregistered"));
3960    }
3961
3962    #[tokio::test]
3963    async fn test_atomic_table_status_check() {
3964        // This test verifies that the TableStatus check is atomic
3965        // by ensuring a single directory listing is used
3966
3967        let temp_dir = TempStdDir::default();
3968        let temp_path = temp_dir.to_str().unwrap();
3969
3970        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3971            .manifest_enabled(false)
3972            .dir_listing_enabled(true)
3973            .build()
3974            .await
3975            .unwrap();
3976
3977        // Create a table
3978        let schema = create_test_schema();
3979        let ipc_data = create_test_ipc_data(&schema);
3980        let mut create_req = CreateTableRequest::new();
3981        create_req.id = Some(vec!["test_table".to_string()]);
3982        namespace
3983            .create_table(create_req, bytes::Bytes::from(ipc_data))
3984            .await
3985            .unwrap();
3986
3987        // Table status should show exists=true, is_deregistered=false
3988        let status = namespace.check_table_status("test_table").await;
3989        assert!(status.exists);
3990        assert!(!status.is_deregistered);
3991        assert!(!status.has_reserved_file);
3992    }
3993
3994    #[tokio::test]
3995    async fn test_table_version_tracking_enabled_managed_versioning() {
3996        use lance_namespace::models::DescribeTableRequest;
3997
3998        let temp_dir = TempStdDir::default();
3999        let temp_path = temp_dir.to_str().unwrap();
4000
4001        // Create namespace with table_version_tracking_enabled=true
4002        let namespace = DirectoryNamespaceBuilder::new(temp_path)
4003            .table_version_tracking_enabled(true)
4004            .build()
4005            .await
4006            .unwrap();
4007
4008        // Create a table
4009        let schema = create_test_schema();
4010        let ipc_data = create_test_ipc_data(&schema);
4011        let mut create_req = CreateTableRequest::new();
4012        create_req.id = Some(vec!["test_table".to_string()]);
4013        namespace
4014            .create_table(create_req, bytes::Bytes::from(ipc_data))
4015            .await
4016            .unwrap();
4017
4018        // Describe table should return managed_versioning=true
4019        let mut describe_req = DescribeTableRequest::new();
4020        describe_req.id = Some(vec!["test_table".to_string()]);
4021        let describe_resp = namespace.describe_table(describe_req).await.unwrap();
4022
4023        // managed_versioning should be true
4024        assert_eq!(
4025            describe_resp.managed_versioning,
4026            Some(true),
4027            "managed_versioning should be true when table_version_tracking_enabled=true"
4028        );
4029    }
4030
4031    #[tokio::test]
4032    async fn test_table_version_tracking_disabled_no_managed_versioning() {
4033        use lance_namespace::models::DescribeTableRequest;
4034
4035        let temp_dir = TempStdDir::default();
4036        let temp_path = temp_dir.to_str().unwrap();
4037
4038        // Create namespace with table_version_tracking_enabled=false (default)
4039        let namespace = DirectoryNamespaceBuilder::new(temp_path)
4040            .table_version_tracking_enabled(false)
4041            .build()
4042            .await
4043            .unwrap();
4044
4045        // Create a table
4046        let schema = create_test_schema();
4047        let ipc_data = create_test_ipc_data(&schema);
4048        let mut create_req = CreateTableRequest::new();
4049        create_req.id = Some(vec!["test_table".to_string()]);
4050        namespace
4051            .create_table(create_req, bytes::Bytes::from(ipc_data))
4052            .await
4053            .unwrap();
4054
4055        // Describe table should not have managed_versioning set
4056        let mut describe_req = DescribeTableRequest::new();
4057        describe_req.id = Some(vec!["test_table".to_string()]);
4058        let describe_resp = namespace.describe_table(describe_req).await.unwrap();
4059
4060        // managed_versioning should be None when table_version_tracking_enabled=false
4061        assert!(
4062            describe_resp.managed_versioning.is_none(),
4063            "managed_versioning should be None when table_version_tracking_enabled=false, got: {:?}",
4064            describe_resp.managed_versioning
4065        );
4066    }
4067
4068    #[tokio::test]
4069    #[cfg(not(windows))]
4070    async fn test_list_table_versions() {
4071        use arrow::array::{Int32Array, RecordBatchIterator};
4072        use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
4073        use arrow::record_batch::RecordBatch;
4074        use lance::dataset::{Dataset, WriteMode, WriteParams};
4075        use lance_namespace::models::{CreateNamespaceRequest, ListTableVersionsRequest};
4076
4077        let temp_dir = TempStrDir::default();
4078        let temp_path: &str = &temp_dir;
4079
4080        let namespace: Arc<dyn LanceNamespace> = Arc::new(
4081            DirectoryNamespaceBuilder::new(temp_path)
4082                .table_version_tracking_enabled(true)
4083                .build()
4084                .await
4085                .unwrap(),
4086        );
4087
4088        // Create parent namespace first
4089        let mut create_ns_req = CreateNamespaceRequest::new();
4090        create_ns_req.id = Some(vec!["workspace".to_string()]);
4091        namespace.create_namespace(create_ns_req).await.unwrap();
4092
4093        // Create a table using write_into_namespace (version 1)
4094        let table_id = vec!["workspace".to_string(), "test_table".to_string()];
4095        let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
4096            "id",
4097            DataType::Int32,
4098            false,
4099        )]));
4100        let batch = RecordBatch::try_new(
4101            arrow_schema.clone(),
4102            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
4103        )
4104        .unwrap();
4105        let batches = RecordBatchIterator::new(vec![Ok(batch.clone())], arrow_schema.clone());
4106        let write_params = WriteParams {
4107            mode: WriteMode::Create,
4108            ..Default::default()
4109        };
4110        let mut dataset = Dataset::write_into_namespace(
4111            batches,
4112            namespace.clone(),
4113            table_id.clone(),
4114            Some(write_params),
4115        )
4116        .await
4117        .unwrap();
4118
4119        // Append to create version 2
4120        let batch2 = RecordBatch::try_new(
4121            arrow_schema.clone(),
4122            vec![Arc::new(Int32Array::from(vec![100, 200]))],
4123        )
4124        .unwrap();
4125        let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema.clone());
4126        dataset.append(batches, None).await.unwrap();
4127
4128        // Append to create version 3
4129        let batch3 = RecordBatch::try_new(
4130            arrow_schema.clone(),
4131            vec![Arc::new(Int32Array::from(vec![300, 400]))],
4132        )
4133        .unwrap();
4134        let batches = RecordBatchIterator::new(vec![Ok(batch3)], arrow_schema);
4135        dataset.append(batches, None).await.unwrap();
4136
4137        // List versions - should have versions 1, 2, and 3
4138        let mut list_req = ListTableVersionsRequest::new();
4139        list_req.id = Some(table_id.clone());
4140        let list_resp = namespace.list_table_versions(list_req).await.unwrap();
4141
4142        assert_eq!(
4143            list_resp.versions.len(),
4144            3,
4145            "Should have 3 versions, got: {:?}",
4146            list_resp.versions
4147        );
4148
4149        // Verify each version
4150        for expected_version in 1..=3 {
4151            let version = list_resp
4152                .versions
4153                .iter()
4154                .find(|v| v.version == expected_version)
4155                .unwrap_or_else(|| panic!("Expected version {}", expected_version));
4156
4157            assert!(
4158                !version.manifest_path.is_empty(),
4159                "manifest_path should be set for version {}",
4160                expected_version
4161            );
4162            assert!(
4163                version.manifest_path.contains(".manifest"),
4164                "manifest_path should contain .manifest for version {}",
4165                expected_version
4166            );
4167            assert!(
4168                version.manifest_size.is_some(),
4169                "manifest_size should be set for version {}",
4170                expected_version
4171            );
4172            assert!(
4173                version.manifest_size.unwrap() > 0,
4174                "manifest_size should be > 0 for version {}",
4175                expected_version
4176            );
4177            assert!(
4178                version.timestamp_millis.is_some(),
4179                "timestamp_millis should be set for version {}",
4180                expected_version
4181            );
4182        }
4183    }
4184
4185    #[tokio::test]
4186    #[cfg(not(windows))]
4187    async fn test_describe_table_version() {
4188        use arrow::array::{Int32Array, RecordBatchIterator};
4189        use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
4190        use arrow::record_batch::RecordBatch;
4191        use lance::dataset::{Dataset, WriteMode, WriteParams};
4192        use lance_namespace::models::{CreateNamespaceRequest, DescribeTableVersionRequest};
4193
4194        let temp_dir = TempStrDir::default();
4195        let temp_path: &str = &temp_dir;
4196
4197        let namespace: Arc<dyn LanceNamespace> = Arc::new(
4198            DirectoryNamespaceBuilder::new(temp_path)
4199                .table_version_tracking_enabled(true)
4200                .build()
4201                .await
4202                .unwrap(),
4203        );
4204
4205        // Create parent namespace first
4206        let mut create_ns_req = CreateNamespaceRequest::new();
4207        create_ns_req.id = Some(vec!["workspace".to_string()]);
4208        namespace.create_namespace(create_ns_req).await.unwrap();
4209
4210        // Create a table using write_into_namespace (version 1)
4211        let table_id = vec!["workspace".to_string(), "test_table".to_string()];
4212        let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
4213            "id",
4214            DataType::Int32,
4215            false,
4216        )]));
4217        let batch = RecordBatch::try_new(
4218            arrow_schema.clone(),
4219            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
4220        )
4221        .unwrap();
4222        let batches = RecordBatchIterator::new(vec![Ok(batch)], arrow_schema.clone());
4223        let write_params = WriteParams {
4224            mode: WriteMode::Create,
4225            ..Default::default()
4226        };
4227        let mut dataset = Dataset::write_into_namespace(
4228            batches,
4229            namespace.clone(),
4230            table_id.clone(),
4231            Some(write_params),
4232        )
4233        .await
4234        .unwrap();
4235
4236        // Append data to create version 2
4237        let batch2 = RecordBatch::try_new(
4238            arrow_schema.clone(),
4239            vec![Arc::new(Int32Array::from(vec![100, 200]))],
4240        )
4241        .unwrap();
4242        let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema);
4243        dataset.append(batches, None).await.unwrap();
4244
4245        // Describe version 1
4246        let mut describe_req = DescribeTableVersionRequest::new();
4247        describe_req.id = Some(table_id.clone());
4248        describe_req.version = Some(1);
4249        let describe_resp = namespace
4250            .describe_table_version(describe_req)
4251            .await
4252            .unwrap();
4253
4254        let version = &describe_resp.version;
4255        assert_eq!(version.version, 1);
4256        assert!(version.timestamp_millis.is_some());
4257        assert!(
4258            !version.manifest_path.is_empty(),
4259            "manifest_path should be set"
4260        );
4261        assert!(
4262            version.manifest_path.contains(".manifest"),
4263            "manifest_path should contain .manifest"
4264        );
4265        assert!(
4266            version.manifest_size.is_some(),
4267            "manifest_size should be set"
4268        );
4269        assert!(
4270            version.manifest_size.unwrap() > 0,
4271            "manifest_size should be > 0"
4272        );
4273
4274        // Describe version 2
4275        let mut describe_req = DescribeTableVersionRequest::new();
4276        describe_req.id = Some(table_id.clone());
4277        describe_req.version = Some(2);
4278        let describe_resp = namespace
4279            .describe_table_version(describe_req)
4280            .await
4281            .unwrap();
4282
4283        let version = &describe_resp.version;
4284        assert_eq!(version.version, 2);
4285        assert!(version.timestamp_millis.is_some());
4286        assert!(
4287            !version.manifest_path.is_empty(),
4288            "manifest_path should be set"
4289        );
4290        assert!(
4291            version.manifest_size.is_some(),
4292            "manifest_size should be set"
4293        );
4294        assert!(
4295            version.manifest_size.unwrap() > 0,
4296            "manifest_size should be > 0"
4297        );
4298    }
4299
4300    #[tokio::test]
4301    #[cfg(not(windows))]
4302    async fn test_describe_table_version_latest() {
4303        use arrow::array::{Int32Array, RecordBatchIterator};
4304        use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
4305        use arrow::record_batch::RecordBatch;
4306        use lance::dataset::{Dataset, WriteMode, WriteParams};
4307        use lance_namespace::models::{CreateNamespaceRequest, DescribeTableVersionRequest};
4308
4309        let temp_dir = TempStrDir::default();
4310        let temp_path: &str = &temp_dir;
4311
4312        let namespace: Arc<dyn LanceNamespace> = Arc::new(
4313            DirectoryNamespaceBuilder::new(temp_path)
4314                .table_version_tracking_enabled(true)
4315                .build()
4316                .await
4317                .unwrap(),
4318        );
4319
4320        // Create parent namespace first
4321        let mut create_ns_req = CreateNamespaceRequest::new();
4322        create_ns_req.id = Some(vec!["workspace".to_string()]);
4323        namespace.create_namespace(create_ns_req).await.unwrap();
4324
4325        // Create a table using write_into_namespace (version 1)
4326        let table_id = vec!["workspace".to_string(), "test_table".to_string()];
4327        let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
4328            "id",
4329            DataType::Int32,
4330            false,
4331        )]));
4332        let batch = RecordBatch::try_new(
4333            arrow_schema.clone(),
4334            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
4335        )
4336        .unwrap();
4337        let batches = RecordBatchIterator::new(vec![Ok(batch)], arrow_schema.clone());
4338        let write_params = WriteParams {
4339            mode: WriteMode::Create,
4340            ..Default::default()
4341        };
4342        let mut dataset = Dataset::write_into_namespace(
4343            batches,
4344            namespace.clone(),
4345            table_id.clone(),
4346            Some(write_params),
4347        )
4348        .await
4349        .unwrap();
4350
4351        // Append to create version 2
4352        let batch2 = RecordBatch::try_new(
4353            arrow_schema.clone(),
4354            vec![Arc::new(Int32Array::from(vec![100, 200]))],
4355        )
4356        .unwrap();
4357        let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema.clone());
4358        dataset.append(batches, None).await.unwrap();
4359
4360        // Append to create version 3
4361        let batch3 = RecordBatch::try_new(
4362            arrow_schema.clone(),
4363            vec![Arc::new(Int32Array::from(vec![300, 400]))],
4364        )
4365        .unwrap();
4366        let batches = RecordBatchIterator::new(vec![Ok(batch3)], arrow_schema);
4367        dataset.append(batches, None).await.unwrap();
4368
4369        // Describe latest version (no version specified)
4370        let mut describe_req = DescribeTableVersionRequest::new();
4371        describe_req.id = Some(table_id.clone());
4372        describe_req.version = None;
4373        let describe_resp = namespace
4374            .describe_table_version(describe_req)
4375            .await
4376            .unwrap();
4377
4378        // Should return version 3 as it's the latest
4379        assert_eq!(describe_resp.version.version, 3);
4380    }
4381
4382    #[tokio::test]
4383    #[cfg(not(windows))]
4384    async fn test_create_table_version() {
4385        use futures::TryStreamExt;
4386        use lance::dataset::builder::DatasetBuilder;
4387        use lance_namespace::models::CreateTableVersionRequest;
4388
4389        let temp_dir = TempStrDir::default();
4390        let temp_path: &str = &temp_dir;
4391
4392        let namespace: Arc<dyn LanceNamespace> = Arc::new(
4393            DirectoryNamespaceBuilder::new(temp_path)
4394                .table_version_tracking_enabled(true)
4395                .build()
4396                .await
4397                .unwrap(),
4398        );
4399
4400        // Create a table
4401        let schema = create_test_schema();
4402        let ipc_data = create_test_ipc_data(&schema);
4403        let mut create_req = CreateTableRequest::new();
4404        create_req.id = Some(vec!["test_table".to_string()]);
4405        namespace
4406            .create_table(create_req, bytes::Bytes::from(ipc_data))
4407            .await
4408            .unwrap();
4409
4410        // Open the dataset using from_namespace to get proper object_store and paths
4411        let table_id = vec!["test_table".to_string()];
4412        let dataset = DatasetBuilder::from_namespace(namespace.clone(), table_id.clone())
4413            .await
4414            .unwrap()
4415            .load()
4416            .await
4417            .unwrap();
4418
4419        // Use dataset's object_store to find and copy the manifest
4420        let versions_path = dataset.versions_dir();
4421        let manifest_metas: Vec<_> = dataset
4422            .object_store()
4423            .inner
4424            .list(Some(&versions_path))
4425            .try_collect()
4426            .await
4427            .unwrap();
4428
4429        let manifest_meta = manifest_metas
4430            .iter()
4431            .find(|m| {
4432                m.location
4433                    .filename()
4434                    .map(|f| f.ends_with(".manifest"))
4435                    .unwrap_or(false)
4436            })
4437            .expect("No manifest file found");
4438
4439        // Read the existing manifest data
4440        let manifest_data = dataset
4441            .object_store()
4442            .inner
4443            .get(&manifest_meta.location)
4444            .await
4445            .unwrap()
4446            .bytes()
4447            .await
4448            .unwrap();
4449
4450        // Write to a staging location using the dataset's object_store
4451        let staging_path = dataset.versions_dir().child("staging_manifest");
4452        dataset
4453            .object_store()
4454            .inner
4455            .put(&staging_path, manifest_data.into())
4456            .await
4457            .unwrap();
4458
4459        // Create version 2 from staging manifest
4460        // Use the same naming scheme as the existing dataset (V2)
4461        let mut create_version_req = CreateTableVersionRequest::new(2, staging_path.to_string());
4462        create_version_req.id = Some(table_id.clone());
4463        create_version_req.naming_scheme = Some("V2".to_string());
4464
4465        let result = namespace.create_table_version(create_version_req).await;
4466        assert!(
4467            result.is_ok(),
4468            "create_table_version should succeed: {:?}",
4469            result
4470        );
4471
4472        // Verify version 2 was created at the path returned in the response
4473        let response = result.unwrap();
4474        let version_info = response
4475            .version
4476            .expect("response should contain version info");
4477        let version_2_path = Path::from(version_info.manifest_path);
4478        let head_result = dataset.object_store().inner.head(&version_2_path).await;
4479        assert!(
4480            head_result.is_ok(),
4481            "Version 2 manifest should exist at {}",
4482            version_2_path
4483        );
4484
4485        // Verify the staging file has been deleted
4486        let staging_head_result = dataset.object_store().inner.head(&staging_path).await;
4487        assert!(
4488            staging_head_result.is_err(),
4489            "Staging manifest should have been deleted after create_table_version"
4490        );
4491    }
4492
4493    #[tokio::test]
4494    #[cfg(not(windows))]
4495    async fn test_create_table_version_conflict() {
4496        // create_table_version should fail if the version already exists.
4497        // Each version always writes to a new file location.
4498        use futures::TryStreamExt;
4499        use lance::dataset::builder::DatasetBuilder;
4500        use lance_namespace::models::CreateTableVersionRequest;
4501
4502        let temp_dir = TempStrDir::default();
4503        let temp_path: &str = &temp_dir;
4504
4505        let namespace: Arc<dyn LanceNamespace> = Arc::new(
4506            DirectoryNamespaceBuilder::new(temp_path)
4507                .table_version_tracking_enabled(true)
4508                .build()
4509                .await
4510                .unwrap(),
4511        );
4512
4513        // Create a table
4514        let schema = create_test_schema();
4515        let ipc_data = create_test_ipc_data(&schema);
4516        let mut create_req = CreateTableRequest::new();
4517        create_req.id = Some(vec!["test_table".to_string()]);
4518        namespace
4519            .create_table(create_req, bytes::Bytes::from(ipc_data))
4520            .await
4521            .unwrap();
4522
4523        // Open the dataset using from_namespace to get proper object_store and paths
4524        let table_id = vec!["test_table".to_string()];
4525        let dataset = DatasetBuilder::from_namespace(namespace.clone(), table_id.clone())
4526            .await
4527            .unwrap()
4528            .load()
4529            .await
4530            .unwrap();
4531
4532        // Use dataset's object_store to find and copy the manifest
4533        let versions_path = dataset.versions_dir();
4534        let manifest_metas: Vec<_> = dataset
4535            .object_store()
4536            .inner
4537            .list(Some(&versions_path))
4538            .try_collect()
4539            .await
4540            .unwrap();
4541
4542        let manifest_meta = manifest_metas
4543            .iter()
4544            .find(|m| {
4545                m.location
4546                    .filename()
4547                    .map(|f| f.ends_with(".manifest"))
4548                    .unwrap_or(false)
4549            })
4550            .expect("No manifest file found");
4551
4552        // Read the existing manifest data
4553        let manifest_data = dataset
4554            .object_store()
4555            .inner
4556            .get(&manifest_meta.location)
4557            .await
4558            .unwrap()
4559            .bytes()
4560            .await
4561            .unwrap();
4562
4563        // Write to a staging location using the dataset's object_store
4564        let staging_path = dataset.versions_dir().child("staging_manifest");
4565        dataset
4566            .object_store()
4567            .inner
4568            .put(&staging_path, manifest_data.into())
4569            .await
4570            .unwrap();
4571
4572        // First create version 2 (should succeed)
4573        let mut create_version_req = CreateTableVersionRequest::new(2, staging_path.to_string());
4574        create_version_req.id = Some(table_id.clone());
4575        create_version_req.naming_scheme = Some("V2".to_string());
4576        let first_result = namespace.create_table_version(create_version_req).await;
4577        assert!(
4578            first_result.is_ok(),
4579            "First create_table_version for version 2 should succeed: {:?}",
4580            first_result
4581        );
4582
4583        // Get the path from the response for verification
4584        let version_2_path = Path::from(
4585            first_result
4586                .unwrap()
4587                .version
4588                .expect("response should contain version info")
4589                .manifest_path,
4590        );
4591
4592        // Create version 2 again (should fail - conflict)
4593        let mut create_version_req = CreateTableVersionRequest::new(2, staging_path.to_string());
4594        create_version_req.id = Some(table_id.clone());
4595        create_version_req.naming_scheme = Some("V2".to_string());
4596
4597        let result = namespace.create_table_version(create_version_req).await;
4598        assert!(
4599            result.is_err(),
4600            "create_table_version should fail for existing version"
4601        );
4602
4603        // Verify version 2 still exists using the dataset's object_store
4604        let head_result = dataset.object_store().inner.head(&version_2_path).await;
4605        assert!(
4606            head_result.is_ok(),
4607            "Version 2 manifest should still exist at {}",
4608            version_2_path
4609        );
4610    }
4611
4612    #[tokio::test]
4613    async fn test_create_table_version_table_not_found() {
4614        use lance_namespace::models::CreateTableVersionRequest;
4615
4616        let temp_dir = TempStdDir::default();
4617        let temp_path = temp_dir.to_str().unwrap();
4618
4619        let namespace = DirectoryNamespaceBuilder::new(temp_path)
4620            .table_version_tracking_enabled(true)
4621            .build()
4622            .await
4623            .unwrap();
4624
4625        // Try to create version for non-existent table
4626        let mut create_version_req =
4627            CreateTableVersionRequest::new(1, "/some/staging/path".to_string());
4628        create_version_req.id = Some(vec!["non_existent_table".to_string()]);
4629
4630        let result = namespace.create_table_version(create_version_req).await;
4631        assert!(
4632            result.is_err(),
4633            "create_table_version should fail for non-existent table"
4634        );
4635        let err_msg = result.unwrap_err().to_string();
4636        assert!(
4637            err_msg.contains("does not exist"),
4638            "Error should mention table does not exist, got: {}",
4639            err_msg
4640        );
4641    }
4642
4643    /// End-to-end integration test module for table version tracking.
4644    mod e2e_table_version_tracking {
4645        use super::*;
4646        use std::sync::atomic::{AtomicUsize, Ordering};
4647
4648        /// Tracking wrapper around a namespace that counts method invocations.
4649        struct TrackingNamespace {
4650            inner: DirectoryNamespace,
4651            create_table_version_count: AtomicUsize,
4652            describe_table_version_count: AtomicUsize,
4653            list_table_versions_count: AtomicUsize,
4654        }
4655
4656        impl TrackingNamespace {
4657            fn new(inner: DirectoryNamespace) -> Self {
4658                Self {
4659                    inner,
4660                    create_table_version_count: AtomicUsize::new(0),
4661                    describe_table_version_count: AtomicUsize::new(0),
4662                    list_table_versions_count: AtomicUsize::new(0),
4663                }
4664            }
4665
4666            fn create_table_version_calls(&self) -> usize {
4667                self.create_table_version_count.load(Ordering::SeqCst)
4668            }
4669
4670            fn describe_table_version_calls(&self) -> usize {
4671                self.describe_table_version_count.load(Ordering::SeqCst)
4672            }
4673
4674            fn list_table_versions_calls(&self) -> usize {
4675                self.list_table_versions_count.load(Ordering::SeqCst)
4676            }
4677        }
4678
4679        impl std::fmt::Debug for TrackingNamespace {
4680            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4681                f.debug_struct("TrackingNamespace")
4682                    .field(
4683                        "create_table_version_calls",
4684                        &self.create_table_version_calls(),
4685                    )
4686                    .finish()
4687            }
4688        }
4689
4690        #[async_trait]
4691        impl LanceNamespace for TrackingNamespace {
4692            async fn create_namespace(
4693                &self,
4694                request: CreateNamespaceRequest,
4695            ) -> Result<CreateNamespaceResponse> {
4696                self.inner.create_namespace(request).await
4697            }
4698
4699            async fn describe_namespace(
4700                &self,
4701                request: DescribeNamespaceRequest,
4702            ) -> Result<DescribeNamespaceResponse> {
4703                self.inner.describe_namespace(request).await
4704            }
4705
4706            async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
4707                self.inner.namespace_exists(request).await
4708            }
4709
4710            async fn list_namespaces(
4711                &self,
4712                request: ListNamespacesRequest,
4713            ) -> Result<ListNamespacesResponse> {
4714                self.inner.list_namespaces(request).await
4715            }
4716
4717            async fn drop_namespace(
4718                &self,
4719                request: DropNamespaceRequest,
4720            ) -> Result<DropNamespaceResponse> {
4721                self.inner.drop_namespace(request).await
4722            }
4723
4724            async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
4725                self.inner.list_tables(request).await
4726            }
4727
4728            async fn describe_table(
4729                &self,
4730                request: DescribeTableRequest,
4731            ) -> Result<DescribeTableResponse> {
4732                self.inner.describe_table(request).await
4733            }
4734
4735            async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
4736                self.inner.table_exists(request).await
4737            }
4738
4739            async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
4740                self.inner.drop_table(request).await
4741            }
4742
4743            async fn create_table(
4744                &self,
4745                request: CreateTableRequest,
4746                request_data: Bytes,
4747            ) -> Result<CreateTableResponse> {
4748                self.inner.create_table(request, request_data).await
4749            }
4750
4751            async fn declare_table(
4752                &self,
4753                request: DeclareTableRequest,
4754            ) -> Result<DeclareTableResponse> {
4755                self.inner.declare_table(request).await
4756            }
4757
4758            async fn list_table_versions(
4759                &self,
4760                request: ListTableVersionsRequest,
4761            ) -> Result<ListTableVersionsResponse> {
4762                self.list_table_versions_count
4763                    .fetch_add(1, Ordering::SeqCst);
4764                self.inner.list_table_versions(request).await
4765            }
4766
4767            async fn create_table_version(
4768                &self,
4769                request: CreateTableVersionRequest,
4770            ) -> Result<CreateTableVersionResponse> {
4771                self.create_table_version_count
4772                    .fetch_add(1, Ordering::SeqCst);
4773                self.inner.create_table_version(request).await
4774            }
4775
4776            async fn describe_table_version(
4777                &self,
4778                request: DescribeTableVersionRequest,
4779            ) -> Result<DescribeTableVersionResponse> {
4780                self.describe_table_version_count
4781                    .fetch_add(1, Ordering::SeqCst);
4782                self.inner.describe_table_version(request).await
4783            }
4784
4785            async fn batch_delete_table_versions(
4786                &self,
4787                request: BatchDeleteTableVersionsRequest,
4788            ) -> Result<BatchDeleteTableVersionsResponse> {
4789                self.inner.batch_delete_table_versions(request).await
4790            }
4791
4792            fn namespace_id(&self) -> String {
4793                self.inner.namespace_id()
4794            }
4795        }
4796
4797        #[tokio::test]
4798        async fn test_describe_table_returns_managed_versioning() {
4799            use lance_namespace::models::{CreateNamespaceRequest, DescribeTableRequest};
4800
4801            let temp_dir = TempStdDir::default();
4802            let temp_path = temp_dir.to_str().unwrap();
4803
4804            // Create namespace with table_version_tracking_enabled and manifest_enabled
4805            let ns = DirectoryNamespaceBuilder::new(temp_path)
4806                .table_version_tracking_enabled(true)
4807                .manifest_enabled(true)
4808                .build()
4809                .await
4810                .unwrap();
4811
4812            // Create parent namespace
4813            let mut create_ns_req = CreateNamespaceRequest::new();
4814            create_ns_req.id = Some(vec!["workspace".to_string()]);
4815            ns.create_namespace(create_ns_req).await.unwrap();
4816
4817            // Create a table with multi-level ID (namespace + table)
4818            let schema = create_test_schema();
4819            let ipc_data = create_test_ipc_data(&schema);
4820            let mut create_req = CreateTableRequest::new();
4821            create_req.id = Some(vec!["workspace".to_string(), "test_table".to_string()]);
4822            ns.create_table(create_req, bytes::Bytes::from(ipc_data))
4823                .await
4824                .unwrap();
4825
4826            // Describe table should return managed_versioning=true
4827            let mut describe_req = DescribeTableRequest::new();
4828            describe_req.id = Some(vec!["workspace".to_string(), "test_table".to_string()]);
4829            let describe_resp = ns.describe_table(describe_req).await.unwrap();
4830
4831            // managed_versioning should be true
4832            assert_eq!(
4833                describe_resp.managed_versioning,
4834                Some(true),
4835                "managed_versioning should be true when table_version_tracking_enabled=true"
4836            );
4837        }
4838
4839        #[tokio::test]
4840        #[cfg(not(windows))]
4841        async fn test_external_manifest_store_invokes_namespace_apis() {
4842            use arrow::array::{Int32Array, StringArray};
4843            use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
4844            use arrow::record_batch::RecordBatch;
4845            use lance::Dataset;
4846            use lance::dataset::builder::DatasetBuilder;
4847            use lance::dataset::{WriteMode, WriteParams};
4848            use lance_namespace::models::CreateNamespaceRequest;
4849
4850            let temp_dir = TempStdDir::default();
4851            let temp_path = temp_dir.to_str().unwrap();
4852
4853            // Create namespace with table_version_tracking_enabled and manifest_enabled
4854            let inner_ns = DirectoryNamespaceBuilder::new(temp_path)
4855                .table_version_tracking_enabled(true)
4856                .manifest_enabled(true)
4857                .build()
4858                .await
4859                .unwrap();
4860
4861            let tracking_ns = Arc::new(TrackingNamespace::new(inner_ns));
4862            let ns: Arc<dyn LanceNamespace> = tracking_ns.clone();
4863
4864            // Create parent namespace
4865            let mut create_ns_req = CreateNamespaceRequest::new();
4866            create_ns_req.id = Some(vec!["workspace".to_string()]);
4867            ns.create_namespace(create_ns_req).await.unwrap();
4868
4869            // Create a table with multi-level ID (namespace + table)
4870            let table_id = vec!["workspace".to_string(), "test_table".to_string()];
4871
4872            // Create some initial data
4873            let arrow_schema = Arc::new(ArrowSchema::new(vec![
4874                Field::new("id", DataType::Int32, false),
4875                Field::new("name", DataType::Utf8, true),
4876            ]));
4877            let batch = RecordBatch::try_new(
4878                arrow_schema.clone(),
4879                vec![
4880                    Arc::new(Int32Array::from(vec![1, 2, 3])),
4881                    Arc::new(StringArray::from(vec!["a", "b", "c"])),
4882                ],
4883            )
4884            .unwrap();
4885
4886            // Create a table using write_into_namespace
4887            let batches = RecordBatchIterator::new(vec![Ok(batch.clone())], arrow_schema.clone());
4888            let write_params = WriteParams {
4889                mode: WriteMode::Create,
4890                ..Default::default()
4891            };
4892            let mut dataset = Dataset::write_into_namespace(
4893                batches,
4894                ns.clone(),
4895                table_id.clone(),
4896                Some(write_params),
4897            )
4898            .await
4899            .unwrap();
4900            assert_eq!(dataset.version().version, 1);
4901
4902            // Verify create_table_version was called once during initial write_into_namespace
4903            assert_eq!(
4904                tracking_ns.create_table_version_calls(),
4905                1,
4906                "create_table_version should have been called once during initial write_into_namespace"
4907            );
4908
4909            // Append data - this should call create_table_version again
4910            let append_batch = RecordBatch::try_new(
4911                arrow_schema.clone(),
4912                vec![
4913                    Arc::new(Int32Array::from(vec![4, 5, 6])),
4914                    Arc::new(StringArray::from(vec!["d", "e", "f"])),
4915                ],
4916            )
4917            .unwrap();
4918            let append_batches = RecordBatchIterator::new(vec![Ok(append_batch)], arrow_schema);
4919            dataset.append(append_batches, None).await.unwrap();
4920
4921            assert_eq!(
4922                tracking_ns.create_table_version_calls(),
4923                2,
4924                "create_table_version should have been called twice (once for create, once for append)"
4925            );
4926
4927            // checkout_latest should call list_table_versions exactly once
4928            let initial_list_calls = tracking_ns.list_table_versions_calls();
4929            let latest_dataset = DatasetBuilder::from_namespace(ns.clone(), table_id.clone())
4930                .await
4931                .unwrap()
4932                .load()
4933                .await
4934                .unwrap();
4935            assert_eq!(latest_dataset.version().version, 2);
4936            assert_eq!(
4937                tracking_ns.list_table_versions_calls(),
4938                initial_list_calls + 1,
4939                "list_table_versions should have been called exactly once during checkout_latest"
4940            );
4941
4942            // checkout to specific version should call describe_table_version exactly once
4943            let initial_describe_calls = tracking_ns.describe_table_version_calls();
4944            let v1_dataset = DatasetBuilder::from_namespace(ns.clone(), table_id.clone())
4945                .await
4946                .unwrap()
4947                .with_version(1)
4948                .load()
4949                .await
4950                .unwrap();
4951            assert_eq!(v1_dataset.version().version, 1);
4952            assert_eq!(
4953                tracking_ns.describe_table_version_calls(),
4954                initial_describe_calls + 1,
4955                "describe_table_version should have been called exactly once during checkout to version 1"
4956            );
4957        }
4958
4959        #[tokio::test]
4960        #[cfg(not(windows))]
4961        async fn test_dataset_commit_with_external_manifest_store() {
4962            use arrow::array::{Int32Array, StringArray};
4963            use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
4964            use arrow::record_batch::RecordBatch;
4965            use futures::TryStreamExt;
4966            use lance::dataset::{Dataset, WriteMode, WriteParams};
4967            use lance_namespace::models::CreateNamespaceRequest;
4968            use lance_table::io::commit::ManifestNamingScheme;
4969
4970            let temp_dir = TempStdDir::default();
4971            let temp_path = temp_dir.to_str().unwrap();
4972
4973            // Create namespace with table_version_tracking_enabled and manifest_enabled
4974            let inner_ns = DirectoryNamespaceBuilder::new(temp_path)
4975                .table_version_tracking_enabled(true)
4976                .manifest_enabled(true)
4977                .build()
4978                .await
4979                .unwrap();
4980
4981            let tracking_ns: Arc<dyn LanceNamespace> = Arc::new(TrackingNamespace::new(inner_ns));
4982
4983            // Create parent namespace
4984            let mut create_ns_req = CreateNamespaceRequest::new();
4985            create_ns_req.id = Some(vec!["workspace".to_string()]);
4986            tracking_ns.create_namespace(create_ns_req).await.unwrap();
4987
4988            // Create a table using write_into_namespace
4989            let table_id = vec!["workspace".to_string(), "test_table".to_string()];
4990            let arrow_schema = Arc::new(ArrowSchema::new(vec![
4991                Field::new("id", DataType::Int32, false),
4992                Field::new("name", DataType::Utf8, true),
4993            ]));
4994            let batch = RecordBatch::try_new(
4995                arrow_schema.clone(),
4996                vec![
4997                    Arc::new(Int32Array::from(vec![1, 2, 3])),
4998                    Arc::new(StringArray::from(vec!["a", "b", "c"])),
4999                ],
5000            )
5001            .unwrap();
5002            let batches = RecordBatchIterator::new(vec![Ok(batch)], arrow_schema.clone());
5003            let write_params = WriteParams {
5004                mode: WriteMode::Create,
5005                ..Default::default()
5006            };
5007            let dataset = Dataset::write_into_namespace(
5008                batches,
5009                tracking_ns.clone(),
5010                table_id.clone(),
5011                Some(write_params),
5012            )
5013            .await
5014            .unwrap();
5015            assert_eq!(dataset.version().version, 1);
5016
5017            // Append data using write_into_namespace (APPEND mode)
5018            let batch2 = RecordBatch::try_new(
5019                arrow_schema.clone(),
5020                vec![
5021                    Arc::new(Int32Array::from(vec![4, 5, 6])),
5022                    Arc::new(StringArray::from(vec!["d", "e", "f"])),
5023                ],
5024            )
5025            .unwrap();
5026            let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema);
5027            let write_params = WriteParams {
5028                mode: WriteMode::Append,
5029                ..Default::default()
5030            };
5031            Dataset::write_into_namespace(
5032                batches,
5033                tracking_ns.clone(),
5034                table_id.clone(),
5035                Some(write_params),
5036            )
5037            .await
5038            .unwrap();
5039
5040            // Verify version 2 was created using the dataset's object_store
5041            // List manifests in the versions directory to find the V2 named manifest
5042            let manifest_metas: Vec<_> = dataset
5043                .object_store()
5044                .inner
5045                .list(Some(&dataset.versions_dir()))
5046                .try_collect()
5047                .await
5048                .unwrap();
5049            let version_2_found = manifest_metas.iter().any(|m| {
5050                m.location
5051                    .filename()
5052                    .map(|f| {
5053                        f.ends_with(".manifest")
5054                            && ManifestNamingScheme::V2.parse_version(f) == Some(2)
5055                    })
5056                    .unwrap_or(false)
5057            });
5058            assert!(
5059                version_2_found,
5060                "Version 2 manifest should exist in versions directory"
5061            );
5062        }
5063    }
5064
5065    /// Tests for multi-table transaction support via table_version_storage_enabled.
5066    mod multi_table_transactions {
5067        use super::*;
5068        use futures::TryStreamExt;
5069        use lance::dataset::builder::DatasetBuilder;
5070        use lance_namespace::models::CreateTableVersionRequest;
5071
5072        /// Helper to create a namespace with table_version_storage_enabled enabled
5073        async fn create_managed_namespace(temp_path: &str) -> Arc<DirectoryNamespace> {
5074            Arc::new(
5075                DirectoryNamespaceBuilder::new(temp_path)
5076                    .table_version_tracking_enabled(true)
5077                    .table_version_storage_enabled(true)
5078                    .manifest_enabled(true)
5079                    .build()
5080                    .await
5081                    .unwrap(),
5082            )
5083        }
5084
5085        /// Helper to create a table and get its staging manifest path
5086        async fn create_table_and_get_staging(
5087            namespace: Arc<dyn LanceNamespace>,
5088            table_name: &str,
5089        ) -> (Vec<String>, object_store::path::Path) {
5090            let schema = create_test_schema();
5091            let ipc_data = create_test_ipc_data(&schema);
5092            let mut create_req = CreateTableRequest::new();
5093            create_req.id = Some(vec![table_name.to_string()]);
5094            namespace
5095                .create_table(create_req, bytes::Bytes::from(ipc_data))
5096                .await
5097                .unwrap();
5098
5099            let table_id = vec![table_name.to_string()];
5100            let dataset = DatasetBuilder::from_namespace(namespace.clone(), table_id.clone())
5101                .await
5102                .unwrap()
5103                .load()
5104                .await
5105                .unwrap();
5106
5107            // Find existing manifest and create a staging copy
5108            let versions_path = dataset.versions_dir();
5109            let manifest_metas: Vec<_> = dataset
5110                .object_store()
5111                .inner
5112                .list(Some(&versions_path))
5113                .try_collect()
5114                .await
5115                .unwrap();
5116
5117            let manifest_meta = manifest_metas
5118                .iter()
5119                .find(|m| {
5120                    m.location
5121                        .filename()
5122                        .map(|f| f.ends_with(".manifest"))
5123                        .unwrap_or(false)
5124                })
5125                .expect("No manifest file found");
5126
5127            let manifest_data = dataset
5128                .object_store()
5129                .inner
5130                .get(&manifest_meta.location)
5131                .await
5132                .unwrap()
5133                .bytes()
5134                .await
5135                .unwrap();
5136
5137            let staging_path = dataset
5138                .versions_dir()
5139                .child(format!("staging_{}", table_name));
5140            dataset
5141                .object_store()
5142                .inner
5143                .put(&staging_path, manifest_data.into())
5144                .await
5145                .unwrap();
5146
5147            (table_id, staging_path)
5148        }
5149
5150        #[tokio::test]
5151        async fn test_table_version_storage_enabled_requires_manifest() {
5152            // table_version_storage_enabled=true requires manifest_enabled=true
5153            let temp_dir = TempStdDir::default();
5154            let temp_path = temp_dir.to_str().unwrap();
5155
5156            let result = DirectoryNamespaceBuilder::new(temp_path)
5157                .table_version_storage_enabled(true)
5158                .manifest_enabled(false)
5159                .build()
5160                .await;
5161
5162            assert!(
5163                result.is_err(),
5164                "Should fail when table_version_storage_enabled=true but manifest_enabled=false"
5165            );
5166        }
5167
5168        #[tokio::test]
5169        #[cfg(not(windows))]
5170        async fn test_create_table_version_records_in_manifest() {
5171            // When table_version_storage_enabled is enabled, single create_table_version
5172            // should also record the version in __manifest
5173            let temp_dir = TempStrDir::default();
5174            let temp_path: &str = &temp_dir;
5175
5176            let namespace = create_managed_namespace(temp_path).await;
5177            let ns: Arc<dyn LanceNamespace> = namespace.clone();
5178
5179            let (table_id, staging_path) =
5180                create_table_and_get_staging(ns.clone(), "table_managed").await;
5181
5182            // Create version 2
5183            let mut create_req = CreateTableVersionRequest::new(2, staging_path.to_string());
5184            create_req.id = Some(table_id.clone());
5185            create_req.naming_scheme = Some("V2".to_string());
5186            let response = namespace.create_table_version(create_req).await.unwrap();
5187
5188            assert!(response.version.is_some());
5189            let version = response.version.unwrap();
5190            assert_eq!(version.version, 2);
5191
5192            // Verify the version is recorded in __manifest by querying it
5193            let manifest_ns = namespace.manifest_ns.as_ref().unwrap();
5194            let table_id_str = manifest::ManifestNamespace::str_object_id(&table_id);
5195            let versions = manifest_ns
5196                .query_table_versions(&table_id_str, false, None)
5197                .await
5198                .unwrap();
5199
5200            assert!(
5201                !versions.is_empty(),
5202                "Version should be recorded in __manifest"
5203            );
5204            let (ver, _path) = &versions[0];
5205            assert_eq!(*ver, 2, "Recorded version should be 2");
5206        }
5207    }
5208}