lance_namespace_impls/
dir.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Directory-based Lance Namespace implementation.
5//!
6//! This module provides a directory-based implementation of the Lance namespace
7//! that stores tables as Lance datasets in a filesystem directory structure.
8
9pub mod manifest;
10
11use arrow::record_batch::RecordBatchIterator;
12use arrow_ipc::reader::StreamReader;
13use async_trait::async_trait;
14use bytes::Bytes;
15use lance::dataset::{Dataset, WriteParams};
16use lance::session::Session;
17use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
18use object_store::path::Path;
19use std::collections::HashMap;
20use std::io::Cursor;
21use std::sync::Arc;
22
23use lance_namespace::models::{
24    CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
25    CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DescribeNamespaceRequest,
26    DescribeNamespaceResponse, DescribeTableRequest, DescribeTableResponse, DropNamespaceRequest,
27    DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest,
28    ListNamespacesResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest,
29    TableExistsRequest,
30};
31
32use lance_core::{box_error, Error, Result};
33use lance_namespace::schema::arrow_schema_to_json;
34use lance_namespace::LanceNamespace;
35
36/// Builder for creating a DirectoryNamespace.
37///
38/// This builder provides a fluent API for configuring and establishing
39/// connections to directory-based Lance namespaces.
40///
41/// # Examples
42///
43/// ```no_run
44/// # use lance_namespace_impls::DirectoryNamespaceBuilder;
45/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
46/// // Create a local directory namespace
47/// let namespace = DirectoryNamespaceBuilder::new("/path/to/data")
48///     .build()
49///     .await?;
50/// # Ok(())
51/// # }
52/// ```
53///
54/// ```no_run
55/// # use lance_namespace_impls::DirectoryNamespaceBuilder;
56/// # use lance::session::Session;
57/// # use std::sync::Arc;
58/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
59/// // Create with custom storage options and session
60/// let session = Arc::new(Session::default());
61/// let namespace = DirectoryNamespaceBuilder::new("s3://bucket/path")
62///     .storage_option("region", "us-west-2")
63///     .storage_option("access_key_id", "key")
64///     .session(session)
65///     .build()
66///     .await?;
67/// # Ok(())
68/// # }
69/// ```
70#[derive(Debug, Clone)]
71pub struct DirectoryNamespaceBuilder {
72    root: String,
73    storage_options: Option<HashMap<String, String>>,
74    session: Option<Arc<Session>>,
75    manifest_enabled: bool,
76    dir_listing_enabled: bool,
77    inline_optimization_enabled: bool,
78}
79
80impl DirectoryNamespaceBuilder {
81    /// Create a new DirectoryNamespaceBuilder with the specified root path.
82    ///
83    /// # Arguments
84    ///
85    /// * `root` - Root directory path (local path or cloud URI like s3://bucket/path)
86    pub fn new(root: impl Into<String>) -> Self {
87        Self {
88            root: root.into().trim_end_matches('/').to_string(),
89            storage_options: None,
90            session: None,
91            manifest_enabled: true,
92            dir_listing_enabled: true, // Default to enabled for backwards compatibility
93            inline_optimization_enabled: true,
94        }
95    }
96
97    /// Enable or disable manifest-based listing.
98    ///
99    /// When enabled (default), the namespace uses a `__manifest` table to track tables.
100    /// When disabled, relies solely on directory scanning.
101    pub fn manifest_enabled(mut self, enabled: bool) -> Self {
102        self.manifest_enabled = enabled;
103        self
104    }
105
106    /// Enable or disable directory-based listing fallback.
107    ///
108    /// When enabled (default), falls back to directory scanning for tables not in the manifest.
109    /// When disabled, only consults the manifest table.
110    pub fn dir_listing_enabled(mut self, enabled: bool) -> Self {
111        self.dir_listing_enabled = enabled;
112        self
113    }
114
115    /// Enable or disable inline optimization of the __manifest table.
116    ///
117    /// When enabled (default), performs compaction and indexing on the __manifest table
118    /// after every write operation to maintain optimal performance.
119    /// When disabled, manual optimization must be performed separately.
120    pub fn inline_optimization_enabled(mut self, enabled: bool) -> Self {
121        self.inline_optimization_enabled = enabled;
122        self
123    }
124
125    /// Create a DirectoryNamespaceBuilder from properties HashMap.
126    ///
127    /// This method parses a properties map into builder configuration.
128    /// It expects:
129    /// - `root`: The root directory path (required)
130    /// - `manifest_enabled`: Enable manifest-based table tracking (optional, default: true)
131    /// - `dir_listing_enabled`: Enable directory listing for table discovery (optional, default: true)
132    /// - `inline_optimization_enabled`: Enable inline optimization of __manifest table (optional, default: true)
133    /// - `storage.*`: Storage options (optional, prefix will be stripped)
134    ///
135    /// # Arguments
136    ///
137    /// * `properties` - Configuration properties
138    /// * `session` - Optional Lance session to reuse object store registry
139    ///
140    /// # Returns
141    ///
142    /// Returns a `DirectoryNamespaceBuilder` instance.
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if the `root` property is missing.
147    ///
148    /// # Examples
149    ///
150    /// ```no_run
151    /// # use lance_namespace_impls::DirectoryNamespaceBuilder;
152    /// # use std::collections::HashMap;
153    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
154    /// let mut properties = HashMap::new();
155    /// properties.insert("root".to_string(), "/path/to/data".to_string());
156    /// properties.insert("manifest_enabled".to_string(), "true".to_string());
157    /// properties.insert("dir_listing_enabled".to_string(), "false".to_string());
158    /// properties.insert("storage.region".to_string(), "us-west-2".to_string());
159    ///
160    /// let namespace = DirectoryNamespaceBuilder::from_properties(properties, None)?
161    ///     .build()
162    ///     .await?;
163    /// # Ok(())
164    /// # }
165    /// ```
166    pub fn from_properties(
167        properties: HashMap<String, String>,
168        session: Option<Arc<Session>>,
169    ) -> Result<Self> {
170        // Extract root from properties (required)
171        let root = properties
172            .get("root")
173            .cloned()
174            .ok_or_else(|| Error::Namespace {
175                source: "Missing required property 'root' for directory namespace".into(),
176                location: snafu::location!(),
177            })?;
178
179        // Extract storage options (properties prefixed with "storage.")
180        let storage_options: HashMap<String, String> = properties
181            .iter()
182            .filter_map(|(k, v)| {
183                k.strip_prefix("storage.")
184                    .map(|key| (key.to_string(), v.clone()))
185            })
186            .collect();
187
188        let storage_options = if storage_options.is_empty() {
189            None
190        } else {
191            Some(storage_options)
192        };
193
194        // Extract manifest_enabled (default: true)
195        let manifest_enabled = properties
196            .get("manifest_enabled")
197            .and_then(|v| v.parse::<bool>().ok())
198            .unwrap_or(true);
199
200        // Extract dir_listing_enabled (default: true)
201        let dir_listing_enabled = properties
202            .get("dir_listing_enabled")
203            .and_then(|v| v.parse::<bool>().ok())
204            .unwrap_or(true);
205
206        // Extract inline_optimization_enabled (default: true)
207        let inline_optimization_enabled = properties
208            .get("inline_optimization_enabled")
209            .and_then(|v| v.parse::<bool>().ok())
210            .unwrap_or(true);
211
212        Ok(Self {
213            root: root.trim_end_matches('/').to_string(),
214            storage_options,
215            session,
216            manifest_enabled,
217            dir_listing_enabled,
218            inline_optimization_enabled,
219        })
220    }
221
222    /// Add a storage option.
223    ///
224    /// # Arguments
225    ///
226    /// * `key` - Storage option key (e.g., "region", "access_key_id")
227    /// * `value` - Storage option value
228    pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
229        self.storage_options
230            .get_or_insert_with(HashMap::new)
231            .insert(key.into(), value.into());
232        self
233    }
234
235    /// Add multiple storage options.
236    ///
237    /// # Arguments
238    ///
239    /// * `options` - HashMap of storage options to add
240    pub fn storage_options(mut self, options: HashMap<String, String>) -> Self {
241        self.storage_options
242            .get_or_insert_with(HashMap::new)
243            .extend(options);
244        self
245    }
246
247    /// Set the Lance session to use for this namespace.
248    ///
249    /// When a session is provided, the namespace will reuse the session's
250    /// object store registry, allowing multiple namespaces and datasets
251    /// to share the same underlying storage connections.
252    ///
253    /// # Arguments
254    ///
255    /// * `session` - Arc-wrapped Lance session
256    pub fn session(mut self, session: Arc<Session>) -> Self {
257        self.session = Some(session);
258        self
259    }
260
261    /// Build the DirectoryNamespace.
262    ///
263    /// # Returns
264    ///
265    /// Returns a `DirectoryNamespace` instance.
266    ///
267    /// # Errors
268    ///
269    /// Returns an error if:
270    /// - The root path is invalid
271    /// - Connection to the storage backend fails
272    /// - Storage options are invalid
273    pub async fn build(self) -> Result<DirectoryNamespace> {
274        let (object_store, base_path) =
275            Self::initialize_object_store(&self.root, &self.storage_options, &self.session).await?;
276
277        let manifest_ns = if self.manifest_enabled {
278            match manifest::ManifestNamespace::from_directory(
279                self.root.clone(),
280                self.storage_options.clone(),
281                self.session.clone(),
282                object_store.clone(),
283                base_path.clone(),
284                self.dir_listing_enabled,
285                self.inline_optimization_enabled,
286            )
287            .await
288            {
289                Ok(ns) => Some(Arc::new(ns)),
290                Err(e) => {
291                    // Failed to initialize manifest namespace, fall back to directory listing only
292                    log::warn!(
293                        "Failed to initialize manifest namespace, falling back to directory listing only: {}",
294                        e
295                    );
296                    None
297                }
298            }
299        } else {
300            None
301        };
302
303        Ok(DirectoryNamespace {
304            root: self.root,
305            storage_options: self.storage_options,
306            session: self.session,
307            object_store,
308            base_path,
309            manifest_ns,
310            dir_listing_enabled: self.dir_listing_enabled,
311        })
312    }
313
314    /// Initialize the Lance ObjectStore based on the configuration
315    async fn initialize_object_store(
316        root: &str,
317        storage_options: &Option<HashMap<String, String>>,
318        session: &Option<Arc<Session>>,
319    ) -> Result<(Arc<ObjectStore>, Path)> {
320        // Build ObjectStoreParams from storage options
321        let params = ObjectStoreParams {
322            storage_options: storage_options.clone(),
323            ..Default::default()
324        };
325
326        // Use object store registry from session if provided, otherwise create a new one
327        let registry = if let Some(session) = session {
328            session.store_registry()
329        } else {
330            Arc::new(ObjectStoreRegistry::default())
331        };
332
333        // Use Lance's object store factory to create from URI
334        let (object_store, base_path) = ObjectStore::from_uri_and_params(registry, root, &params)
335            .await
336            .map_err(|e| Error::Namespace {
337                source: format!("Failed to create object store: {}", e).into(),
338                location: snafu::location!(),
339            })?;
340
341        Ok((object_store, base_path))
342    }
343}
344
345/// Directory-based implementation of Lance Namespace.
346///
347/// This implementation stores tables as Lance datasets in a directory structure.
348/// It supports local filesystems and cloud storage backends through Lance's object store.
349///
350/// ## Manifest-based Listing
351///
352/// When `manifest_enabled=true`, the namespace uses a special `__manifest` Lance table to track tables
353/// instead of scanning the filesystem. This provides:
354/// - Better performance for listing operations
355/// - Ability to track table metadata
356/// - Foundation for future features like namespaces and table renaming
357///
358/// When `dir_listing_enabled=true`, the namespace falls back to directory scanning for tables not
359/// found in the manifest, enabling gradual migration.
360pub struct DirectoryNamespace {
361    root: String,
362    storage_options: Option<HashMap<String, String>>,
363    #[allow(dead_code)]
364    session: Option<Arc<Session>>,
365    object_store: Arc<ObjectStore>,
366    base_path: Path,
367    manifest_ns: Option<Arc<manifest::ManifestNamespace>>,
368    dir_listing_enabled: bool,
369}
370
371impl std::fmt::Debug for DirectoryNamespace {
372    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
373        write!(f, "{}", self.namespace_id())
374    }
375}
376
377impl std::fmt::Display for DirectoryNamespace {
378    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
379        write!(f, "{}", self.namespace_id())
380    }
381}
382
383impl DirectoryNamespace {
384    /// Apply pagination to a list of table names
385    ///
386    /// Sorts the list alphabetically and applies pagination using page_token (start_after) and limit.
387    ///
388    /// # Arguments
389    /// * `names` - The vector of table names to paginate
390    /// * `page_token` - Skip items until finding one greater than this value (start_after semantics)
391    /// * `limit` - Maximum number of items to keep
392    fn apply_pagination(names: &mut Vec<String>, page_token: Option<String>, limit: Option<i32>) {
393        // Sort alphabetically for consistent ordering
394        names.sort();
395
396        // Apply page_token filtering (start_after semantics)
397        if let Some(start_after) = page_token {
398            if let Some(index) = names
399                .iter()
400                .position(|name| name.as_str() > start_after.as_str())
401            {
402                names.drain(0..index);
403            } else {
404                names.clear();
405            }
406        }
407
408        // Apply limit
409        if let Some(limit) = limit {
410            if limit >= 0 {
411                names.truncate(limit as usize);
412            }
413        }
414    }
415
416    /// List tables using directory scanning (fallback method)
417    async fn list_directory_tables(&self) -> Result<Vec<String>> {
418        let mut tables = Vec::new();
419        let entries = self
420            .object_store
421            .read_dir(self.base_path.clone())
422            .await
423            .map_err(|e| Error::IO {
424                source: box_error(std::io::Error::other(format!(
425                    "Failed to list directory: {}",
426                    e
427                ))),
428                location: snafu::location!(),
429            })?;
430
431        for entry in entries {
432            let path = entry.trim_end_matches('/');
433            if !path.ends_with(".lance") {
434                continue;
435            }
436
437            let table_name = &path[..path.len() - 6];
438            tables.push(table_name.to_string());
439        }
440
441        Ok(tables)
442    }
443
444    /// Validate that the namespace ID represents the root namespace
445    fn validate_root_namespace_id(id: &Option<Vec<String>>) -> Result<()> {
446        if let Some(id) = id {
447            if !id.is_empty() {
448                return Err(Error::Namespace {
449                    source: format!(
450                        "Directory namespace only supports root namespace operations, but got namespace ID: {:?}. Expected empty ID.",
451                        id
452                    ).into(),
453                    location: snafu::location!(),
454                });
455            }
456        }
457        Ok(())
458    }
459
460    /// Extract table name from table ID
461    fn table_name_from_id(id: &Option<Vec<String>>) -> Result<String> {
462        let id = id.as_ref().ok_or_else(|| Error::Namespace {
463            source: "Directory namespace table ID cannot be empty".into(),
464            location: snafu::location!(),
465        })?;
466
467        if id.len() != 1 {
468            return Err(Error::Namespace {
469                source: format!(
470                    "Multi-level table IDs are only supported when manifest mode is enabled, but got: {:?}",
471                    id
472                )
473                .into(),
474                location: snafu::location!(),
475            });
476        }
477
478        Ok(id[0].clone())
479    }
480
481    /// Get the full URI path for a table (for returning in responses)
482    fn table_full_uri(&self, table_name: &str) -> String {
483        format!("{}/{}.lance", &self.root, table_name)
484    }
485
486    /// Get the object store path for a table (relative to base_path)
487    fn table_path(&self, table_name: &str) -> Path {
488        self.base_path
489            .child(format!("{}.lance", table_name).as_str())
490    }
491
492    /// Get the reserved file path for a table
493    fn table_reserved_file_path(&self, table_name: &str) -> Path {
494        self.base_path
495            .child(format!("{}.lance", table_name).as_str())
496            .child(".lance-reserved")
497    }
498
499    /// Migrate directory-based tables to the manifest.
500    ///
501    /// This is a one-time migration operation that:
502    /// 1. Scans the directory for existing `.lance` tables
503    /// 2. Registers any unmigrated tables in the manifest
504    /// 3. Returns the count of tables that were migrated
505    ///
506    /// This method is safe to run multiple times - it will skip tables that are already
507    /// registered in the manifest.
508    ///
509    /// # Usage
510    ///
511    /// After creating tables in directory-only mode or dual mode, you can migrate them
512    /// to the manifest to enable manifest-only mode:
513    ///
514    /// ```no_run
515    /// # use lance_namespace_impls::DirectoryNamespaceBuilder;
516    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
517    /// // Create namespace with dual mode (manifest + directory listing)
518    /// let namespace = DirectoryNamespaceBuilder::new("/path/to/data")
519    ///     .manifest_enabled(true)
520    ///     .dir_listing_enabled(true)
521    ///     .build()
522    ///     .await?;
523    ///
524    /// // ... tables are created and used ...
525    ///
526    /// // Migrate existing directory tables to manifest
527    /// let migrated_count = namespace.migrate().await?;
528    /// println!("Migrated {} tables", migrated_count);
529    ///
530    /// // Now you can disable directory listing for better performance:
531    /// // (requires rebuilding the namespace)
532    /// let namespace = DirectoryNamespaceBuilder::new("/path/to/data")
533    ///     .manifest_enabled(true)
534    ///     .dir_listing_enabled(false)  // All tables now in manifest
535    ///     .build()
536    ///     .await?;
537    /// # Ok(())
538    /// # }
539    /// ```
540    ///
541    /// # Returns
542    ///
543    /// Returns the number of tables that were migrated to the manifest.
544    ///
545    /// # Errors
546    ///
547    /// Returns an error if:
548    /// - Manifest is not enabled
549    /// - Directory listing fails
550    /// - Manifest registration fails
551    pub async fn migrate(&self) -> Result<usize> {
552        // We only care about tables in the root namespace
553        let Some(ref manifest_ns) = self.manifest_ns else {
554            return Ok(0); // No manifest, nothing to migrate
555        };
556
557        // Get all table locations already in the manifest
558        let manifest_locations = manifest_ns.list_manifest_table_locations().await?;
559
560        // Get all tables from directory
561        let dir_tables = self.list_directory_tables().await?;
562
563        // Register each directory table that doesn't have an overlapping location
564        // If a directory name already exists in the manifest,
565        // that means the table must have already been migrated or created
566        // in the manifest, so we can skip it.
567        let mut migrated_count = 0;
568        for table_name in dir_tables {
569            // For root namespace tables, the directory name is "table_name.lance"
570            let dir_name = format!("{}.lance", table_name);
571            if !manifest_locations.contains(&dir_name) {
572                manifest_ns.register_table(&table_name, dir_name).await?;
573                migrated_count += 1;
574            }
575        }
576
577        Ok(migrated_count)
578    }
579}
580
581#[async_trait]
582impl LanceNamespace for DirectoryNamespace {
583    async fn list_namespaces(
584        &self,
585        request: ListNamespacesRequest,
586    ) -> Result<ListNamespacesResponse> {
587        if let Some(ref manifest_ns) = self.manifest_ns {
588            return manifest_ns.list_namespaces(request).await;
589        }
590
591        Self::validate_root_namespace_id(&request.id)?;
592        Ok(ListNamespacesResponse::new(vec![]))
593    }
594
595    async fn describe_namespace(
596        &self,
597        request: DescribeNamespaceRequest,
598    ) -> Result<DescribeNamespaceResponse> {
599        if let Some(ref manifest_ns) = self.manifest_ns {
600            return manifest_ns.describe_namespace(request).await;
601        }
602
603        Self::validate_root_namespace_id(&request.id)?;
604        Ok(DescribeNamespaceResponse {
605            properties: Some(HashMap::new()),
606        })
607    }
608
609    async fn create_namespace(
610        &self,
611        request: CreateNamespaceRequest,
612    ) -> Result<CreateNamespaceResponse> {
613        if let Some(ref manifest_ns) = self.manifest_ns {
614            return manifest_ns.create_namespace(request).await;
615        }
616
617        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
618            return Err(Error::Namespace {
619                source: "Root namespace already exists and cannot be created".into(),
620                location: snafu::location!(),
621            });
622        }
623
624        Err(Error::NotSupported {
625            source: "Child namespaces are only supported when manifest mode is enabled".into(),
626            location: snafu::location!(),
627        })
628    }
629
630    async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
631        if let Some(ref manifest_ns) = self.manifest_ns {
632            return manifest_ns.drop_namespace(request).await;
633        }
634
635        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
636            return Err(Error::Namespace {
637                source: "Root namespace cannot be dropped".into(),
638                location: snafu::location!(),
639            });
640        }
641
642        Err(Error::NotSupported {
643            source: "Child namespaces are only supported when manifest mode is enabled".into(),
644            location: snafu::location!(),
645        })
646    }
647
648    async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
649        if let Some(ref manifest_ns) = self.manifest_ns {
650            return manifest_ns.namespace_exists(request).await;
651        }
652
653        if request.id.is_none() || request.id.as_ref().unwrap().is_empty() {
654            return Ok(());
655        }
656
657        Err(Error::Namespace {
658            source: "Child namespaces are only supported when manifest mode is enabled".into(),
659            location: snafu::location!(),
660        })
661    }
662
663    async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
664        // Validate that namespace ID is provided
665        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
666            source: "Namespace ID is required".into(),
667            location: snafu::location!(),
668        })?;
669
670        // For child namespaces, always delegate to manifest (if enabled)
671        if !namespace_id.is_empty() {
672            if let Some(ref manifest_ns) = self.manifest_ns {
673                return manifest_ns.list_tables(request).await;
674            }
675            return Err(Error::NotSupported {
676                source: "Child namespaces are only supported when manifest mode is enabled".into(),
677                location: snafu::location!(),
678            });
679        }
680
681        // When only manifest is enabled (no directory listing), delegate directly to manifest
682        if let Some(ref manifest_ns) = self.manifest_ns {
683            if !self.dir_listing_enabled {
684                return manifest_ns.list_tables(request).await;
685            }
686        }
687
688        // When both manifest and directory listing are enabled, we need to merge and deduplicate
689        let mut tables = if self.manifest_ns.is_some() && self.dir_listing_enabled {
690            // Get all manifest table locations (for deduplication)
691            let manifest_locations = if let Some(ref manifest_ns) = self.manifest_ns {
692                manifest_ns.list_manifest_table_locations().await?
693            } else {
694                std::collections::HashSet::new()
695            };
696
697            // Get all manifest tables (without pagination for merging)
698            let mut manifest_request = request.clone();
699            manifest_request.limit = None;
700            manifest_request.page_token = None;
701            let manifest_tables = if let Some(ref manifest_ns) = self.manifest_ns {
702                let manifest_response = manifest_ns.list_tables(manifest_request).await?;
703                manifest_response.tables
704            } else {
705                vec![]
706            };
707
708            // Start with all manifest table names
709            // Add directory tables that aren't already in the manifest (by location)
710            let mut all_tables: Vec<String> = manifest_tables;
711            let dir_tables = self.list_directory_tables().await?;
712            for table_name in dir_tables {
713                // Check if this table's location is already in the manifest
714                // Manifest stores full URIs, so we need to check both formats
715                let full_location = format!("{}/{}.lance", self.root, table_name);
716                let relative_location = format!("{}.lance", table_name);
717                if !manifest_locations.contains(&full_location)
718                    && !manifest_locations.contains(&relative_location)
719                {
720                    all_tables.push(table_name);
721                }
722            }
723
724            all_tables
725        } else {
726            self.list_directory_tables().await?
727        };
728
729        // Apply sorting and pagination
730        Self::apply_pagination(&mut tables, request.page_token, request.limit);
731        let response = ListTablesResponse::new(tables);
732        Ok(response)
733    }
734
735    async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
736        if let Some(ref manifest_ns) = self.manifest_ns {
737            match manifest_ns.describe_table(request.clone()).await {
738                Ok(response) => return Ok(response),
739                Err(_)
740                    if self.dir_listing_enabled
741                        && request.id.as_ref().is_some_and(|id| id.len() == 1) =>
742                {
743                    // Fall through to directory check only for single-level IDs
744                }
745                Err(e) => return Err(e),
746            }
747        }
748
749        let table_name = Self::table_name_from_id(&request.id)?;
750        let table_uri = self.table_full_uri(&table_name);
751
752        let table_path = self.table_path(&table_name);
753        let dir_exists = self
754            .object_store
755            .read_dir(table_path)
756            .await
757            .map(|entries| !entries.is_empty())
758            .unwrap_or(false);
759
760        if !dir_exists {
761            return Err(Error::Namespace {
762                source: format!("Table does not exist: {}", table_name).into(),
763                location: snafu::location!(),
764            });
765        }
766
767        // Try to load the dataset to get real information
768        match Dataset::open(&table_uri).await {
769            Ok(mut dataset) => {
770                // If a specific version is requested, checkout that version
771                if let Some(requested_version) = request.version {
772                    dataset = dataset.checkout_version(requested_version as u64).await?;
773                }
774
775                let version = dataset.version().version;
776                let lance_schema = dataset.schema();
777                let arrow_schema: arrow_schema::Schema = lance_schema.into();
778                let json_schema = arrow_schema_to_json(&arrow_schema)?;
779                Ok(DescribeTableResponse {
780                    version: Some(version as i64),
781                    location: Some(table_uri),
782                    schema: Some(Box::new(json_schema)),
783                    properties: None,
784                    storage_options: self.storage_options.clone(),
785                })
786            }
787            Err(err) => {
788                let reserved_file_path = self.table_reserved_file_path(&table_name);
789                if self
790                    .object_store
791                    .exists(&reserved_file_path)
792                    .await
793                    .unwrap_or(false)
794                {
795                    Ok(DescribeTableResponse {
796                        version: None,
797                        location: Some(table_uri),
798                        schema: None,
799                        properties: None,
800                        storage_options: self.storage_options.clone(),
801                    })
802                } else {
803                    Err(Error::Namespace {
804                        source: format!(
805                            "Table directory exists but cannot load dataset {}: {:?}",
806                            table_name, err
807                        )
808                        .into(),
809                        location: snafu::location!(),
810                    })
811                }
812            }
813        }
814    }
815
816    async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
817        if let Some(ref manifest_ns) = self.manifest_ns {
818            match manifest_ns.table_exists(request.clone()).await {
819                Ok(()) => return Ok(()),
820                Err(_) if self.dir_listing_enabled => {
821                    // Fall through to directory check
822                }
823                Err(e) => return Err(e),
824            }
825        }
826
827        let table_name = Self::table_name_from_id(&request.id)?;
828        let table_path = self.table_path(&table_name);
829        let table_exists = self
830            .object_store
831            .read_dir(table_path)
832            .await
833            .map(|entries| !entries.is_empty())
834            .unwrap_or(false);
835
836        if !table_exists {
837            return Err(Error::Namespace {
838                source: format!("Table does not exist: {}", table_name).into(),
839                location: snafu::location!(),
840            });
841        }
842
843        Ok(())
844    }
845
846    async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
847        if let Some(ref manifest_ns) = self.manifest_ns {
848            return manifest_ns.drop_table(request).await;
849        }
850
851        let table_name = Self::table_name_from_id(&request.id)?;
852        let table_uri = self.table_full_uri(&table_name);
853        let table_path = self.table_path(&table_name);
854
855        self.object_store
856            .remove_dir_all(table_path)
857            .await
858            .map_err(|e| Error::Namespace {
859                source: format!("Failed to drop table {}: {}", table_name, e).into(),
860                location: snafu::location!(),
861            })?;
862
863        Ok(DropTableResponse {
864            id: request.id,
865            location: Some(table_uri),
866            properties: None,
867            transaction_id: None,
868        })
869    }
870
871    async fn create_table(
872        &self,
873        request: CreateTableRequest,
874        request_data: Bytes,
875    ) -> Result<CreateTableResponse> {
876        if let Some(ref manifest_ns) = self.manifest_ns {
877            return manifest_ns.create_table(request, request_data).await;
878        }
879
880        let table_name = Self::table_name_from_id(&request.id)?;
881        let table_uri = self.table_full_uri(&table_name);
882        if request_data.is_empty() {
883            return Err(Error::Namespace {
884                source: "Request data (Arrow IPC stream) is required for create_table".into(),
885                location: snafu::location!(),
886            });
887        }
888
889        // Validate location if provided
890        if let Some(location) = &request.location {
891            let location = location.trim_end_matches('/');
892            if location != table_uri {
893                return Err(Error::Namespace {
894                    source: format!(
895                        "Cannot create table {} at location {}, must be at location {}",
896                        table_name, location, table_uri
897                    )
898                    .into(),
899                    location: snafu::location!(),
900                });
901            }
902        }
903
904        // Parse the Arrow IPC stream from request_data
905        let cursor = Cursor::new(request_data.to_vec());
906        let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Namespace {
907            source: format!("Invalid Arrow IPC stream: {}", e).into(),
908            location: snafu::location!(),
909        })?;
910        let arrow_schema = stream_reader.schema();
911
912        // Collect all batches from the stream
913        let mut batches = Vec::new();
914        for batch_result in stream_reader {
915            batches.push(batch_result.map_err(|e| Error::Namespace {
916                source: format!("Failed to read batch from IPC stream: {}", e).into(),
917                location: snafu::location!(),
918            })?);
919        }
920
921        // Create RecordBatchReader from the batches
922        let reader = if batches.is_empty() {
923            let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
924            let batches = vec![Ok(batch)];
925            RecordBatchIterator::new(batches, arrow_schema.clone())
926        } else {
927            let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
928            RecordBatchIterator::new(batch_results, arrow_schema)
929        };
930
931        let store_params = self.storage_options.as_ref().map(|opts| ObjectStoreParams {
932            storage_options: Some(opts.clone()),
933            ..Default::default()
934        });
935
936        let write_params = WriteParams {
937            mode: lance::dataset::WriteMode::Create,
938            store_params,
939            ..Default::default()
940        };
941
942        // Create the Lance dataset using the actual Lance API
943        Dataset::write(reader, &table_uri, Some(write_params))
944            .await
945            .map_err(|e| Error::Namespace {
946                source: format!("Failed to create Lance dataset: {}", e).into(),
947                location: snafu::location!(),
948            })?;
949
950        Ok(CreateTableResponse {
951            version: Some(1),
952            location: Some(table_uri),
953            properties: None,
954            storage_options: self.storage_options.clone(),
955        })
956    }
957
958    async fn create_empty_table(
959        &self,
960        request: CreateEmptyTableRequest,
961    ) -> Result<CreateEmptyTableResponse> {
962        if let Some(ref manifest_ns) = self.manifest_ns {
963            return manifest_ns.create_empty_table(request).await;
964        }
965
966        let table_name = Self::table_name_from_id(&request.id)?;
967        let table_uri = self.table_full_uri(&table_name);
968
969        // Validate location if provided
970        if let Some(location) = &request.location {
971            let location = location.trim_end_matches('/');
972            if location != table_uri {
973                return Err(Error::Namespace {
974                    source: format!(
975                        "Cannot create table {} at location {}, must be at location {}",
976                        table_name, location, table_uri
977                    )
978                    .into(),
979                    location: snafu::location!(),
980                });
981            }
982        }
983
984        // Create the .lance-reserved file to mark the table as existing
985        let reserved_file_path = self.table_reserved_file_path(&table_name);
986
987        self.object_store
988            .create(&reserved_file_path)
989            .await
990            .map_err(|e| Error::Namespace {
991                source: format!(
992                    "Failed to create .lance-reserved file for table {}: {}",
993                    table_name, e
994                )
995                .into(),
996                location: snafu::location!(),
997            })?
998            .shutdown()
999            .await
1000            .map_err(|e| Error::Namespace {
1001                source: format!(
1002                    "Failed to finalize .lance-reserved file for table {}: {}",
1003                    table_name, e
1004                )
1005                .into(),
1006                location: snafu::location!(),
1007            })?;
1008
1009        Ok(CreateEmptyTableResponse {
1010            location: Some(table_uri),
1011            properties: None,
1012            storage_options: self.storage_options.clone(),
1013        })
1014    }
1015
1016    async fn register_table(
1017        &self,
1018        request: lance_namespace::models::RegisterTableRequest,
1019    ) -> Result<lance_namespace::models::RegisterTableResponse> {
1020        // If manifest is enabled, delegate to manifest namespace
1021        if let Some(ref manifest_ns) = self.manifest_ns {
1022            return LanceNamespace::register_table(manifest_ns.as_ref(), request).await;
1023        }
1024
1025        // Without manifest, register_table is not supported
1026        Err(Error::NotSupported {
1027            source: "register_table is only supported when manifest mode is enabled".into(),
1028            location: snafu::location!(),
1029        })
1030    }
1031
1032    async fn deregister_table(
1033        &self,
1034        request: lance_namespace::models::DeregisterTableRequest,
1035    ) -> Result<lance_namespace::models::DeregisterTableResponse> {
1036        // If manifest is enabled, delegate to manifest namespace
1037        if let Some(ref manifest_ns) = self.manifest_ns {
1038            return LanceNamespace::deregister_table(manifest_ns.as_ref(), request).await;
1039        }
1040
1041        // Without manifest, deregister_table is not supported
1042        Err(Error::NotSupported {
1043            source: "deregister_table is only supported when manifest mode is enabled".into(),
1044            location: snafu::location!(),
1045        })
1046    }
1047
1048    fn namespace_id(&self) -> String {
1049        format!("DirectoryNamespace {{ root: {:?} }}", self.root)
1050    }
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055    use super::*;
1056    use arrow_ipc::reader::StreamReader;
1057    use lance::dataset::Dataset;
1058    use lance_core::utils::tempfile::TempStdDir;
1059    use lance_namespace::models::{
1060        CreateTableRequest, JsonArrowDataType, JsonArrowField, JsonArrowSchema, ListTablesRequest,
1061    };
1062    use lance_namespace::schema::convert_json_arrow_schema;
1063    use std::io::Cursor;
1064    use std::sync::Arc;
1065
1066    /// Helper to create a test DirectoryNamespace with a temporary directory
1067    async fn create_test_namespace() -> (DirectoryNamespace, TempStdDir) {
1068        let temp_dir = TempStdDir::default();
1069
1070        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1071            .build()
1072            .await
1073            .unwrap();
1074        (namespace, temp_dir)
1075    }
1076
1077    /// Helper to create test IPC data from a schema
1078    fn create_test_ipc_data(schema: &JsonArrowSchema) -> Vec<u8> {
1079        use arrow::ipc::writer::StreamWriter;
1080
1081        let arrow_schema = convert_json_arrow_schema(schema).unwrap();
1082        let arrow_schema = Arc::new(arrow_schema);
1083        let batch = arrow::record_batch::RecordBatch::new_empty(arrow_schema.clone());
1084        let mut buffer = Vec::new();
1085        {
1086            let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
1087            writer.write(&batch).unwrap();
1088            writer.finish().unwrap();
1089        }
1090        buffer
1091    }
1092
1093    /// Helper to create a simple test schema
1094    fn create_test_schema() -> JsonArrowSchema {
1095        let int_type = JsonArrowDataType::new("int32".to_string());
1096        let string_type = JsonArrowDataType::new("utf8".to_string());
1097
1098        let id_field = JsonArrowField {
1099            name: "id".to_string(),
1100            r#type: Box::new(int_type),
1101            nullable: false,
1102            metadata: None,
1103        };
1104
1105        let name_field = JsonArrowField {
1106            name: "name".to_string(),
1107            r#type: Box::new(string_type),
1108            nullable: true,
1109            metadata: None,
1110        };
1111
1112        JsonArrowSchema {
1113            fields: vec![id_field, name_field],
1114            metadata: None,
1115        }
1116    }
1117
1118    #[tokio::test]
1119    async fn test_create_table() {
1120        let (namespace, _temp_dir) = create_test_namespace().await;
1121
1122        // Create test IPC data
1123        let schema = create_test_schema();
1124        let ipc_data = create_test_ipc_data(&schema);
1125
1126        let mut request = CreateTableRequest::new();
1127        request.id = Some(vec!["test_table".to_string()]);
1128
1129        let response = namespace
1130            .create_table(request, bytes::Bytes::from(ipc_data))
1131            .await
1132            .unwrap();
1133
1134        assert!(response.location.is_some());
1135        assert!(response.location.unwrap().ends_with("test_table.lance"));
1136        assert_eq!(response.version, Some(1));
1137    }
1138
1139    #[tokio::test]
1140    async fn test_create_table_without_data() {
1141        let (namespace, _temp_dir) = create_test_namespace().await;
1142
1143        let mut request = CreateTableRequest::new();
1144        request.id = Some(vec!["test_table".to_string()]);
1145
1146        let result = namespace.create_table(request, bytes::Bytes::new()).await;
1147        assert!(result.is_err());
1148        assert!(result
1149            .unwrap_err()
1150            .to_string()
1151            .contains("Arrow IPC stream) is required"));
1152    }
1153
1154    #[tokio::test]
1155    async fn test_create_table_with_invalid_id() {
1156        let (namespace, _temp_dir) = create_test_namespace().await;
1157
1158        // Create test IPC data
1159        let schema = create_test_schema();
1160        let ipc_data = create_test_ipc_data(&schema);
1161
1162        // Test with empty ID
1163        let mut request = CreateTableRequest::new();
1164        request.id = Some(vec![]);
1165
1166        let result = namespace
1167            .create_table(request, bytes::Bytes::from(ipc_data.clone()))
1168            .await;
1169        assert!(result.is_err());
1170
1171        // Test with multi-level ID - should now work with manifest enabled
1172        // First create the parent namespace
1173        let mut create_ns_req = CreateNamespaceRequest::new();
1174        create_ns_req.id = Some(vec!["test_namespace".to_string()]);
1175        namespace.create_namespace(create_ns_req).await.unwrap();
1176
1177        // Now create table in the namespace
1178        let mut request = CreateTableRequest::new();
1179        request.id = Some(vec!["test_namespace".to_string(), "table".to_string()]);
1180
1181        let result = namespace
1182            .create_table(request, bytes::Bytes::from(ipc_data))
1183            .await;
1184        // Should succeed with manifest enabled
1185        assert!(
1186            result.is_ok(),
1187            "Multi-level table IDs should work with manifest enabled"
1188        );
1189    }
1190
1191    #[tokio::test]
1192    async fn test_create_table_with_wrong_location() {
1193        let (namespace, _temp_dir) = create_test_namespace().await;
1194
1195        // Create test IPC data
1196        let schema = create_test_schema();
1197        let ipc_data = create_test_ipc_data(&schema);
1198
1199        let mut request = CreateTableRequest::new();
1200        request.id = Some(vec!["test_table".to_string()]);
1201        request.location = Some("/wrong/path/table.lance".to_string());
1202
1203        let result = namespace
1204            .create_table(request, bytes::Bytes::from(ipc_data))
1205            .await;
1206        assert!(result.is_err());
1207        assert!(result
1208            .unwrap_err()
1209            .to_string()
1210            .contains("must be at location"));
1211    }
1212
1213    #[tokio::test]
1214    async fn test_list_tables() {
1215        let (namespace, _temp_dir) = create_test_namespace().await;
1216
1217        // Initially, no tables
1218        let mut request = ListTablesRequest::new();
1219        request.id = Some(vec![]);
1220        let response = namespace.list_tables(request).await.unwrap();
1221        assert_eq!(response.tables.len(), 0);
1222
1223        // Create test IPC data
1224        let schema = create_test_schema();
1225        let ipc_data = create_test_ipc_data(&schema);
1226
1227        // Create a table
1228        let mut create_request = CreateTableRequest::new();
1229        create_request.id = Some(vec!["table1".to_string()]);
1230        namespace
1231            .create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
1232            .await
1233            .unwrap();
1234
1235        // Create another table
1236        let mut create_request = CreateTableRequest::new();
1237        create_request.id = Some(vec!["table2".to_string()]);
1238        namespace
1239            .create_table(create_request, bytes::Bytes::from(ipc_data))
1240            .await
1241            .unwrap();
1242
1243        // List tables should return both
1244        let mut request = ListTablesRequest::new();
1245        request.id = Some(vec![]);
1246        let response = namespace.list_tables(request).await.unwrap();
1247        let tables = response.tables;
1248        assert_eq!(tables.len(), 2);
1249        assert!(tables.contains(&"table1".to_string()));
1250        assert!(tables.contains(&"table2".to_string()));
1251    }
1252
1253    #[tokio::test]
1254    async fn test_list_tables_with_namespace_id() {
1255        let (namespace, _temp_dir) = create_test_namespace().await;
1256
1257        // First create a child namespace
1258        let mut create_ns_req = CreateNamespaceRequest::new();
1259        create_ns_req.id = Some(vec!["test_namespace".to_string()]);
1260        namespace.create_namespace(create_ns_req).await.unwrap();
1261
1262        // Now list tables in the child namespace
1263        let mut request = ListTablesRequest::new();
1264        request.id = Some(vec!["test_namespace".to_string()]);
1265
1266        let result = namespace.list_tables(request).await;
1267        // Should succeed (with manifest enabled) and return empty list (no tables yet)
1268        assert!(
1269            result.is_ok(),
1270            "list_tables should work with child namespace when manifest is enabled"
1271        );
1272        let response = result.unwrap();
1273        assert_eq!(
1274            response.tables.len(),
1275            0,
1276            "Namespace should have no tables yet"
1277        );
1278    }
1279
1280    #[tokio::test]
1281    async fn test_describe_table() {
1282        let (namespace, _temp_dir) = create_test_namespace().await;
1283
1284        // Create a table first
1285        let schema = create_test_schema();
1286        let ipc_data = create_test_ipc_data(&schema);
1287
1288        let mut create_request = CreateTableRequest::new();
1289        create_request.id = Some(vec!["test_table".to_string()]);
1290        namespace
1291            .create_table(create_request, bytes::Bytes::from(ipc_data))
1292            .await
1293            .unwrap();
1294
1295        // Describe the table
1296        let mut request = DescribeTableRequest::new();
1297        request.id = Some(vec!["test_table".to_string()]);
1298        let response = namespace.describe_table(request).await.unwrap();
1299
1300        assert!(response.location.is_some());
1301        assert!(response.location.unwrap().ends_with("test_table.lance"));
1302    }
1303
1304    #[tokio::test]
1305    async fn test_describe_nonexistent_table() {
1306        let (namespace, _temp_dir) = create_test_namespace().await;
1307
1308        let mut request = DescribeTableRequest::new();
1309        request.id = Some(vec!["nonexistent".to_string()]);
1310
1311        let result = namespace.describe_table(request).await;
1312        assert!(result.is_err());
1313        assert!(result
1314            .unwrap_err()
1315            .to_string()
1316            .contains("Table does not exist"));
1317    }
1318
1319    #[tokio::test]
1320    async fn test_table_exists() {
1321        let (namespace, _temp_dir) = create_test_namespace().await;
1322
1323        // Create a table
1324        let schema = create_test_schema();
1325        let ipc_data = create_test_ipc_data(&schema);
1326
1327        let mut create_request = CreateTableRequest::new();
1328        create_request.id = Some(vec!["existing_table".to_string()]);
1329        namespace
1330            .create_table(create_request, bytes::Bytes::from(ipc_data))
1331            .await
1332            .unwrap();
1333
1334        // Check existing table
1335        let mut request = TableExistsRequest::new();
1336        request.id = Some(vec!["existing_table".to_string()]);
1337        let result = namespace.table_exists(request).await;
1338        assert!(result.is_ok());
1339
1340        // Check non-existent table
1341        let mut request = TableExistsRequest::new();
1342        request.id = Some(vec!["nonexistent".to_string()]);
1343        let result = namespace.table_exists(request).await;
1344        assert!(result.is_err());
1345        assert!(result
1346            .unwrap_err()
1347            .to_string()
1348            .contains("Table does not exist"));
1349    }
1350
1351    #[tokio::test]
1352    async fn test_drop_table() {
1353        let (namespace, _temp_dir) = create_test_namespace().await;
1354
1355        // Create a table
1356        let schema = create_test_schema();
1357        let ipc_data = create_test_ipc_data(&schema);
1358
1359        let mut create_request = CreateTableRequest::new();
1360        create_request.id = Some(vec!["table_to_drop".to_string()]);
1361        namespace
1362            .create_table(create_request, bytes::Bytes::from(ipc_data))
1363            .await
1364            .unwrap();
1365
1366        // Verify it exists
1367        let mut exists_request = TableExistsRequest::new();
1368        exists_request.id = Some(vec!["table_to_drop".to_string()]);
1369        assert!(namespace.table_exists(exists_request.clone()).await.is_ok());
1370
1371        // Drop the table
1372        let mut drop_request = DropTableRequest::new();
1373        drop_request.id = Some(vec!["table_to_drop".to_string()]);
1374        let response = namespace.drop_table(drop_request).await.unwrap();
1375        assert!(response.location.is_some());
1376
1377        // Verify it no longer exists
1378        assert!(namespace.table_exists(exists_request).await.is_err());
1379    }
1380
1381    #[tokio::test]
1382    async fn test_drop_nonexistent_table() {
1383        let (namespace, _temp_dir) = create_test_namespace().await;
1384
1385        let mut request = DropTableRequest::new();
1386        request.id = Some(vec!["nonexistent".to_string()]);
1387
1388        // Should not fail when dropping non-existent table (idempotent)
1389        let result = namespace.drop_table(request).await;
1390        // The operation might succeed or fail depending on implementation
1391        // But it should not panic
1392        let _ = result;
1393    }
1394
1395    #[tokio::test]
1396    async fn test_root_namespace_operations() {
1397        let (namespace, _temp_dir) = create_test_namespace().await;
1398
1399        // Test list_namespaces - should return empty list for root
1400        let mut request = ListNamespacesRequest::new();
1401        request.id = Some(vec![]);
1402        let result = namespace.list_namespaces(request).await;
1403        assert!(result.is_ok());
1404        assert_eq!(result.unwrap().namespaces.len(), 0);
1405
1406        // Test describe_namespace - should succeed for root
1407        let mut request = DescribeNamespaceRequest::new();
1408        request.id = Some(vec![]);
1409        let result = namespace.describe_namespace(request).await;
1410        assert!(result.is_ok());
1411
1412        // Test namespace_exists - root always exists
1413        let mut request = NamespaceExistsRequest::new();
1414        request.id = Some(vec![]);
1415        let result = namespace.namespace_exists(request).await;
1416        assert!(result.is_ok());
1417
1418        // Test create_namespace - root cannot be created
1419        let mut request = CreateNamespaceRequest::new();
1420        request.id = Some(vec![]);
1421        let result = namespace.create_namespace(request).await;
1422        assert!(result.is_err());
1423        assert!(result.unwrap_err().to_string().contains("already exists"));
1424
1425        // Test drop_namespace - root cannot be dropped
1426        let mut request = DropNamespaceRequest::new();
1427        request.id = Some(vec![]);
1428        let result = namespace.drop_namespace(request).await;
1429        assert!(result.is_err());
1430        assert!(result
1431            .unwrap_err()
1432            .to_string()
1433            .contains("cannot be dropped"));
1434    }
1435
1436    #[tokio::test]
1437    async fn test_non_root_namespace_operations() {
1438        let (namespace, _temp_dir) = create_test_namespace().await;
1439
1440        // With manifest enabled (default), child namespaces are now supported
1441        // Test create_namespace for non-root - should succeed with manifest
1442        let mut request = CreateNamespaceRequest::new();
1443        request.id = Some(vec!["child".to_string()]);
1444        let result = namespace.create_namespace(request).await;
1445        assert!(
1446            result.is_ok(),
1447            "Child namespace creation should succeed with manifest enabled"
1448        );
1449
1450        // Test namespace_exists for non-root - should exist after creation
1451        let mut request = NamespaceExistsRequest::new();
1452        request.id = Some(vec!["child".to_string()]);
1453        let result = namespace.namespace_exists(request).await;
1454        assert!(
1455            result.is_ok(),
1456            "Child namespace should exist after creation"
1457        );
1458
1459        // Test drop_namespace for non-root - should succeed
1460        let mut request = DropNamespaceRequest::new();
1461        request.id = Some(vec!["child".to_string()]);
1462        let result = namespace.drop_namespace(request).await;
1463        assert!(
1464            result.is_ok(),
1465            "Child namespace drop should succeed with manifest enabled"
1466        );
1467
1468        // Verify namespace no longer exists
1469        let mut request = NamespaceExistsRequest::new();
1470        request.id = Some(vec!["child".to_string()]);
1471        let result = namespace.namespace_exists(request).await;
1472        assert!(
1473            result.is_err(),
1474            "Child namespace should not exist after drop"
1475        );
1476    }
1477
1478    #[tokio::test]
1479    async fn test_config_custom_root() {
1480        let temp_dir = TempStdDir::default();
1481        let custom_path = temp_dir.join("custom");
1482        std::fs::create_dir(&custom_path).unwrap();
1483
1484        let namespace = DirectoryNamespaceBuilder::new(custom_path.to_string_lossy().to_string())
1485            .build()
1486            .await
1487            .unwrap();
1488
1489        // Create test IPC data
1490        let schema = create_test_schema();
1491        let ipc_data = create_test_ipc_data(&schema);
1492
1493        // Create a table and verify location
1494        let mut request = CreateTableRequest::new();
1495        request.id = Some(vec!["test_table".to_string()]);
1496
1497        let response = namespace
1498            .create_table(request, bytes::Bytes::from(ipc_data))
1499            .await
1500            .unwrap();
1501
1502        assert!(response.location.unwrap().contains("custom"));
1503    }
1504
1505    #[tokio::test]
1506    async fn test_config_storage_options() {
1507        let temp_dir = TempStdDir::default();
1508
1509        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1510            .storage_option("option1", "value1")
1511            .storage_option("option2", "value2")
1512            .build()
1513            .await
1514            .unwrap();
1515
1516        // Create test IPC data
1517        let schema = create_test_schema();
1518        let ipc_data = create_test_ipc_data(&schema);
1519
1520        // Create a table and check storage options are included
1521        let mut request = CreateTableRequest::new();
1522        request.id = Some(vec!["test_table".to_string()]);
1523
1524        let response = namespace
1525            .create_table(request, bytes::Bytes::from(ipc_data))
1526            .await
1527            .unwrap();
1528
1529        let storage_options = response.storage_options.unwrap();
1530        assert_eq!(storage_options.get("option1"), Some(&"value1".to_string()));
1531        assert_eq!(storage_options.get("option2"), Some(&"value2".to_string()));
1532    }
1533
1534    #[tokio::test]
1535    async fn test_from_properties_manifest_enabled() {
1536        let temp_dir = TempStdDir::default();
1537
1538        let mut properties = HashMap::new();
1539        properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
1540        properties.insert("manifest_enabled".to_string(), "true".to_string());
1541        properties.insert("dir_listing_enabled".to_string(), "false".to_string());
1542
1543        let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
1544        assert!(builder.manifest_enabled);
1545        assert!(!builder.dir_listing_enabled);
1546
1547        let namespace = builder.build().await.unwrap();
1548
1549        // Create test IPC data
1550        let schema = create_test_schema();
1551        let ipc_data = create_test_ipc_data(&schema);
1552
1553        // Create a table
1554        let mut request = CreateTableRequest::new();
1555        request.id = Some(vec!["test_table".to_string()]);
1556
1557        let response = namespace
1558            .create_table(request, bytes::Bytes::from(ipc_data))
1559            .await
1560            .unwrap();
1561
1562        assert!(response.location.is_some());
1563    }
1564
1565    #[tokio::test]
1566    async fn test_from_properties_dir_listing_enabled() {
1567        let temp_dir = TempStdDir::default();
1568
1569        let mut properties = HashMap::new();
1570        properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
1571        properties.insert("manifest_enabled".to_string(), "false".to_string());
1572        properties.insert("dir_listing_enabled".to_string(), "true".to_string());
1573
1574        let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
1575        assert!(!builder.manifest_enabled);
1576        assert!(builder.dir_listing_enabled);
1577
1578        let namespace = builder.build().await.unwrap();
1579
1580        // Create test IPC data
1581        let schema = create_test_schema();
1582        let ipc_data = create_test_ipc_data(&schema);
1583
1584        // Create a table
1585        let mut request = CreateTableRequest::new();
1586        request.id = Some(vec!["test_table".to_string()]);
1587
1588        let response = namespace
1589            .create_table(request, bytes::Bytes::from(ipc_data))
1590            .await
1591            .unwrap();
1592
1593        assert!(response.location.is_some());
1594    }
1595
1596    #[tokio::test]
1597    async fn test_from_properties_defaults() {
1598        let temp_dir = TempStdDir::default();
1599
1600        let mut properties = HashMap::new();
1601        properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
1602
1603        let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
1604        // Both should default to true
1605        assert!(builder.manifest_enabled);
1606        assert!(builder.dir_listing_enabled);
1607    }
1608
1609    #[tokio::test]
1610    async fn test_from_properties_with_storage_options() {
1611        let temp_dir = TempStdDir::default();
1612
1613        let mut properties = HashMap::new();
1614        properties.insert("root".to_string(), temp_dir.to_str().unwrap().to_string());
1615        properties.insert("manifest_enabled".to_string(), "true".to_string());
1616        properties.insert("storage.region".to_string(), "us-west-2".to_string());
1617        properties.insert("storage.bucket".to_string(), "my-bucket".to_string());
1618
1619        let builder = DirectoryNamespaceBuilder::from_properties(properties, None).unwrap();
1620        assert!(builder.manifest_enabled);
1621        assert!(builder.storage_options.is_some());
1622
1623        let storage_options = builder.storage_options.unwrap();
1624        assert_eq!(
1625            storage_options.get("region"),
1626            Some(&"us-west-2".to_string())
1627        );
1628        assert_eq!(
1629            storage_options.get("bucket"),
1630            Some(&"my-bucket".to_string())
1631        );
1632    }
1633
1634    #[tokio::test]
1635    async fn test_various_arrow_types() {
1636        let (namespace, _temp_dir) = create_test_namespace().await;
1637
1638        // Create schema with various types
1639        let fields = vec![
1640            JsonArrowField {
1641                name: "bool_col".to_string(),
1642                r#type: Box::new(JsonArrowDataType::new("bool".to_string())),
1643                nullable: true,
1644                metadata: None,
1645            },
1646            JsonArrowField {
1647                name: "int8_col".to_string(),
1648                r#type: Box::new(JsonArrowDataType::new("int8".to_string())),
1649                nullable: true,
1650                metadata: None,
1651            },
1652            JsonArrowField {
1653                name: "float64_col".to_string(),
1654                r#type: Box::new(JsonArrowDataType::new("float64".to_string())),
1655                nullable: true,
1656                metadata: None,
1657            },
1658            JsonArrowField {
1659                name: "binary_col".to_string(),
1660                r#type: Box::new(JsonArrowDataType::new("binary".to_string())),
1661                nullable: true,
1662                metadata: None,
1663            },
1664        ];
1665
1666        let schema = JsonArrowSchema {
1667            fields,
1668            metadata: None,
1669        };
1670
1671        // Create IPC data
1672        let ipc_data = create_test_ipc_data(&schema);
1673
1674        let mut request = CreateTableRequest::new();
1675        request.id = Some(vec!["complex_table".to_string()]);
1676
1677        let response = namespace
1678            .create_table(request, bytes::Bytes::from(ipc_data))
1679            .await
1680            .unwrap();
1681
1682        assert!(response.location.is_some());
1683    }
1684
1685    #[tokio::test]
1686    async fn test_connect_dir() {
1687        let temp_dir = TempStdDir::default();
1688
1689        let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
1690            .build()
1691            .await
1692            .unwrap();
1693
1694        // Test basic operation through the concrete type
1695        let mut request = ListTablesRequest::new();
1696        request.id = Some(vec![]);
1697        let response = namespace.list_tables(request).await.unwrap();
1698        assert_eq!(response.tables.len(), 0);
1699    }
1700
1701    #[tokio::test]
1702    async fn test_create_table_with_ipc_data() {
1703        use arrow::array::{Int32Array, StringArray};
1704        use arrow::ipc::writer::StreamWriter;
1705
1706        let (namespace, _temp_dir) = create_test_namespace().await;
1707
1708        // Create a schema with some fields
1709        let schema = create_test_schema();
1710
1711        // Create some test data that matches the schema
1712        let arrow_schema = convert_json_arrow_schema(&schema).unwrap();
1713        let arrow_schema = Arc::new(arrow_schema);
1714
1715        // Create a RecordBatch with actual data
1716        let id_array = Int32Array::from(vec![1, 2, 3]);
1717        let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
1718        let batch = arrow::record_batch::RecordBatch::try_new(
1719            arrow_schema.clone(),
1720            vec![Arc::new(id_array), Arc::new(name_array)],
1721        )
1722        .unwrap();
1723
1724        // Write the batch to an IPC stream
1725        let mut buffer = Vec::new();
1726        {
1727            let mut writer = StreamWriter::try_new(&mut buffer, &arrow_schema).unwrap();
1728            writer.write(&batch).unwrap();
1729            writer.finish().unwrap();
1730        }
1731
1732        // Create table with the IPC data
1733        let mut request = CreateTableRequest::new();
1734        request.id = Some(vec!["test_table_with_data".to_string()]);
1735
1736        let response = namespace
1737            .create_table(request, Bytes::from(buffer))
1738            .await
1739            .unwrap();
1740
1741        assert_eq!(response.version, Some(1));
1742        assert!(response
1743            .location
1744            .unwrap()
1745            .contains("test_table_with_data.lance"));
1746
1747        // Verify table exists
1748        let mut exists_request = TableExistsRequest::new();
1749        exists_request.id = Some(vec!["test_table_with_data".to_string()]);
1750        namespace.table_exists(exists_request).await.unwrap();
1751    }
1752
1753    #[tokio::test]
1754    async fn test_create_empty_table() {
1755        let (namespace, temp_dir) = create_test_namespace().await;
1756
1757        let mut request = CreateEmptyTableRequest::new();
1758        request.id = Some(vec!["empty_table".to_string()]);
1759
1760        let response = namespace.create_empty_table(request).await.unwrap();
1761
1762        assert!(response.location.is_some());
1763        assert!(response.location.unwrap().ends_with("empty_table.lance"));
1764
1765        // Verify the .lance-reserved file was created in the correct location
1766        let table_dir = temp_dir.join("empty_table.lance");
1767        assert!(table_dir.exists());
1768        assert!(table_dir.is_dir());
1769
1770        let reserved_file = table_dir.join(".lance-reserved");
1771        assert!(reserved_file.exists());
1772        assert!(reserved_file.is_file());
1773
1774        // Verify file is empty
1775        let metadata = std::fs::metadata(&reserved_file).unwrap();
1776        assert_eq!(metadata.len(), 0);
1777
1778        // Verify table exists by checking for .lance-reserved file
1779        let mut exists_request = TableExistsRequest::new();
1780        exists_request.id = Some(vec!["empty_table".to_string()]);
1781        namespace.table_exists(exists_request).await.unwrap();
1782
1783        // List tables should include the empty table
1784        let mut list_request = ListTablesRequest::new();
1785        list_request.id = Some(vec![]);
1786        let list_response = namespace.list_tables(list_request).await.unwrap();
1787        assert!(list_response.tables.contains(&"empty_table".to_string()));
1788
1789        // Verify describe table works for empty table
1790        let mut describe_request = DescribeTableRequest::new();
1791        describe_request.id = Some(vec!["empty_table".to_string()]);
1792        let describe_response = namespace.describe_table(describe_request).await.unwrap();
1793        assert!(describe_response.location.is_some());
1794        assert!(describe_response.location.unwrap().contains("empty_table"));
1795    }
1796
1797    #[tokio::test]
1798    async fn test_create_empty_table_with_wrong_location() {
1799        let (namespace, _temp_dir) = create_test_namespace().await;
1800
1801        let mut request = CreateEmptyTableRequest::new();
1802        request.id = Some(vec!["test_table".to_string()]);
1803        request.location = Some("/wrong/path/table.lance".to_string());
1804
1805        let result = namespace.create_empty_table(request).await;
1806        assert!(result.is_err());
1807        assert!(result
1808            .unwrap_err()
1809            .to_string()
1810            .contains("must be at location"));
1811    }
1812
1813    #[tokio::test]
1814    async fn test_create_empty_table_then_drop() {
1815        let (namespace, temp_dir) = create_test_namespace().await;
1816
1817        // Create an empty table
1818        let mut create_request = CreateEmptyTableRequest::new();
1819        create_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1820
1821        let create_response = namespace.create_empty_table(create_request).await.unwrap();
1822        assert!(create_response.location.is_some());
1823
1824        // Verify it exists
1825        let table_dir = temp_dir.join("empty_table_to_drop.lance");
1826        assert!(table_dir.exists());
1827        let reserved_file = table_dir.join(".lance-reserved");
1828        assert!(reserved_file.exists());
1829
1830        // Drop the table
1831        let mut drop_request = DropTableRequest::new();
1832        drop_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1833        let drop_response = namespace.drop_table(drop_request).await.unwrap();
1834        assert!(drop_response.location.is_some());
1835
1836        // Verify table directory was removed
1837        assert!(!table_dir.exists());
1838        assert!(!reserved_file.exists());
1839
1840        // Verify table no longer exists
1841        let mut exists_request = TableExistsRequest::new();
1842        exists_request.id = Some(vec!["empty_table_to_drop".to_string()]);
1843        let exists_result = namespace.table_exists(exists_request).await;
1844        assert!(exists_result.is_err());
1845    }
1846
1847    #[tokio::test]
1848    async fn test_child_namespace_create_and_list() {
1849        let (namespace, _temp_dir) = create_test_namespace().await;
1850
1851        // Create multiple child namespaces
1852        for i in 1..=3 {
1853            let mut create_req = CreateNamespaceRequest::new();
1854            create_req.id = Some(vec![format!("ns{}", i)]);
1855            let result = namespace.create_namespace(create_req).await;
1856            assert!(result.is_ok(), "Failed to create child namespace ns{}", i);
1857        }
1858
1859        // List child namespaces
1860        let list_req = ListNamespacesRequest {
1861            id: Some(vec![]),
1862            page_token: None,
1863            limit: None,
1864        };
1865        let result = namespace.list_namespaces(list_req).await;
1866        assert!(result.is_ok());
1867        let namespaces = result.unwrap().namespaces;
1868        assert_eq!(namespaces.len(), 3);
1869        assert!(namespaces.contains(&"ns1".to_string()));
1870        assert!(namespaces.contains(&"ns2".to_string()));
1871        assert!(namespaces.contains(&"ns3".to_string()));
1872    }
1873
1874    #[tokio::test]
1875    async fn test_nested_namespace_hierarchy() {
1876        let (namespace, _temp_dir) = create_test_namespace().await;
1877
1878        // Create parent namespace
1879        let mut create_req = CreateNamespaceRequest::new();
1880        create_req.id = Some(vec!["parent".to_string()]);
1881        namespace.create_namespace(create_req).await.unwrap();
1882
1883        // Create nested children
1884        let mut create_req = CreateNamespaceRequest::new();
1885        create_req.id = Some(vec!["parent".to_string(), "child1".to_string()]);
1886        namespace.create_namespace(create_req).await.unwrap();
1887
1888        let mut create_req = CreateNamespaceRequest::new();
1889        create_req.id = Some(vec!["parent".to_string(), "child2".to_string()]);
1890        namespace.create_namespace(create_req).await.unwrap();
1891
1892        // List children of parent
1893        let list_req = ListNamespacesRequest {
1894            id: Some(vec!["parent".to_string()]),
1895            page_token: None,
1896            limit: None,
1897        };
1898        let result = namespace.list_namespaces(list_req).await;
1899        assert!(result.is_ok());
1900        let children = result.unwrap().namespaces;
1901        assert_eq!(children.len(), 2);
1902        assert!(children.contains(&"child1".to_string()));
1903        assert!(children.contains(&"child2".to_string()));
1904
1905        // List root should only show parent
1906        let list_req = ListNamespacesRequest {
1907            id: Some(vec![]),
1908            page_token: None,
1909            limit: None,
1910        };
1911        let result = namespace.list_namespaces(list_req).await;
1912        assert!(result.is_ok());
1913        let root_namespaces = result.unwrap().namespaces;
1914        assert_eq!(root_namespaces.len(), 1);
1915        assert_eq!(root_namespaces[0], "parent");
1916    }
1917
1918    #[tokio::test]
1919    async fn test_table_in_child_namespace() {
1920        let (namespace, _temp_dir) = create_test_namespace().await;
1921
1922        // Create child namespace
1923        let mut create_ns_req = CreateNamespaceRequest::new();
1924        create_ns_req.id = Some(vec!["test_ns".to_string()]);
1925        namespace.create_namespace(create_ns_req).await.unwrap();
1926
1927        // Create table in child namespace
1928        let schema = create_test_schema();
1929        let ipc_data = create_test_ipc_data(&schema);
1930        let mut create_table_req = CreateTableRequest::new();
1931        create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
1932        let result = namespace
1933            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
1934            .await;
1935        assert!(result.is_ok(), "Failed to create table in child namespace");
1936
1937        // List tables in child namespace
1938        let list_req = ListTablesRequest {
1939            id: Some(vec!["test_ns".to_string()]),
1940            page_token: None,
1941            limit: None,
1942        };
1943        let result = namespace.list_tables(list_req).await;
1944        assert!(result.is_ok());
1945        let tables = result.unwrap().tables;
1946        assert_eq!(tables.len(), 1);
1947        assert_eq!(tables[0], "table1");
1948
1949        // Verify table exists
1950        let mut exists_req = TableExistsRequest::new();
1951        exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
1952        let result = namespace.table_exists(exists_req).await;
1953        assert!(result.is_ok());
1954
1955        // Describe table in child namespace
1956        let mut describe_req = DescribeTableRequest::new();
1957        describe_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
1958        let result = namespace.describe_table(describe_req).await;
1959        assert!(result.is_ok());
1960        let response = result.unwrap();
1961        assert!(response.location.is_some());
1962    }
1963
1964    #[tokio::test]
1965    async fn test_multiple_tables_in_child_namespace() {
1966        let (namespace, _temp_dir) = create_test_namespace().await;
1967
1968        // Create child namespace
1969        let mut create_ns_req = CreateNamespaceRequest::new();
1970        create_ns_req.id = Some(vec!["test_ns".to_string()]);
1971        namespace.create_namespace(create_ns_req).await.unwrap();
1972
1973        // Create multiple tables
1974        let schema = create_test_schema();
1975        let ipc_data = create_test_ipc_data(&schema);
1976        for i in 1..=3 {
1977            let mut create_table_req = CreateTableRequest::new();
1978            create_table_req.id = Some(vec!["test_ns".to_string(), format!("table{}", i)]);
1979            namespace
1980                .create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
1981                .await
1982                .unwrap();
1983        }
1984
1985        // List tables
1986        let list_req = ListTablesRequest {
1987            id: Some(vec!["test_ns".to_string()]),
1988            page_token: None,
1989            limit: None,
1990        };
1991        let result = namespace.list_tables(list_req).await;
1992        assert!(result.is_ok());
1993        let tables = result.unwrap().tables;
1994        assert_eq!(tables.len(), 3);
1995        assert!(tables.contains(&"table1".to_string()));
1996        assert!(tables.contains(&"table2".to_string()));
1997        assert!(tables.contains(&"table3".to_string()));
1998    }
1999
2000    #[tokio::test]
2001    async fn test_drop_table_in_child_namespace() {
2002        let (namespace, _temp_dir) = create_test_namespace().await;
2003
2004        // Create child namespace
2005        let mut create_ns_req = CreateNamespaceRequest::new();
2006        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2007        namespace.create_namespace(create_ns_req).await.unwrap();
2008
2009        // Create table
2010        let schema = create_test_schema();
2011        let ipc_data = create_test_ipc_data(&schema);
2012        let mut create_table_req = CreateTableRequest::new();
2013        create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2014        namespace
2015            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2016            .await
2017            .unwrap();
2018
2019        // Drop table
2020        let mut drop_req = DropTableRequest::new();
2021        drop_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2022        let result = namespace.drop_table(drop_req).await;
2023        assert!(result.is_ok(), "Failed to drop table in child namespace");
2024
2025        // Verify table no longer exists
2026        let mut exists_req = TableExistsRequest::new();
2027        exists_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2028        let result = namespace.table_exists(exists_req).await;
2029        assert!(result.is_err());
2030    }
2031
2032    #[tokio::test]
2033    async fn test_empty_table_in_child_namespace() {
2034        let (namespace, _temp_dir) = create_test_namespace().await;
2035
2036        // Create child namespace
2037        let mut create_ns_req = CreateNamespaceRequest::new();
2038        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2039        namespace.create_namespace(create_ns_req).await.unwrap();
2040
2041        // Create empty table
2042        let mut create_empty_req = CreateEmptyTableRequest::new();
2043        create_empty_req.id = Some(vec!["test_ns".to_string(), "empty_table".to_string()]);
2044        let result = namespace.create_empty_table(create_empty_req).await;
2045        assert!(
2046            result.is_ok(),
2047            "Failed to create empty table in child namespace"
2048        );
2049
2050        // Verify table exists
2051        let mut exists_req = TableExistsRequest::new();
2052        exists_req.id = Some(vec!["test_ns".to_string(), "empty_table".to_string()]);
2053        let result = namespace.table_exists(exists_req).await;
2054        assert!(result.is_ok());
2055    }
2056
2057    #[tokio::test]
2058    async fn test_deeply_nested_namespace() {
2059        let (namespace, _temp_dir) = create_test_namespace().await;
2060
2061        // Create deeply nested namespace hierarchy
2062        let mut create_req = CreateNamespaceRequest::new();
2063        create_req.id = Some(vec!["level1".to_string()]);
2064        namespace.create_namespace(create_req).await.unwrap();
2065
2066        let mut create_req = CreateNamespaceRequest::new();
2067        create_req.id = Some(vec!["level1".to_string(), "level2".to_string()]);
2068        namespace.create_namespace(create_req).await.unwrap();
2069
2070        let mut create_req = CreateNamespaceRequest::new();
2071        create_req.id = Some(vec![
2072            "level1".to_string(),
2073            "level2".to_string(),
2074            "level3".to_string(),
2075        ]);
2076        namespace.create_namespace(create_req).await.unwrap();
2077
2078        // Create table in deeply nested namespace
2079        let schema = create_test_schema();
2080        let ipc_data = create_test_ipc_data(&schema);
2081        let mut create_table_req = CreateTableRequest::new();
2082        create_table_req.id = Some(vec![
2083            "level1".to_string(),
2084            "level2".to_string(),
2085            "level3".to_string(),
2086            "table1".to_string(),
2087        ]);
2088        let result = namespace
2089            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2090            .await;
2091        assert!(
2092            result.is_ok(),
2093            "Failed to create table in deeply nested namespace"
2094        );
2095
2096        // Verify table exists
2097        let mut exists_req = TableExistsRequest::new();
2098        exists_req.id = Some(vec![
2099            "level1".to_string(),
2100            "level2".to_string(),
2101            "level3".to_string(),
2102            "table1".to_string(),
2103        ]);
2104        let result = namespace.table_exists(exists_req).await;
2105        assert!(result.is_ok());
2106    }
2107
2108    #[tokio::test]
2109    async fn test_namespace_with_properties() {
2110        let (namespace, _temp_dir) = create_test_namespace().await;
2111
2112        // Create namespace with properties
2113        let mut properties = HashMap::new();
2114        properties.insert("owner".to_string(), "test_user".to_string());
2115        properties.insert("description".to_string(), "Test namespace".to_string());
2116
2117        let mut create_req = CreateNamespaceRequest::new();
2118        create_req.id = Some(vec!["test_ns".to_string()]);
2119        create_req.properties = Some(properties.clone());
2120        namespace.create_namespace(create_req).await.unwrap();
2121
2122        // Describe namespace and verify properties
2123        let describe_req = DescribeNamespaceRequest {
2124            id: Some(vec!["test_ns".to_string()]),
2125        };
2126        let result = namespace.describe_namespace(describe_req).await;
2127        assert!(result.is_ok());
2128        let response = result.unwrap();
2129        assert!(response.properties.is_some());
2130        let props = response.properties.unwrap();
2131        assert_eq!(props.get("owner"), Some(&"test_user".to_string()));
2132        assert_eq!(
2133            props.get("description"),
2134            Some(&"Test namespace".to_string())
2135        );
2136    }
2137
2138    #[tokio::test]
2139    async fn test_cannot_drop_namespace_with_tables() {
2140        let (namespace, _temp_dir) = create_test_namespace().await;
2141
2142        // Create namespace
2143        let mut create_ns_req = CreateNamespaceRequest::new();
2144        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2145        namespace.create_namespace(create_ns_req).await.unwrap();
2146
2147        // Create table in namespace
2148        let schema = create_test_schema();
2149        let ipc_data = create_test_ipc_data(&schema);
2150        let mut create_table_req = CreateTableRequest::new();
2151        create_table_req.id = Some(vec!["test_ns".to_string(), "table1".to_string()]);
2152        namespace
2153            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2154            .await
2155            .unwrap();
2156
2157        // Try to drop namespace - should fail
2158        let mut drop_req = DropNamespaceRequest::new();
2159        drop_req.id = Some(vec!["test_ns".to_string()]);
2160        let result = namespace.drop_namespace(drop_req).await;
2161        assert!(
2162            result.is_err(),
2163            "Should not be able to drop namespace with tables"
2164        );
2165    }
2166
2167    #[tokio::test]
2168    async fn test_isolation_between_namespaces() {
2169        let (namespace, _temp_dir) = create_test_namespace().await;
2170
2171        // Create two namespaces
2172        let mut create_req = CreateNamespaceRequest::new();
2173        create_req.id = Some(vec!["ns1".to_string()]);
2174        namespace.create_namespace(create_req).await.unwrap();
2175
2176        let mut create_req = CreateNamespaceRequest::new();
2177        create_req.id = Some(vec!["ns2".to_string()]);
2178        namespace.create_namespace(create_req).await.unwrap();
2179
2180        // Create table with same name in both namespaces
2181        let schema = create_test_schema();
2182        let ipc_data = create_test_ipc_data(&schema);
2183
2184        let mut create_table_req = CreateTableRequest::new();
2185        create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2186        namespace
2187            .create_table(create_table_req, bytes::Bytes::from(ipc_data.clone()))
2188            .await
2189            .unwrap();
2190
2191        let mut create_table_req = CreateTableRequest::new();
2192        create_table_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
2193        namespace
2194            .create_table(create_table_req, bytes::Bytes::from(ipc_data))
2195            .await
2196            .unwrap();
2197
2198        // List tables in each namespace
2199        let list_req = ListTablesRequest {
2200            id: Some(vec!["ns1".to_string()]),
2201            page_token: None,
2202            limit: None,
2203        };
2204        let result = namespace.list_tables(list_req).await.unwrap();
2205        assert_eq!(result.tables.len(), 1);
2206        assert_eq!(result.tables[0], "table1");
2207
2208        let list_req = ListTablesRequest {
2209            id: Some(vec!["ns2".to_string()]),
2210            page_token: None,
2211            limit: None,
2212        };
2213        let result = namespace.list_tables(list_req).await.unwrap();
2214        assert_eq!(result.tables.len(), 1);
2215        assert_eq!(result.tables[0], "table1");
2216
2217        // Drop table in ns1 shouldn't affect ns2
2218        let mut drop_req = DropTableRequest::new();
2219        drop_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2220        namespace.drop_table(drop_req).await.unwrap();
2221
2222        // Verify ns1 table is gone but ns2 table still exists
2223        let mut exists_req = TableExistsRequest::new();
2224        exists_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2225        assert!(namespace.table_exists(exists_req).await.is_err());
2226
2227        let mut exists_req = TableExistsRequest::new();
2228        exists_req.id = Some(vec!["ns2".to_string(), "table1".to_string()]);
2229        assert!(namespace.table_exists(exists_req).await.is_ok());
2230    }
2231
2232    #[tokio::test]
2233    async fn test_migrate_directory_tables() {
2234        let temp_dir = TempStdDir::default();
2235        let temp_path = temp_dir.to_str().unwrap();
2236
2237        // Step 1: Create tables in directory-only mode
2238        let dir_only_ns = DirectoryNamespaceBuilder::new(temp_path)
2239            .manifest_enabled(false)
2240            .dir_listing_enabled(true)
2241            .build()
2242            .await
2243            .unwrap();
2244
2245        // Create some tables
2246        let schema = create_test_schema();
2247        let ipc_data = create_test_ipc_data(&schema);
2248
2249        for i in 1..=3 {
2250            let mut create_req = CreateTableRequest::new();
2251            create_req.id = Some(vec![format!("table{}", i)]);
2252            dir_only_ns
2253                .create_table(create_req, bytes::Bytes::from(ipc_data.clone()))
2254                .await
2255                .unwrap();
2256        }
2257
2258        drop(dir_only_ns);
2259
2260        // Step 2: Create namespace with dual mode (manifest + directory listing)
2261        let dual_mode_ns = DirectoryNamespaceBuilder::new(temp_path)
2262            .manifest_enabled(true)
2263            .dir_listing_enabled(true)
2264            .build()
2265            .await
2266            .unwrap();
2267
2268        // Before migration, tables should be visible (via directory listing fallback)
2269        let mut list_req = ListTablesRequest::new();
2270        list_req.id = Some(vec![]);
2271        let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
2272        assert_eq!(tables.len(), 3);
2273
2274        // Run migration
2275        let migrated_count = dual_mode_ns.migrate().await.unwrap();
2276        assert_eq!(migrated_count, 3, "Should migrate all 3 tables");
2277
2278        // Verify tables are now in manifest
2279        let mut list_req = ListTablesRequest::new();
2280        list_req.id = Some(vec![]);
2281        let tables = dual_mode_ns.list_tables(list_req).await.unwrap().tables;
2282        assert_eq!(tables.len(), 3);
2283
2284        // Run migration again - should be idempotent
2285        let migrated_count = dual_mode_ns.migrate().await.unwrap();
2286        assert_eq!(
2287            migrated_count, 0,
2288            "Should not migrate already-migrated tables"
2289        );
2290
2291        drop(dual_mode_ns);
2292
2293        // Step 3: Create namespace with manifest-only mode
2294        let manifest_only_ns = DirectoryNamespaceBuilder::new(temp_path)
2295            .manifest_enabled(true)
2296            .dir_listing_enabled(false)
2297            .build()
2298            .await
2299            .unwrap();
2300
2301        // Tables should still be accessible (now from manifest only)
2302        let mut list_req = ListTablesRequest::new();
2303        list_req.id = Some(vec![]);
2304        let tables = manifest_only_ns.list_tables(list_req).await.unwrap().tables;
2305        assert_eq!(tables.len(), 3);
2306        assert!(tables.contains(&"table1".to_string()));
2307        assert!(tables.contains(&"table2".to_string()));
2308        assert!(tables.contains(&"table3".to_string()));
2309    }
2310
2311    #[tokio::test]
2312    async fn test_migrate_without_manifest() {
2313        let temp_dir = TempStdDir::default();
2314        let temp_path = temp_dir.to_str().unwrap();
2315
2316        // Create namespace without manifest
2317        let namespace = DirectoryNamespaceBuilder::new(temp_path)
2318            .manifest_enabled(false)
2319            .dir_listing_enabled(true)
2320            .build()
2321            .await
2322            .unwrap();
2323
2324        // migrate() should return 0 when manifest is not enabled
2325        let migrated_count = namespace.migrate().await.unwrap();
2326        assert_eq!(migrated_count, 0);
2327    }
2328
2329    #[tokio::test]
2330    async fn test_register_table() {
2331        use lance_namespace::models::{RegisterTableRequest, TableExistsRequest};
2332
2333        let temp_dir = TempStdDir::default();
2334        let temp_path = temp_dir.to_str().unwrap();
2335
2336        let namespace = DirectoryNamespaceBuilder::new(temp_path)
2337            .build()
2338            .await
2339            .unwrap();
2340
2341        // Create a physical table first using lance directly
2342        let schema = create_test_schema();
2343        let ipc_data = create_test_ipc_data(&schema);
2344
2345        let table_uri = format!("{}/external_table.lance", temp_path);
2346        let cursor = Cursor::new(ipc_data);
2347        let stream_reader = StreamReader::try_new(cursor, None).unwrap();
2348        let batches: Vec<_> = stream_reader
2349            .collect::<std::result::Result<Vec<_>, _>>()
2350            .unwrap();
2351        let schema = batches[0].schema();
2352        let batch_results: Vec<_> = batches.into_iter().map(Ok).collect();
2353        let reader = RecordBatchIterator::new(batch_results, schema);
2354        Dataset::write(Box::new(reader), &table_uri, None)
2355            .await
2356            .unwrap();
2357
2358        // Register the table
2359        let mut register_req = RegisterTableRequest::new("external_table.lance".to_string());
2360        register_req.id = Some(vec!["registered_table".to_string()]);
2361
2362        let response = namespace.register_table(register_req).await.unwrap();
2363        assert_eq!(response.location, "external_table.lance");
2364
2365        // Verify table exists in namespace
2366        let mut exists_req = TableExistsRequest::new();
2367        exists_req.id = Some(vec!["registered_table".to_string()]);
2368        assert!(namespace.table_exists(exists_req).await.is_ok());
2369
2370        // Verify we can list the table
2371        let mut list_req = ListTablesRequest::new();
2372        list_req.id = Some(vec![]);
2373        let tables = namespace.list_tables(list_req).await.unwrap();
2374        assert!(tables.tables.contains(&"registered_table".to_string()));
2375    }
2376
2377    #[tokio::test]
2378    async fn test_register_table_duplicate_fails() {
2379        use lance_namespace::models::RegisterTableRequest;
2380
2381        let temp_dir = TempStdDir::default();
2382        let temp_path = temp_dir.to_str().unwrap();
2383
2384        let namespace = DirectoryNamespaceBuilder::new(temp_path)
2385            .build()
2386            .await
2387            .unwrap();
2388
2389        // Register a table
2390        let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
2391        register_req.id = Some(vec!["test_table".to_string()]);
2392
2393        namespace
2394            .register_table(register_req.clone())
2395            .await
2396            .unwrap();
2397
2398        // Try to register again - should fail
2399        let result = namespace.register_table(register_req).await;
2400        assert!(result.is_err());
2401        assert!(result.unwrap_err().to_string().contains("already exists"));
2402    }
2403
2404    #[tokio::test]
2405    async fn test_deregister_table() {
2406        use lance_namespace::models::{DeregisterTableRequest, TableExistsRequest};
2407
2408        let temp_dir = TempStdDir::default();
2409        let temp_path = temp_dir.to_str().unwrap();
2410
2411        // Create namespace with manifest-only mode (no directory listing fallback)
2412        // This ensures deregistered tables are truly invisible
2413        let namespace = DirectoryNamespaceBuilder::new(temp_path)
2414            .manifest_enabled(true)
2415            .dir_listing_enabled(false)
2416            .build()
2417            .await
2418            .unwrap();
2419
2420        // Create a table
2421        let schema = create_test_schema();
2422        let ipc_data = create_test_ipc_data(&schema);
2423
2424        let mut create_req = CreateTableRequest::new();
2425        create_req.id = Some(vec!["test_table".to_string()]);
2426        namespace
2427            .create_table(create_req, bytes::Bytes::from(ipc_data))
2428            .await
2429            .unwrap();
2430
2431        // Verify table exists
2432        let mut exists_req = TableExistsRequest::new();
2433        exists_req.id = Some(vec!["test_table".to_string()]);
2434        assert!(namespace.table_exists(exists_req.clone()).await.is_ok());
2435
2436        // Deregister the table
2437        let mut deregister_req = DeregisterTableRequest::new();
2438        deregister_req.id = Some(vec!["test_table".to_string()]);
2439        let response = namespace.deregister_table(deregister_req).await.unwrap();
2440
2441        // Should return location and id
2442        assert!(
2443            response.location.is_some(),
2444            "Deregister should return location"
2445        );
2446        let location = response.location.as_ref().unwrap();
2447        // Location should be a proper file:// URI with the temp path
2448        // Use uri_to_url to normalize the temp path to a URL for comparison
2449        let expected_url = lance_io::object_store::uri_to_url(temp_path)
2450            .expect("Failed to convert temp path to URL");
2451        let expected_prefix = expected_url.to_string();
2452        assert!(
2453            location.starts_with(&expected_prefix),
2454            "Location should start with '{}', got: {}",
2455            expected_prefix,
2456            location
2457        );
2458        assert!(
2459            location.contains("test_table"),
2460            "Location should contain table name: {}",
2461            location
2462        );
2463        assert_eq!(response.id, Some(vec!["test_table".to_string()]));
2464
2465        // Verify table no longer exists in namespace (removed from manifest)
2466        assert!(namespace.table_exists(exists_req).await.is_err());
2467
2468        // Verify physical data still exists at the returned location
2469        let dataset = Dataset::open(location).await;
2470        assert!(
2471            dataset.is_ok(),
2472            "Physical table data should still exist at {}",
2473            location
2474        );
2475    }
2476
2477    #[tokio::test]
2478    async fn test_deregister_table_in_child_namespace() {
2479        use lance_namespace::models::{
2480            CreateNamespaceRequest, DeregisterTableRequest, TableExistsRequest,
2481        };
2482
2483        let temp_dir = TempStdDir::default();
2484        let temp_path = temp_dir.to_str().unwrap();
2485
2486        let namespace = DirectoryNamespaceBuilder::new(temp_path)
2487            .build()
2488            .await
2489            .unwrap();
2490
2491        // Create child namespace
2492        let mut create_ns_req = CreateNamespaceRequest::new();
2493        create_ns_req.id = Some(vec!["test_ns".to_string()]);
2494        namespace.create_namespace(create_ns_req).await.unwrap();
2495
2496        // Create a table in the child namespace
2497        let schema = create_test_schema();
2498        let ipc_data = create_test_ipc_data(&schema);
2499
2500        let mut create_req = CreateTableRequest::new();
2501        create_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
2502        namespace
2503            .create_table(create_req, bytes::Bytes::from(ipc_data))
2504            .await
2505            .unwrap();
2506
2507        // Deregister the table
2508        let mut deregister_req = DeregisterTableRequest::new();
2509        deregister_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
2510        let response = namespace.deregister_table(deregister_req).await.unwrap();
2511
2512        // Should return location and id in child namespace
2513        assert!(
2514            response.location.is_some(),
2515            "Deregister should return location"
2516        );
2517        let location = response.location.as_ref().unwrap();
2518        // Location should be a proper file:// URI with the temp path
2519        // Use uri_to_url to normalize the temp path to a URL for comparison
2520        let expected_url = lance_io::object_store::uri_to_url(temp_path)
2521            .expect("Failed to convert temp path to URL");
2522        let expected_prefix = expected_url.to_string();
2523        assert!(
2524            location.starts_with(&expected_prefix),
2525            "Location should start with '{}', got: {}",
2526            expected_prefix,
2527            location
2528        );
2529        assert!(
2530            location.contains("test_ns") && location.contains("test_table"),
2531            "Location should contain namespace and table name: {}",
2532            location
2533        );
2534        assert_eq!(
2535            response.id,
2536            Some(vec!["test_ns".to_string(), "test_table".to_string()])
2537        );
2538
2539        // Verify table no longer exists
2540        let mut exists_req = TableExistsRequest::new();
2541        exists_req.id = Some(vec!["test_ns".to_string(), "test_table".to_string()]);
2542        assert!(namespace.table_exists(exists_req).await.is_err());
2543    }
2544
2545    #[tokio::test]
2546    async fn test_register_deregister_without_manifest_fails() {
2547        use lance_namespace::models::{DeregisterTableRequest, RegisterTableRequest};
2548
2549        let temp_dir = TempStdDir::default();
2550        let temp_path = temp_dir.to_str().unwrap();
2551
2552        // Create namespace without manifest
2553        let namespace = DirectoryNamespaceBuilder::new(temp_path)
2554            .manifest_enabled(false)
2555            .build()
2556            .await
2557            .unwrap();
2558
2559        // Try to register - should fail
2560        let mut register_req = RegisterTableRequest::new("test_table.lance".to_string());
2561        register_req.id = Some(vec!["test_table".to_string()]);
2562        let result = namespace.register_table(register_req).await;
2563        assert!(result.is_err());
2564        assert!(result
2565            .unwrap_err()
2566            .to_string()
2567            .contains("manifest mode is enabled"));
2568
2569        // Try to deregister - should fail
2570        let mut deregister_req = DeregisterTableRequest::new();
2571        deregister_req.id = Some(vec!["test_table".to_string()]);
2572        let result = namespace.deregister_table(deregister_req).await;
2573        assert!(result.is_err());
2574        assert!(result
2575            .unwrap_err()
2576            .to_string()
2577            .contains("manifest mode is enabled"));
2578    }
2579
2580    #[tokio::test]
2581    async fn test_register_table_rejects_absolute_uri() {
2582        use lance_namespace::models::RegisterTableRequest;
2583
2584        let temp_dir = TempStdDir::default();
2585        let temp_path = temp_dir.to_str().unwrap();
2586
2587        let namespace = DirectoryNamespaceBuilder::new(temp_path)
2588            .build()
2589            .await
2590            .unwrap();
2591
2592        // Try to register with absolute URI - should fail
2593        let mut register_req = RegisterTableRequest::new("s3://bucket/table.lance".to_string());
2594        register_req.id = Some(vec!["test_table".to_string()]);
2595        let result = namespace.register_table(register_req).await;
2596        assert!(result.is_err());
2597        let err_msg = result.unwrap_err().to_string();
2598        assert!(err_msg.contains("Absolute URIs are not allowed"));
2599    }
2600
2601    #[tokio::test]
2602    async fn test_register_table_rejects_absolute_path() {
2603        use lance_namespace::models::RegisterTableRequest;
2604
2605        let temp_dir = TempStdDir::default();
2606        let temp_path = temp_dir.to_str().unwrap();
2607
2608        let namespace = DirectoryNamespaceBuilder::new(temp_path)
2609            .build()
2610            .await
2611            .unwrap();
2612
2613        // Try to register with absolute path - should fail
2614        let mut register_req = RegisterTableRequest::new("/tmp/table.lance".to_string());
2615        register_req.id = Some(vec!["test_table".to_string()]);
2616        let result = namespace.register_table(register_req).await;
2617        assert!(result.is_err());
2618        let err_msg = result.unwrap_err().to_string();
2619        assert!(err_msg.contains("Absolute paths are not allowed"));
2620    }
2621
2622    #[tokio::test]
2623    async fn test_register_table_rejects_path_traversal() {
2624        use lance_namespace::models::RegisterTableRequest;
2625
2626        let temp_dir = TempStdDir::default();
2627        let temp_path = temp_dir.to_str().unwrap();
2628
2629        let namespace = DirectoryNamespaceBuilder::new(temp_path)
2630            .build()
2631            .await
2632            .unwrap();
2633
2634        // Try to register with path traversal - should fail
2635        let mut register_req = RegisterTableRequest::new("../outside/table.lance".to_string());
2636        register_req.id = Some(vec!["test_table".to_string()]);
2637        let result = namespace.register_table(register_req).await;
2638        assert!(result.is_err());
2639        let err_msg = result.unwrap_err().to_string();
2640        assert!(err_msg.contains("Path traversal is not allowed"));
2641    }
2642
2643    #[tokio::test]
2644    async fn test_namespace_write() {
2645        use arrow::array::Int32Array;
2646        use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
2647        use arrow::record_batch::{RecordBatch, RecordBatchIterator};
2648        use lance::dataset::{Dataset, WriteMode, WriteParams};
2649        use lance_namespace::LanceNamespace;
2650
2651        let (namespace, _temp_dir) = create_test_namespace().await;
2652        let namespace = Arc::new(namespace) as Arc<dyn LanceNamespace>;
2653
2654        // Use child namespace instead of root
2655        let table_id = vec!["test_ns".to_string(), "test_table".to_string()];
2656        let schema = Arc::new(ArrowSchema::new(vec![
2657            ArrowField::new("a", DataType::Int32, false),
2658            ArrowField::new("b", DataType::Int32, false),
2659        ]));
2660
2661        // Test 1: CREATE mode
2662        let data1 = RecordBatch::try_new(
2663            schema.clone(),
2664            vec![
2665                Arc::new(Int32Array::from(vec![1, 2, 3])),
2666                Arc::new(Int32Array::from(vec![10, 20, 30])),
2667            ],
2668        )
2669        .unwrap();
2670
2671        let reader1 = RecordBatchIterator::new(vec![data1].into_iter().map(Ok), schema.clone());
2672        let dataset = Dataset::write_into_namespace(
2673            reader1,
2674            namespace.clone(),
2675            table_id.clone(),
2676            None,
2677            false,
2678        )
2679        .await
2680        .unwrap();
2681
2682        assert_eq!(dataset.count_rows(None).await.unwrap(), 3);
2683        assert_eq!(dataset.version().version, 1);
2684
2685        // Test 2: APPEND mode
2686        let data2 = RecordBatch::try_new(
2687            schema.clone(),
2688            vec![
2689                Arc::new(Int32Array::from(vec![4, 5])),
2690                Arc::new(Int32Array::from(vec![40, 50])),
2691            ],
2692        )
2693        .unwrap();
2694
2695        let params_append = WriteParams {
2696            mode: WriteMode::Append,
2697            ..Default::default()
2698        };
2699
2700        let reader2 = RecordBatchIterator::new(vec![data2].into_iter().map(Ok), schema.clone());
2701        let dataset = Dataset::write_into_namespace(
2702            reader2,
2703            namespace.clone(),
2704            table_id.clone(),
2705            Some(params_append),
2706            false,
2707        )
2708        .await
2709        .unwrap();
2710
2711        assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
2712        assert_eq!(dataset.version().version, 2);
2713
2714        // Test 3: OVERWRITE mode
2715        let data3 = RecordBatch::try_new(
2716            schema.clone(),
2717            vec![
2718                Arc::new(Int32Array::from(vec![100, 200])),
2719                Arc::new(Int32Array::from(vec![1000, 2000])),
2720            ],
2721        )
2722        .unwrap();
2723
2724        let params_overwrite = WriteParams {
2725            mode: WriteMode::Overwrite,
2726            ..Default::default()
2727        };
2728
2729        let reader3 = RecordBatchIterator::new(vec![data3].into_iter().map(Ok), schema.clone());
2730        let dataset = Dataset::write_into_namespace(
2731            reader3,
2732            namespace.clone(),
2733            table_id.clone(),
2734            Some(params_overwrite),
2735            false,
2736        )
2737        .await
2738        .unwrap();
2739
2740        assert_eq!(dataset.count_rows(None).await.unwrap(), 2);
2741        assert_eq!(dataset.version().version, 3);
2742
2743        // Verify old data was replaced
2744        let result = dataset.scan().try_into_batch().await.unwrap();
2745        let a_col = result
2746            .column_by_name("a")
2747            .unwrap()
2748            .as_any()
2749            .downcast_ref::<Int32Array>()
2750            .unwrap();
2751        assert_eq!(a_col.values(), &[100, 200]);
2752    }
2753}