Skip to main content

lance_namespace_impls/
dir.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Directory-based Lance Namespace implementation.
5//!
6//! This module provides a directory-based implementation of the Lance namespace
7//! that stores tables as Lance datasets in a filesystem directory structure.
8
9pub mod manifest;
10
11use arrow::record_batch::RecordBatchIterator;
12use arrow_ipc::reader::StreamReader;
13use async_trait::async_trait;
14use bytes::Bytes;
15use futures::TryStreamExt;
16use lance::dataset::builder::DatasetBuilder;
17use lance::dataset::{Dataset, WriteParams};
18use lance::session::Session;
19use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
20use lance_table::io::commit::ManifestNamingScheme;
21use object_store::path::Path;
22use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, PutMode, PutOptions};
23use std::collections::HashMap;
24use std::io::Cursor;
25use std::sync::Arc;
26
27use crate::context::DynamicContextProvider;
28use lance_namespace::models::{
29    BatchDeleteTableVersionsRequest, BatchDeleteTableVersionsResponse, CreateNamespaceRequest,
30    CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, CreateTableVersionRequest,
31    CreateTableVersionResponse, DeclareTableRequest, DeclareTableResponse,
32    DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
33    DescribeTableResponse, DescribeTableVersionRequest, DescribeTableVersionResponse,
34    DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, DropTableResponse, Identity,
35    ListNamespacesRequest, ListNamespacesResponse, ListTableVersionsRequest,
36    ListTableVersionsResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest,
37    TableExistsRequest, TableVersion,
38};
39
40use lance_core::{Error, Result, box_error};
41use lance_namespace::LanceNamespace;
42use lance_namespace::schema::arrow_schema_to_json;
43
44use crate::credentials::{
45    CredentialVendor, create_credential_vendor_for_location, has_credential_vendor_config,
46};
47
48/// Result of checking table status atomically.
49///
50/// This struct captures the state of a table directory in a single snapshot,
51/// avoiding race conditions between checking existence and other status flags.
52pub(crate) struct TableStatus {
53    /// Whether the table directory exists (has any files)
54    pub(crate) exists: bool,
55    /// Whether the table has a `.lance-deregistered` marker file
56    pub(crate) is_deregistered: bool,
57    /// Whether the table has a `.lance-reserved` marker file (declared but not written)
58    pub(crate) has_reserved_file: bool,
59}
60
61/// Builder for creating a DirectoryNamespace.
62///
63/// This builder provides a fluent API for configuring and establishing
64/// connections to directory-based Lance namespaces.
65///
66/// # Examples
67///
68/// ```no_run
69/// # use lance_namespace_impls::DirectoryNamespaceBuilder;
70/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
71/// // Create a local directory namespace
72/// let namespace = DirectoryNamespaceBuilder::new("/path/to/data")
73///     .build()
74///     .await?;
75/// # Ok(())
76/// # }
77/// ```
78///
79/// ```no_run
80/// # use lance_namespace_impls::DirectoryNamespaceBuilder;
81/// # use lance::session::Session;
82/// # use std::sync::Arc;
83/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
84/// // Create with custom storage options and session
85/// let session = Arc::new(Session::default());
86/// let namespace = DirectoryNamespaceBuilder::new("s3://bucket/path")
87///     .storage_option("region", "us-west-2")
88///     .storage_option("access_key_id", "key")
89///     .session(session)
90///     .build()
91///     .await?;
92/// # Ok(())
93/// # }
94/// ```
95#[derive(Clone)]
96pub struct DirectoryNamespaceBuilder {
97    root: String,
98    storage_options: Option<HashMap<String, String>>,
99    session: Option<Arc<Session>>,
100    manifest_enabled: bool,
101    dir_listing_enabled: bool,
102    inline_optimization_enabled: bool,
103    table_version_tracking_enabled: bool,
104    credential_vendor_properties: HashMap<String, String>,
105    context_provider: Option<Arc<dyn DynamicContextProvider>>,
106    commit_retries: Option<u32>,
107}
108
109impl std::fmt::Debug for DirectoryNamespaceBuilder {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        f.debug_struct("DirectoryNamespaceBuilder")
112            .field("root", &self.root)
113            .field("storage_options", &self.storage_options)
114            .field("manifest_enabled", &self.manifest_enabled)
115            .field("dir_listing_enabled", &self.dir_listing_enabled)
116            .field(
117                "inline_optimization_enabled",
118                &self.inline_optimization_enabled,
119            )
120            .field(
121                "table_version_tracking_enabled",
122                &self.table_version_tracking_enabled,
123            )
124            .field(
125                "context_provider",
126                &self.context_provider.as_ref().map(|_| "Some(...)"),
127            )
128            .finish()
129    }
130}
131
132impl DirectoryNamespaceBuilder {
133    /// Create a new DirectoryNamespaceBuilder with the specified root path.
134    ///
135    /// # Arguments
136    ///
137    /// * `root` - Root directory path (local path or cloud URI like s3://bucket/path)
138    pub fn new(root: impl Into<String>) -> Self {
139        Self {
140            root: root.into().trim_end_matches('/').to_string(),
141            storage_options: None,
142            session: None,
143            manifest_enabled: true,
144            dir_listing_enabled: true, // Default to enabled for backwards compatibility
145            inline_optimization_enabled: true,
146            table_version_tracking_enabled: false, // Default to disabled
147            credential_vendor_properties: HashMap::new(),
148            context_provider: None,
149            commit_retries: None,
150        }
151    }
152
153    /// Enable or disable manifest-based listing.
154    ///
155    /// When enabled (default), the namespace uses a `__manifest` table to track tables.
156    /// When disabled, relies solely on directory scanning.
157    pub fn manifest_enabled(mut self, enabled: bool) -> Self {
158        self.manifest_enabled = enabled;
159        self
160    }
161
162    /// Enable or disable directory-based listing fallback.
163    ///
164    /// When enabled (default), falls back to directory scanning for tables not in the manifest.
165    /// When disabled, only consults the manifest table.
166    pub fn dir_listing_enabled(mut self, enabled: bool) -> Self {
167        self.dir_listing_enabled = enabled;
168        self
169    }
170
171    /// Enable or disable inline optimization of the __manifest table.
172    ///
173    /// When enabled (default), performs compaction and indexing on the __manifest table
174    /// after every write operation to maintain optimal performance.
175    /// When disabled, manual optimization must be performed separately.
176    pub fn inline_optimization_enabled(mut self, enabled: bool) -> Self {
177        self.inline_optimization_enabled = enabled;
178        self
179    }
180
181    /// Enable or disable table version tracking through the namespace.
182    ///
183    /// When enabled, `describe_table` returns `managed_versioning: true` to indicate
184    /// that commits should go through the namespace's table version APIs rather than
185    /// direct object store operations.
186    ///
187    /// When disabled (default), `managed_versioning` is not set.
188    pub fn table_version_tracking_enabled(mut self, enabled: bool) -> Self {
189        self.table_version_tracking_enabled = enabled;
190        self
191    }
192
193    /// Create a DirectoryNamespaceBuilder from properties HashMap.
194    ///
195    /// This method parses a properties map into builder configuration.
196    /// It expects:
197    /// - `root`: The root directory path (required)
198    /// - `manifest_enabled`: Enable manifest-based table tracking (optional, default: true)
199    /// - `dir_listing_enabled`: Enable directory listing for table discovery (optional, default: true)
200    /// - `inline_optimization_enabled`: Enable inline optimization of __manifest table (optional, default: true)
201    /// - `storage.*`: Storage options (optional, prefix will be stripped)
202    ///
203    /// Credential vendor properties (prefixed with `credential_vendor.`, prefix is stripped):
204    /// - `credential_vendor.enabled`: Set to "true" to enable credential vending (required)
205    /// - `credential_vendor.permission`: Permission level: read, write, or admin (default: read)
206    ///
207    /// AWS-specific properties (for s3:// locations):
208    /// - `credential_vendor.aws_role_arn`: AWS IAM role ARN (required for AWS)
209    /// - `credential_vendor.aws_external_id`: AWS external ID (optional)
210    /// - `credential_vendor.aws_region`: AWS region (optional)
211    /// - `credential_vendor.aws_role_session_name`: AWS role session name (optional)
212    /// - `credential_vendor.aws_duration_millis`: Credential duration in ms (default: 3600000, range: 15min-12hrs)
213    ///
214    /// GCP-specific properties (for gs:// locations):
215    /// - `credential_vendor.gcp_service_account`: Service account to impersonate (optional)
216    ///
217    /// Note: GCP uses Application Default Credentials (ADC). To use a service account key file,
218    /// set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable before starting.
219    /// GCP token duration cannot be configured; it's determined by the STS endpoint (typically 1 hour).
220    ///
221    /// Azure-specific properties (for az:// locations):
222    /// - `credential_vendor.azure_account_name`: Azure storage account name (required for Azure)
223    /// - `credential_vendor.azure_tenant_id`: Azure tenant ID (optional)
224    /// - `credential_vendor.azure_duration_millis`: Credential duration in ms (default: 3600000, up to 7 days)
225    ///
226    /// # Arguments
227    ///
228    /// * `properties` - Configuration properties
229    /// * `session` - Optional Lance session to reuse object store registry
230    ///
231    /// # Returns
232    ///
233    /// Returns a `DirectoryNamespaceBuilder` instance.
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if the `root` property is missing.
238    ///
239    /// # Examples
240    ///
241    /// ```no_run
242    /// # use lance_namespace_impls::DirectoryNamespaceBuilder;
243    /// # use std::collections::HashMap;
244    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
245    /// let mut properties = HashMap::new();
246    /// properties.insert("root".to_string(), "/path/to/data".to_string());
247    /// properties.insert("manifest_enabled".to_string(), "true".to_string());
248    /// properties.insert("dir_listing_enabled".to_string(), "false".to_string());
249    /// properties.insert("storage.region".to_string(), "us-west-2".to_string());
250    ///
251    /// let namespace = DirectoryNamespaceBuilder::from_properties(properties, None)?
252    ///     .build()
253    ///     .await?;
254    /// # Ok(())
255    /// # }
256    /// ```
257    pub fn from_properties(
258        properties: HashMap<String, String>,
259        session: Option<Arc<Session>>,
260    ) -> Result<Self> {
261        // Extract root from properties (required)
262        let root = properties.get("root").cloned().ok_or_else(|| {
263            Error::namespace_source(
264                "Missing required property 'root' for directory namespace".into(),
265            )
266        })?;
267
268        // Extract storage options (properties prefixed with "storage.")
269        let storage_options: HashMap<String, String> = properties
270            .iter()
271            .filter_map(|(k, v)| {
272                k.strip_prefix("storage.")
273                    .map(|key| (key.to_string(), v.clone()))
274            })
275            .collect();
276
277        let storage_options = if storage_options.is_empty() {
278            None
279        } else {
280            Some(storage_options)
281        };
282
283        // Extract manifest_enabled (default: true)
284        let manifest_enabled = properties
285            .get("manifest_enabled")
286            .and_then(|v| v.parse::<bool>().ok())
287            .unwrap_or(true);
288
289        // Extract dir_listing_enabled (default: true)
290        let dir_listing_enabled = properties
291            .get("dir_listing_enabled")
292            .and_then(|v| v.parse::<bool>().ok())
293            .unwrap_or(true);
294
295        // Extract inline_optimization_enabled (default: true)
296        let inline_optimization_enabled = properties
297            .get("inline_optimization_enabled")
298            .and_then(|v| v.parse::<bool>().ok())
299            .unwrap_or(true);
300
301        // Extract table_version_tracking_enabled (default: false)
302        let table_version_tracking_enabled = properties
303            .get("table_version_tracking_enabled")
304            .and_then(|v| v.parse::<bool>().ok())
305            .unwrap_or(false);
306
307        // Extract credential vendor properties (properties prefixed with "credential_vendor.")
308        // The prefix is stripped to get short property names
309        // The build() method will check if enabled=true before creating the vendor
310        let credential_vendor_properties: HashMap<String, String> = properties
311            .iter()
312            .filter_map(|(k, v)| {
313                k.strip_prefix("credential_vendor.")
314                    .map(|key| (key.to_string(), v.clone()))
315            })
316            .collect();
317
318        let commit_retries = properties
319            .get("commit_retries")
320            .and_then(|v| v.parse::<u32>().ok());
321
322        Ok(Self {
323            root: root.trim_end_matches('/').to_string(),
324            storage_options,
325            session,
326            manifest_enabled,
327            dir_listing_enabled,
328            inline_optimization_enabled,
329            table_version_tracking_enabled,
330            credential_vendor_properties,
331            context_provider: None,
332            commit_retries,
333        })
334    }
335
336    /// Add a storage option.
337    ///
338    /// # Arguments
339    ///
340    /// * `key` - Storage option key (e.g., "region", "access_key_id")
341    /// * `value` - Storage option value
342    pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
343        self.storage_options
344            .get_or_insert_with(HashMap::new)
345            .insert(key.into(), value.into());
346        self
347    }
348
349    /// Add multiple storage options.
350    ///
351    /// # Arguments
352    ///
353    /// * `options` - HashMap of storage options to add
354    pub fn storage_options(mut self, options: HashMap<String, String>) -> Self {
355        self.storage_options
356            .get_or_insert_with(HashMap::new)
357            .extend(options);
358        self
359    }
360
361    /// Set the Lance session to use for this namespace.
362    ///
363    /// When a session is provided, the namespace will reuse the session's
364    /// object store registry, allowing multiple namespaces and datasets
365    /// to share the same underlying storage connections.
366    ///
367    /// # Arguments
368    ///
369    /// * `session` - Arc-wrapped Lance session
370    pub fn session(mut self, session: Arc<Session>) -> Self {
371        self.session = Some(session);
372        self
373    }
374
375    /// Set the number of retries for commit operations on the manifest table.
376    /// If not set, defaults to [`lance_table::io::commit::CommitConfig`] default (20).
377    pub fn commit_retries(mut self, retries: u32) -> Self {
378        self.commit_retries = Some(retries);
379        self
380    }
381
382    /// Add a credential vendor property.
383    ///
384    /// Use short property names without the `credential_vendor.` prefix.
385    /// Common properties: `enabled`, `permission`.
386    /// AWS properties: `aws_role_arn`, `aws_external_id`, `aws_region`, `aws_role_session_name`, `aws_duration_millis`.
387    /// GCP properties: `gcp_service_account`.
388    /// Azure properties: `azure_account_name`, `azure_tenant_id`, `azure_duration_millis`.
389    ///
390    /// # Arguments
391    ///
392    /// * `key` - Property key (e.g., "enabled", "aws_role_arn")
393    /// * `value` - Property value
394    ///
395    /// # Example
396    ///
397    /// ```no_run
398    /// # use lance_namespace_impls::DirectoryNamespaceBuilder;
399    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
400    /// let namespace = DirectoryNamespaceBuilder::new("s3://my-bucket/data")
401    ///     .credential_vendor_property("enabled", "true")
402    ///     .credential_vendor_property("aws_role_arn", "arn:aws:iam::123456789012:role/MyRole")
403    ///     .credential_vendor_property("permission", "read")
404    ///     .build()
405    ///     .await?;
406    /// # Ok(())
407    /// # }
408    /// ```
409    pub fn credential_vendor_property(
410        mut self,
411        key: impl Into<String>,
412        value: impl Into<String>,
413    ) -> Self {
414        self.credential_vendor_properties
415            .insert(key.into(), value.into());
416        self
417    }
418
419    /// Add multiple credential vendor properties.
420    ///
421    /// Use short property names without the `credential_vendor.` prefix.
422    ///
423    /// # Arguments
424    ///
425    /// * `properties` - HashMap of credential vendor properties to add
426    pub fn credential_vendor_properties(mut self, properties: HashMap<String, String>) -> Self {
427        self.credential_vendor_properties.extend(properties);
428        self
429    }
430
431    /// Set a dynamic context provider for per-request context.
432    ///
433    /// The provider can be used to generate additional context for operations.
434    /// For DirectoryNamespace, the context is stored but not directly used
435    /// in operations (unlike RestNamespace where it's converted to HTTP headers).
436    ///
437    /// # Arguments
438    ///
439    /// * `provider` - The context provider implementation
440    pub fn context_provider(mut self, provider: Arc<dyn DynamicContextProvider>) -> Self {
441        self.context_provider = Some(provider);
442        self
443    }
444
445    /// Build the DirectoryNamespace.
446    ///
447    /// # Returns
448    ///
449    /// Returns a `DirectoryNamespace` instance.
450    ///
451    /// # Errors
452    ///
453    /// Returns an error if:
454    /// - The root path is invalid
455    /// - Connection to the storage backend fails
456    /// - Storage options are invalid
457    pub async fn build(self) -> Result<DirectoryNamespace> {
458        let (object_store, base_path) =
459            Self::initialize_object_store(&self.root, &self.storage_options, &self.session).await?;
460
461        let manifest_ns = if self.manifest_enabled {
462            match manifest::ManifestNamespace::from_directory(
463                self.root.clone(),
464                self.storage_options.clone(),
465                self.session.clone(),
466                object_store.clone(),
467                base_path.clone(),
468                self.dir_listing_enabled,
469                self.inline_optimization_enabled,
470                self.commit_retries,
471            )
472            .await
473            {
474                Ok(ns) => Some(Arc::new(ns)),
475                Err(e) => {
476                    // Failed to initialize manifest namespace, fall back to directory listing only
477                    log::warn!(
478                        "Failed to initialize manifest namespace, falling back to directory listing only: {}",
479                        e
480                    );
481                    None
482                }
483            }
484        } else {
485            None
486        };
487
488        // Create credential vendor once during initialization if enabled
489        let credential_vendor = if has_credential_vendor_config(&self.credential_vendor_properties)
490        {
491            create_credential_vendor_for_location(&self.root, &self.credential_vendor_properties)
492                .await?
493                .map(Arc::from)
494        } else {
495            None
496        };
497
498        Ok(DirectoryNamespace {
499            root: self.root,
500            storage_options: self.storage_options,
501            session: self.session,
502            object_store,
503            base_path,
504            manifest_ns,
505            dir_listing_enabled: self.dir_listing_enabled,
506            table_version_tracking_enabled: self.table_version_tracking_enabled,
507            credential_vendor,
508            context_provider: self.context_provider,
509        })
510    }
511
512    /// Initialize the Lance ObjectStore based on the configuration
513    async fn initialize_object_store(
514        root: &str,
515        storage_options: &Option<HashMap<String, String>>,
516        session: &Option<Arc<Session>>,
517    ) -> Result<(Arc<ObjectStore>, Path)> {
518        // Build ObjectStoreParams from storage options
519        let accessor = storage_options.clone().map(|opts| {
520            Arc::new(lance_io::object_store::StorageOptionsAccessor::with_static_options(opts))
521        });
522        let params = ObjectStoreParams {
523            storage_options_accessor: accessor,
524            ..Default::default()
525        };
526
527        // Use object store registry from session if provided, otherwise create a new one
528        let registry = if let Some(session) = session {
529            session.store_registry()
530        } else {
531            Arc::new(ObjectStoreRegistry::default())
532        };
533
534        // Use Lance's object store factory to create from URI
535        let (object_store, base_path) = ObjectStore::from_uri_and_params(registry, root, &params)
536            .await
537            .map_err(|e| {
538                Error::namespace_source(format!("Failed to create object store: {}", e).into())
539            })?;
540
541        Ok((object_store, base_path))
542    }
543}
544
545/// Directory-based implementation of Lance Namespace.
546///
547/// This implementation stores tables as Lance datasets in a directory structure.
548/// It supports local filesystems and cloud storage backends through Lance's object store.
549///
550/// ## Manifest-based Listing
551///
552/// When `manifest_enabled=true`, the namespace uses a special `__manifest` Lance table to track tables
553/// instead of scanning the filesystem. This provides:
554/// - Better performance for listing operations
555/// - Ability to track table metadata
556/// - Foundation for future features like namespaces and table renaming
557///
558/// When `dir_listing_enabled=true`, the namespace falls back to directory scanning for tables not
559/// found in the manifest, enabling gradual migration.
560///
561/// ## Credential Vending
562///
563/// When credential vendor properties are configured, `describe_table` will vend temporary
564/// credentials based on the table location URI. The vendor type is auto-selected:
565/// - `s3://` locations use AWS STS AssumeRole
566/// - `gs://` locations use GCP OAuth2 tokens
567/// - `az://` locations use Azure SAS tokens
568pub struct DirectoryNamespace {
569    root: String,
570    storage_options: Option<HashMap<String, String>>,
571    #[allow(dead_code)]
572    session: Option<Arc<Session>>,
573    object_store: Arc<ObjectStore>,
574    base_path: Path,
575    manifest_ns: Option<Arc<manifest::ManifestNamespace>>,
576    dir_listing_enabled: bool,
577    /// When true, `describe_table` returns `managed_versioning: true` to indicate
578    /// commits should go through namespace table version APIs.
579    table_version_tracking_enabled: bool,
580    /// Credential vendor created once during initialization.
581    /// Used to vend temporary credentials for table access.
582    credential_vendor: Option<Arc<dyn CredentialVendor>>,
583    /// Dynamic context provider for per-request context.
584    /// Stored but not directly used in operations (available for future extensions).
585    #[allow(dead_code)]
586    context_provider: Option<Arc<dyn DynamicContextProvider>>,
587}
588
589impl std::fmt::Debug for DirectoryNamespace {
590    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
591        write!(f, "{}", self.namespace_id())
592    }
593}
594
595impl std::fmt::Display for DirectoryNamespace {
596    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
597        write!(f, "{}", self.namespace_id())
598    }
599}
600
601impl DirectoryNamespace {
602    /// Apply pagination to a list of table names
603    ///
604    /// Sorts the list alphabetically and applies pagination using page_token (start_after) and limit.
605    ///
606    /// # Arguments
607    /// * `names` - The vector of table names to paginate
608    /// * `page_token` - Skip items until finding one greater than this value (start_after semantics)
609    /// * `limit` - Maximum number of items to keep
610    fn apply_pagination(names: &mut Vec<String>, page_token: Option<String>, limit: Option<i32>) {
611        // Sort alphabetically for consistent ordering
612        names.sort();
613
614        // Apply page_token filtering (start_after semantics)
615        if let Some(start_after) = page_token {
616            if let Some(index) = names
617                .iter()
618                .position(|name| name.as_str() > start_after.as_str())
619            {
620                names.drain(0..index);
621            } else {
622                names.clear();
623            }
624        }
625
626        // Apply limit
627        if let Some(limit) = limit
628            && limit >= 0
629        {
630            names.truncate(limit as usize);
631        }
632    }
633
634    /// List tables using directory scanning (fallback method)
635    async fn list_directory_tables(&self) -> Result<Vec<String>> {
636        let mut tables = Vec::new();
637        let entries = self
638            .object_store
639            .read_dir(self.base_path.clone())
640            .await
641            .map_err(|e| {
642                Error::io_source(box_error(std::io::Error::other(format!(
643                    "Failed to list directory: {}",
644                    e
645                ))))
646            })?;
647
648        for entry in entries {
649            let path = entry.trim_end_matches('/');
650            if !path.ends_with(".lance") {
651                continue;
652            }
653
654            let table_name = &path[..path.len() - 6];
655
656            // Use atomic check to skip deregistered tables and declared-but-not-written tables
657            let status = self.check_table_status(table_name).await;
658            if status.is_deregistered || status.has_reserved_file {
659                continue;
660            }
661
662            tables.push(table_name.to_string());
663        }
664
665        Ok(tables)
666    }
667
668    /// Validate that the namespace ID represents the root namespace
669    fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
670        if let Some(id) = id
671            && !id.is_empty()
672        {
673            return Err(Error::namespace_source(format!(
674                "Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
675                id
676            ).into()));
677        }
678        Ok(())
679    }
680
681    /// Extract table name from table ID
682    fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
683        let id = id.as_ref().ok_or_else(|| {
684            Error::namespace_source("Directory namespace table ID cannot be empty".into())
685        })?;
686
687        if id.len() != 1 {
688            return Err(Error::namespace_source(format!(
689                "Multi-level table IDs are only supported when manifest mode is enabled, but got: {:?}",
690                id
691            )
692            .into()));
693        }
694
695        Ok(id[0].clone())
696    }
697
698    async fn resolve_table_location(&self, id: &Option<Vec<String>>) -> Result<String> {
699        let mut describe_req = DescribeTableRequest::new();
700        describe_req.id = id.clone();
701        describe_req.load_detailed_metadata = Some(false);
702
703        let describe_resp = self.describe_table(describe_req).await?;
704
705        describe_resp.location.ok_or_else(|| {
706            Error::namespace_source(format!("Table location not found for: {:?}", id).into())
707        })
708    }
709
710    fn table_full_uri(&self, table_name: &str) -> String {
711        format!("{}/{}.lance", &self.root, table_name)
712    }
713
714    fn uri_to_object_store_path(uri: &str) -> Path {
715        let path_str = if let Some(rest) = uri.strip_prefix("file://") {
716            rest
717        } else if let Some(rest) = uri.strip_prefix("s3://") {
718            rest.split_once('/').map(|(_, p)| p).unwrap_or(rest)
719        } else if let Some(rest) = uri.strip_prefix("gs://") {
720            rest.split_once('/').map(|(_, p)| p).unwrap_or(rest)
721        } else if let Some(rest) = uri.strip_prefix("az://") {
722            rest.split_once('/').map(|(_, p)| p).unwrap_or(rest)
723        } else {
724            uri
725        };
726        Path::from(path_str)
727    }
728
729    /// Get the object store path for a table (relative to base_path)
730    fn table_path(&self, table_name: &str) -> Path {
731        self.base_path
732            .child(format!("{}.lance", table_name).as_str())
733    }
734
735    /// Get the reserved file path for a table
736    fn table_reserved_file_path(&self, table_name: &str) -> Path {
737        self.base_path
738            .child(format!("{}.lance", table_name).as_str())
739            .child(".lance-reserved")
740    }
741
742    /// Get the deregistered marker file path for a table
743    fn table_deregistered_file_path(&self, table_name: &str) -> Path {
744        self.base_path
745            .child(format!("{}.lance", table_name).as_str())
746            .child(".lance-deregistered")
747    }
748
749    /// Atomically check table existence and deregistration status.
750    ///
751    /// This performs a single directory listing to get a consistent snapshot of the
752    /// table's state, avoiding race conditions between checking existence and
753    /// checking deregistration status.
754    pub(crate) async fn check_table_status(&self, table_name: &str) -> TableStatus {
755        let table_path = self.table_path(table_name);
756        match self.object_store.read_dir(table_path).await {
757            Ok(entries) => {
758                let exists = !entries.is_empty();
759                let is_deregistered = entries.iter().any(|e| e.ends_with(".lance-deregistered"));
760                let has_reserved_file = entries.iter().any(|e| e.ends_with(".lance-reserved"));
761                TableStatus {
762                    exists,
763                    is_deregistered,
764                    has_reserved_file,
765                }
766            }
767            Err(_) => TableStatus {
768                exists: false,
769                is_deregistered: false,
770                has_reserved_file: false,
771            },
772        }
773    }
774
775    async fn put_marker_file_atomic(
776        &self,
777        path: &Path,
778        file_description: &str,
779    ) -> std::result::Result<(), String> {
780        let put_opts = PutOptions {
781            mode: PutMode::Create,
782            ..Default::default()
783        };
784
785        match self
786            .object_store
787            .inner
788            .put_opts(path, bytes::Bytes::new().into(), put_opts)
789            .await
790        {
791            Ok(_) => Ok(()),
792            Err(ObjectStoreError::AlreadyExists { .. })
793            | Err(ObjectStoreError::Precondition { .. }) => {
794                Err(format!("{} already exists", file_description))
795            }
796            Err(e) => Err(format!("Failed to create {}: {}", file_description, e)),
797        }
798    }
799
800    /// Get storage options for a table, using credential vending if configured.
801    ///
802    /// If credential vendor properties are configured and the table location matches
803    /// a supported cloud provider, this will create an appropriate vendor and vend
804    /// temporary credentials scoped to the table location. Otherwise, returns the
805    /// static storage options.
806    ///
807    /// The vendor type is auto-selected based on the table URI:
808    /// - `s3://` locations use AWS STS AssumeRole
809    /// - `gs://` locations use GCP OAuth2 tokens
810    /// - `az://` locations use Azure SAS tokens
811    ///
812    /// The permission level (Read, Write, Admin) is configured at namespace
813    /// initialization time via the `credential_vendor_permission` property.
814    ///
815    /// # Arguments
816    ///
817    /// * `table_uri` - The full URI of the table
818    /// * `identity` - Optional identity from the request for identity-based credential vending
819    async fn get_storage_options_for_table(
820        &self,
821        table_uri: &str,
822        identity: Option<&Identity>,
823    ) -> Result<Option<HashMap<String, String>>> {
824        if let Some(ref vendor) = self.credential_vendor {
825            let vended = vendor.vend_credentials(table_uri, identity).await?;
826            return Ok(Some(vended.storage_options));
827        }
828        Ok(self.storage_options.clone())
829    }
830
831    /// Migrate directory-based tables to the manifest.
832    ///
833    /// This is a one-time migration operation that:
834    /// 1. Scans the directory for existing `.lance` tables
835    /// 2. Registers any unmigrated tables in the manifest
836    /// 3. Returns the count of tables that were migrated
837    ///
838    /// This method is safe to run multiple times - it will skip tables that are already
839    /// registered in the manifest.
840    ///
841    /// # Usage
842    ///
843    /// After creating tables in directory-only mode or dual mode, you can migrate them
844    /// to the manifest to enable manifest-only mode:
845    ///
846    /// ```no_run
847    /// # use lance_namespace_impls::DirectoryNamespaceBuilder;
848    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
849    /// // Create namespace with dual mode (manifest + directory listing)
850    /// let namespace = DirectoryNamespaceBuilder::new("/path/to/data")
851    ///     .manifest_enabled(true)
852    ///     .dir_listing_enabled(true)
853    ///     .build()
854    ///     .await?;
855    ///
856    /// // ... tables are created and used ...
857    ///
858    /// // Migrate existing directory tables to manifest
859    /// let migrated_count = namespace.migrate().await?;
860    /// println!("Migrated {} tables", migrated_count);
861    ///
862    /// // Now you can disable directory listing for better performance:
863    /// // (requires rebuilding the namespace)
864    /// let namespace = DirectoryNamespaceBuilder::new("/path/to/data")
865    ///     .manifest_enabled(true)
866    ///     .dir_listing_enabled(false)  // All tables now in manifest
867    ///     .build()
868    ///     .await?;
869    /// # Ok(())
870    /// # }
871    /// ```
872    ///
873    /// # Returns
874    ///
875    /// Returns the number of tables that were migrated to the manifest.
876    ///
877    /// # Errors
878    ///
879    /// Returns an error if:
880    /// - Manifest is not enabled
881    /// - Directory listing fails
882    /// - Manifest registration fails
883    pub async fn migrate(&self) -> Result<usize> {
884        // We only care about tables in the root namespace
885        let Some(ref manifest_ns) = self.manifest_ns else {
886            return Ok(0); // No manifest, nothing to migrate
887        };
888
889        // Get all table locations already in the manifest
890        let manifest_locations = manifest_ns.list_manifest_table_locations().await?;
891
892        // Get all tables from directory
893        let dir_tables = self.list_directory_tables().await?;
894
895        // Register each directory table that doesn't have an overlapping location
896        // If a directory name already exists in the manifest,
897        // that means the table must have already been migrated or created
898        // in the manifest, so we can skip it.
899        let mut migrated_count = 0;
900        for table_name in dir_tables {
901            // For root namespace tables, the directory name is "table_name.lance"
902            let dir_name = format!("{}.lance", table_name);
903            if !manifest_locations.contains(&dir_name) {
904                manifest_ns.register_table(&table_name, dir_name).await?;
905                migrated_count += 1;
906            }
907        }
908
909        Ok(migrated_count)
910    }
911}
912
913#[async_trait]
914impl LanceNamespace for DirectoryNamespace {
915    async fn list_namespaces(
916        &self,
917        request: ListNamespacesRequest,
918    ) -> Result<ListNamespacesResponse> {
919        if let Some(ref manifest_ns) = self.manifest_ns {
920            return manifest_ns.list_namespaces(request).await;
921        }
922
923        Self::validate_root_namespace_id(&request.id)?;
924        Ok(ListNamespacesResponse::new(vec![]))
925    }
926
927    async fn describe_namespace(
928        &self,
929        request: DescribeNamespaceRequest,
930    ) -> Result<DescribeNamespaceResponse> {
931        if let Some(ref manifest_ns) = self.manifest_ns {
932            return manifest_ns.describe_namespace(request).await;
933        }
934
935        Self::validate_root_namespace_id(&request.id)?;
936        #[allow(clippy::needless_update)]
937        Ok(DescribeNamespaceResponse {
938            properties: Some(HashMap::new()),
939            ..Default::default()
940        })
941    }
942
943    async fn create_namespace(
944        &self,
945        request: CreateNamespaceRequest,
946    ) -> Result<CreateNamespaceResponse> {
947        if let Some(ref manifest_ns) = self.manifest_ns {
948            return manifest_ns.create_namespace(request).await;
949        }
950
951        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
952            return Err(Error::namespace_source(
953                "Root namespace already exists and cannot be created".into(),
954            ));
955        }
956
957        Err(Error::not_supported_source(
958            "Child namespaces are only supported when manifest mode is enabled".into(),
959        ))
960    }
961
962    async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
963        if let Some(ref manifest_ns) = self.manifest_ns {
964            return manifest_ns.drop_namespace(request).await;
965        }
966
967        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
968            return Err(Error::namespace_source(
969                "Root namespace cannot be dropped".into(),
970            ));
971        }
972
973        Err(Error::not_supported_source(
974            "Child namespaces are only supported when manifest mode is enabled".into(),
975        ))
976    }
977
978    async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
979        if let Some(ref manifest_ns) = self.manifest_ns {
980            return manifest_ns.namespace_exists(request).await;
981        }
982
983        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
984            return Ok(());
985        }
986
987        Err(Error::namespace_source(
988            "Child namespaces are only supported when manifest mode is enabled".into(),
989        ))
990    }
991
992    async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
993        // Validate that namespace ID is provided
994        let namespace_id = request
995            .id
996            .as_ref()
997            .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
998
999        // For child namespaces, always delegate to manifest (if enabled)
1000        if !namespace_id.is_empty() {
1001            if let Some(ref manifest_ns) = self.manifest_ns {
1002                return manifest_ns.list_tables(request).await;
1003            }
1004            return Err(Error::not_supported_source(
1005                "Child namespaces are only supported when manifest mode is enabled".into(),
1006            ));
1007        }
1008
1009        // When only manifest is enabled (no directory listing), delegate directly to manifest
1010        if let Some(ref manifest_ns) = self.manifest_ns
1011            && !self.dir_listing_enabled
1012        {
1013            return manifest_ns.list_tables(request).await;
1014        }
1015
1016        // When both manifest and directory listing are enabled, we need to merge and deduplicate
1017        let mut tables = if self.manifest_ns.is_some() && self.dir_listing_enabled {
1018            // Get all manifest table locations (for deduplication)
1019            let manifest_locations = if let Some(ref manifest_ns) = self.manifest_ns {
1020                manifest_ns.list_manifest_table_locations().await?
1021            } else {
1022                std::collections::HashSet::new()
1023            };
1024
1025            // Get all manifest tables (without pagination for merging)
1026            let mut manifest_request = request.clone();
1027            manifest_request.limit = None;
1028            manifest_request.page_token = None;
1029            let manifest_tables = if let Some(ref manifest_ns) = self.manifest_ns {
1030                let manifest_response = manifest_ns.list_tables(manifest_request).await?;
1031                manifest_response.tables
1032            } else {
1033                vec![]
1034            };
1035
1036            // Start with all manifest table names
1037            // Add directory tables that aren't already in the manifest (by location)
1038            let mut all_tables: Vec<String> = manifest_tables;
1039            let dir_tables = self.list_directory_tables().await?;
1040            for table_name in dir_tables {
1041                // Check if this table's location is already in the manifest
1042                // Manifest stores full URIs, so we need to check both formats
1043                let full_location = format!("{}/{}.lance", self.root, table_name);
1044                let relative_location = format!("{}.lance", table_name);
1045                if !manifest_locations.contains(&full_location)
1046                    && !manifest_locations.contains(&relative_location)
1047                {
1048                    all_tables.push(table_name);
1049                }
1050            }
1051
1052            all_tables
1053        } else {
1054            self.list_directory_tables().await?
1055        };
1056
1057        // Apply sorting and pagination
1058        Self::apply_pagination(&mut tables, request.page_token, request.limit);
1059        let response = ListTablesResponse::new(tables);
1060        Ok(response)
1061    }
1062
1063    async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1064        if let Some(ref manifest_ns) = self.manifest_ns {
1065            match manifest_ns.describe_table(request.clone()).await {
1066                Ok(mut response) => {
1067                    // Only apply identity-based credential vending when explicitly requested
1068                    if request.vend_credentials == Some(true) && self.credential_vendor.is_some() {
1069                        if let Some(ref table_uri) = response.table_uri {
1070                            let identity = request.identity.as_deref();
1071                            response.storage_options = self
1072                                .get_storage_options_for_table(table_uri, identity)
1073                                .await?;
1074                        }
1075                    } else if request.vend_credentials == Some(false) {
1076                        response.storage_options = None;
1077                    }
1078                    // Set managed_versioning flag when table_version_tracking_enabled
1079                    if self.table_version_tracking_enabled {
1080                        response.managed_versioning = Some(true);
1081                    }
1082                    return Ok(response);
1083                }
1084                Err(_)
1085                    if self.dir_listing_enabled
1086                        && request.id.as_ref().is_some_and(|id| id.len() == 1) =>
1087                {
1088                    // Fall through to directory check only for single-level IDs
1089                }
1090                Err(e) => return Err(e),
1091            }
1092        }
1093
1094        let table_name = Self::table_name_from_id(&request.id)?;
1095        let table_uri = self.table_full_uri(&table_name);
1096
1097        // Atomically check table existence and deregistration status
1098        let status = self.check_table_status(&table_name).await;
1099
1100        if !status.exists {
1101            return Err(Error::namespace_source(
1102                format!("Table does not exist: {}", table_name).into(),
1103            ));
1104        }
1105
1106        if status.is_deregistered {
1107            return Err(Error::namespace_source(
1108                format!("Table is deregistered: {}", table_name).into(),
1109            ));
1110        }
1111
1112        let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
1113        // For backwards compatibility, only skip vending credentials when explicitly set to false
1114        let vend_credentials = request.vend_credentials.unwrap_or(true);
1115        let identity = request.identity.as_deref();
1116
1117        // If not loading detailed metadata, return minimal response with just location
1118        if !load_detailed_metadata {
1119            let storage_options = if vend_credentials {
1120                self.get_storage_options_for_table(&table_uri, identity)
1121                    .await?
1122            } else {
1123                None
1124            };
1125            return Ok(DescribeTableResponse {
1126                table: Some(table_name),
1127                namespace: request.id.as_ref().map(|id| {
1128                    if id.len() > 1 {
1129                        id[..id.len() - 1].to_vec()
1130                    } else {
1131                        vec![]
1132                    }
1133                }),
1134                location: Some(table_uri.clone()),
1135                table_uri: Some(table_uri),
1136                storage_options,
1137                managed_versioning: if self.table_version_tracking_enabled {
1138                    Some(true)
1139                } else {
1140                    None
1141                },
1142                ..Default::default()
1143            });
1144        }
1145
1146        // Try to load the dataset to get real information
1147        // Use DatasetBuilder with storage options to support S3 with custom endpoints
1148        let mut builder = DatasetBuilder::from_uri(&table_uri);
1149        if let Some(opts) = &self.storage_options {
1150            builder = builder.with_storage_options(opts.clone());
1151        }
1152        if let Some(sess) = &self.session {
1153            builder = builder.with_session(sess.clone());
1154        }
1155        match builder.load().await {
1156            Ok(mut dataset) => {
1157                // If a specific version is requested, checkout that version
1158                if let Some(requested_version) = request.version {
1159                    dataset = dataset.checkout_version(requested_version as u64).await?;
1160                }
1161
1162                let version_info = dataset.version();
1163                let lance_schema = dataset.schema();
1164                let arrow_schema: arrow_schema::Schema = lance_schema.into();
1165                let json_schema = arrow_schema_to_json(&arrow_schema)?;
1166                let storage_options = if vend_credentials {
1167                    self.get_storage_options_for_table(&table_uri, identity)
1168                        .await?
1169                } else {
1170                    None
1171                };
1172
1173                // Convert BTreeMap to HashMap for the response
1174                let metadata: std::collections::HashMap<String, String> =
1175                    version_info.metadata.into_iter().collect();
1176
1177                Ok(DescribeTableResponse {
1178                    table: Some(table_name),
1179                    namespace: request.id.as_ref().map(|id| {
1180                        if id.len() > 1 {
1181                            id[..id.len() - 1].to_vec()
1182                        } else {
1183                            vec![]
1184                        }
1185                    }),
1186                    version: Some(version_info.version as i64),
1187                    location: Some(table_uri.clone()),
1188                    table_uri: Some(table_uri),
1189                    schema: Some(Box::new(json_schema)),
1190                    storage_options,
1191                    metadata: Some(metadata),
1192                    managed_versioning: if self.table_version_tracking_enabled {
1193                        Some(true)
1194                    } else {
1195                        None
1196                    },
1197                    ..Default::default()
1198                })
1199            }
1200            Err(err) => {
1201                // Use the reserved file status from the atomic check
1202                if status.has_reserved_file {
1203                    let storage_options = if vend_credentials {
1204                        self.get_storage_options_for_table(&table_uri, identity)
1205                            .await?
1206                    } else {
1207                        None
1208                    };
1209                    Ok(DescribeTableResponse {
1210                        table: Some(table_name),
1211                        namespace: request.id.as_ref().map(|id| {
1212                            if id.len() > 1 {
1213                                id[..id.len() - 1].to_vec()
1214                            } else {
1215                                vec![]
1216                            }
1217                        }),
1218                        location: Some(table_uri.clone()),
1219                        table_uri: Some(table_uri),
1220                        storage_options,
1221                        managed_versioning: if self.table_version_tracking_enabled {
1222                            Some(true)
1223                        } else {
1224                            None
1225                        },
1226                        ..Default::default()
1227                    })
1228                } else {
1229                    Err(Error::namespace_source(
1230                        format!(
1231                            "Table directory exists but cannot load dataset {}: {:?}",
1232                            table_name, err
1233                        )
1234                        .into(),
1235                    ))
1236                }
1237            }
1238        }
1239    }
1240
1241    async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1242        if let Some(ref manifest_ns) = self.manifest_ns {
1243            match manifest_ns.table_exists(request.clone()).await {
1244                Ok(()) => return Ok(()),
1245                Err(_) if self.dir_listing_enabled => {
1246                    // Fall through to directory check
1247                }
1248                Err(e) => return Err(e),
1249            }
1250        }
1251
1252        let table_name = Self::table_name_from_id(&request.id)?;
1253
1254        // Atomically check table existence and deregistration status
1255        let status = self.check_table_status(&table_name).await;
1256
1257        if !status.exists {
1258            return Err(Error::namespace_source(
1259                format!("Table does not exist: {}", table_name).into(),
1260            ));
1261        }
1262
1263        if status.is_deregistered {
1264            return Err(Error::namespace_source(
1265                format!("Table is deregistered: {}", table_name).into(),
1266            ));
1267        }
1268
1269        Ok(())
1270    }
1271
1272    async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1273        if let Some(ref manifest_ns) = self.manifest_ns {
1274            return manifest_ns.drop_table(request).await;
1275        }
1276
1277        let table_name = Self::table_name_from_id(&request.id)?;
1278        let table_uri = self.table_full_uri(&table_name);
1279        let table_path = self.table_path(&table_name);
1280
1281        self.object_store
1282            .remove_dir_all(table_path)
1283            .await
1284            .map_err(|e| {
1285                Error::namespace_source(
1286                    format!("Failed to drop table {}: {}", table_name, e).into(),
1287                )
1288            })?;
1289
1290        Ok(DropTableResponse {
1291            id: request.id,
1292            location: Some(table_uri),
1293            ..Default::default()
1294        })
1295    }
1296
1297    async fn create_table(
1298        &self,
1299        request: CreateTableRequest,
1300        request_data: Bytes,
1301    ) -> Result<CreateTableResponse> {
1302        if let Some(ref manifest_ns) = self.manifest_ns {
1303            return manifest_ns.create_table(request, request_data).await;
1304        }
1305
1306        let table_name = Self::table_name_from_id(&request.id)?;
1307        let table_uri = self.table_full_uri(&table_name);
1308        if request_data.is_empty() {
1309            return Err(Error::namespace_source(
1310                "Request data (Arrow IPC stream) is required for create_table".into(),
1311            ));
1312        }
1313
1314        // Parse the Arrow IPC stream from request_data
1315        let cursor = Cursor::new(request_data.to_vec());
1316        let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| {
1317            Error::namespace_source(format!("Invalid Arrow IPC stream: {}", e).into())
1318        })?;
1319        let arrow_schema = stream_reader.schema();
1320
1321        // Collect all batches from the stream
1322        let mut batches = Vec::new();
1323        for batch_result in stream_reader {
1324            batches.push(batch_result.map_err(|e| {
1325                Error::namespace_source(
1326                    format!("Failed to read batch from IPC stream: {}", e).into(),
1327                )
1328            })?);
1329        }
1330
1331        // Create RecordBatchReader from the batches
1332        let reader = if batches.is_empty() {
1333            let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
1334            let batches = vec![Ok(batch)];
1335            RecordBatchIterator::new(batches, arrow_schema.clone())
1336        } else {
1337            let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
1338            RecordBatchIterator::new(batch_results, arrow_schema)
1339        };
1340
1341        let store_params = self.storage_options.as_ref().map(|opts| ObjectStoreParams {
1342            storage_options_accessor: Some(Arc::new(
1343                lance_io::object_store::StorageOptionsAccessor::with_static_options(opts.clone()),
1344            )),
1345            ..Default::default()
1346        });
1347
1348        let write_params = WriteParams {
1349            mode: lance::dataset::WriteMode::Create,
1350            store_params,
1351            ..Default::default()
1352        };
1353
1354        // Create the Lance dataset using the actual Lance API
1355        Dataset::write(reader, &table_uri, Some(write_params))
1356            .await
1357            .map_err(|e| {
1358                Error::namespace_source(format!("Failed to create Lance dataset: {}", e).into())
1359            })?;
1360
1361        Ok(CreateTableResponse {
1362            version: Some(1),
1363            location: Some(table_uri),
1364            storage_options: self.storage_options.clone(),
1365            ..Default::default()
1366        })
1367    }
1368
1369    async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
1370        if let Some(ref manifest_ns) = self.manifest_ns {
1371            let mut response = manifest_ns.declare_table(request.clone()).await?;
1372            // Only apply identity-based credential vending when explicitly requested
1373            if request.vend_credentials == Some(true) && self.credential_vendor.is_some() {
1374                if let Some(ref location) = response.location {
1375                    let identity = request.identity.as_deref();
1376                    response.storage_options = self
1377                        .get_storage_options_for_table(location, identity)
1378                        .await?;
1379                }
1380            } else if request.vend_credentials == Some(false) {
1381                response.storage_options = None;
1382            }
1383            // Set managed_versioning when table_version_tracking_enabled
1384            if self.table_version_tracking_enabled {
1385                response.managed_versioning = Some(true);
1386            }
1387            return Ok(response);
1388        }
1389
1390        let table_name = Self::table_name_from_id(&request.id)?;
1391        let table_uri = self.table_full_uri(&table_name);
1392
1393        // Validate location if provided
1394        if let Some(location) = &request.location {
1395            let location = location.trim_end_matches('/');
1396            if location != table_uri {
1397                return Err(Error::namespace_source(
1398                    format!(
1399                        "Cannot declare table {} at location {}, must be at location {}",
1400                        table_name, location, table_uri
1401                    )
1402                    .into(),
1403                ));
1404            }
1405        }
1406
1407        // Check if table already has data (created via create_table).
1408        // The atomic put only prevents races between concurrent declare_table calls,
1409        // not between declare_table and existing data.
1410        let status = self.check_table_status(&table_name).await;
1411        if status.exists && !status.has_reserved_file {
1412            // Table has data but no reserved file - it was created with data
1413            return Err(Error::namespace_source(
1414                format!("Table already exists: {}", table_name).into(),
1415            ));
1416        }
1417
1418        // Atomically create the .lance-reserved file to mark the table as declared.
1419        // This uses put_if_not_exists semantics to avoid race conditions between
1420        // concurrent declare_table calls.
1421        let reserved_file_path = self.table_reserved_file_path(&table_name);
1422
1423        self.put_marker_file_atomic(&reserved_file_path, &format!("table {}", table_name))
1424            .await
1425            .map_err(|e| Error::namespace_source(e.into()))?;
1426
1427        // For backwards compatibility, only skip vending credentials when explicitly set to false
1428        let vend_credentials = request.vend_credentials.unwrap_or(true);
1429        let identity = request.identity.as_deref();
1430        let storage_options = if vend_credentials {
1431            self.get_storage_options_for_table(&table_uri, identity)
1432                .await?
1433        } else {
1434            None
1435        };
1436
1437        Ok(DeclareTableResponse {
1438            location: Some(table_uri),
1439            storage_options,
1440            managed_versioning: if self.table_version_tracking_enabled {
1441                Some(true)
1442            } else {
1443                None
1444            },
1445            ..Default::default()
1446        })
1447    }
1448
1449    async fn register_table(
1450        &self,
1451        request: lance_namespace::models::RegisterTableRequest,
1452    ) -> Result<lance_namespace::models::RegisterTableResponse> {
1453        // If manifest is enabled, delegate to manifest namespace
1454        if let Some(ref manifest_ns) = self.manifest_ns {
1455            return LanceNamespace::register_table(manifest_ns.as_ref(), request).await;
1456        }
1457
1458        // Without manifest, register_table is not supported
1459        Err(Error::not_supported_source(
1460            "register_table is only supported when manifest mode is enabled".into(),
1461        ))
1462    }
1463
1464    async fn deregister_table(
1465        &self,
1466        request: lance_namespace::models::DeregisterTableRequest,
1467    ) -> Result<lance_namespace::models::DeregisterTableResponse> {
1468        // If manifest is enabled, delegate to manifest namespace
1469        if let Some(ref manifest_ns) = self.manifest_ns {
1470            return LanceNamespace::deregister_table(manifest_ns.as_ref(), request).await;
1471        }
1472
1473        // V1 mode: create a .lance-deregistered marker file in the table directory
1474        let table_name = Self::table_name_from_id(&request.id)?;
1475        let table_uri = self.table_full_uri(&table_name);
1476
1477        // Check table existence and deregistration status.
1478        // This provides better error messages for common cases.
1479        let status = self.check_table_status(&table_name).await;
1480
1481        if !status.exists {
1482            return Err(Error::namespace_source(
1483                format!("Table does not exist: {}", table_name).into(),
1484            ));
1485        }
1486
1487        if status.is_deregistered {
1488            return Err(Error::namespace_source(
1489                format!("Table is already deregistered: {}", table_name).into(),
1490            ));
1491        }
1492
1493        // Atomically create the .lance-deregistered marker file.
1494        // This uses put_if_not_exists semantics to prevent race conditions
1495        // when multiple processes try to deregister the same table concurrently.
1496        // If a race occurs and another process already created the file,
1497        // we'll get an AlreadyExists error which we convert to a proper message.
1498        let deregistered_path = self.table_deregistered_file_path(&table_name);
1499        self.put_marker_file_atomic(
1500            &deregistered_path,
1501            &format!("deregistration marker for table {}", table_name),
1502        )
1503        .await
1504        .map_err(|e| {
1505            // Convert "already exists" to "already deregistered" for better UX
1506            let message = if e.contains("already exists") {
1507                format!("Table is already deregistered: {}", table_name)
1508            } else {
1509                e
1510            };
1511            Error::namespace_source(message.into())
1512        })?;
1513
1514        Ok(lance_namespace::models::DeregisterTableResponse {
1515            id: request.id,
1516            location: Some(table_uri),
1517            ..Default::default()
1518        })
1519    }
1520
1521    async fn list_table_versions(
1522        &self,
1523        request: ListTableVersionsRequest,
1524    ) -> Result<ListTableVersionsResponse> {
1525        let table_uri = self.resolve_table_location(&request.id).await?;
1526
1527        let table_path = Self::uri_to_object_store_path(&table_uri);
1528        let versions_dir = table_path.child("_versions");
1529        let manifest_metas: Vec<_> = self
1530            .object_store
1531            .read_dir_all(&versions_dir, None)
1532            .try_collect()
1533            .await
1534            .map_err(|e| {
1535                Error::namespace_source(
1536                    format!(
1537                        "Failed to list manifest files for table at '{}': {}",
1538                        table_uri, e
1539                    )
1540                    .into(),
1541                )
1542            })?;
1543
1544        let is_v2_naming = manifest_metas
1545            .first()
1546            .is_some_and(|meta| meta.location.filename().is_some_and(|f| f.len() == 29));
1547
1548        let mut table_versions: Vec<TableVersion> = manifest_metas
1549            .into_iter()
1550            .filter_map(|meta| {
1551                let filename = meta.location.filename()?;
1552                let version_str = filename.strip_suffix(".manifest")?;
1553                if version_str.starts_with('d') {
1554                    return None;
1555                }
1556                let file_version: u64 = version_str.parse().ok()?;
1557
1558                let actual_version = if file_version > u64::MAX / 2 {
1559                    u64::MAX - file_version
1560                } else {
1561                    file_version
1562                };
1563
1564                // Use full path from object_store (relative to object store root)
1565                Some(TableVersion {
1566                    version: actual_version as i64,
1567                    manifest_path: meta.location.to_string(),
1568                    manifest_size: Some(meta.size as i64),
1569                    e_tag: meta.e_tag,
1570                    timestamp_millis: Some(meta.last_modified.timestamp_millis()),
1571                    metadata: None,
1572                })
1573            })
1574            .collect();
1575
1576        let list_is_ordered = self.object_store.list_is_lexically_ordered;
1577        let want_descending = request.descending == Some(true);
1578
1579        let needs_sort = if list_is_ordered {
1580            if is_v2_naming {
1581                !want_descending
1582            } else {
1583                want_descending
1584            }
1585        } else {
1586            true
1587        };
1588
1589        if needs_sort {
1590            if want_descending {
1591                table_versions.sort_by(|a, b| b.version.cmp(&a.version));
1592            } else {
1593                table_versions.sort_by(|a, b| a.version.cmp(&b.version));
1594            }
1595        }
1596
1597        if let Some(limit) = request.limit {
1598            table_versions.truncate(limit as usize);
1599        }
1600
1601        Ok(ListTableVersionsResponse {
1602            versions: table_versions,
1603            page_token: None,
1604        })
1605    }
1606
1607    async fn create_table_version(
1608        &self,
1609        request: CreateTableVersionRequest,
1610    ) -> Result<CreateTableVersionResponse> {
1611        let table_uri = self.resolve_table_location(&request.id).await?;
1612
1613        let staging_manifest_path = &request.manifest_path;
1614        let version = request.version as u64;
1615
1616        let table_path = Self::uri_to_object_store_path(&table_uri);
1617
1618        // Determine naming scheme from request, default to V2
1619        let naming_scheme = match request.naming_scheme.as_deref() {
1620            Some("V1") => ManifestNamingScheme::V1,
1621            _ => ManifestNamingScheme::V2,
1622        };
1623
1624        // Compute final path using the naming scheme
1625        let final_path = naming_scheme.manifest_path(&table_path, version);
1626
1627        let staging_path = Self::uri_to_object_store_path(staging_manifest_path);
1628        let manifest_data = self
1629            .object_store
1630            .inner
1631            .get(&staging_path)
1632            .await
1633            .map_err(|e| {
1634                Error::namespace_source(
1635                    format!(
1636                        "Failed to read staging manifest at '{}': {}",
1637                        staging_manifest_path, e
1638                    )
1639                    .into(),
1640                )
1641            })?
1642            .bytes()
1643            .await
1644            .map_err(|e| {
1645                Error::namespace_source(
1646                    format!(
1647                        "Failed to read staging manifest bytes at '{}': {}",
1648                        staging_manifest_path, e
1649                    )
1650                    .into(),
1651                )
1652            })?;
1653
1654        let manifest_size = manifest_data.len() as i64;
1655
1656        let put_result = self
1657            .object_store
1658            .inner
1659            .put_opts(
1660                &final_path,
1661                manifest_data.into(),
1662                PutOptions {
1663                    mode: PutMode::Create,
1664                    ..Default::default()
1665                },
1666            )
1667            .await
1668            .map_err(|e| match e {
1669                object_store::Error::AlreadyExists { .. }
1670                | object_store::Error::Precondition { .. } => Error::namespace_source(
1671                    format!(
1672                        "Version {} already exists for table at '{}'",
1673                        version, table_uri
1674                    )
1675                    .into(),
1676                ),
1677                _ => Error::namespace_source(
1678                    format!(
1679                        "Failed to create version {} for table at '{}': {}",
1680                        version, table_uri, e
1681                    )
1682                    .into(),
1683                ),
1684            })?;
1685
1686        // Delete the staging manifest after successful copy
1687        if let Err(e) = self.object_store.inner.delete(&staging_path).await {
1688            log::warn!(
1689                "Failed to delete staging manifest at '{}': {:?}",
1690                staging_path,
1691                e
1692            );
1693        }
1694
1695        Ok(CreateTableVersionResponse {
1696            transaction_id: None,
1697            version: Some(Box::new(TableVersion {
1698                version: version as i64,
1699                manifest_path: final_path.to_string(),
1700                manifest_size: Some(manifest_size),
1701                e_tag: put_result.e_tag,
1702                timestamp_millis: None,
1703                metadata: None,
1704            })),
1705        })
1706    }
1707
1708    async fn describe_table_version(
1709        &self,
1710        request: DescribeTableVersionRequest,
1711    ) -> Result<DescribeTableVersionResponse> {
1712        let table_uri = self.resolve_table_location(&request.id).await?;
1713
1714        // Use DatasetBuilder with storage options to support S3 with custom endpoints
1715        let mut builder = DatasetBuilder::from_uri(&table_uri);
1716        if let Some(opts) = &self.storage_options {
1717            builder = builder.with_storage_options(opts.clone());
1718        }
1719        if let Some(sess) = &self.session {
1720            builder = builder.with_session(sess.clone());
1721        }
1722        let mut dataset = builder.load().await.map_err(|e| {
1723            Error::namespace_source(
1724                format!("Failed to open table at '{}': {}", table_uri, e).into(),
1725            )
1726        })?;
1727
1728        if let Some(version) = request.version {
1729            dataset = dataset
1730                .checkout_version(version as u64)
1731                .await
1732                .map_err(|e| {
1733                    Error::namespace_source(
1734                        format!(
1735                            "Failed to checkout version {} for table at '{}': {}",
1736                            version, table_uri, e
1737                        )
1738                        .into(),
1739                    )
1740                })?;
1741        }
1742
1743        let version_info = dataset.version();
1744        let manifest_location = dataset.manifest_location();
1745        let metadata: std::collections::HashMap<String, String> =
1746            version_info.metadata.into_iter().collect();
1747
1748        let table_version = TableVersion {
1749            version: version_info.version as i64,
1750            manifest_path: manifest_location.path.to_string(),
1751            manifest_size: manifest_location.size.map(|s| s as i64),
1752            e_tag: manifest_location.e_tag.clone(),
1753            timestamp_millis: Some(version_info.timestamp.timestamp_millis()),
1754            metadata: if metadata.is_empty() {
1755                None
1756            } else {
1757                Some(metadata)
1758            },
1759        };
1760
1761        Ok(DescribeTableVersionResponse {
1762            version: Box::new(table_version),
1763        })
1764    }
1765
1766    async fn batch_delete_table_versions(
1767        &self,
1768        request: BatchDeleteTableVersionsRequest,
1769    ) -> Result<BatchDeleteTableVersionsResponse> {
1770        let table_uri = self.resolve_table_location(&request.id).await?;
1771
1772        let table_path = Self::uri_to_object_store_path(&table_uri);
1773        let table_path_str = table_path.as_ref();
1774        let versions_dir_path = Path::from(format!("{}_versions", table_path_str));
1775
1776        let mut deleted_count = 0i64;
1777
1778        for range in &request.ranges {
1779            let start = range.start_version as u64;
1780            let end = if range.end_version > 0 {
1781                range.end_version as u64
1782            } else {
1783                start
1784            };
1785
1786            for version in start..=end {
1787                let version_path = versions_dir_path.child(format!("{}.manifest", version));
1788                match self.object_store.inner.delete(&version_path).await {
1789                    Ok(_) => {
1790                        deleted_count += 1;
1791                    }
1792                    Err(object_store::Error::NotFound { .. }) => {}
1793                    Err(e) => {
1794                        return Err(Error::namespace_source(
1795                            format!(
1796                                "Failed to delete version {} for table at '{}': {}",
1797                                version, table_uri, e
1798                            )
1799                            .into(),
1800                        ));
1801                    }
1802                }
1803            }
1804        }
1805
1806        Ok(BatchDeleteTableVersionsResponse {
1807            deleted_count: Some(deleted_count),
1808            transaction_id: None,
1809        })
1810    }
1811
1812    fn namespace_id(&self) -> String {
1813        format!("DirectoryNamespace {{ root: {:?} }}", self.root)
1814    }
1815}
1816
1817#[cfg(test)]
1818mod tests {
1819    use super::*;
1820    use arrow_ipc::reader::StreamReader;
1821    use lance::dataset::Dataset;
1822    use lance_core::utils::tempfile::{TempStdDir, TempStrDir};
1823    use lance_namespace::models::{
1824        CreateTableRequest, JsonArrowDataType, JsonArrowField, JsonArrowSchema, ListTablesRequest,
1825    };
1826    use lance_namespace::schema::convert_json_arrow_schema;
1827    use std::io::Cursor;
1828    use std::sync::Arc;
1829
1830    /// Helper to create a test DirectoryNamespace with a temporary directory
1831    async fn create_test_namespace() -> (DirectoryNamespace, TempStdDir) {
1832        let temp_dir = TempStdDir::default();
1833
1834        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1835            .build()
1836            .await
1837            .unwrap();
1838        (namespace, temp_dir)
1839    }
1840
1841    /// Helper to create test IPC data from a schema
1842    fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
1843        use arrow::ipc::writer::StreamWriter;
1844
1845        let arrow_schema = convert_json_arrow_schema(schema).unwrap();
1846        let arrow_schema = Arc::new(arrow_schema);
1847        let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
1848        let mut buffer = Vec::new();
1849        {
1850            let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
1851            writer.write(&batch).unwrap();
1852            writer.finish().unwrap();
1853        }
1854        buffer
1855    }
1856
1857    /// Helper to create a simple test schema
1858    fn create_test_schema() -> JsonArrowSchema {
1859        let int_type = JsonArrowDataType::new("int32".to_string());
1860        let string_type = JsonArrowDataType::new("utf8".to_string());
1861
1862        let id_field = JsonArrowField {
1863            name: "id".to_string(),
1864            r#type: Box::new(int_type),
1865            nullable: false,
1866            metadata: None,
1867        };
1868
1869        let name_field = JsonArrowField {
1870            name: "name".to_string(),
1871            r#type: Box::new(string_type),
1872            nullable: true,
1873            metadata: None,
1874        };
1875
1876        JsonArrowSchema {
1877            fields: vec![id_field, name_field],
1878            metadata: None,
1879        }
1880    }
1881
1882    #[tokio::test]
1883    async fn test_create_table() {
1884        let (namespace, _temp_dir) = create_test_namespace().await;
1885
1886        // Create test IPC data
1887        let schema = create_test_schema();
1888        let ipc_data = create_test_ipc_data(&schema);
1889
1890        let mut request = CreateTableRequest::new();
1891        request.id = Some(vec!["test_table".to_string()]);
1892
1893        let response = namespace
1894            .create_table(request, bytes::Bytes::from(ipc_data))
1895            .await
1896            .unwrap();
1897
1898        assert!(response.location.is_some());
1899        assert!(response.location.unwrap().ends_with("test_table.lance"));
1900        assert_eq!(response.version, Some(1));
1901    }
1902
1903    #[tokio::test]
1904    async fn test_create_table_without_data() {
1905        let (namespace, _temp_dir) = create_test_namespace().await;
1906
1907        let mut request = CreateTableRequest::new();
1908        request.id = Some(vec!["test_table".to_string()]);
1909
1910        let result = namespace.create_table(request, bytes::Bytes::new()).await;
1911        assert!(result.is_err());
1912        assert!(
1913            result
1914                .unwrap_err()
1915                .to_string()
1916                .contains("Arrow IPC stream) is required")
1917        );
1918    }
1919
1920    #[tokio::test]
1921    async fn test_create_table_with_invalid_id() {
1922        let (namespace, _temp_dir) = create_test_namespace().await;
1923
1924        // Create test IPC data
1925        let schema = create_test_schema();
1926        let ipc_data = create_test_ipc_data(&schema);
1927
1928        // Test with empty ID
1929        let mut request = CreateTableRequest::new();
1930        request.id = Some(vec![]);
1931
1932        let result = namespace
1933            .create_table(request, bytes::Bytes::from(ipc_data.clone()))
1934            .await;
1935        assert!(result.is_err());
1936
1937        // Test with multi-level ID - should now work with manifest enabled
1938        // First create the parent namespace
1939        let mut create_ns_req = CreateNamespaceRequest::new();
1940        create_ns_req.id = Some(vec!["test_namespace".to_string()]);
1941        namespace.create_namespace(create_ns_req).await.unwrap();
1942
1943        // Now create table in the namespace
1944        let mut request = CreateTableRequest::new();
1945        request.id = Some(vec!["test_namespace".to_string(), "table".to_string()]);
1946
1947        let result = namespace
1948            .create_table(request, bytes::Bytes::from(ipc_data))
1949            .await;
1950        // Should succeed with manifest enabled
1951        assert!(
1952            result.is_ok(),
1953            "Multi-level table IDs should work with manifest enabled"
1954        );
1955    }
1956
1957    #[tokio::test]
1958    async fn test_list_tables() {
1959        let (namespace, _temp_dir) = create_test_namespace().await;
1960
1961        // Initially, no tables
1962        let mut request = ListTablesRequest::new();
1963        request.id = Some(vec![]);
1964        let response = namespace.list_tables(request).await.unwrap();
1965        assert_eq!(response.tables.len(), 0);
1966
1967        // Create test IPC data
1968        let schema = create_test_schema();
1969        let ipc_data = create_test_ipc_data(&schema);
1970
1971        // Create a table
1972        let mut create_request = CreateTableRequest::new();
1973        create_request.id = Some(vec!["table1".to_string()]);
1974        namespace
1975            .create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
1976            .await
1977            .unwrap();
1978
1979        // Create another table
1980        let mut create_request = CreateTableRequest::new();
1981        create_request.id = Some(vec!["table2".to_string()]);
1982        namespace
1983            .create_table(create_request, bytes::Bytes::from(ipc_data))
1984            .await
1985            .unwrap();
1986
1987        // List tables should return both
1988        let mut request = ListTablesRequest::new();
1989        request.id = Some(vec![]);
1990        let response = namespace.list_tables(request).await.unwrap();
1991        let tables = response.tables;
1992        assert_eq!(tables.len(), 2);
1993        assert!(tables.contains(&"table1".to_string()));
1994        assert!(tables.contains(&"table2".to_string()));
1995    }
1996
1997    #[tokio::test]
1998    async fn test_list_tables_with_namespace_id() {
1999        let (namespace, _temp_dir) = create_test_namespace().await;
2000
2001        // First create a child namespace
2002        let mut create_ns_req = CreateNamespaceRequest::new();
2003        create_ns_req.id = Some(vec!["test_namespace".to_string()]);
2004        namespace.create_namespace(create_ns_req).await.unwrap();
2005
2006        // Now list tables in the child namespace
2007        let mut request = ListTablesRequest::new();
2008        request.id = Some(vec!["test_namespace".to_string()]);
2009
2010        let result = namespace.list_tables(request).await;
2011        // Should succeed (with manifest enabled) and return empty list (no tables yet)
2012        assert!(
2013            result.is_ok(),
2014            "list_tables should work with child namespace when manifest is enabled"
2015        );
2016        let response = result.unwrap();
2017        assert_eq!(
2018            response.tables.len(),
2019            0,
2020            "Namespace should have no tables yet"
2021        );
2022    }
2023
2024    #[tokio::test]
2025    async fn test_describe_table() {
2026        let (namespace, _temp_dir) = create_test_namespace().await;
2027
2028        // Create a table first
2029        let schema = create_test_schema();
2030        let ipc_data = create_test_ipc_data(&schema);
2031
2032        let mut create_request = CreateTableRequest::new();
2033        create_request.id = Some(vec!["test_table".to_string()]);
2034        namespace
2035            .create_table(create_request, bytes::Bytes::from(ipc_data))
2036            .await
2037            .unwrap();
2038
2039        // Describe the table
2040        let mut request = DescribeTableRequest::new();
2041        request.id = Some(vec!["test_table".to_string()]);
2042        let response = namespace.describe_table(request).await.unwrap();
2043
2044        assert!(response.location.is_some());
2045        assert!(response.location.unwrap().ends_with("test_table.lance"));
2046    }
2047
2048    #[tokio::test]
2049    async fn test_describe_nonexistent_table() {
2050        let (namespace, _temp_dir) = create_test_namespace().await;
2051
2052        let mut request = DescribeTableRequest::new();
2053        request.id = Some(vec!["nonexistent".to_string()]);
2054
2055        let result = namespace.describe_table(request).await;
2056        assert!(result.is_err());
2057        assert!(
2058            result
2059                .unwrap_err()
2060                .to_string()
2061                .contains("Table does not exist")
2062        );
2063    }
2064
2065    #[tokio::test]
2066    async fn test_table_exists() {
2067        let (namespace, _temp_dir) = create_test_namespace().await;
2068
2069        // Create a table
2070        let schema = create_test_schema();
2071        let ipc_data = create_test_ipc_data(&schema);
2072
2073        let mut create_request = CreateTableRequest::new();
2074        create_request.id = Some(vec!["existing_table".to_string()]);
2075        namespace
2076            .create_table(create_request, bytes::Bytes::from(ipc_data))
2077            .await
2078            .unwrap();
2079
2080        // Check existing table
2081        let mut request = TableExistsRequest::new();
2082        request.id = Some(vec!["existing_table".to_string()]);
2083        let result = namespace.table_exists(request).await;
2084        assert!(result.is_ok());
2085
2086        // Check non-existent table
2087        let mut request = TableExistsRequest::new();
2088        request.id = Some(vec!["nonexistent".to_string()]);
2089        let result = namespace.table_exists(request).await;
2090        assert!(result.is_err());
2091        assert!(
2092            result
2093                .unwrap_err()
2094                .to_string()
2095                .contains("Table does not exist")
2096        );
2097    }
2098
2099    #[tokio::test]
2100    async fn test_drop_table() {
2101        let (namespace, _temp_dir) = create_test_namespace().await;
2102
2103        // Create a table
2104        let schema = create_test_schema();
2105        let ipc_data = create_test_ipc_data(&schema);
2106
2107        let mut create_request = CreateTableRequest::new();
2108        create_request.id = Some(vec!["table_to_drop".to_string()]);
2109        namespace
2110            .create_table(create_request, bytes::Bytes::from(ipc_data))
2111            .await
2112            .unwrap();
2113
2114        // Verify it exists
2115        let mut exists_request = TableExistsRequest::new();
2116        exists_request.id = Some(vec!["table_to_drop".to_string()]);
2117        assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
2118
2119        // Drop the table
2120        let mut drop_request = DropTableRequest::new();
2121        drop_request.id = Some(vec!["table_to_drop".to_string()]);
2122        let response = namespace.drop_table(drop_request).await.unwrap();
2123        assert!(response.location.is_some());
2124
2125        // Verify it no longer exists
2126        assert!(namespace.table_exists(exists_request).await.is_err());
2127    }
2128
2129    #[tokio::test]
2130    async fn test_drop_nonexistent_table() {
2131        let (namespace, _temp_dir) = create_test_namespace().await;
2132
2133        let mut request = DropTableRequest::new();
2134        request.id = Some(vec!["nonexistent".to_string()]);
2135
2136        // Should not fail when dropping non-existent table (idempotent)
2137        let result = namespace.drop_table(request).await;
2138        // The operation might succeed or fail depending on implementation
2139        // But it should not panic
2140        let _ = result;
2141    }
2142
2143    #[tokio::test]
2144    async fn test_root_namespace_operations() {
2145        let (namespace, _temp_dir) = create_test_namespace().await;
2146
2147        // Test list_namespaces - should return empty list for root
2148        let mut request = ListNamespacesRequest::new();
2149        request.id = Some(vec![]);
2150        let result = namespace.list_namespaces(request).await;
2151        assert!(result.is_ok());
2152        assert_eq!(result.unwrap().namespaces.len(), 0);
2153
2154        // Test describe_namespace - should succeed for root
2155        let mut request = DescribeNamespaceRequest::new();
2156        request.id = Some(vec![]);
2157        let result = namespace.describe_namespace(request).await;
2158        assert!(result.is_ok());
2159
2160        // Test namespace_exists - root always exists
2161        let mut request = NamespaceExistsRequest::new();
2162        request.id = Some(vec![]);
2163        let result = namespace.namespace_exists(request).await;
2164        assert!(result.is_ok());
2165
2166        // Test create_namespace - root cannot be created
2167        let mut request = CreateNamespaceRequest::new();
2168        request.id = Some(vec![]);
2169        let result = namespace.create_namespace(request).await;
2170        assert!(result.is_err());
2171        assert!(result.unwrap_err().to_string().contains("already exists"));
2172
2173        // Test drop_namespace - root cannot be dropped
2174        let mut request = DropNamespaceRequest::new();
2175        request.id = Some(vec![]);
2176        let result = namespace.drop_namespace(request).await;
2177        assert!(result.is_err());
2178        assert!(
2179            result
2180                .unwrap_err()
2181                .to_string()
2182                .contains("cannot be dropped")
2183        );
2184    }
2185
2186    #[tokio::test]
2187    async fn test_non_root_namespace_operations() {
2188        let (namespace, _temp_dir) = create_test_namespace().await;
2189
2190        // With manifest enabled (default), child namespaces are now supported
2191        // Test create_namespace for non-root - should succeed with manifest
2192        let mut request = CreateNamespaceRequest::new();
2193        request.id = Some(vec!["child".to_string()]);
2194        let result = namespace.create_namespace(request).await;
2195        assert!(
2196            result.is_ok(),
2197            "Child namespace creation should succeed with manifest enabled"
2198        );
2199
2200        // Test namespace_exists for non-root - should exist after creation
2201        let mut request = NamespaceExistsRequest::new();
2202        request.id = Some(vec!["child".to_string()]);
2203        let result = namespace.namespace_exists(request).await;
2204        assert!(
2205            result.is_ok(),
2206            "Child namespace should exist after creation"
2207        );
2208
2209        // Test drop_namespace for non-root - should succeed
2210        let mut request = DropNamespaceRequest::new();
2211        request.id = Some(vec!["child".to_string()]);
2212        let result = namespace.drop_namespace(request).await;
2213        assert!(
2214            result.is_ok(),
2215            "Child namespace drop should succeed with manifest enabled"
2216        );
2217
2218        // Verify namespace no longer exists
2219        let mut request = NamespaceExistsRequest::new();
2220        request.id = Some(vec!["child".to_string()]);
2221        let result = namespace.namespace_exists(request).await;
2222        assert!(
2223            result.is_err(),
2224            "Child namespace should not exist after drop"
2225        );
2226    }
2227
2228    #[tokio::test]
2229    async fn test_config_custom_root() {
2230        let temp_dir = TempStdDir::default();
2231        let custom_path = temp_dir.join("custom");
2232        std::fs::create_dir(&custom_path).unwrap();
2233
2234        let namespace = DirectoryNamespaceBuilder::new(custom_path.to_string_lossy().to_string())
2235            .build()
2236            .await
2237            .unwrap();
2238
2239        // Create test IPC data
2240        let schema = create_test_schema();
2241        let ipc_data = create_test_ipc_data(&schema);
2242
2243        // Create a table and verify location
2244        let mut request = CreateTableRequest::new();
2245        request.id = Some(vec!["test_table".to_string()]);
2246
2247        let response = namespace
2248            .create_table(request, bytes::Bytes::from(ipc_data))
2249            .await
2250            .unwrap();
2251
2252        assert!(response.location.unwrap().contains("custom"));
2253    }
2254
2255    #[tokio::test]
2256    async fn test_config_storage_options() {
2257        let temp_dir = TempStdDir::default();
2258
2259        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
2260            .storage_option("option1", "value1")
2261            .storage_option("option2", "value2")
2262            .build()
2263            .await
2264            .unwrap();
2265
2266        // Create test IPC data
2267        let schema = create_test_schema();
2268        let ipc_data = create_test_ipc_data(&schema);
2269
2270        // Create a table and check storage options are included
2271        let mut request = CreateTableRequest::new();
2272        request.id = Some(vec!["test_table".to_string()]);
2273
2274        let response = namespace
2275            .create_table(request, bytes::Bytes::from(ipc_data))
2276            .await
2277            .unwrap();
2278
2279        let storage_options = response.storage_options.unwrap();
2280        assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
2281        assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
2282    }
2283
2284    #[tokio::test]
2285    async fn test_from_properties_manifest_enabled() {
2286        let temp_dir = TempStdDir::default();
2287
2288        let mut properties = HashMap::new();
2289        properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2290        properties.insert("manifest_enabled".to_string(), "true".to_string());
2291        properties.insert("dir_listing_enabled".to_string(), "false".to_string());
2292
2293        let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2294        assert!(builder.manifest_enabled);
2295        assert!(!builder.dir_listing_enabled);
2296
2297        let namespace = builder.build().await.unwrap();
2298
2299        // Create test IPC data
2300        let schema = create_test_schema();
2301        let ipc_data = create_test_ipc_data(&schema);
2302
2303        // Create a table
2304        let mut request = CreateTableRequest::new();
2305        request.id = Some(vec!["test_table".to_string()]);
2306
2307        let response = namespace
2308            .create_table(request, bytes::Bytes::from(ipc_data))
2309            .await
2310            .unwrap();
2311
2312        assert!(response.location.is_some());
2313    }
2314
2315    #[tokio::test]
2316    async fn test_from_properties_dir_listing_enabled() {
2317        let temp_dir = TempStdDir::default();
2318
2319        let mut properties = HashMap::new();
2320        properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2321        properties.insert("manifest_enabled".to_string(), "false".to_string());
2322        properties.insert("dir_listing_enabled".to_string(), "true".to_string());
2323
2324        let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2325        assert!(!builder.manifest_enabled);
2326        assert!(builder.dir_listing_enabled);
2327
2328        let namespace = builder.build().await.unwrap();
2329
2330        // Create test IPC data
2331        let schema = create_test_schema();
2332        let ipc_data = create_test_ipc_data(&schema);
2333
2334        // Create a table
2335        let mut request = CreateTableRequest::new();
2336        request.id = Some(vec!["test_table".to_string()]);
2337
2338        let response = namespace
2339            .create_table(request, bytes::Bytes::from(ipc_data))
2340            .await
2341            .unwrap();
2342
2343        assert!(response.location.is_some());
2344    }
2345
2346    #[tokio::test]
2347    async fn test_from_properties_defaults() {
2348        let temp_dir = TempStdDir::default();
2349
2350        let mut properties = HashMap::new();
2351        properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2352
2353        let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2354        // Both should default to true
2355        assert!(builder.manifest_enabled);
2356        assert!(builder.dir_listing_enabled);
2357    }
2358
2359    #[tokio::test]
2360    async fn test_from_properties_with_storage_options() {
2361        let temp_dir = TempStdDir::default();
2362
2363        let mut properties = HashMap::new();
2364        properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
2365        properties.insert("manifest_enabled".to_string(), "true".to_string());
2366        properties.insert("storage.region".to_string(), "us-west-2".to_string());
2367        properties.insert("storage.bucket".to_string(), "my-bucket".to_string());
2368
2369        let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
2370        assert!(builder.manifest_enabled);
2371        assert!(builder.storage_options.is_some());
2372
2373        let storage_options = builder.storage_options.unwrap();
2374        assert_eq!(
2375            storage_options.get("region"),
2376            Some(&"us-west-2".to_string())
2377        );
2378        assert_eq!(
2379            storage_options.get("bucket"),
2380            Some(&"my-bucket".to_string())
2381        );
2382    }
2383
2384    #[tokio::test]
2385    async fn test_various_arrow_types() {
2386        let (namespace, _temp_dir) = create_test_namespace().await;
2387
2388        // Create schema with various types
2389        let fields = vec![
2390            JsonArrowField {
2391                name: "bool_col".to_string(),
2392                r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
2393                nullable: true,
2394                metadata: None,
2395            },
2396            JsonArrowField {
2397                name: "int8_col".to_string(),
2398                r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
2399                nullable: true,
2400                metadata: None,
2401            },
2402            JsonArrowField {
2403                name: "float64_col".to_string(),
2404                r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
2405                nullable: true,
2406                metadata: None,
2407            },
2408            JsonArrowField {
2409                name: "binary_col".to_string(),
2410                r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
2411                nullable: true,
2412                metadata: None,
2413            },
2414        ];
2415
2416        let schema = JsonArrowSchema {
2417            fields,
2418            metadata: None,
2419        };
2420
2421        // Create IPC data
2422        let ipc_data = create_test_ipc_data(&schema);
2423
2424        let mut request = CreateTableRequest::new();
2425        request.id = Some(vec!["complex_table".to_string()]);
2426
2427        let response = namespace
2428            .create_table(request, bytes::Bytes::from(ipc_data))
2429            .await
2430            .unwrap();
2431
2432        assert!(response.location.is_some());
2433    }
2434
2435    #[tokio::test]
2436    async fn test_connect_dir() {
2437        let temp_dir = TempStdDir::default();
2438
2439        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
2440            .build()
2441            .await
2442            .unwrap();
2443
2444        // Test basic operation through the concrete type
2445        let mut request = ListTablesRequest::new();
2446        request.id = Some(vec![]);
2447        let response = namespace.list_tables(request).await.unwrap();
2448        assert_eq!(response.tables.len(), 0);
2449    }
2450
2451    #[tokio::test]
2452    async fn test_create_table_with_ipc_data() {
2453        use arrow::array::{Int32Array, StringArray};
2454        use arrow::ipc::writer::StreamWriter;
2455
2456        let (namespace, _temp_dir) = create_test_namespace().await;
2457
2458        // Create a schema with some fields
2459        let schema = create_test_schema();
2460
2461        // Create some test data that matches the schema
2462        let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
2463        let arrow_schema = Arc::new(arrow_schema);
2464
2465        // Create a RecordBatch with actual data
2466        let id_array = Int32Array::from(vec![1, 2, 3]);
2467        let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
2468        let batch = arrow::record_batch::RecordBatch::try_new(
2469            arrow_schema.clone(),
2470            vec![Arc::new(id_array), Arc::new(name_array)],
2471        )
2472        .unwrap();
2473
2474        // Write the batch to an IPC stream
2475        let mut buffer = Vec::new();
2476        {
2477            let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
2478            writer.write(&batch).unwrap();
2479            writer.finish().unwrap();
2480        }
2481
2482        // Create table with the IPC data
2483        let mut request = CreateTableRequest::new();
2484        request.id = Some(vec!["test_table_with_data".to_string()]);
2485
2486        let response = namespace
2487            .create_table(request, Bytes::from(buffer))
2488            .await
2489            .unwrap();
2490
2491        assert_eq!(response.version, Some(1));
2492        assert!(
2493            response
2494                .location
2495                .unwrap()
2496                .contains("test_table_with_data.lance")
2497        );
2498
2499        // Verify table exists
2500        let mut exists_request = TableExistsRequest::new();
2501        exists_request.id = Some(vec!["test_table_with_data".to_string()]);
2502        namespace.table_exists(exists_request).await.unwrap();
2503    }
2504
2505    #[tokio::test]
2506    async fn test_child_namespace_create_and_list() {
2507        let (namespace, _temp_dir) = create_test_namespace().await;
2508
2509        // Create multiple child namespaces
2510        for i in 1..=3 {
2511            let mut create_req = CreateNamespaceRequest::new();
2512            create_req.id = Some(vec![format!("ns{}", i)]);
2513            let result = namespace.create_namespace(create_req).await;
2514            assert!(result.is_ok(), "Failed to create child namespace ns{}", i);
2515        }
2516
2517        // List child namespaces
2518        let list_req = ListNamespacesRequest {
2519            id: Some(vec![]),
2520            ..Default::default()
2521        };
2522        let result = namespace.list_namespaces(list_req).await;
2523        assert!(result.is_ok());
2524        let namespaces = result.unwrap().namespaces;
2525        assert_eq!(namespaces.len(), 3);
2526        assert!(namespaces.contains(&"ns1".to_string()));
2527        assert!(namespaces.contains(&"ns2".to_string()));
2528        assert!(namespaces.contains(&"ns3".to_string()));
2529    }
2530
2531    #[tokio::test]
2532    async fn test_nested_namespace_hierarchy() {
2533        let (namespace, _temp_dir) = create_test_namespace().await;
2534
2535        // Create parent namespace
2536        let mut create_req = CreateNamespaceRequest::new();
2537        create_req.id = Some(vec!["parent".to_string()]);
2538        namespace.create_namespace(create_req).await.unwrap();
2539
2540        // Create nested children
2541        let mut create_req = CreateNamespaceRequest::new();
2542        create_req.id = Some(vec!["parent".to_string(), "child1".to_string()]);
2543        namespace.create_namespace(create_req).await.unwrap();
2544
2545        let mut create_req = CreateNamespaceRequest::new();
2546        create_req.id = Some(vec!["parent".to_string(), "child2".to_string()]);
2547        namespace.create_namespace(create_req).await.unwrap();
2548
2549        // List children of parent
2550        let list_req = ListNamespacesRequest {
2551            id: Some(vec!["parent".to_string()]),
2552            ..Default::default()
2553        };
2554        let result = namespace.list_namespaces(list_req).await;
2555        assert!(result.is_ok());
2556        let children = result.unwrap().namespaces;
2557        assert_eq!(children.len(), 2);
2558        assert!(children.contains(&"child1".to_string()));
2559        assert!(children.contains(&"child2".to_string()));
2560
2561        // List root should only show parent
2562        let list_req = ListNamespacesRequest {
2563            id: Some(vec![]),
2564            ..Default::default()
2565        };
2566        let result = namespace.list_namespaces(list_req).await;
2567        assert!(result.is_ok());
2568        let root_namespaces = result.unwrap().namespaces;
2569        assert_eq!(root_namespaces.len(), 1);
2570        assert_eq!(root_namespaces[0], "parent");
2571    }
2572
2573    #[tokio::test]
2574    async fn test_table_in_child_namespace() {
2575        let (namespace, _temp_dir) = create_test_namespace().await;
2576
2577        // Create child namespace
2578        let mut create_ns_req = CreateNamespaceRequest::new();
2579        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2580        namespace.create_namespace(create_ns_req).await.unwrap();
2581
2582        // Create table in child namespace
2583        let schema = create_test_schema();
2584        let ipc_data = create_test_ipc_data(&schema);
2585        let mut create_table_req = CreateTableRequest::new();
2586        create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2587        let result = namespace
2588            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2589            .await;
2590        assert!(result.is_ok(), "Failed to create table in child namespace");
2591
2592        // List tables in child namespace
2593        let list_req = ListTablesRequest {
2594            id: Some(vec!["test_ns".to_string()]),
2595            ..Default::default()
2596        };
2597        let result = namespace.list_tables(list_req).await;
2598        assert!(result.is_ok());
2599        let tables = result.unwrap().tables;
2600        assert_eq!(tables.len(), 1);
2601        assert_eq!(tables[0], "table1");
2602
2603        // Verify table exists
2604        let mut exists_req = TableExistsRequest::new();
2605        exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2606        let result = namespace.table_exists(exists_req).await;
2607        assert!(result.is_ok());
2608
2609        // Describe table in child namespace
2610        let mut describe_req = DescribeTableRequest::new();
2611        describe_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2612        let result = namespace.describe_table(describe_req).await;
2613        assert!(result.is_ok());
2614        let response = result.unwrap();
2615        assert!(response.location.is_some());
2616    }
2617
2618    #[tokio::test]
2619    async fn test_multiple_tables_in_child_namespace() {
2620        let (namespace, _temp_dir) = create_test_namespace().await;
2621
2622        // Create child namespace
2623        let mut create_ns_req = CreateNamespaceRequest::new();
2624        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2625        namespace.create_namespace(create_ns_req).await.unwrap();
2626
2627        // Create multiple tables
2628        let schema = create_test_schema();
2629        let ipc_data = create_test_ipc_data(&schema);
2630        for i in 1..=3 {
2631            let mut create_table_req = CreateTableRequest::new();
2632            create_table_req.id = Some(vec!["test_ns".to_string(), format!("table{}", i)]);
2633            namespace
2634                .create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
2635                .await
2636                .unwrap();
2637        }
2638
2639        // List tables
2640        let list_req = ListTablesRequest {
2641            id: Some(vec!["test_ns".to_string()]),
2642            ..Default::default()
2643        };
2644        let result = namespace.list_tables(list_req).await;
2645        assert!(result.is_ok());
2646        let tables = result.unwrap().tables;
2647        assert_eq!(tables.len(), 3);
2648        assert!(tables.contains(&"table1".to_string()));
2649        assert!(tables.contains(&"table2".to_string()));
2650        assert!(tables.contains(&"table3".to_string()));
2651    }
2652
2653    #[tokio::test]
2654    async fn test_drop_table_in_child_namespace() {
2655        let (namespace, _temp_dir) = create_test_namespace().await;
2656
2657        // Create child namespace
2658        let mut create_ns_req = CreateNamespaceRequest::new();
2659        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2660        namespace.create_namespace(create_ns_req).await.unwrap();
2661
2662        // Create table
2663        let schema = create_test_schema();
2664        let ipc_data = create_test_ipc_data(&schema);
2665        let mut create_table_req = CreateTableRequest::new();
2666        create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2667        namespace
2668            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2669            .await
2670            .unwrap();
2671
2672        // Drop table
2673        let mut drop_req = DropTableRequest::new();
2674        drop_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2675        let result = namespace.drop_table(drop_req).await;
2676        assert!(result.is_ok(), "Failed to drop table in child namespace");
2677
2678        // Verify table no longer exists
2679        let mut exists_req = TableExistsRequest::new();
2680        exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2681        let result = namespace.table_exists(exists_req).await;
2682        assert!(result.is_err());
2683    }
2684
2685    #[tokio::test]
2686    async fn test_deeply_nested_namespace() {
2687        let (namespace, _temp_dir) = create_test_namespace().await;
2688
2689        // Create deeply nested namespace hierarchy
2690        let mut create_req = CreateNamespaceRequest::new();
2691        create_req.id = Some(vec!["level1".to_string()]);
2692        namespace.create_namespace(create_req).await.unwrap();
2693
2694        let mut create_req = CreateNamespaceRequest::new();
2695        create_req.id = Some(vec!["level1".to_string(), "level2".to_string()]);
2696        namespace.create_namespace(create_req).await.unwrap();
2697
2698        let mut create_req = CreateNamespaceRequest::new();
2699        create_req.id = Some(vec![
2700            "level1".to_string(),
2701            "level2".to_string(),
2702            "level3".to_string(),
2703        ]);
2704        namespace.create_namespace(create_req).await.unwrap();
2705
2706        // Create table in deeply nested namespace
2707        let schema = create_test_schema();
2708        let ipc_data = create_test_ipc_data(&schema);
2709        let mut create_table_req = CreateTableRequest::new();
2710        create_table_req.id = Some(vec![
2711            "level1".to_string(),
2712            "level2".to_string(),
2713            "level3".to_string(),
2714            "table1".to_string(),
2715        ]);
2716        let result = namespace
2717            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2718            .await;
2719        assert!(
2720            result.is_ok(),
2721            "Failed to create table in deeply nested namespace"
2722        );
2723
2724        // Verify table exists
2725        let mut exists_req = TableExistsRequest::new();
2726        exists_req.id = Some(vec![
2727            "level1".to_string(),
2728            "level2".to_string(),
2729            "level3".to_string(),
2730            "table1".to_string(),
2731        ]);
2732        let result = namespace.table_exists(exists_req).await;
2733        assert!(result.is_ok());
2734    }
2735
2736    #[tokio::test]
2737    async fn test_namespace_with_properties() {
2738        let (namespace, _temp_dir) = create_test_namespace().await;
2739
2740        // Create namespace with properties
2741        let mut properties = HashMap::new();
2742        properties.insert("owner".to_string(), "test_user".to_string());
2743        properties.insert("description".to_string(), "Test namespace".to_string());
2744
2745        let mut create_req = CreateNamespaceRequest::new();
2746        create_req.id = Some(vec!["test_ns".to_string()]);
2747        create_req.properties = Some(properties.clone());
2748        namespace.create_namespace(create_req).await.unwrap();
2749
2750        // Describe namespace and verify properties
2751        let describe_req = DescribeNamespaceRequest {
2752            id: Some(vec!["test_ns".to_string()]),
2753            ..Default::default()
2754        };
2755        let result = namespace.describe_namespace(describe_req).await;
2756        assert!(result.is_ok());
2757        let response = result.unwrap();
2758        assert!(response.properties.is_some());
2759        let props = response.properties.unwrap();
2760        assert_eq!(props.get("owner"), Some(&"test_user".to_string()));
2761        assert_eq!(
2762            props.get("description"),
2763            Some(&"Test namespace".to_string())
2764        );
2765    }
2766
2767    #[tokio::test]
2768    async fn test_cannot_drop_namespace_with_tables() {
2769        let (namespace, _temp_dir) = create_test_namespace().await;
2770
2771        // Create namespace
2772        let mut create_ns_req = CreateNamespaceRequest::new();
2773        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2774        namespace.create_namespace(create_ns_req).await.unwrap();
2775
2776        // Create table in namespace
2777        let schema = create_test_schema();
2778        let ipc_data = create_test_ipc_data(&schema);
2779        let mut create_table_req = CreateTableRequest::new();
2780        create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2781        namespace
2782            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2783            .await
2784            .unwrap();
2785
2786        // Try to drop namespace - should fail
2787        let mut drop_req = DropNamespaceRequest::new();
2788        drop_req.id = Some(vec!["test_ns".to_string()]);
2789        let result = namespace.drop_namespace(drop_req).await;
2790        assert!(
2791            result.is_err(),
2792            "Should not be able to drop namespace with tables"
2793        );
2794    }
2795
2796    #[tokio::test]
2797    async fn test_isolation_between_namespaces() {
2798        let (namespace, _temp_dir) = create_test_namespace().await;
2799
2800        // Create two namespaces
2801        let mut create_req = CreateNamespaceRequest::new();
2802        create_req.id = Some(vec!["ns1".to_string()]);
2803        namespace.create_namespace(create_req).await.unwrap();
2804
2805        let mut create_req = CreateNamespaceRequest::new();
2806        create_req.id = Some(vec!["ns2".to_string()]);
2807        namespace.create_namespace(create_req).await.unwrap();
2808
2809        // Create table with same name in both namespaces
2810        let schema = create_test_schema();
2811        let ipc_data = create_test_ipc_data(&schema);
2812
2813        let mut create_table_req = CreateTableRequest::new();
2814        create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2815        namespace
2816            .create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
2817            .await
2818            .unwrap();
2819
2820        let mut create_table_req = CreateTableRequest::new();
2821        create_table_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
2822        namespace
2823            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2824            .await
2825            .unwrap();
2826
2827        // List tables in each namespace
2828        let list_req = ListTablesRequest {
2829            id: Some(vec!["ns1".to_string()]),
2830            page_token: None,
2831            limit: None,
2832            ..Default::default()
2833        };
2834        let result = namespace.list_tables(list_req).await.unwrap();
2835        assert_eq!(result.tables.len(), 1);
2836        assert_eq!(result.tables[0], "table1");
2837
2838        let list_req = ListTablesRequest {
2839            id: Some(vec!["ns2".to_string()]),
2840            page_token: None,
2841            limit: None,
2842            ..Default::default()
2843        };
2844        let result = namespace.list_tables(list_req).await.unwrap();
2845        assert_eq!(result.tables.len(), 1);
2846        assert_eq!(result.tables[0], "table1");
2847
2848        // Drop table in ns1 shouldn't affect ns2
2849        let mut drop_req = DropTableRequest::new();
2850        drop_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2851        namespace.drop_table(drop_req).await.unwrap();
2852
2853        // Verify ns1 table is gone but ns2 table still exists
2854        let mut exists_req = TableExistsRequest::new();
2855        exists_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2856        assert!(namespace.table_exists(exists_req).await.is_err());
2857
2858        let mut exists_req = TableExistsRequest::new();
2859        exists_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
2860        assert!(namespace.table_exists(exists_req).await.is_ok());
2861    }
2862
2863    #[tokio::test]
2864    async fn test_migrate_directory_tables() {
2865        let temp_dir = TempStdDir::default();
2866        let temp_path = temp_dir.to_str().unwrap();
2867
2868        // Step 1: Create tables in directory-only mode
2869        let dir_only_ns = DirectoryNamespaceBuilder::new(temp_path)
2870            .manifest_enabled(false)
2871            .dir_listing_enabled(true)
2872            .build()
2873            .await
2874            .unwrap();
2875
2876        // Create some tables
2877        let schema = create_test_schema();
2878        let ipc_data = create_test_ipc_data(&schema);
2879
2880        for i in 1..=3 {
2881            let mut create_req = CreateTableRequest::new();
2882            create_req.id = Some(vec![format!("table{}", i)]);
2883            dir_only_ns
2884                .create_table(create_req, bytes::Bytes::from(ipc_data.clone()))
2885                .await
2886                .unwrap();
2887        }
2888
2889        drop(dir_only_ns);
2890
2891        // Step 2: Create namespace with dual mode (manifest + directory listing)
2892        let dual_mode_ns = DirectoryNamespaceBuilder::new(temp_path)
2893            .manifest_enabled(true)
2894            .dir_listing_enabled(true)
2895            .build()
2896            .await
2897            .unwrap();
2898
2899        // Before migration, tables should be visible (via directory listing fallback)
2900        let mut list_req = ListTablesRequest::new();
2901        list_req.id = Some(vec![]);
2902        let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
2903        assert_eq!(tables.len(), 3);
2904
2905        // Run migration
2906        let migrated_count = dual_mode_ns.migrate().await.unwrap();
2907        assert_eq!(migrated_count, 3, "Should migrate all 3 tables");
2908
2909        // Verify tables are now in manifest
2910        let mut list_req = ListTablesRequest::new();
2911        list_req.id = Some(vec![]);
2912        let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
2913        assert_eq!(tables.len(), 3);
2914
2915        // Run migration again - should be idempotent
2916        let migrated_count = dual_mode_ns.migrate().await.unwrap();
2917        assert_eq!(
2918            migrated_count, 0,
2919            "Should not migrate already-migrated tables"
2920        );
2921
2922        drop(dual_mode_ns);
2923
2924        // Step 3: Create namespace with manifest-only mode
2925        let manifest_only_ns = DirectoryNamespaceBuilder::new(temp_path)
2926            .manifest_enabled(true)
2927            .dir_listing_enabled(false)
2928            .build()
2929            .await
2930            .unwrap();
2931
2932        // Tables should still be accessible (now from manifest only)
2933        let mut list_req = ListTablesRequest::new();
2934        list_req.id = Some(vec![]);
2935        let tables = manifest_only_ns.list_tables(list_req).await.unwrap().tables;
2936        assert_eq!(tables.len(), 3);
2937        assert!(tables.contains(&"table1".to_string()));
2938        assert!(tables.contains(&"table2".to_string()));
2939        assert!(tables.contains(&"table3".to_string()));
2940    }
2941
2942    #[tokio::test]
2943    async fn test_migrate_without_manifest() {
2944        let temp_dir = TempStdDir::default();
2945        let temp_path = temp_dir.to_str().unwrap();
2946
2947        // Create namespace without manifest
2948        let namespace = DirectoryNamespaceBuilder::new(temp_path)
2949            .manifest_enabled(false)
2950            .dir_listing_enabled(true)
2951            .build()
2952            .await
2953            .unwrap();
2954
2955        // migrate() should return 0 when manifest is not enabled
2956        let migrated_count = namespace.migrate().await.unwrap();
2957        assert_eq!(migrated_count, 0);
2958    }
2959
2960    #[tokio::test]
2961    async fn test_register_table() {
2962        use lance_namespace::models::{RegisterTableRequest, TableExistsRequest};
2963
2964        let temp_dir = TempStdDir::default();
2965        let temp_path = temp_dir.to_str().unwrap();
2966
2967        let namespace = DirectoryNamespaceBuilder::new(temp_path)
2968            .build()
2969            .await
2970            .unwrap();
2971
2972        // Create a physical table first using lance directly
2973        let schema = create_test_schema();
2974        let ipc_data = create_test_ipc_data(&schema);
2975
2976        let table_uri = format!("{}/external_table.lance", temp_path);
2977        let cursor = Cursor::new(ipc_data);
2978        let stream_reader = StreamReader::try_new(cursor, None).unwrap();
2979        let batches: Vec<_> = stream_reader
2980            .collect::<std::result::Result<Vec<_>, _>>()
2981            .unwrap();
2982        let schema = batches[0].schema();
2983        let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
2984        let reader = RecordBatchIterator::new(batch_results, schema);
2985        Dataset::write(Box::new(reader), &table_uri, None)
2986            .await
2987            .unwrap();
2988
2989        // Register the table
2990        let mut register_req = RegisterTableRequest::new("external_table.lance".to_string());
2991        register_req.id = Some(vec!["registered_table".to_string()]);
2992
2993        let response = namespace.register_table(register_req).await.unwrap();
2994        assert_eq!(response.location, Some("external_table.lance".to_string()));
2995
2996        // Verify table exists in namespace
2997        let mut exists_req = TableExistsRequest::new();
2998        exists_req.id = Some(vec!["registered_table".to_string()]);
2999        assert!(namespace.table_exists(exists_req).await.is_ok());
3000
3001        // Verify we can list the table
3002        let mut list_req = ListTablesRequest::new();
3003        list_req.id = Some(vec![]);
3004        let tables = namespace.list_tables(list_req).await.unwrap();
3005        assert!(tables.tables.contains(&"registered_table".to_string()));
3006    }
3007
3008    #[tokio::test]
3009    async fn test_register_table_duplicate_fails() {
3010        use lance_namespace::models::RegisterTableRequest;
3011
3012        let temp_dir = TempStdDir::default();
3013        let temp_path = temp_dir.to_str().unwrap();
3014
3015        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3016            .build()
3017            .await
3018            .unwrap();
3019
3020        // Register a table
3021        let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
3022        register_req.id = Some(vec!["test_table".to_string()]);
3023
3024        namespace
3025            .register_table(register_req.clone())
3026            .await
3027            .unwrap();
3028
3029        // Try to register again - should fail
3030        let result = namespace.register_table(register_req).await;
3031        assert!(result.is_err());
3032        assert!(result.unwrap_err().to_string().contains("already exists"));
3033    }
3034
3035    #[tokio::test]
3036    async fn test_deregister_table() {
3037        use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
3038
3039        let temp_dir = TempStdDir::default();
3040        let temp_path = temp_dir.to_str().unwrap();
3041
3042        // Create namespace with manifest-only mode (no directory listing fallback)
3043        // This ensures deregistered tables are truly invisible
3044        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3045            .manifest_enabled(true)
3046            .dir_listing_enabled(false)
3047            .build()
3048            .await
3049            .unwrap();
3050
3051        // Create a table
3052        let schema = create_test_schema();
3053        let ipc_data = create_test_ipc_data(&schema);
3054
3055        let mut create_req = CreateTableRequest::new();
3056        create_req.id = Some(vec!["test_table".to_string()]);
3057        namespace
3058            .create_table(create_req, bytes::Bytes::from(ipc_data))
3059            .await
3060            .unwrap();
3061
3062        // Verify table exists
3063        let mut exists_req = TableExistsRequest::new();
3064        exists_req.id = Some(vec!["test_table".to_string()]);
3065        assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
3066
3067        // Deregister the table
3068        let mut deregister_req = DeregisterTableRequest::new();
3069        deregister_req.id = Some(vec!["test_table".to_string()]);
3070        let response = namespace.deregister_table(deregister_req).await.unwrap();
3071
3072        // Should return location and id
3073        assert!(
3074            response.location.is_some(),
3075            "Deregister should return location"
3076        );
3077        let location = response.location.as_ref().unwrap();
3078        // Location should be a proper file:// URI with the temp path
3079        // Use uri_to_url to normalize the temp path to a URL for comparison
3080        let expected_url = lance_io::object_store::uri_to_url(temp_path)
3081            .expect("Failed to convert temp path to URL");
3082        let expected_prefix = expected_url.to_string();
3083        assert!(
3084            location.starts_with(&expected_prefix),
3085            "Location should start with '{}', got: {}",
3086            expected_prefix,
3087            location
3088        );
3089        assert!(
3090            location.contains("test_table"),
3091            "Location should contain table name: {}",
3092            location
3093        );
3094        assert_eq!(response.id, Some(vec!["test_table".to_string()]));
3095
3096        // Verify table no longer exists in namespace (removed from manifest)
3097        assert!(namespace.table_exists(exists_req).await.is_err());
3098
3099        // Verify physical data still exists at the returned location
3100        let dataset = Dataset::open(location).await;
3101        assert!(
3102            dataset.is_ok(),
3103            "Physical table data should still exist at {}",
3104            location
3105        );
3106    }
3107
3108    #[tokio::test]
3109    async fn test_deregister_table_in_child_namespace() {
3110        use lance_namespace::models::{
3111            CreateNamespaceRequest, DeregisterTableRequest, TableExistsRequest,
3112        };
3113
3114        let temp_dir = TempStdDir::default();
3115        let temp_path = temp_dir.to_str().unwrap();
3116
3117        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3118            .build()
3119            .await
3120            .unwrap();
3121
3122        // Create child namespace
3123        let mut create_ns_req = CreateNamespaceRequest::new();
3124        create_ns_req.id = Some(vec!["test_ns".to_string()]);
3125        namespace.create_namespace(create_ns_req).await.unwrap();
3126
3127        // Create a table in the child namespace
3128        let schema = create_test_schema();
3129        let ipc_data = create_test_ipc_data(&schema);
3130
3131        let mut create_req = CreateTableRequest::new();
3132        create_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
3133        namespace
3134            .create_table(create_req, bytes::Bytes::from(ipc_data))
3135            .await
3136            .unwrap();
3137
3138        // Deregister the table
3139        let mut deregister_req = DeregisterTableRequest::new();
3140        deregister_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
3141        let response = namespace.deregister_table(deregister_req).await.unwrap();
3142
3143        // Should return location and id in child namespace
3144        assert!(
3145            response.location.is_some(),
3146            "Deregister should return location"
3147        );
3148        let location = response.location.as_ref().unwrap();
3149        // Location should be a proper file:// URI with the temp path
3150        // Use uri_to_url to normalize the temp path to a URL for comparison
3151        let expected_url = lance_io::object_store::uri_to_url(temp_path)
3152            .expect("Failed to convert temp path to URL");
3153        let expected_prefix = expected_url.to_string();
3154        assert!(
3155            location.starts_with(&expected_prefix),
3156            "Location should start with '{}', got: {}",
3157            expected_prefix,
3158            location
3159        );
3160        assert!(
3161            location.contains("test_ns") && location.contains("test_table"),
3162            "Location should contain namespace and table name: {}",
3163            location
3164        );
3165        assert_eq!(
3166            response.id,
3167            Some(vec!["test_ns".to_string(), "test_table".to_string()])
3168        );
3169
3170        // Verify table no longer exists
3171        let mut exists_req = TableExistsRequest::new();
3172        exists_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
3173        assert!(namespace.table_exists(exists_req).await.is_err());
3174    }
3175
3176    #[tokio::test]
3177    async fn test_register_without_manifest_fails() {
3178        use lance_namespace::models::RegisterTableRequest;
3179
3180        let temp_dir = TempStdDir::default();
3181        let temp_path = temp_dir.to_str().unwrap();
3182
3183        // Create namespace without manifest
3184        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3185            .manifest_enabled(false)
3186            .build()
3187            .await
3188            .unwrap();
3189
3190        // Try to register - should fail (register requires manifest)
3191        let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
3192        register_req.id = Some(vec!["test_table".to_string()]);
3193        let result = namespace.register_table(register_req).await;
3194        assert!(result.is_err());
3195        assert!(
3196            result
3197                .unwrap_err()
3198                .to_string()
3199                .contains("manifest mode is enabled")
3200        );
3201
3202        // Note: deregister_table now works in V1 mode via .lance-deregistered marker files
3203        // See test_deregister_table_v1_mode for that test case
3204    }
3205
3206    #[tokio::test]
3207    async fn test_register_table_rejects_absolute_uri() {
3208        use lance_namespace::models::RegisterTableRequest;
3209
3210        let temp_dir = TempStdDir::default();
3211        let temp_path = temp_dir.to_str().unwrap();
3212
3213        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3214            .build()
3215            .await
3216            .unwrap();
3217
3218        // Try to register with absolute URI - should fail
3219        let mut register_req = RegisterTableRequest::new("s3://bucket/table.lance".to_string());
3220        register_req.id = Some(vec!["test_table".to_string()]);
3221        let result = namespace.register_table(register_req).await;
3222        assert!(result.is_err());
3223        let err_msg = result.unwrap_err().to_string();
3224        assert!(err_msg.contains("Absolute URIs are not allowed"));
3225    }
3226
3227    #[tokio::test]
3228    async fn test_register_table_rejects_absolute_path() {
3229        use lance_namespace::models::RegisterTableRequest;
3230
3231        let temp_dir = TempStdDir::default();
3232        let temp_path = temp_dir.to_str().unwrap();
3233
3234        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3235            .build()
3236            .await
3237            .unwrap();
3238
3239        // Try to register with absolute path - should fail
3240        let mut register_req = RegisterTableRequest::new("/tmp/table.lance".to_string());
3241        register_req.id = Some(vec!["test_table".to_string()]);
3242        let result = namespace.register_table(register_req).await;
3243        assert!(result.is_err());
3244        let err_msg = result.unwrap_err().to_string();
3245        assert!(err_msg.contains("Absolute paths are not allowed"));
3246    }
3247
3248    #[tokio::test]
3249    async fn test_register_table_rejects_path_traversal() {
3250        use lance_namespace::models::RegisterTableRequest;
3251
3252        let temp_dir = TempStdDir::default();
3253        let temp_path = temp_dir.to_str().unwrap();
3254
3255        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3256            .build()
3257            .await
3258            .unwrap();
3259
3260        // Try to register with path traversal - should fail
3261        let mut register_req = RegisterTableRequest::new("../outside/table.lance".to_string());
3262        register_req.id = Some(vec!["test_table".to_string()]);
3263        let result = namespace.register_table(register_req).await;
3264        assert!(result.is_err());
3265        let err_msg = result.unwrap_err().to_string();
3266        assert!(err_msg.contains("Path traversal is not allowed"));
3267    }
3268
3269    #[tokio::test]
3270    async fn test_namespace_write() {
3271        use arrow::array::Int32Array;
3272        use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
3273        use arrow::record_batch::{RecordBatch, RecordBatchIterator};
3274        use lance::dataset::{Dataset, WriteMode, WriteParams};
3275        use lance_namespace::LanceNamespace;
3276
3277        let (namespace, _temp_dir) = create_test_namespace().await;
3278        let namespace = Arc::new(namespace) as Arc<dyn LanceNamespace>;
3279
3280        // Use child namespace instead of root
3281        let table_id = vec!["test_ns".to_string(), "test_table".to_string()];
3282        let schema = Arc::new(ArrowSchema::new(vec![
3283            ArrowField::new("a", DataType::Int32, false),
3284            ArrowField::new("b", DataType::Int32, false),
3285        ]));
3286
3287        // Test 1: CREATE mode
3288        let data1 = RecordBatch::try_new(
3289            schema.clone(),
3290            vec![
3291                Arc::new(Int32Array::from(vec![1, 2, 3])),
3292                Arc::new(Int32Array::from(vec![10, 20, 30])),
3293            ],
3294        )
3295        .unwrap();
3296
3297        let reader1 = RecordBatchIterator::new(vec![data1].into_iter().map(Ok), schema.clone());
3298        let dataset =
3299            Dataset::write_into_namespace(reader1, namespace.clone(), table_id.clone(), None)
3300                .await
3301                .unwrap();
3302
3303        assert_eq!(dataset.count_rows(None).await.unwrap(), 3);
3304        assert_eq!(dataset.version().version, 1);
3305
3306        // Test 2: APPEND mode
3307        let data2 = RecordBatch::try_new(
3308            schema.clone(),
3309            vec![
3310                Arc::new(Int32Array::from(vec![4, 5])),
3311                Arc::new(Int32Array::from(vec![40, 50])),
3312            ],
3313        )
3314        .unwrap();
3315
3316        let params_append = WriteParams {
3317            mode: WriteMode::Append,
3318            ..Default::default()
3319        };
3320
3321        let reader2 = RecordBatchIterator::new(vec![data2].into_iter().map(Ok), schema.clone());
3322        let dataset = Dataset::write_into_namespace(
3323            reader2,
3324            namespace.clone(),
3325            table_id.clone(),
3326            Some(params_append),
3327        )
3328        .await
3329        .unwrap();
3330
3331        assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
3332        assert_eq!(dataset.version().version, 2);
3333
3334        // Test 3: OVERWRITE mode
3335        let data3 = RecordBatch::try_new(
3336            schema.clone(),
3337            vec![
3338                Arc::new(Int32Array::from(vec![100, 200])),
3339                Arc::new(Int32Array::from(vec![1000, 2000])),
3340            ],
3341        )
3342        .unwrap();
3343
3344        let params_overwrite = WriteParams {
3345            mode: WriteMode::Overwrite,
3346            ..Default::default()
3347        };
3348
3349        let reader3 = RecordBatchIterator::new(vec![data3].into_iter().map(Ok), schema.clone());
3350        let dataset = Dataset::write_into_namespace(
3351            reader3,
3352            namespace.clone(),
3353            table_id.clone(),
3354            Some(params_overwrite),
3355        )
3356        .await
3357        .unwrap();
3358
3359        assert_eq!(dataset.count_rows(None).await.unwrap(), 2);
3360        assert_eq!(dataset.version().version, 3);
3361
3362        // Verify old data was replaced
3363        let result = dataset.scan().try_into_batch().await.unwrap();
3364        let a_col = result
3365            .column_by_name("a")
3366            .unwrap()
3367            .as_any()
3368            .downcast_ref::<Int32Array>()
3369            .unwrap();
3370        assert_eq!(a_col.values(), &[100, 200]);
3371    }
3372
3373    // ============================================================
3374    // Tests for declare_table
3375    // ============================================================
3376
3377    #[tokio::test]
3378    async fn test_declare_table_v1_mode() {
3379        use lance_namespace::models::{
3380            DeclareTableRequest, DescribeTableRequest, TableExistsRequest,
3381        };
3382
3383        let temp_dir = TempStdDir::default();
3384        let temp_path = temp_dir.to_str().unwrap();
3385
3386        // Create namespace in V1 mode (no manifest)
3387        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3388            .manifest_enabled(false)
3389            .build()
3390            .await
3391            .unwrap();
3392
3393        // Declare a table
3394        let mut declare_req = DeclareTableRequest::new();
3395        declare_req.id = Some(vec!["test_table".to_string()]);
3396        let response = namespace.declare_table(declare_req).await.unwrap();
3397
3398        // Should return location
3399        assert!(response.location.is_some());
3400        let location = response.location.as_ref().unwrap();
3401        assert!(location.ends_with("test_table.lance"));
3402
3403        // Table should exist (via reserved file)
3404        let mut exists_req = TableExistsRequest::new();
3405        exists_req.id = Some(vec!["test_table".to_string()]);
3406        assert!(namespace.table_exists(exists_req).await.is_ok());
3407
3408        // Describe should work but return no version/schema (not written yet)
3409        let mut describe_req = DescribeTableRequest::new();
3410        describe_req.id = Some(vec!["test_table".to_string()]);
3411        let describe_response = namespace.describe_table(describe_req).await.unwrap();
3412        assert!(describe_response.location.is_some());
3413        assert!(describe_response.version.is_none()); // Not written yet
3414        assert!(describe_response.schema.is_none()); // Not written yet
3415    }
3416
3417    #[tokio::test]
3418    async fn test_declare_table_with_manifest() {
3419        use lance_namespace::models::{DeclareTableRequest, TableExistsRequest};
3420
3421        let temp_dir = TempStdDir::default();
3422        let temp_path = temp_dir.to_str().unwrap();
3423
3424        // Create namespace with manifest
3425        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3426            .manifest_enabled(true)
3427            .dir_listing_enabled(false)
3428            .build()
3429            .await
3430            .unwrap();
3431
3432        // Declare a table
3433        let mut declare_req = DeclareTableRequest::new();
3434        declare_req.id = Some(vec!["test_table".to_string()]);
3435        let response = namespace.declare_table(declare_req).await.unwrap();
3436
3437        // Should return location
3438        assert!(response.location.is_some());
3439
3440        // Table should exist in manifest
3441        let mut exists_req = TableExistsRequest::new();
3442        exists_req.id = Some(vec!["test_table".to_string()]);
3443        assert!(namespace.table_exists(exists_req).await.is_ok());
3444    }
3445
3446    #[tokio::test]
3447    async fn test_declare_table_when_table_exists() {
3448        use lance_namespace::models::DeclareTableRequest;
3449
3450        let temp_dir = TempStdDir::default();
3451        let temp_path = temp_dir.to_str().unwrap();
3452
3453        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3454            .manifest_enabled(false)
3455            .build()
3456            .await
3457            .unwrap();
3458
3459        // First create a table with actual data
3460        let schema = create_test_schema();
3461        let ipc_data = create_test_ipc_data(&schema);
3462        let mut create_req = CreateTableRequest::new();
3463        create_req.id = Some(vec!["test_table".to_string()]);
3464        namespace
3465            .create_table(create_req, bytes::Bytes::from(ipc_data))
3466            .await
3467            .unwrap();
3468
3469        // Try to declare the same table - should fail because it already has data
3470        let mut declare_req = DeclareTableRequest::new();
3471        declare_req.id = Some(vec!["test_table".to_string()]);
3472        let result = namespace.declare_table(declare_req).await;
3473        assert!(result.is_err());
3474    }
3475
3476    // ============================================================
3477    // Tests for deregister_table in V1 mode
3478    // ============================================================
3479
3480    #[tokio::test]
3481    async fn test_deregister_table_v1_mode() {
3482        use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
3483
3484        let temp_dir = TempStdDir::default();
3485        let temp_path = temp_dir.to_str().unwrap();
3486
3487        // Create namespace in V1 mode (no manifest, with dir listing)
3488        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3489            .manifest_enabled(false)
3490            .dir_listing_enabled(true)
3491            .build()
3492            .await
3493            .unwrap();
3494
3495        // Create a table with data
3496        let schema = create_test_schema();
3497        let ipc_data = create_test_ipc_data(&schema);
3498        let mut create_req = CreateTableRequest::new();
3499        create_req.id = Some(vec!["test_table".to_string()]);
3500        namespace
3501            .create_table(create_req, bytes::Bytes::from(ipc_data))
3502            .await
3503            .unwrap();
3504
3505        // Verify table exists
3506        let mut exists_req = TableExistsRequest::new();
3507        exists_req.id = Some(vec!["test_table".to_string()]);
3508        assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
3509
3510        // Deregister the table
3511        let mut deregister_req = DeregisterTableRequest::new();
3512        deregister_req.id = Some(vec!["test_table".to_string()]);
3513        let response = namespace.deregister_table(deregister_req).await.unwrap();
3514
3515        // Should return location
3516        assert!(response.location.is_some());
3517        let location = response.location.as_ref().unwrap();
3518        assert!(location.contains("test_table"));
3519
3520        // Table should no longer exist (deregistered)
3521        let result = namespace.table_exists(exists_req).await;
3522        assert!(result.is_err());
3523        assert!(result.unwrap_err().to_string().contains("deregistered"));
3524
3525        // Physical data should still exist
3526        let dataset = Dataset::open(location).await;
3527        assert!(dataset.is_ok(), "Physical table data should still exist");
3528    }
3529
3530    #[tokio::test]
3531    async fn test_deregister_table_v1_already_deregistered() {
3532        use lance_namespace::models::DeregisterTableRequest;
3533
3534        let temp_dir = TempStdDir::default();
3535        let temp_path = temp_dir.to_str().unwrap();
3536
3537        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3538            .manifest_enabled(false)
3539            .dir_listing_enabled(true)
3540            .build()
3541            .await
3542            .unwrap();
3543
3544        // Create a table
3545        let schema = create_test_schema();
3546        let ipc_data = create_test_ipc_data(&schema);
3547        let mut create_req = CreateTableRequest::new();
3548        create_req.id = Some(vec!["test_table".to_string()]);
3549        namespace
3550            .create_table(create_req, bytes::Bytes::from(ipc_data))
3551            .await
3552            .unwrap();
3553
3554        // Deregister once
3555        let mut deregister_req = DeregisterTableRequest::new();
3556        deregister_req.id = Some(vec!["test_table".to_string()]);
3557        namespace
3558            .deregister_table(deregister_req.clone())
3559            .await
3560            .unwrap();
3561
3562        // Try to deregister again - should fail
3563        let result = namespace.deregister_table(deregister_req).await;
3564        assert!(result.is_err());
3565        assert!(
3566            result
3567                .unwrap_err()
3568                .to_string()
3569                .contains("already deregistered")
3570        );
3571    }
3572
3573    // ============================================================
3574    // Tests for list_tables skipping deregistered tables
3575    // ============================================================
3576
3577    #[tokio::test]
3578    async fn test_list_tables_skips_deregistered_v1() {
3579        use lance_namespace::models::DeregisterTableRequest;
3580
3581        let temp_dir = TempStdDir::default();
3582        let temp_path = temp_dir.to_str().unwrap();
3583
3584        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3585            .manifest_enabled(false)
3586            .dir_listing_enabled(true)
3587            .build()
3588            .await
3589            .unwrap();
3590
3591        // Create two tables
3592        let schema = create_test_schema();
3593        let ipc_data = create_test_ipc_data(&schema);
3594
3595        let mut create_req1 = CreateTableRequest::new();
3596        create_req1.id = Some(vec!["table1".to_string()]);
3597        namespace
3598            .create_table(create_req1, bytes::Bytes::from(ipc_data.clone()))
3599            .await
3600            .unwrap();
3601
3602        let mut create_req2 = CreateTableRequest::new();
3603        create_req2.id = Some(vec!["table2".to_string()]);
3604        namespace
3605            .create_table(create_req2, bytes::Bytes::from(ipc_data))
3606            .await
3607            .unwrap();
3608
3609        // List tables - should see both (root namespace = empty vec)
3610        let mut list_req = ListTablesRequest::new();
3611        list_req.id = Some(vec![]);
3612        let list_response = namespace.list_tables(list_req.clone()).await.unwrap();
3613        assert_eq!(list_response.tables.len(), 2);
3614
3615        // Deregister table1
3616        let mut deregister_req = DeregisterTableRequest::new();
3617        deregister_req.id = Some(vec!["table1".to_string()]);
3618        namespace.deregister_table(deregister_req).await.unwrap();
3619
3620        // List tables - should only see table2
3621        let list_response = namespace.list_tables(list_req).await.unwrap();
3622        assert_eq!(list_response.tables.len(), 1);
3623        assert!(list_response.tables.contains(&"table2".to_string()));
3624        assert!(!list_response.tables.contains(&"table1".to_string()));
3625    }
3626
3627    // ============================================================
3628    // Tests for describe_table and table_exists with deregistered tables
3629    // ============================================================
3630
3631    #[tokio::test]
3632    async fn test_describe_table_fails_for_deregistered_v1() {
3633        use lance_namespace::models::{DeregisterTableRequest, DescribeTableRequest};
3634
3635        let temp_dir = TempStdDir::default();
3636        let temp_path = temp_dir.to_str().unwrap();
3637
3638        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3639            .manifest_enabled(false)
3640            .dir_listing_enabled(true)
3641            .build()
3642            .await
3643            .unwrap();
3644
3645        // Create a table
3646        let schema = create_test_schema();
3647        let ipc_data = create_test_ipc_data(&schema);
3648        let mut create_req = CreateTableRequest::new();
3649        create_req.id = Some(vec!["test_table".to_string()]);
3650        namespace
3651            .create_table(create_req, bytes::Bytes::from(ipc_data))
3652            .await
3653            .unwrap();
3654
3655        // Describe should work before deregistration
3656        let mut describe_req = DescribeTableRequest::new();
3657        describe_req.id = Some(vec!["test_table".to_string()]);
3658        assert!(namespace.describe_table(describe_req.clone()).await.is_ok());
3659
3660        // Deregister
3661        let mut deregister_req = DeregisterTableRequest::new();
3662        deregister_req.id = Some(vec!["test_table".to_string()]);
3663        namespace.deregister_table(deregister_req).await.unwrap();
3664
3665        // Describe should fail after deregistration
3666        let result = namespace.describe_table(describe_req).await;
3667        assert!(result.is_err());
3668        assert!(result.unwrap_err().to_string().contains("deregistered"));
3669    }
3670
3671    #[tokio::test]
3672    async fn test_table_exists_fails_for_deregistered_v1() {
3673        use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
3674
3675        let temp_dir = TempStdDir::default();
3676        let temp_path = temp_dir.to_str().unwrap();
3677
3678        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3679            .manifest_enabled(false)
3680            .dir_listing_enabled(true)
3681            .build()
3682            .await
3683            .unwrap();
3684
3685        // Create a table
3686        let schema = create_test_schema();
3687        let ipc_data = create_test_ipc_data(&schema);
3688        let mut create_req = CreateTableRequest::new();
3689        create_req.id = Some(vec!["test_table".to_string()]);
3690        namespace
3691            .create_table(create_req, bytes::Bytes::from(ipc_data))
3692            .await
3693            .unwrap();
3694
3695        // Table exists should work before deregistration
3696        let mut exists_req = TableExistsRequest::new();
3697        exists_req.id = Some(vec!["test_table".to_string()]);
3698        assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
3699
3700        // Deregister
3701        let mut deregister_req = DeregisterTableRequest::new();
3702        deregister_req.id = Some(vec!["test_table".to_string()]);
3703        namespace.deregister_table(deregister_req).await.unwrap();
3704
3705        // Table exists should fail after deregistration
3706        let result = namespace.table_exists(exists_req).await;
3707        assert!(result.is_err());
3708        assert!(result.unwrap_err().to_string().contains("deregistered"));
3709    }
3710
3711    #[tokio::test]
3712    async fn test_atomic_table_status_check() {
3713        // This test verifies that the TableStatus check is atomic
3714        // by ensuring a single directory listing is used
3715
3716        let temp_dir = TempStdDir::default();
3717        let temp_path = temp_dir.to_str().unwrap();
3718
3719        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3720            .manifest_enabled(false)
3721            .dir_listing_enabled(true)
3722            .build()
3723            .await
3724            .unwrap();
3725
3726        // Create a table
3727        let schema = create_test_schema();
3728        let ipc_data = create_test_ipc_data(&schema);
3729        let mut create_req = CreateTableRequest::new();
3730        create_req.id = Some(vec!["test_table".to_string()]);
3731        namespace
3732            .create_table(create_req, bytes::Bytes::from(ipc_data))
3733            .await
3734            .unwrap();
3735
3736        // Table status should show exists=true, is_deregistered=false
3737        let status = namespace.check_table_status("test_table").await;
3738        assert!(status.exists);
3739        assert!(!status.is_deregistered);
3740        assert!(!status.has_reserved_file);
3741    }
3742
3743    #[tokio::test]
3744    async fn test_table_version_tracking_enabled_managed_versioning() {
3745        use lance_namespace::models::DescribeTableRequest;
3746
3747        let temp_dir = TempStdDir::default();
3748        let temp_path = temp_dir.to_str().unwrap();
3749
3750        // Create namespace with table_version_tracking_enabled=true
3751        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3752            .table_version_tracking_enabled(true)
3753            .build()
3754            .await
3755            .unwrap();
3756
3757        // Create a table
3758        let schema = create_test_schema();
3759        let ipc_data = create_test_ipc_data(&schema);
3760        let mut create_req = CreateTableRequest::new();
3761        create_req.id = Some(vec!["test_table".to_string()]);
3762        namespace
3763            .create_table(create_req, bytes::Bytes::from(ipc_data))
3764            .await
3765            .unwrap();
3766
3767        // Describe table should return managed_versioning=true
3768        let mut describe_req = DescribeTableRequest::new();
3769        describe_req.id = Some(vec!["test_table".to_string()]);
3770        let describe_resp = namespace.describe_table(describe_req).await.unwrap();
3771
3772        // managed_versioning should be true
3773        assert_eq!(
3774            describe_resp.managed_versioning,
3775            Some(true),
3776            "managed_versioning should be true when table_version_tracking_enabled=true"
3777        );
3778    }
3779
3780    #[tokio::test]
3781    async fn test_table_version_tracking_disabled_no_managed_versioning() {
3782        use lance_namespace::models::DescribeTableRequest;
3783
3784        let temp_dir = TempStdDir::default();
3785        let temp_path = temp_dir.to_str().unwrap();
3786
3787        // Create namespace with table_version_tracking_enabled=false (default)
3788        let namespace = DirectoryNamespaceBuilder::new(temp_path)
3789            .table_version_tracking_enabled(false)
3790            .build()
3791            .await
3792            .unwrap();
3793
3794        // Create a table
3795        let schema = create_test_schema();
3796        let ipc_data = create_test_ipc_data(&schema);
3797        let mut create_req = CreateTableRequest::new();
3798        create_req.id = Some(vec!["test_table".to_string()]);
3799        namespace
3800            .create_table(create_req, bytes::Bytes::from(ipc_data))
3801            .await
3802            .unwrap();
3803
3804        // Describe table should not have managed_versioning set
3805        let mut describe_req = DescribeTableRequest::new();
3806        describe_req.id = Some(vec!["test_table".to_string()]);
3807        let describe_resp = namespace.describe_table(describe_req).await.unwrap();
3808
3809        // managed_versioning should be None when table_version_tracking_enabled=false
3810        assert!(
3811            describe_resp.managed_versioning.is_none(),
3812            "managed_versioning should be None when table_version_tracking_enabled=false, got: {:?}",
3813            describe_resp.managed_versioning
3814        );
3815    }
3816
3817    #[tokio::test]
3818    #[cfg(not(windows))]
3819    async fn test_list_table_versions() {
3820        use arrow::array::{Int32Array, RecordBatchIterator};
3821        use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
3822        use arrow::record_batch::RecordBatch;
3823        use lance::dataset::{Dataset, WriteMode, WriteParams};
3824        use lance_namespace::models::{CreateNamespaceRequest, ListTableVersionsRequest};
3825
3826        let temp_dir = TempStrDir::default();
3827        let temp_path: &str = &temp_dir;
3828
3829        let namespace: Arc<dyn LanceNamespace> = Arc::new(
3830            DirectoryNamespaceBuilder::new(temp_path)
3831                .table_version_tracking_enabled(true)
3832                .build()
3833                .await
3834                .unwrap(),
3835        );
3836
3837        // Create parent namespace first
3838        let mut create_ns_req = CreateNamespaceRequest::new();
3839        create_ns_req.id = Some(vec!["workspace".to_string()]);
3840        namespace.create_namespace(create_ns_req).await.unwrap();
3841
3842        // Create a table using write_into_namespace (version 1)
3843        let table_id = vec!["workspace".to_string(), "test_table".to_string()];
3844        let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
3845            "id",
3846            DataType::Int32,
3847            false,
3848        )]));
3849        let batch = RecordBatch::try_new(
3850            arrow_schema.clone(),
3851            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
3852        )
3853        .unwrap();
3854        let batches = RecordBatchIterator::new(vec![Ok(batch.clone())], arrow_schema.clone());
3855        let write_params = WriteParams {
3856            mode: WriteMode::Create,
3857            ..Default::default()
3858        };
3859        let mut dataset = Dataset::write_into_namespace(
3860            batches,
3861            namespace.clone(),
3862            table_id.clone(),
3863            Some(write_params),
3864        )
3865        .await
3866        .unwrap();
3867
3868        // Append to create version 2
3869        let batch2 = RecordBatch::try_new(
3870            arrow_schema.clone(),
3871            vec![Arc::new(Int32Array::from(vec![100, 200]))],
3872        )
3873        .unwrap();
3874        let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema.clone());
3875        dataset.append(batches, None).await.unwrap();
3876
3877        // Append to create version 3
3878        let batch3 = RecordBatch::try_new(
3879            arrow_schema.clone(),
3880            vec![Arc::new(Int32Array::from(vec![300, 400]))],
3881        )
3882        .unwrap();
3883        let batches = RecordBatchIterator::new(vec![Ok(batch3)], arrow_schema);
3884        dataset.append(batches, None).await.unwrap();
3885
3886        // List versions - should have versions 1, 2, and 3
3887        let mut list_req = ListTableVersionsRequest::new();
3888        list_req.id = Some(table_id.clone());
3889        let list_resp = namespace.list_table_versions(list_req).await.unwrap();
3890
3891        assert_eq!(
3892            list_resp.versions.len(),
3893            3,
3894            "Should have 3 versions, got: {:?}",
3895            list_resp.versions
3896        );
3897
3898        // Verify each version
3899        for expected_version in 1..=3 {
3900            let version = list_resp
3901                .versions
3902                .iter()
3903                .find(|v| v.version == expected_version)
3904                .unwrap_or_else(|| panic!("Expected version {}", expected_version));
3905
3906            assert!(
3907                !version.manifest_path.is_empty(),
3908                "manifest_path should be set for version {}",
3909                expected_version
3910            );
3911            assert!(
3912                version.manifest_path.contains(".manifest"),
3913                "manifest_path should contain .manifest for version {}",
3914                expected_version
3915            );
3916            assert!(
3917                version.manifest_size.is_some(),
3918                "manifest_size should be set for version {}",
3919                expected_version
3920            );
3921            assert!(
3922                version.manifest_size.unwrap() > 0,
3923                "manifest_size should be > 0 for version {}",
3924                expected_version
3925            );
3926            assert!(
3927                version.timestamp_millis.is_some(),
3928                "timestamp_millis should be set for version {}",
3929                expected_version
3930            );
3931        }
3932    }
3933
3934    #[tokio::test]
3935    #[cfg(not(windows))]
3936    async fn test_describe_table_version() {
3937        use arrow::array::{Int32Array, RecordBatchIterator};
3938        use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
3939        use arrow::record_batch::RecordBatch;
3940        use lance::dataset::{Dataset, WriteMode, WriteParams};
3941        use lance_namespace::models::{CreateNamespaceRequest, DescribeTableVersionRequest};
3942
3943        let temp_dir = TempStrDir::default();
3944        let temp_path: &str = &temp_dir;
3945
3946        let namespace: Arc<dyn LanceNamespace> = Arc::new(
3947            DirectoryNamespaceBuilder::new(temp_path)
3948                .table_version_tracking_enabled(true)
3949                .build()
3950                .await
3951                .unwrap(),
3952        );
3953
3954        // Create parent namespace first
3955        let mut create_ns_req = CreateNamespaceRequest::new();
3956        create_ns_req.id = Some(vec!["workspace".to_string()]);
3957        namespace.create_namespace(create_ns_req).await.unwrap();
3958
3959        // Create a table using write_into_namespace (version 1)
3960        let table_id = vec!["workspace".to_string(), "test_table".to_string()];
3961        let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
3962            "id",
3963            DataType::Int32,
3964            false,
3965        )]));
3966        let batch = RecordBatch::try_new(
3967            arrow_schema.clone(),
3968            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
3969        )
3970        .unwrap();
3971        let batches = RecordBatchIterator::new(vec![Ok(batch)], arrow_schema.clone());
3972        let write_params = WriteParams {
3973            mode: WriteMode::Create,
3974            ..Default::default()
3975        };
3976        let mut dataset = Dataset::write_into_namespace(
3977            batches,
3978            namespace.clone(),
3979            table_id.clone(),
3980            Some(write_params),
3981        )
3982        .await
3983        .unwrap();
3984
3985        // Append data to create version 2
3986        let batch2 = RecordBatch::try_new(
3987            arrow_schema.clone(),
3988            vec![Arc::new(Int32Array::from(vec![100, 200]))],
3989        )
3990        .unwrap();
3991        let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema);
3992        dataset.append(batches, None).await.unwrap();
3993
3994        // Describe version 1
3995        let mut describe_req = DescribeTableVersionRequest::new();
3996        describe_req.id = Some(table_id.clone());
3997        describe_req.version = Some(1);
3998        let describe_resp = namespace
3999            .describe_table_version(describe_req)
4000            .await
4001            .unwrap();
4002
4003        let version = &describe_resp.version;
4004        assert_eq!(version.version, 1);
4005        assert!(version.timestamp_millis.is_some());
4006        assert!(
4007            !version.manifest_path.is_empty(),
4008            "manifest_path should be set"
4009        );
4010        assert!(
4011            version.manifest_path.contains(".manifest"),
4012            "manifest_path should contain .manifest"
4013        );
4014        assert!(
4015            version.manifest_size.is_some(),
4016            "manifest_size should be set"
4017        );
4018        assert!(
4019            version.manifest_size.unwrap() > 0,
4020            "manifest_size should be > 0"
4021        );
4022
4023        // Describe version 2
4024        let mut describe_req = DescribeTableVersionRequest::new();
4025        describe_req.id = Some(table_id.clone());
4026        describe_req.version = Some(2);
4027        let describe_resp = namespace
4028            .describe_table_version(describe_req)
4029            .await
4030            .unwrap();
4031
4032        let version = &describe_resp.version;
4033        assert_eq!(version.version, 2);
4034        assert!(version.timestamp_millis.is_some());
4035        assert!(
4036            !version.manifest_path.is_empty(),
4037            "manifest_path should be set"
4038        );
4039        assert!(
4040            version.manifest_size.is_some(),
4041            "manifest_size should be set"
4042        );
4043        assert!(
4044            version.manifest_size.unwrap() > 0,
4045            "manifest_size should be > 0"
4046        );
4047    }
4048
4049    #[tokio::test]
4050    #[cfg(not(windows))]
4051    async fn test_describe_table_version_latest() {
4052        use arrow::array::{Int32Array, RecordBatchIterator};
4053        use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
4054        use arrow::record_batch::RecordBatch;
4055        use lance::dataset::{Dataset, WriteMode, WriteParams};
4056        use lance_namespace::models::{CreateNamespaceRequest, DescribeTableVersionRequest};
4057
4058        let temp_dir = TempStrDir::default();
4059        let temp_path: &str = &temp_dir;
4060
4061        let namespace: Arc<dyn LanceNamespace> = Arc::new(
4062            DirectoryNamespaceBuilder::new(temp_path)
4063                .table_version_tracking_enabled(true)
4064                .build()
4065                .await
4066                .unwrap(),
4067        );
4068
4069        // Create parent namespace first
4070        let mut create_ns_req = CreateNamespaceRequest::new();
4071        create_ns_req.id = Some(vec!["workspace".to_string()]);
4072        namespace.create_namespace(create_ns_req).await.unwrap();
4073
4074        // Create a table using write_into_namespace (version 1)
4075        let table_id = vec!["workspace".to_string(), "test_table".to_string()];
4076        let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
4077            "id",
4078            DataType::Int32,
4079            false,
4080        )]));
4081        let batch = RecordBatch::try_new(
4082            arrow_schema.clone(),
4083            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
4084        )
4085        .unwrap();
4086        let batches = RecordBatchIterator::new(vec![Ok(batch)], arrow_schema.clone());
4087        let write_params = WriteParams {
4088            mode: WriteMode::Create,
4089            ..Default::default()
4090        };
4091        let mut dataset = Dataset::write_into_namespace(
4092            batches,
4093            namespace.clone(),
4094            table_id.clone(),
4095            Some(write_params),
4096        )
4097        .await
4098        .unwrap();
4099
4100        // Append to create version 2
4101        let batch2 = RecordBatch::try_new(
4102            arrow_schema.clone(),
4103            vec![Arc::new(Int32Array::from(vec![100, 200]))],
4104        )
4105        .unwrap();
4106        let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema.clone());
4107        dataset.append(batches, None).await.unwrap();
4108
4109        // Append to create version 3
4110        let batch3 = RecordBatch::try_new(
4111            arrow_schema.clone(),
4112            vec![Arc::new(Int32Array::from(vec![300, 400]))],
4113        )
4114        .unwrap();
4115        let batches = RecordBatchIterator::new(vec![Ok(batch3)], arrow_schema);
4116        dataset.append(batches, None).await.unwrap();
4117
4118        // Describe latest version (no version specified)
4119        let mut describe_req = DescribeTableVersionRequest::new();
4120        describe_req.id = Some(table_id.clone());
4121        describe_req.version = None;
4122        let describe_resp = namespace
4123            .describe_table_version(describe_req)
4124            .await
4125            .unwrap();
4126
4127        // Should return version 3 as it's the latest
4128        assert_eq!(describe_resp.version.version, 3);
4129    }
4130
4131    #[tokio::test]
4132    #[cfg(not(windows))]
4133    async fn test_create_table_version() {
4134        use futures::TryStreamExt;
4135        use lance::dataset::builder::DatasetBuilder;
4136        use lance_namespace::models::CreateTableVersionRequest;
4137
4138        let temp_dir = TempStrDir::default();
4139        let temp_path: &str = &temp_dir;
4140
4141        let namespace: Arc<dyn LanceNamespace> = Arc::new(
4142            DirectoryNamespaceBuilder::new(temp_path)
4143                .table_version_tracking_enabled(true)
4144                .build()
4145                .await
4146                .unwrap(),
4147        );
4148
4149        // Create a table
4150        let schema = create_test_schema();
4151        let ipc_data = create_test_ipc_data(&schema);
4152        let mut create_req = CreateTableRequest::new();
4153        create_req.id = Some(vec!["test_table".to_string()]);
4154        namespace
4155            .create_table(create_req, bytes::Bytes::from(ipc_data))
4156            .await
4157            .unwrap();
4158
4159        // Open the dataset using from_namespace to get proper object_store and paths
4160        let table_id = vec!["test_table".to_string()];
4161        let dataset = DatasetBuilder::from_namespace(namespace.clone(), table_id.clone())
4162            .await
4163            .unwrap()
4164            .load()
4165            .await
4166            .unwrap();
4167
4168        // Use dataset's object_store to find and copy the manifest
4169        let versions_path = dataset.versions_dir();
4170        let manifest_metas: Vec<_> = dataset
4171            .object_store()
4172            .inner
4173            .list(Some(&versions_path))
4174            .try_collect()
4175            .await
4176            .unwrap();
4177
4178        let manifest_meta = manifest_metas
4179            .iter()
4180            .find(|m| {
4181                m.location
4182                    .filename()
4183                    .map(|f| f.ends_with(".manifest"))
4184                    .unwrap_or(false)
4185            })
4186            .expect("No manifest file found");
4187
4188        // Read the existing manifest data
4189        let manifest_data = dataset
4190            .object_store()
4191            .inner
4192            .get(&manifest_meta.location)
4193            .await
4194            .unwrap()
4195            .bytes()
4196            .await
4197            .unwrap();
4198
4199        // Write to a staging location using the dataset's object_store
4200        let staging_path = dataset.versions_dir().child("staging_manifest");
4201        dataset
4202            .object_store()
4203            .inner
4204            .put(&staging_path, manifest_data.into())
4205            .await
4206            .unwrap();
4207
4208        // Create version 2 from staging manifest
4209        // Use the same naming scheme as the existing dataset (V2)
4210        let mut create_version_req = CreateTableVersionRequest::new(2, staging_path.to_string());
4211        create_version_req.id = Some(table_id.clone());
4212        create_version_req.naming_scheme = Some("V2".to_string());
4213
4214        let result = namespace.create_table_version(create_version_req).await;
4215        assert!(
4216            result.is_ok(),
4217            "create_table_version should succeed: {:?}",
4218            result
4219        );
4220
4221        // Verify version 2 was created at the path returned in the response
4222        let response = result.unwrap();
4223        let version_info = response
4224            .version
4225            .expect("response should contain version info");
4226        let version_2_path = Path::from(version_info.manifest_path);
4227        let head_result = dataset.object_store().inner.head(&version_2_path).await;
4228        assert!(
4229            head_result.is_ok(),
4230            "Version 2 manifest should exist at {}",
4231            version_2_path
4232        );
4233
4234        // Verify the staging file has been deleted
4235        let staging_head_result = dataset.object_store().inner.head(&staging_path).await;
4236        assert!(
4237            staging_head_result.is_err(),
4238            "Staging manifest should have been deleted after create_table_version"
4239        );
4240    }
4241
4242    #[tokio::test]
4243    #[cfg(not(windows))]
4244    async fn test_create_table_version_conflict() {
4245        // create_table_version should fail if the version already exists.
4246        // Each version always writes to a new file location.
4247        use futures::TryStreamExt;
4248        use lance::dataset::builder::DatasetBuilder;
4249        use lance_namespace::models::CreateTableVersionRequest;
4250
4251        let temp_dir = TempStrDir::default();
4252        let temp_path: &str = &temp_dir;
4253
4254        let namespace: Arc<dyn LanceNamespace> = Arc::new(
4255            DirectoryNamespaceBuilder::new(temp_path)
4256                .table_version_tracking_enabled(true)
4257                .build()
4258                .await
4259                .unwrap(),
4260        );
4261
4262        // Create a table
4263        let schema = create_test_schema();
4264        let ipc_data = create_test_ipc_data(&schema);
4265        let mut create_req = CreateTableRequest::new();
4266        create_req.id = Some(vec!["test_table".to_string()]);
4267        namespace
4268            .create_table(create_req, bytes::Bytes::from(ipc_data))
4269            .await
4270            .unwrap();
4271
4272        // Open the dataset using from_namespace to get proper object_store and paths
4273        let table_id = vec!["test_table".to_string()];
4274        let dataset = DatasetBuilder::from_namespace(namespace.clone(), table_id.clone())
4275            .await
4276            .unwrap()
4277            .load()
4278            .await
4279            .unwrap();
4280
4281        // Use dataset's object_store to find and copy the manifest
4282        let versions_path = dataset.versions_dir();
4283        let manifest_metas: Vec<_> = dataset
4284            .object_store()
4285            .inner
4286            .list(Some(&versions_path))
4287            .try_collect()
4288            .await
4289            .unwrap();
4290
4291        let manifest_meta = manifest_metas
4292            .iter()
4293            .find(|m| {
4294                m.location
4295                    .filename()
4296                    .map(|f| f.ends_with(".manifest"))
4297                    .unwrap_or(false)
4298            })
4299            .expect("No manifest file found");
4300
4301        // Read the existing manifest data
4302        let manifest_data = dataset
4303            .object_store()
4304            .inner
4305            .get(&manifest_meta.location)
4306            .await
4307            .unwrap()
4308            .bytes()
4309            .await
4310            .unwrap();
4311
4312        // Write to a staging location using the dataset's object_store
4313        let staging_path = dataset.versions_dir().child("staging_manifest");
4314        dataset
4315            .object_store()
4316            .inner
4317            .put(&staging_path, manifest_data.into())
4318            .await
4319            .unwrap();
4320
4321        // First create version 2 (should succeed)
4322        let mut create_version_req = CreateTableVersionRequest::new(2, staging_path.to_string());
4323        create_version_req.id = Some(table_id.clone());
4324        create_version_req.naming_scheme = Some("V2".to_string());
4325        let first_result = namespace.create_table_version(create_version_req).await;
4326        assert!(
4327            first_result.is_ok(),
4328            "First create_table_version for version 2 should succeed: {:?}",
4329            first_result
4330        );
4331
4332        // Get the path from the response for verification
4333        let version_2_path = Path::from(
4334            first_result
4335                .unwrap()
4336                .version
4337                .expect("response should contain version info")
4338                .manifest_path,
4339        );
4340
4341        // Create version 2 again (should fail - conflict)
4342        let mut create_version_req = CreateTableVersionRequest::new(2, staging_path.to_string());
4343        create_version_req.id = Some(table_id.clone());
4344        create_version_req.naming_scheme = Some("V2".to_string());
4345
4346        let result = namespace.create_table_version(create_version_req).await;
4347        assert!(
4348            result.is_err(),
4349            "create_table_version should fail for existing version"
4350        );
4351
4352        // Verify version 2 still exists using the dataset's object_store
4353        let head_result = dataset.object_store().inner.head(&version_2_path).await;
4354        assert!(
4355            head_result.is_ok(),
4356            "Version 2 manifest should still exist at {}",
4357            version_2_path
4358        );
4359    }
4360
4361    #[tokio::test]
4362    async fn test_create_table_version_table_not_found() {
4363        use lance_namespace::models::CreateTableVersionRequest;
4364
4365        let temp_dir = TempStdDir::default();
4366        let temp_path = temp_dir.to_str().unwrap();
4367
4368        let namespace = DirectoryNamespaceBuilder::new(temp_path)
4369            .table_version_tracking_enabled(true)
4370            .build()
4371            .await
4372            .unwrap();
4373
4374        // Try to create version for non-existent table
4375        let mut create_version_req =
4376            CreateTableVersionRequest::new(1, "/some/staging/path".to_string());
4377        create_version_req.id = Some(vec!["non_existent_table".to_string()]);
4378
4379        let result = namespace.create_table_version(create_version_req).await;
4380        assert!(
4381            result.is_err(),
4382            "create_table_version should fail for non-existent table"
4383        );
4384        let err_msg = result.unwrap_err().to_string();
4385        assert!(
4386            err_msg.contains("does not exist"),
4387            "Error should mention table does not exist, got: {}",
4388            err_msg
4389        );
4390    }
4391
4392    /// End-to-end integration test module for table version tracking.
4393    mod e2e_table_version_tracking {
4394        use super::*;
4395        use std::sync::atomic::{AtomicUsize, Ordering};
4396
4397        /// Tracking wrapper around a namespace that counts method invocations.
4398        struct TrackingNamespace {
4399            inner: DirectoryNamespace,
4400            create_table_version_count: AtomicUsize,
4401            describe_table_version_count: AtomicUsize,
4402            list_table_versions_count: AtomicUsize,
4403        }
4404
4405        impl TrackingNamespace {
4406            fn new(inner: DirectoryNamespace) -> Self {
4407                Self {
4408                    inner,
4409                    create_table_version_count: AtomicUsize::new(0),
4410                    describe_table_version_count: AtomicUsize::new(0),
4411                    list_table_versions_count: AtomicUsize::new(0),
4412                }
4413            }
4414
4415            fn create_table_version_calls(&self) -> usize {
4416                self.create_table_version_count.load(Ordering::SeqCst)
4417            }
4418
4419            fn describe_table_version_calls(&self) -> usize {
4420                self.describe_table_version_count.load(Ordering::SeqCst)
4421            }
4422
4423            fn list_table_versions_calls(&self) -> usize {
4424                self.list_table_versions_count.load(Ordering::SeqCst)
4425            }
4426        }
4427
4428        impl std::fmt::Debug for TrackingNamespace {
4429            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4430                f.debug_struct("TrackingNamespace")
4431                    .field(
4432                        "create_table_version_calls",
4433                        &self.create_table_version_calls(),
4434                    )
4435                    .finish()
4436            }
4437        }
4438
4439        #[async_trait]
4440        impl LanceNamespace for TrackingNamespace {
4441            async fn create_namespace(
4442                &self,
4443                request: CreateNamespaceRequest,
4444            ) -> Result<CreateNamespaceResponse> {
4445                self.inner.create_namespace(request).await
4446            }
4447
4448            async fn describe_namespace(
4449                &self,
4450                request: DescribeNamespaceRequest,
4451            ) -> Result<DescribeNamespaceResponse> {
4452                self.inner.describe_namespace(request).await
4453            }
4454
4455            async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
4456                self.inner.namespace_exists(request).await
4457            }
4458
4459            async fn list_namespaces(
4460                &self,
4461                request: ListNamespacesRequest,
4462            ) -> Result<ListNamespacesResponse> {
4463                self.inner.list_namespaces(request).await
4464            }
4465
4466            async fn drop_namespace(
4467                &self,
4468                request: DropNamespaceRequest,
4469            ) -> Result<DropNamespaceResponse> {
4470                self.inner.drop_namespace(request).await
4471            }
4472
4473            async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
4474                self.inner.list_tables(request).await
4475            }
4476
4477            async fn describe_table(
4478                &self,
4479                request: DescribeTableRequest,
4480            ) -> Result<DescribeTableResponse> {
4481                self.inner.describe_table(request).await
4482            }
4483
4484            async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
4485                self.inner.table_exists(request).await
4486            }
4487
4488            async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
4489                self.inner.drop_table(request).await
4490            }
4491
4492            async fn create_table(
4493                &self,
4494                request: CreateTableRequest,
4495                request_data: Bytes,
4496            ) -> Result<CreateTableResponse> {
4497                self.inner.create_table(request, request_data).await
4498            }
4499
4500            async fn declare_table(
4501                &self,
4502                request: DeclareTableRequest,
4503            ) -> Result<DeclareTableResponse> {
4504                self.inner.declare_table(request).await
4505            }
4506
4507            async fn list_table_versions(
4508                &self,
4509                request: ListTableVersionsRequest,
4510            ) -> Result<ListTableVersionsResponse> {
4511                self.list_table_versions_count
4512                    .fetch_add(1, Ordering::SeqCst);
4513                self.inner.list_table_versions(request).await
4514            }
4515
4516            async fn create_table_version(
4517                &self,
4518                request: CreateTableVersionRequest,
4519            ) -> Result<CreateTableVersionResponse> {
4520                self.create_table_version_count
4521                    .fetch_add(1, Ordering::SeqCst);
4522                self.inner.create_table_version(request).await
4523            }
4524
4525            async fn describe_table_version(
4526                &self,
4527                request: DescribeTableVersionRequest,
4528            ) -> Result<DescribeTableVersionResponse> {
4529                self.describe_table_version_count
4530                    .fetch_add(1, Ordering::SeqCst);
4531                self.inner.describe_table_version(request).await
4532            }
4533
4534            async fn batch_delete_table_versions(
4535                &self,
4536                request: BatchDeleteTableVersionsRequest,
4537            ) -> Result<BatchDeleteTableVersionsResponse> {
4538                self.inner.batch_delete_table_versions(request).await
4539            }
4540
4541            fn namespace_id(&self) -> String {
4542                self.inner.namespace_id()
4543            }
4544        }
4545
4546        #[tokio::test]
4547        async fn test_describe_table_returns_managed_versioning() {
4548            use lance_namespace::models::{CreateNamespaceRequest, DescribeTableRequest};
4549
4550            let temp_dir = TempStdDir::default();
4551            let temp_path = temp_dir.to_str().unwrap();
4552
4553            // Create namespace with table_version_tracking_enabled and manifest_enabled
4554            let ns = DirectoryNamespaceBuilder::new(temp_path)
4555                .table_version_tracking_enabled(true)
4556                .manifest_enabled(true)
4557                .build()
4558                .await
4559                .unwrap();
4560
4561            // Create parent namespace
4562            let mut create_ns_req = CreateNamespaceRequest::new();
4563            create_ns_req.id = Some(vec!["workspace".to_string()]);
4564            ns.create_namespace(create_ns_req).await.unwrap();
4565
4566            // Create a table with multi-level ID (namespace + table)
4567            let schema = create_test_schema();
4568            let ipc_data = create_test_ipc_data(&schema);
4569            let mut create_req = CreateTableRequest::new();
4570            create_req.id = Some(vec!["workspace".to_string(), "test_table".to_string()]);
4571            ns.create_table(create_req, bytes::Bytes::from(ipc_data))
4572                .await
4573                .unwrap();
4574
4575            // Describe table should return managed_versioning=true
4576            let mut describe_req = DescribeTableRequest::new();
4577            describe_req.id = Some(vec!["workspace".to_string(), "test_table".to_string()]);
4578            let describe_resp = ns.describe_table(describe_req).await.unwrap();
4579
4580            // managed_versioning should be true
4581            assert_eq!(
4582                describe_resp.managed_versioning,
4583                Some(true),
4584                "managed_versioning should be true when table_version_tracking_enabled=true"
4585            );
4586        }
4587
4588        #[tokio::test]
4589        #[cfg(not(windows))]
4590        async fn test_external_manifest_store_invokes_namespace_apis() {
4591            use arrow::array::{Int32Array, StringArray};
4592            use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
4593            use arrow::record_batch::RecordBatch;
4594            use lance::Dataset;
4595            use lance::dataset::builder::DatasetBuilder;
4596            use lance::dataset::{WriteMode, WriteParams};
4597            use lance_namespace::models::CreateNamespaceRequest;
4598
4599            let temp_dir = TempStdDir::default();
4600            let temp_path = temp_dir.to_str().unwrap();
4601
4602            // Create namespace with table_version_tracking_enabled and manifest_enabled
4603            let inner_ns = DirectoryNamespaceBuilder::new(temp_path)
4604                .table_version_tracking_enabled(true)
4605                .manifest_enabled(true)
4606                .build()
4607                .await
4608                .unwrap();
4609
4610            let tracking_ns = Arc::new(TrackingNamespace::new(inner_ns));
4611            let ns: Arc<dyn LanceNamespace> = tracking_ns.clone();
4612
4613            // Create parent namespace
4614            let mut create_ns_req = CreateNamespaceRequest::new();
4615            create_ns_req.id = Some(vec!["workspace".to_string()]);
4616            ns.create_namespace(create_ns_req).await.unwrap();
4617
4618            // Create a table with multi-level ID (namespace + table)
4619            let table_id = vec!["workspace".to_string(), "test_table".to_string()];
4620
4621            // Create some initial data
4622            let arrow_schema = Arc::new(ArrowSchema::new(vec![
4623                Field::new("id", DataType::Int32, false),
4624                Field::new("name", DataType::Utf8, true),
4625            ]));
4626            let batch = RecordBatch::try_new(
4627                arrow_schema.clone(),
4628                vec![
4629                    Arc::new(Int32Array::from(vec![1, 2, 3])),
4630                    Arc::new(StringArray::from(vec!["a", "b", "c"])),
4631                ],
4632            )
4633            .unwrap();
4634
4635            // Create a table using write_into_namespace
4636            let batches = RecordBatchIterator::new(vec![Ok(batch.clone())], arrow_schema.clone());
4637            let write_params = WriteParams {
4638                mode: WriteMode::Create,
4639                ..Default::default()
4640            };
4641            let mut dataset = Dataset::write_into_namespace(
4642                batches,
4643                ns.clone(),
4644                table_id.clone(),
4645                Some(write_params),
4646            )
4647            .await
4648            .unwrap();
4649            assert_eq!(dataset.version().version, 1);
4650
4651            // Verify create_table_version was called once during initial write_into_namespace
4652            assert_eq!(
4653                tracking_ns.create_table_version_calls(),
4654                1,
4655                "create_table_version should have been called once during initial write_into_namespace"
4656            );
4657
4658            // Append data - this should call create_table_version again
4659            let append_batch = RecordBatch::try_new(
4660                arrow_schema.clone(),
4661                vec![
4662                    Arc::new(Int32Array::from(vec![4, 5, 6])),
4663                    Arc::new(StringArray::from(vec!["d", "e", "f"])),
4664                ],
4665            )
4666            .unwrap();
4667            let append_batches = RecordBatchIterator::new(vec![Ok(append_batch)], arrow_schema);
4668            dataset.append(append_batches, None).await.unwrap();
4669
4670            assert_eq!(
4671                tracking_ns.create_table_version_calls(),
4672                2,
4673                "create_table_version should have been called twice (once for create, once for append)"
4674            );
4675
4676            // checkout_latest should call list_table_versions exactly once
4677            let initial_list_calls = tracking_ns.list_table_versions_calls();
4678            let latest_dataset = DatasetBuilder::from_namespace(ns.clone(), table_id.clone())
4679                .await
4680                .unwrap()
4681                .load()
4682                .await
4683                .unwrap();
4684            assert_eq!(latest_dataset.version().version, 2);
4685            assert_eq!(
4686                tracking_ns.list_table_versions_calls(),
4687                initial_list_calls + 1,
4688                "list_table_versions should have been called exactly once during checkout_latest"
4689            );
4690
4691            // checkout to specific version should call describe_table_version exactly once
4692            let initial_describe_calls = tracking_ns.describe_table_version_calls();
4693            let v1_dataset = DatasetBuilder::from_namespace(ns.clone(), table_id.clone())
4694                .await
4695                .unwrap()
4696                .with_version(1)
4697                .load()
4698                .await
4699                .unwrap();
4700            assert_eq!(v1_dataset.version().version, 1);
4701            assert_eq!(
4702                tracking_ns.describe_table_version_calls(),
4703                initial_describe_calls + 1,
4704                "describe_table_version should have been called exactly once during checkout to version 1"
4705            );
4706        }
4707
4708        #[tokio::test]
4709        #[cfg(not(windows))]
4710        async fn test_dataset_commit_with_external_manifest_store() {
4711            use arrow::array::{Int32Array, StringArray};
4712            use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
4713            use arrow::record_batch::RecordBatch;
4714            use futures::TryStreamExt;
4715            use lance::dataset::{Dataset, WriteMode, WriteParams};
4716            use lance_namespace::models::CreateNamespaceRequest;
4717            use lance_table::io::commit::ManifestNamingScheme;
4718
4719            let temp_dir = TempStdDir::default();
4720            let temp_path = temp_dir.to_str().unwrap();
4721
4722            // Create namespace with table_version_tracking_enabled and manifest_enabled
4723            let inner_ns = DirectoryNamespaceBuilder::new(temp_path)
4724                .table_version_tracking_enabled(true)
4725                .manifest_enabled(true)
4726                .build()
4727                .await
4728                .unwrap();
4729
4730            let tracking_ns: Arc<dyn LanceNamespace> = Arc::new(TrackingNamespace::new(inner_ns));
4731
4732            // Create parent namespace
4733            let mut create_ns_req = CreateNamespaceRequest::new();
4734            create_ns_req.id = Some(vec!["workspace".to_string()]);
4735            tracking_ns.create_namespace(create_ns_req).await.unwrap();
4736
4737            // Create a table using write_into_namespace
4738            let table_id = vec!["workspace".to_string(), "test_table".to_string()];
4739            let arrow_schema = Arc::new(ArrowSchema::new(vec![
4740                Field::new("id", DataType::Int32, false),
4741                Field::new("name", DataType::Utf8, true),
4742            ]));
4743            let batch = RecordBatch::try_new(
4744                arrow_schema.clone(),
4745                vec![
4746                    Arc::new(Int32Array::from(vec![1, 2, 3])),
4747                    Arc::new(StringArray::from(vec!["a", "b", "c"])),
4748                ],
4749            )
4750            .unwrap();
4751            let batches = RecordBatchIterator::new(vec![Ok(batch)], arrow_schema.clone());
4752            let write_params = WriteParams {
4753                mode: WriteMode::Create,
4754                ..Default::default()
4755            };
4756            let dataset = Dataset::write_into_namespace(
4757                batches,
4758                tracking_ns.clone(),
4759                table_id.clone(),
4760                Some(write_params),
4761            )
4762            .await
4763            .unwrap();
4764            assert_eq!(dataset.version().version, 1);
4765
4766            // Append data using write_into_namespace (APPEND mode)
4767            let batch2 = RecordBatch::try_new(
4768                arrow_schema.clone(),
4769                vec![
4770                    Arc::new(Int32Array::from(vec![4, 5, 6])),
4771                    Arc::new(StringArray::from(vec!["d", "e", "f"])),
4772                ],
4773            )
4774            .unwrap();
4775            let batches = RecordBatchIterator::new(vec![Ok(batch2)], arrow_schema);
4776            let write_params = WriteParams {
4777                mode: WriteMode::Append,
4778                ..Default::default()
4779            };
4780            Dataset::write_into_namespace(
4781                batches,
4782                tracking_ns.clone(),
4783                table_id.clone(),
4784                Some(write_params),
4785            )
4786            .await
4787            .unwrap();
4788
4789            // Verify version 2 was created using the dataset's object_store
4790            // List manifests in the versions directory to find the V2 named manifest
4791            let manifest_metas: Vec<_> = dataset
4792                .object_store()
4793                .inner
4794                .list(Some(&dataset.versions_dir()))
4795                .try_collect()
4796                .await
4797                .unwrap();
4798            let version_2_found = manifest_metas.iter().any(|m| {
4799                m.location
4800                    .filename()
4801                    .map(|f| {
4802                        f.ends_with(".manifest")
4803                            && ManifestNamingScheme::V2.parse_version(f) == Some(2)
4804                    })
4805                    .unwrap_or(false)
4806            });
4807            assert!(
4808                version_2_found,
4809                "Version 2 manifest should exist in versions directory"
4810            );
4811        }
4812    }
4813}