Skip to main content

lance_namespace_impls/dir/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Manifest-based namespace implementation
5//!
6//! This module provides a namespace implementation that uses a manifest table
7//! to track tables and nested namespaces.
8
9use arrow::array::builder::{ListBuilder, StringBuilder};
10use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray};
11use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
12use arrow_ipc::reader::StreamReader;
13use async_trait::async_trait;
14use bytes::Bytes;
15use futures::{FutureExt, stream::StreamExt};
16use lance::dataset::optimize::{CompactionOptions, compact_files};
17use lance::dataset::{
18    DeleteBuilder, MergeInsertBuilder, ReadParams, WhenMatched, WhenNotMatched, WriteParams,
19    builder::DatasetBuilder,
20};
21use lance::session::Session;
22use lance::{Dataset, dataset::scanner::Scanner};
23use lance_core::Error as LanceError;
24use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION;
25use lance_core::{Error, Result, box_error};
26use lance_index::IndexType;
27use lance_index::optimize::OptimizeOptions;
28use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
29use lance_index::traits::DatasetIndexExt;
30use lance_io::object_store::{ObjectStore, ObjectStoreParams};
31use lance_namespace::LanceNamespace;
32use lance_namespace::error::NamespaceError;
33use lance_namespace::models::{
34    CreateNamespaceRequest, CreateNamespaceResponse, CreateTableRequest, CreateTableResponse,
35    DeclareTableRequest, DeclareTableResponse, DeregisterTableRequest, DeregisterTableResponse,
36    DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
37    DescribeTableResponse, DescribeTableVersionResponse, DropNamespaceRequest,
38    DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest,
39    ListNamespacesResponse, ListTableVersionsResponse, ListTablesRequest, ListTablesResponse,
40    NamespaceExistsRequest, RegisterTableRequest, RegisterTableResponse, TableExistsRequest,
41    TableVersion,
42};
43use lance_namespace::schema::arrow_schema_to_json;
44use object_store::path::Path;
45use std::io::Cursor;
46use std::{
47    collections::HashMap,
48    hash::{DefaultHasher, Hash, Hasher},
49    ops::{Deref, DerefMut},
50    sync::Arc,
51};
52use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
53
54const MANIFEST_TABLE_NAME: &str = "__manifest";
55const DELIMITER: &str = "$";
56
57// Index names for the __manifest table
58/// BTREE index on the object_id column for fast lookups
59const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
60/// Bitmap index on the object_type column for filtering by type
61const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
62/// LabelList index on the base_objects column for view dependencies
63const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
64
65/// Object types that can be stored in the manifest
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum ObjectType {
68    Namespace,
69    Table,
70    TableVersion,
71}
72
73impl ObjectType {
74    pub fn as_str(&self) -> &str {
75        match self {
76            Self::Namespace => "namespace",
77            Self::Table => "table",
78            Self::TableVersion => "table_version",
79        }
80    }
81
82    pub fn parse(s: &str) -> Result<Self> {
83        match s {
84            "namespace" => Ok(Self::Namespace),
85            "table" => Ok(Self::Table),
86            "table_version" => Ok(Self::TableVersion),
87            _ => Err(Error::io(format!("Invalid object type: {}", s))),
88        }
89    }
90}
91
92/// Information about a table stored in the manifest
93#[derive(Debug, Clone)]
94pub struct TableInfo {
95    pub namespace: Vec<String>,
96    pub name: String,
97    pub location: String,
98}
99
100/// An entry to be inserted into the manifest table.
101///
102/// This struct makes the meaning of each field explicit, replacing the
103/// previous tuple-based API `(String, ObjectType, Option<String>, Option<String>)`.
104#[derive(Debug, Clone)]
105pub struct ManifestEntry {
106    /// The unique object identifier (e.g., table name or version object_id)
107    pub object_id: String,
108    /// The type of the object (Namespace, Table, or TableVersion)
109    pub object_type: ObjectType,
110    /// The storage location (e.g., directory name for tables)
111    pub location: Option<String>,
112    /// Additional metadata serialized as JSON
113    pub metadata: Option<String>,
114}
115
116/// Information about a namespace stored in the manifest
117#[derive(Debug, Clone)]
118pub struct NamespaceInfo {
119    pub namespace: Vec<String>,
120    pub name: String,
121    pub metadata: Option<HashMap<String, String>>,
122}
123
124/// A wrapper around a Dataset that provides concurrent access.
125///
126/// This can be cloned cheaply. It supports concurrent reads or exclusive writes.
127/// The manifest dataset is always kept strongly consistent by reloading on each read.
128#[derive(Debug, Clone)]
129pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);
130
131impl DatasetConsistencyWrapper {
132    /// Create a new wrapper with the given dataset.
133    pub fn new(dataset: Dataset) -> Self {
134        Self(Arc::new(RwLock::new(dataset)))
135    }
136
137    /// Get an immutable reference to the dataset.
138    /// Always reloads to ensure strong consistency.
139    pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
140        self.reload().await?;
141        Ok(DatasetReadGuard {
142            guard: self.0.read().await,
143        })
144    }
145
146    /// Get a mutable reference to the dataset.
147    /// Always reloads to ensure strong consistency.
148    pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
149        self.reload().await?;
150        Ok(DatasetWriteGuard {
151            guard: self.0.write().await,
152        })
153    }
154
155    /// Provide a known latest version of the dataset.
156    ///
157    /// This is usually done after some write operation, which inherently will
158    /// have the latest version.
159    pub async fn set_latest(&self, dataset: Dataset) {
160        let mut write_guard = self.0.write().await;
161        if dataset.manifest().version > write_guard.manifest().version {
162            *write_guard = dataset;
163        }
164    }
165
166    /// Reload the dataset to the latest version.
167    async fn reload(&self) -> Result<()> {
168        // First check if we need to reload (with read lock)
169        let read_guard = self.0.read().await;
170        let dataset_uri = read_guard.uri().to_string();
171        let current_version = read_guard.version().version;
172        log::debug!(
173            "Reload starting for uri={}, current_version={}",
174            dataset_uri,
175            current_version
176        );
177        let latest_version = read_guard.latest_version_id().await.map_err(|e| {
178            Error::io_source(box_error(std::io::Error::other(format!(
179                "Failed to get latest version: {}",
180                e
181            ))))
182        })?;
183        log::debug!(
184            "Reload got latest_version={} for uri={}, current_version={}",
185            latest_version,
186            dataset_uri,
187            current_version
188        );
189        drop(read_guard);
190
191        // If already up-to-date, return early
192        if latest_version == current_version {
193            log::debug!("Already up-to-date for uri={}", dataset_uri);
194            return Ok(());
195        }
196
197        // Need to reload, acquire write lock
198        let mut write_guard = self.0.write().await;
199
200        // Double-check after acquiring write lock (someone else might have reloaded)
201        let latest_version = write_guard.latest_version_id().await.map_err(|e| {
202            Error::io_source(box_error(std::io::Error::other(format!(
203                "Failed to get latest version: {}",
204                e
205            ))))
206        })?;
207
208        if latest_version != write_guard.version().version {
209            write_guard.checkout_latest().await.map_err(|e| {
210                Error::io_source(box_error(std::io::Error::other(format!(
211                    "Failed to checkout latest: {}",
212                    e
213                ))))
214            })?;
215        }
216
217        Ok(())
218    }
219}
220
221pub struct DatasetReadGuard<'a> {
222    guard: RwLockReadGuard<'a, Dataset>,
223}
224
225impl Deref for DatasetReadGuard<'_> {
226    type Target = Dataset;
227
228    fn deref(&self) -> &Self::Target {
229        &self.guard
230    }
231}
232
233pub struct DatasetWriteGuard<'a> {
234    guard: RwLockWriteGuard<'a, Dataset>,
235}
236
237impl Deref for DatasetWriteGuard<'_> {
238    type Target = Dataset;
239
240    fn deref(&self) -> &Self::Target {
241        &self.guard
242    }
243}
244
245impl DerefMut for DatasetWriteGuard<'_> {
246    fn deref_mut(&mut self) -> &mut Self::Target {
247        &mut self.guard
248    }
249}
250
251/// Manifest-based namespace implementation
252///
253/// Uses a special `__manifest` Lance table to track tables and nested namespaces.
254pub struct ManifestNamespace {
255    root: String,
256    storage_options: Option<HashMap<String, String>>,
257    #[allow(dead_code)]
258    session: Option<Arc<Session>>,
259    #[allow(dead_code)]
260    object_store: Arc<ObjectStore>,
261    #[allow(dead_code)]
262    base_path: Path,
263    manifest_dataset: DatasetConsistencyWrapper,
264    /// Whether directory listing is enabled in dual mode
265    /// If true, root namespace tables use {table_name}.lance naming
266    /// If false, they use namespace-prefixed names
267    dir_listing_enabled: bool,
268    /// Whether to perform inline optimization (compaction and indexing) on the __manifest table
269    /// after every write. Defaults to true.
270    inline_optimization_enabled: bool,
271    /// Number of retries for commit operations on the manifest table.
272    /// If None, defaults to [`lance_table::io::commit::CommitConfig`] default (20).
273    commit_retries: Option<u32>,
274}
275
276impl std::fmt::Debug for ManifestNamespace {
277    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278        f.debug_struct("ManifestNamespace")
279            .field("root", &self.root)
280            .field("storage_options", &self.storage_options)
281            .field("dir_listing_enabled", &self.dir_listing_enabled)
282            .field(
283                "inline_optimization_enabled",
284                &self.inline_optimization_enabled,
285            )
286            .finish()
287    }
288}
289
290/// Convert a Lance commit error to an appropriate namespace error.
291///
292/// Maps lance commit errors to namespace errors:
293/// - `CommitConflict`: version collision retries exhausted -> Throttled (safe to retry)
294/// - `TooMuchWriteContention`: RetryableCommitConflict (semantic conflict) retries exhausted -> ConcurrentModification
295/// - `IncompatibleTransaction`: incompatible concurrent change -> ConcurrentModification
296/// - Errors containing "matched/duplicate/already exists": ConcurrentModification (from WhenMatched::Fail)
297/// - Other errors: IO error with the operation description
298fn convert_lance_commit_error(e: &LanceError, operation: &str, object_id: Option<&str>) -> Error {
299    match e {
300        // CommitConflict: version collision retries exhausted -> Throttled (safe to retry)
301        LanceError::CommitConflict { .. } => NamespaceError::Throttled {
302            message: format!("Too many concurrent writes, please retry later: {:?}", e),
303        }
304        .into(),
305        // TooMuchWriteContention: RetryableCommitConflict (semantic conflict) retries exhausted -> ConcurrentModification
306        // IncompatibleTransaction: incompatible concurrent change -> ConcurrentModification
307        LanceError::TooMuchWriteContention { .. } | LanceError::IncompatibleTransaction { .. } => {
308            let message = if let Some(id) = object_id {
309                format!(
310                    "Object '{}' was concurrently modified by another operation: {:?}",
311                    id, e
312                )
313            } else {
314                format!(
315                    "Object was concurrently modified by another operation: {:?}",
316                    e
317                )
318            };
319            NamespaceError::ConcurrentModification { message }.into()
320        }
321        // Other errors: check message for semantic conflicts (matched/duplicate from WhenMatched::Fail)
322        _ => {
323            let error_msg = e.to_string();
324            if error_msg.contains("matched")
325                || error_msg.contains("duplicate")
326                || error_msg.contains("already exists")
327            {
328                let message = if let Some(id) = object_id {
329                    format!(
330                        "Object '{}' was concurrently created by another operation: {:?}",
331                        id, e
332                    )
333                } else {
334                    format!(
335                        "Object was concurrently created by another operation: {:?}",
336                        e
337                    )
338                };
339                return NamespaceError::ConcurrentModification { message }.into();
340            }
341            Error::io_source(box_error(std::io::Error::other(format!(
342                "{}: {:?}",
343                operation, e
344            ))))
345        }
346    }
347}
348
349impl ManifestNamespace {
350    /// Create a new ManifestNamespace from an existing DirectoryNamespace
351    #[allow(clippy::too_many_arguments)]
352    pub async fn from_directory(
353        root: String,
354        storage_options: Option<HashMap<String, String>>,
355        session: Option<Arc<Session>>,
356        object_store: Arc<ObjectStore>,
357        base_path: Path,
358        dir_listing_enabled: bool,
359        inline_optimization_enabled: bool,
360        commit_retries: Option<u32>,
361        table_version_storage_enabled: bool,
362    ) -> Result<Self> {
363        let manifest_dataset = Self::ensure_manifest_table_up_to_date(
364            &root,
365            &storage_options,
366            session.clone(),
367            table_version_storage_enabled,
368        )
369        .await?;
370
371        Ok(Self {
372            root,
373            storage_options,
374            session,
375            object_store,
376            base_path,
377            manifest_dataset,
378            dir_listing_enabled,
379            inline_optimization_enabled,
380            commit_retries,
381        })
382    }
383
384    /// Build object ID from namespace path and name
385    pub fn build_object_id(namespace: &[String], name: &str) -> String {
386        if namespace.is_empty() {
387            name.to_string()
388        } else {
389            let mut id = namespace.join(DELIMITER);
390            id.push_str(DELIMITER);
391            id.push_str(name);
392            id
393        }
394    }
395
396    /// Parse object ID into namespace path and name
397    pub fn parse_object_id(object_id: &str) -> (Vec<String>, String) {
398        let parts: Vec<&str> = object_id.split(DELIMITER).collect();
399        if parts.len() == 1 {
400            (Vec::new(), parts[0].to_string())
401        } else {
402            let namespace = parts[..parts.len() - 1]
403                .iter()
404                .map(|s| s.to_string())
405                .collect();
406            let name = parts[parts.len() - 1].to_string();
407            (namespace, name)
408        }
409    }
410
411    /// Split an object ID (vec of strings) into namespace and table name
412    pub fn split_object_id(object_id: &[String]) -> (Vec<String>, String) {
413        if object_id.len() == 1 {
414            (vec![], object_id[0].clone())
415        } else {
416            (
417                object_id[..object_id.len() - 1].to_vec(),
418                object_id[object_id.len() - 1].clone(),
419            )
420        }
421    }
422
423    /// Convert an ID (vec of strings) to an object_id string
424    pub fn str_object_id(object_id: &[String]) -> String {
425        object_id.join(DELIMITER)
426    }
427
428    /// Format a version number as a zero-padded lexicographically sortable string.
429    ///
430    /// Versions are stored as 20-digit zero-padded integers (e.g., `00000000000000000001`
431    /// for version 1) so that string-based range queries and sorting work correctly.
432    pub fn format_table_version(version: i64) -> String {
433        format!("{:020}", version)
434    }
435
436    /// Build the object_id for a table version entry.
437    ///
438    /// Format: `{table_object_id}${zero_padded_version}`
439    pub fn build_version_object_id(table_object_id: &str, version: i64) -> String {
440        format!(
441            "{}{}{}",
442            table_object_id,
443            DELIMITER,
444            Self::format_table_version(version)
445        )
446    }
447
448    /// Parse a version number from the version suffix of a table version object_id.
449    ///
450    /// The object_id is formatted as `{table_id}${zero_padded_version}`.
451    pub fn parse_version_from_object_id(object_id: &str) -> Option<i64> {
452        let (_namespace, name) = Self::parse_object_id(object_id);
453        name.parse::<i64>().ok()
454    }
455
456    /// Generate a new directory name in format: `<hash>_<object_id>`
457    /// The hash is used to (1) optimize object store throughput,
458    /// (2) have high enough entropy in a short period of time to prevent issues like
459    /// failed table creation, delete and create new table of the same name, etc.
460    /// The object_id is added after the hash to ensure
461    /// dir name uniqueness and make debugging easier.
462    pub fn generate_dir_name(object_id: &str) -> String {
463        // Generate a random number for uniqueness
464        let random_num: u64 = rand::random();
465
466        // Create hash from random number + object_id
467        let mut hasher = DefaultHasher::new();
468        random_num.hash(&mut hasher);
469        object_id.hash(&mut hasher);
470        let hash = hasher.finish();
471
472        // Format as lowercase hex (8 characters - sufficient entropy for uniqueness)
473        format!("{:08x}_{}", (hash & 0xFFFFFFFF) as u32, object_id)
474    }
475
476    /// Construct a full URI from root and relative location
477    pub(crate) fn construct_full_uri(root: &str, relative_location: &str) -> Result<String> {
478        let mut base_url = lance_io::object_store::uri_to_url(root)?;
479
480        // Ensure the base URL has a trailing slash so that URL.join() appends
481        // rather than replaces the last path segment.
482        // Without this fix, "s3://bucket/path/subdir".join("table.lance")
483        // would incorrectly produce "s3://bucket/path/table.lance" (missing subdir).
484        if !base_url.path().ends_with('/') {
485            base_url.set_path(&format!("{}/", base_url.path()));
486        }
487
488        let full_url = base_url.join(relative_location).map_err(|e| {
489            Error::invalid_input_source(
490                format!(
491                    "Failed to join URI '{}' with '{}': {:?}",
492                    root, relative_location, e
493                )
494                .into(),
495            )
496        })?;
497
498        Ok(full_url.to_string())
499    }
500
501    /// Perform inline optimization on the __manifest table.
502    ///
503    /// This method:
504    /// 1. Creates three indexes on the manifest table:
505    ///    - BTREE index on object_id for fast lookups
506    ///    - Bitmap index on object_type for filtering by type
507    ///    - LabelList index on base_objects for view dependencies
508    /// 2. Runs file compaction to merge small files
509    /// 3. Optimizes existing indices
510    ///
511    /// This is called automatically after writes when inline_optimization_enabled is true.
512    async fn run_inline_optimization(&self) -> Result<()> {
513        if !self.inline_optimization_enabled {
514            return Ok(());
515        }
516
517        // Get a mutable reference to the dataset to perform optimization
518        let mut dataset_guard = self.manifest_dataset.get_mut().await?;
519        let dataset: &mut Dataset = &mut dataset_guard;
520
521        // Step 1: Create indexes if they don't already exist
522        let indices = dataset.load_indices().await?;
523
524        // Check which indexes already exist
525        let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
526        let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
527        let has_base_objects_index = indices
528            .iter()
529            .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);
530
531        // Create BTREE index on object_id
532        if !has_object_id_index {
533            log::debug!(
534                "Creating BTREE index '{}' on object_id for __manifest table",
535                OBJECT_ID_INDEX_NAME
536            );
537            let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
538            if let Err(e) = dataset
539                .create_index(
540                    &["object_id"],
541                    IndexType::BTree,
542                    Some(OBJECT_ID_INDEX_NAME.to_string()),
543                    &params,
544                    true,
545                )
546                .await
547            {
548                log::warn!(
549                    "Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.",
550                    e
551                );
552            } else {
553                log::info!(
554                    "Created BTREE index '{}' on object_id for __manifest table",
555                    OBJECT_ID_INDEX_NAME
556                );
557            }
558        }
559
560        // Create Bitmap index on object_type
561        if !has_object_type_index {
562            log::debug!(
563                "Creating Bitmap index '{}' on object_type for __manifest table",
564                OBJECT_TYPE_INDEX_NAME
565            );
566            let params = ScalarIndexParams::default();
567            if let Err(e) = dataset
568                .create_index(
569                    &["object_type"],
570                    IndexType::Bitmap,
571                    Some(OBJECT_TYPE_INDEX_NAME.to_string()),
572                    &params,
573                    true,
574                )
575                .await
576            {
577                log::warn!(
578                    "Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.",
579                    e
580                );
581            } else {
582                log::info!(
583                    "Created Bitmap index '{}' on object_type for __manifest table",
584                    OBJECT_TYPE_INDEX_NAME
585                );
586            }
587        }
588
589        // Create LabelList index on base_objects
590        if !has_base_objects_index {
591            log::debug!(
592                "Creating LabelList index '{}' on base_objects for __manifest table",
593                BASE_OBJECTS_INDEX_NAME
594            );
595            let params = ScalarIndexParams::default();
596            if let Err(e) = dataset
597                .create_index(
598                    &["base_objects"],
599                    IndexType::LabelList,
600                    Some(BASE_OBJECTS_INDEX_NAME.to_string()),
601                    &params,
602                    true,
603                )
604                .await
605            {
606                log::warn!(
607                    "Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.",
608                    e
609                );
610            } else {
611                log::info!(
612                    "Created LabelList index '{}' on base_objects for __manifest table",
613                    BASE_OBJECTS_INDEX_NAME
614                );
615            }
616        }
617
618        // Step 2: Run file compaction
619        log::debug!("Running file compaction on __manifest table");
620        match compact_files(dataset, CompactionOptions::default(), None).await {
621            Ok(compaction_metrics) => {
622                if compaction_metrics.fragments_removed > 0 {
623                    log::info!(
624                        "Compacted __manifest table: removed {} fragments, added {} fragments",
625                        compaction_metrics.fragments_removed,
626                        compaction_metrics.fragments_added
627                    );
628                }
629            }
630            Err(e) => {
631                log::warn!(
632                    "Failed to compact files for __manifest table: {:?}. Continuing with optimization.",
633                    e
634                );
635            }
636        }
637
638        // Step 3: Optimize indices
639        log::debug!("Optimizing indices on __manifest table");
640        match dataset.optimize_indices(&OptimizeOptions::default()).await {
641            Ok(_) => {
642                log::info!("Successfully optimized indices on __manifest table");
643            }
644            Err(e) => {
645                log::warn!(
646                    "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.",
647                    e
648                );
649            }
650        }
651
652        Ok(())
653    }
654
655    /// Get the manifest schema
656    fn manifest_schema() -> Arc<ArrowSchema> {
657        Arc::new(ArrowSchema::new(vec![
658            // Set unenforced primary key on object_id for bloom filter conflict detection
659            Field::new("object_id", DataType::Utf8, false).with_metadata(
660                [(
661                    LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(),
662                    "0".to_string(),
663                )]
664                .into_iter()
665                .collect(),
666            ),
667            Field::new("object_type", DataType::Utf8, false),
668            Field::new("location", DataType::Utf8, true),
669            Field::new("metadata", DataType::Utf8, true),
670            Field::new(
671                "base_objects",
672                DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))),
673                true,
674            ),
675        ]))
676    }
677
678    /// Get a scanner for the manifest dataset
679    async fn manifest_scanner(&self) -> Result<Scanner> {
680        let dataset_guard = self.manifest_dataset.get().await?;
681        Ok(dataset_guard.scan())
682    }
683
684    /// Helper to execute a scanner and collect results into a Vec
685    async fn execute_scanner(scanner: Scanner) -> Result<Vec<RecordBatch>> {
686        let mut stream = scanner.try_into_stream().await.map_err(|e| {
687            Error::io_source(box_error(std::io::Error::other(format!(
688                "Failed to create stream: {}",
689                e
690            ))))
691        })?;
692
693        let mut batches = Vec::new();
694        while let Some(batch) = stream.next().await {
695            batches.push(batch.map_err(|e| {
696                Error::io_source(box_error(std::io::Error::other(format!(
697                    "Failed to read batch: {}",
698                    e
699                ))))
700            })?);
701        }
702
703        Ok(batches)
704    }
705
706    /// Helper to get a string column from a record batch
707    fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> {
708        let column = batch
709            .column_by_name(column_name)
710            .ok_or_else(|| Error::io(format!("Column '{}' not found", column_name)))?;
711        column
712            .as_any()
713            .downcast_ref::<StringArray>()
714            .ok_or_else(|| Error::io(format!("Column '{}' is not a string array", column_name)))
715    }
716
717    /// Check if the manifest contains an object with the given ID
718    async fn manifest_contains_object(&self, object_id: &str) -> Result<bool> {
719        let escaped_id = object_id.replace('\'', "''");
720        let filter = format!("object_id = '{}'", escaped_id);
721
722        let dataset_guard = self.manifest_dataset.get().await?;
723        let mut scanner = dataset_guard.scan();
724
725        scanner.filter(&filter).map_err(|e| {
726            Error::io_source(box_error(std::io::Error::other(format!(
727                "Failed to filter: {}",
728                e
729            ))))
730        })?;
731
732        // Project no columns and enable row IDs for count_rows to work
733        scanner.project::<&str>(&[]).map_err(|e| {
734            Error::io_source(box_error(std::io::Error::other(format!(
735                "Failed to project: {}",
736                e
737            ))))
738        })?;
739
740        scanner.with_row_id();
741
742        let count = scanner.count_rows().await.map_err(|e| {
743            Error::io_source(box_error(std::io::Error::other(format!(
744                "Failed to count rows: {}",
745                e
746            ))))
747        })?;
748
749        Ok(count > 0)
750    }
751
752    /// Query the manifest for a table with the given object ID
753    async fn query_manifest_for_table(&self, object_id: &str) -> Result<Option<TableInfo>> {
754        let escaped_id = object_id.replace('\'', "''");
755        let filter = format!("object_id = '{}' AND object_type = 'table'", escaped_id);
756        let mut scanner = self.manifest_scanner().await?;
757        scanner.filter(&filter).map_err(|e| {
758            Error::io_source(box_error(std::io::Error::other(format!(
759                "Failed to filter: {}",
760                e
761            ))))
762        })?;
763        scanner.project(&["object_id", "location"]).map_err(|e| {
764            Error::io_source(box_error(std::io::Error::other(format!(
765                "Failed to project: {}",
766                e
767            ))))
768        })?;
769        let batches = Self::execute_scanner(scanner).await?;
770
771        let mut found_result: Option<TableInfo> = None;
772        let mut total_rows = 0;
773
774        for batch in batches {
775            if batch.num_rows() == 0 {
776                continue;
777            }
778
779            total_rows += batch.num_rows();
780            if total_rows > 1 {
781                return Err(Error::io(format!(
782                    "Expected exactly 1 table with id '{}', found {}",
783                    object_id, total_rows
784                )));
785            }
786
787            let object_id_array = Self::get_string_column(&batch, "object_id")?;
788            let location_array = Self::get_string_column(&batch, "location")?;
789            let location = location_array.value(0).to_string();
790            let (namespace, name) = Self::parse_object_id(object_id_array.value(0));
791            found_result = Some(TableInfo {
792                namespace,
793                name,
794                location,
795            });
796        }
797
798        Ok(found_result)
799    }
800
801    /// List all table locations in the manifest (for root namespace only)
802    /// Returns a set of table locations (e.g., "table_name.lance")
803    pub async fn list_manifest_table_locations(&self) -> Result<std::collections::HashSet<String>> {
804        let filter = "object_type = 'table' AND NOT contains(object_id, '$')";
805        let mut scanner = self.manifest_scanner().await?;
806        scanner.filter(filter).map_err(|e| {
807            Error::io_source(box_error(std::io::Error::other(format!(
808                "Failed to filter: {}",
809                e
810            ))))
811        })?;
812        scanner.project(&["location"]).map_err(|e| {
813            Error::io_source(box_error(std::io::Error::other(format!(
814                "Failed to project: {}",
815                e
816            ))))
817        })?;
818
819        let batches = Self::execute_scanner(scanner).await?;
820        let mut locations = std::collections::HashSet::new();
821
822        for batch in batches {
823            if batch.num_rows() == 0 {
824                continue;
825            }
826            let location_array = Self::get_string_column(&batch, "location")?;
827            for i in 0..location_array.len() {
828                locations.insert(location_array.value(i).to_string());
829            }
830        }
831
832        Ok(locations)
833    }
834
835    /// Insert an entry into the manifest table
836    async fn insert_into_manifest(
837        &self,
838        object_id: String,
839        object_type: ObjectType,
840        location: Option<String>,
841    ) -> Result<()> {
842        self.insert_into_manifest_with_metadata(
843            vec![ManifestEntry {
844                object_id,
845                object_type,
846                location,
847                metadata: None,
848            }],
849            None,
850        )
851        .await
852    }
853
854    /// Insert one or more entries into the manifest table with metadata and base_objects.
855    ///
856    /// This is the unified entry point for both single and batch inserts.
857    /// Uses a single MergeInsert operation to insert all entries at once.
858    /// If any entry already exists (matching object_id), the entire batch fails.
859    pub async fn insert_into_manifest_with_metadata(
860        &self,
861        entries: Vec<ManifestEntry>,
862        base_objects: Option<Vec<String>>,
863    ) -> Result<()> {
864        if entries.is_empty() {
865            return Ok(());
866        }
867
868        let schema = Self::manifest_schema();
869
870        let mut object_ids = Vec::with_capacity(entries.len());
871        let mut object_types = Vec::with_capacity(entries.len());
872        let mut locations: Vec<Option<String>> = Vec::with_capacity(entries.len());
873        let mut metadatas: Vec<Option<String>> = Vec::with_capacity(entries.len());
874
875        let string_builder = StringBuilder::new();
876        let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
877            "object_id",
878            DataType::Utf8,
879            true,
880        )));
881
882        for (i, entry) in entries.iter().enumerate() {
883            object_ids.push(entry.object_id.as_str());
884            object_types.push(entry.object_type.as_str());
885            locations.push(entry.location.clone());
886            metadatas.push(entry.metadata.clone());
887
888            // Only the first entry gets the base_objects (for single-entry inserts
889            // with base_objects like view creation); batch entries use null.
890            if i == 0 {
891                match &base_objects {
892                    Some(objects) => {
893                        for obj in objects {
894                            list_builder.values().append_value(obj);
895                        }
896                        list_builder.append(true);
897                    }
898                    None => {
899                        list_builder.append_null();
900                    }
901                }
902            } else {
903                list_builder.append_null();
904            }
905        }
906
907        let base_objects_array = list_builder.finish();
908
909        let location_array: Arc<dyn Array> = Arc::new(StringArray::from(
910            locations.iter().map(|l| l.as_deref()).collect::<Vec<_>>(),
911        ));
912
913        let metadata_array: Arc<dyn Array> = Arc::new(StringArray::from(
914            metadatas.iter().map(|m| m.as_deref()).collect::<Vec<_>>(),
915        ));
916
917        let batch = RecordBatch::try_new(
918            schema.clone(),
919            vec![
920                Arc::new(StringArray::from(object_ids)),
921                Arc::new(StringArray::from(object_types.to_vec())),
922                location_array,
923                metadata_array,
924                Arc::new(base_objects_array),
925            ],
926        )
927        .map_err(|e| Error::io(format!("Failed to create manifest entries: {}", e)))?;
928
929        let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
930
931        // Use MergeInsert to ensure uniqueness on object_id
932        let dataset_guard = self.manifest_dataset.get().await?;
933        let dataset_arc = Arc::new(dataset_guard.clone());
934        drop(dataset_guard); // Drop read guard before merge insert
935
936        let mut merge_builder =
937            MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()]).map_err(
938                |e| {
939                    Error::io_source(box_error(std::io::Error::other(format!(
940                        "Failed to create merge builder: {}",
941                        e
942                    ))))
943                },
944            )?;
945        merge_builder.when_matched(WhenMatched::Fail);
946        merge_builder.when_not_matched(WhenNotMatched::InsertAll);
947        // Use conflict_retries to handle cross-process races on manifest mutations.
948        // When two processes concurrently insert the same object_id, the second one
949        // hits a commit conflict. With conflict_retries > 0, the retry re-evaluates
950        // the full MergeInsert plan against the latest data, where the join detects
951        // the existing row and WhenMatched::Fail fires, producing a clear error.
952        merge_builder.conflict_retries(5);
953        // TODO: after BTREE index creation on object_id, has_scalar_index=true causes
954        // MergeInsert to use V1 path which lacks bloom filters for conflict detection. This
955        // results in (Some, None) filter mismatch when rebasing against V2 operations.
956        // Setting use_index=false ensures all operations consistently use V2 path.
957        merge_builder.use_index(false);
958        if let Some(retries) = self.commit_retries {
959            merge_builder.commit_retries(retries);
960        }
961
962        let (new_dataset_arc, _merge_stats) = merge_builder
963            .try_build()
964            .map_err(|e| {
965                Error::io_source(box_error(std::io::Error::other(format!(
966                    "Failed to build merge: {}",
967                    e
968                ))))
969            })?
970            .execute_reader(Box::new(reader))
971            .await
972            .map_err(|e| {
973                convert_lance_commit_error(&e, "Failed to execute merge insert into manifest", None)
974            })?;
975
976        let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
977        self.manifest_dataset.set_latest(new_dataset).await;
978
979        // Run inline optimization after write
980        if let Err(e) = self.run_inline_optimization().await {
981            log::warn!(
982                "Unexpected failure when running inline optimization: {:?}",
983                e
984            );
985        }
986
987        Ok(())
988    }
989
990    /// Delete an entry from the manifest table
991    pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> {
992        let predicate = format!("object_id = '{}'", object_id);
993
994        // Get dataset and use DeleteBuilder with configured retries
995        let dataset_guard = self.manifest_dataset.get().await?;
996        let dataset = Arc::new(dataset_guard.clone());
997        drop(dataset_guard); // Drop read guard before delete
998
999        let new_dataset = DeleteBuilder::new(dataset, &predicate)
1000            .execute()
1001            .await
1002            .map_err(|e| convert_lance_commit_error(&e, "Failed to delete", None))?;
1003
1004        // Update the wrapper with the new dataset
1005        self.manifest_dataset
1006            .set_latest(
1007                Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1008            )
1009            .await;
1010
1011        // Run inline optimization after delete
1012        if let Err(e) = self.run_inline_optimization().await {
1013            log::warn!(
1014                "Unexpected failure when running inline optimization: {:?}",
1015                e
1016            );
1017        }
1018
1019        Ok(())
1020    }
1021
1022    /// Query the manifest for all versions of a table, sorted by version.
1023    ///
1024    /// Returns a list of (version, metadata_json_string) tuples where metadata_json_string
1025    /// contains the full metadata JSON stored in the manifest (manifest_path, manifest_size,
1026    /// e_tag, naming_scheme).
1027    ///
1028    /// **Known limitation**: All matching rows are loaded into memory, sorted in Rust,
1029    /// and then truncated. For tables with a very large number of versions this may be
1030    /// expensive. Pushing sort/limit into the scan is not yet supported by Lance.
1031    pub async fn query_table_versions(
1032        &self,
1033        object_id: &str,
1034        descending: bool,
1035        limit: Option<i32>,
1036    ) -> Result<Vec<(i64, String)>> {
1037        let escaped_id = object_id.replace('\'', "''");
1038        // table_version object_ids are formatted as "{object_id}${zero_padded_version}"
1039        let filter = format!(
1040            "object_type = 'table_version' AND starts_with(object_id, '{}{}')",
1041            escaped_id, DELIMITER
1042        );
1043        let mut scanner = self.manifest_scanner().await?;
1044        scanner.filter(&filter).map_err(|e| {
1045            Error::io_source(box_error(std::io::Error::other(format!(
1046                "Failed to filter: {}",
1047                e
1048            ))))
1049        })?;
1050        scanner.project(&["object_id", "metadata"]).map_err(|e| {
1051            Error::io_source(box_error(std::io::Error::other(format!(
1052                "Failed to project: {}",
1053                e
1054            ))))
1055        })?;
1056        let batches = Self::execute_scanner(scanner).await?;
1057
1058        let mut versions: Vec<(i64, String)> = Vec::new();
1059        for batch in batches {
1060            if batch.num_rows() == 0 {
1061                continue;
1062            }
1063            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1064            let metadata_array = Self::get_string_column(&batch, "metadata")?;
1065            for i in 0..batch.num_rows() {
1066                let oid = object_id_array.value(i);
1067                // Parse version from object_id
1068                if let Some(version) = Self::parse_version_from_object_id(oid) {
1069                    let metadata_str = metadata_array.value(i).to_string();
1070                    versions.push((version, metadata_str));
1071                }
1072            }
1073        }
1074
1075        if descending {
1076            versions.sort_by(|a, b| b.0.cmp(&a.0));
1077        } else {
1078            versions.sort_by(|a, b| a.0.cmp(&b.0));
1079        }
1080
1081        if let Some(limit) = limit {
1082            versions.truncate(limit as usize);
1083        }
1084
1085        Ok(versions)
1086    }
1087
1088    /// Query the manifest for a specific version of a table.
1089    ///
1090    /// Returns the full metadata JSON string if found, which contains
1091    /// manifest_path, manifest_size, e_tag, and naming_scheme.
1092    ///
1093    pub async fn query_table_version(
1094        &self,
1095        object_id: &str,
1096        version: i64,
1097    ) -> Result<Option<String>> {
1098        let version_object_id = Self::build_version_object_id(object_id, version);
1099        self.query_table_version_by_object_id(&version_object_id)
1100            .await
1101    }
1102
1103    /// Query a specific table version by its exact object_id.
1104    async fn query_table_version_by_object_id(
1105        &self,
1106        version_object_id: &str,
1107    ) -> Result<Option<String>> {
1108        let escaped_id = version_object_id.replace('\'', "''");
1109        let filter = format!(
1110            "object_id = '{}' AND object_type = 'table_version'",
1111            escaped_id
1112        );
1113        let mut scanner = self.manifest_scanner().await?;
1114        scanner.filter(&filter).map_err(|e| {
1115            Error::io_source(box_error(std::io::Error::other(format!(
1116                "Failed to filter: {}",
1117                e
1118            ))))
1119        })?;
1120        scanner.project(&["metadata"]).map_err(|e| {
1121            Error::io_source(box_error(std::io::Error::other(format!(
1122                "Failed to project: {}",
1123                e
1124            ))))
1125        })?;
1126        let batches = Self::execute_scanner(scanner).await?;
1127
1128        for batch in batches {
1129            if batch.num_rows() == 0 {
1130                continue;
1131            }
1132            let metadata_array = Self::get_string_column(&batch, "metadata")?;
1133            return Ok(Some(metadata_array.value(0).to_string()));
1134        }
1135
1136        Ok(None)
1137    }
1138
1139    /// Delete table version entries from the manifest for a given table and version ranges.
1140    ///
1141    /// Each range is (start_version, end_version) inclusive. Deletes all matching
1142    /// `object_type = 'table_version'` entries whose object_id matches
1143    /// `{object_id}${zero_padded_version}`.
1144    ///
1145    /// Builds a single filter expression covering all version ranges and executes
1146    /// one bulk delete operation instead of deleting versions one at a time.
1147    pub async fn delete_table_versions(
1148        &self,
1149        object_id: &str,
1150        ranges: &[(i64, i64)],
1151    ) -> Result<i64> {
1152        if ranges.is_empty() {
1153            return Ok(0);
1154        }
1155
1156        // Collect all object_ids to delete (both new zero-padded and legacy formats)
1157        let mut object_id_conditions: Vec<String> = Vec::new();
1158        for (start, end) in ranges {
1159            for version in *start..=*end {
1160                let oid = Self::build_version_object_id(object_id, version);
1161                let escaped = oid.replace('\'', "''");
1162                object_id_conditions.push(format!("'{}'", escaped));
1163            }
1164        }
1165
1166        if object_id_conditions.is_empty() {
1167            return Ok(0);
1168        }
1169
1170        // First, count how many entries exist so we can report the deleted count
1171        let in_list = object_id_conditions.join(", ");
1172        let filter = format!(
1173            "object_type = 'table_version' AND object_id IN ({})",
1174            in_list
1175        );
1176
1177        let mut scanner = self.manifest_scanner().await?;
1178        scanner.filter(&filter).map_err(|e| {
1179            Error::io_source(box_error(std::io::Error::other(format!(
1180                "Failed to filter: {}",
1181                e
1182            ))))
1183        })?;
1184        scanner.project(&["object_id"]).map_err(|e| {
1185            Error::io_source(box_error(std::io::Error::other(format!(
1186                "Failed to project: {}",
1187                e
1188            ))))
1189        })?;
1190        let batches = Self::execute_scanner(scanner).await?;
1191        let deleted_count: i64 = batches.iter().map(|b| b.num_rows() as i64).sum();
1192
1193        if deleted_count == 0 {
1194            return Ok(0);
1195        }
1196
1197        // Execute a single bulk delete with the combined filter
1198        let dataset_guard = self.manifest_dataset.get().await?;
1199        let dataset = Arc::new(dataset_guard.clone());
1200        drop(dataset_guard);
1201
1202        let new_dataset = DeleteBuilder::new(dataset, &filter)
1203            .execute()
1204            .await
1205            .map_err(|e| {
1206                convert_lance_commit_error(&e, "Failed to batch delete table versions", None)
1207            })?;
1208
1209        self.manifest_dataset
1210            .set_latest(
1211                Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1212            )
1213            .await;
1214
1215        if let Err(e) = self.run_inline_optimization().await {
1216            log::warn!(
1217                "Unexpected failure when running inline optimization: {:?}",
1218                e
1219            );
1220        }
1221
1222        Ok(deleted_count)
1223    }
1224
1225    /// Atomically delete table version entries from the manifest by their object_ids.
1226    ///
1227    /// This method supports multi-table transactional deletion: all specified
1228    /// object_ids (which may span multiple tables) are deleted in a single atomic
1229    /// `DeleteBuilder` operation. Either all entries are removed or none are.
1230    ///
1231    /// Object IDs are formatted as `{table_id}${version}`.
1232    pub async fn batch_delete_table_versions_by_object_ids(
1233        &self,
1234        object_ids: &[String],
1235    ) -> Result<i64> {
1236        if object_ids.is_empty() {
1237            return Ok(0);
1238        }
1239
1240        let in_list: String = object_ids
1241            .iter()
1242            .map(|oid| {
1243                let escaped = oid.replace('\'', "''");
1244                format!("'{}'", escaped)
1245            })
1246            .collect::<Vec<_>>()
1247            .join(", ");
1248
1249        let filter = format!(
1250            "object_type = 'table_version' AND object_id IN ({})",
1251            in_list
1252        );
1253
1254        // Count how many entries exist so we can report the deleted count
1255        let mut scanner = self.manifest_scanner().await?;
1256        scanner.filter(&filter).map_err(|e| {
1257            Error::io_source(box_error(std::io::Error::other(format!(
1258                "Failed to filter: {}",
1259                e
1260            ))))
1261        })?;
1262        scanner.project(&["object_id"]).map_err(|e| {
1263            Error::io_source(box_error(std::io::Error::other(format!(
1264                "Failed to project: {}",
1265                e
1266            ))))
1267        })?;
1268        let batches = Self::execute_scanner(scanner).await?;
1269        let deleted_count: i64 = batches.iter().map(|b| b.num_rows() as i64).sum();
1270
1271        if deleted_count == 0 {
1272            return Ok(0);
1273        }
1274
1275        // Execute a single atomic bulk delete covering all tables
1276        let dataset_guard = self.manifest_dataset.get().await?;
1277        let dataset = Arc::new(dataset_guard.clone());
1278        drop(dataset_guard);
1279
1280        let new_dataset = DeleteBuilder::new(dataset, &filter)
1281            .execute()
1282            .await
1283            .map_err(|e| {
1284                convert_lance_commit_error(
1285                    &e,
1286                    "Failed to batch delete table versions across multiple tables",
1287                    None,
1288                )
1289            })?;
1290
1291        self.manifest_dataset
1292            .set_latest(
1293                Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1294            )
1295            .await;
1296
1297        if let Err(e) = self.run_inline_optimization().await {
1298            log::warn!(
1299                "Unexpected failure when running inline optimization: {:?}",
1300                e
1301            );
1302        }
1303
1304        Ok(deleted_count)
1305    }
1306
1307    /// Set a property flag in the __manifest table's metadata key-value map.
1308    ///
1309    /// This uses `dataset.update_metadata()` to persist the flag in the
1310    /// __manifest dataset's table metadata, rather than inserting a row.
1311    /// If the property already exists with the same value, this is a no-op.
1312    pub async fn set_property(&self, name: &str, value: &str) -> Result<()> {
1313        let dataset_guard = self.manifest_dataset.get().await?;
1314        if dataset_guard.metadata().get(name) == Some(&value.to_string()) {
1315            return Ok(());
1316        }
1317        drop(dataset_guard);
1318
1319        let mut dataset_guard = self.manifest_dataset.get_mut().await?;
1320        dataset_guard
1321            .update_metadata([(name, value)])
1322            .await
1323            .map_err(|e| {
1324                Error::io_source(box_error(std::io::Error::other(format!(
1325                    "Failed to set property '{}' in __manifest metadata: {}",
1326                    name, e
1327                ))))
1328            })?;
1329        Ok(())
1330    }
1331
1332    /// Check if a property flag exists in the __manifest table's metadata key-value map.
1333    pub async fn has_property(&self, name: &str) -> Result<bool> {
1334        let dataset_guard = self.manifest_dataset.get().await?;
1335        Ok(dataset_guard.metadata().contains_key(name))
1336    }
1337
1338    /// Parse metadata JSON into a `TableVersion`.
1339    ///
1340    /// Returns `None` if metadata is invalid or missing required fields.
1341    fn parse_table_version(version: i64, metadata_str: &str) -> Option<TableVersion> {
1342        let meta: serde_json::Value = match serde_json::from_str(metadata_str) {
1343            Ok(v) => v,
1344            Err(e) => {
1345                log::warn!(
1346                    "Skipping version {} due to invalid metadata JSON: {}",
1347                    version,
1348                    e
1349                );
1350                return None;
1351            }
1352        };
1353        let manifest_path = match meta.get("manifest_path").and_then(|v| v.as_str()) {
1354            Some(p) => p.to_string(),
1355            None => {
1356                log::warn!(
1357                    "Skipping version {} due to missing 'manifest_path' in metadata — \
1358                     this may indicate data corruption",
1359                    version
1360                );
1361                return None;
1362            }
1363        };
1364        let manifest_size = meta.get("manifest_size").and_then(|v| v.as_i64());
1365        let e_tag = meta
1366            .get("e_tag")
1367            .and_then(|v| v.as_str())
1368            .map(|s| s.to_string());
1369        Some(TableVersion {
1370            version,
1371            manifest_path,
1372            manifest_size,
1373            e_tag,
1374            timestamp_millis: None,
1375            metadata: None,
1376        })
1377    }
1378
1379    /// List table versions from the __manifest table.
1380    ///
1381    /// Queries the manifest for all versions of the given table and returns
1382    /// them as a `ListTableVersionsResponse`.
1383    pub async fn list_table_versions(
1384        &self,
1385        table_id: &[String],
1386        descending: bool,
1387        limit: Option<i32>,
1388    ) -> Result<ListTableVersionsResponse> {
1389        let object_id = Self::str_object_id(table_id);
1390        let manifest_versions = self
1391            .query_table_versions(&object_id, descending, limit)
1392            .await?;
1393
1394        let table_versions: Vec<TableVersion> = manifest_versions
1395            .into_iter()
1396            .filter_map(|(version, metadata_str)| Self::parse_table_version(version, &metadata_str))
1397            .collect();
1398
1399        Ok(ListTableVersionsResponse {
1400            versions: table_versions,
1401            page_token: None,
1402        })
1403    }
1404
1405    /// Describe a specific table version from the __manifest table.
1406    ///
1407    /// Queries the manifest for a specific version and returns it as a
1408    /// `DescribeTableVersionResponse`. Returns an error if the version is not found.
1409    pub async fn describe_table_version(
1410        &self,
1411        table_id: &[String],
1412        version: i64,
1413    ) -> Result<DescribeTableVersionResponse> {
1414        let object_id = Self::str_object_id(table_id);
1415        if let Some(metadata_str) = self.query_table_version(&object_id, version).await?
1416            && let Some(tv) = Self::parse_table_version(version, &metadata_str)
1417        {
1418            return Ok(DescribeTableVersionResponse {
1419                version: Box::new(tv),
1420            });
1421        }
1422        Err(Error::namespace_source(
1423            format!(
1424                "Version {} not found in manifest for table {:?}",
1425                version, table_id
1426            )
1427            .into(),
1428        ))
1429    }
1430
1431    /// Register a table in the manifest without creating the physical table (internal helper for migration)
1432    pub async fn register_table(&self, name: &str, location: String) -> Result<()> {
1433        let object_id = Self::build_object_id(&[], name);
1434        if self.manifest_contains_object(&object_id).await? {
1435            return Err(Error::io(format!("Table '{}' already exists", name)));
1436        }
1437
1438        self.insert_into_manifest(object_id, ObjectType::Table, Some(location))
1439            .await
1440    }
1441
1442    /// Validate that all levels of a namespace path exist
1443    async fn validate_namespace_levels_exist(&self, namespace_path: &[String]) -> Result<()> {
1444        for i in 1..=namespace_path.len() {
1445            let partial_path = &namespace_path[..i];
1446            let object_id = partial_path.join(DELIMITER);
1447            if !self.manifest_contains_object(&object_id).await? {
1448                return Err(Error::namespace_source(
1449                    format!("Parent namespace '{}' does not exist", object_id).into(),
1450                ));
1451            }
1452        }
1453        Ok(())
1454    }
1455
1456    /// Query the manifest for a namespace with the given object ID
1457    async fn query_manifest_for_namespace(&self, object_id: &str) -> Result<Option<NamespaceInfo>> {
1458        let escaped_id = object_id.replace('\'', "''");
1459        let filter = format!("object_id = '{}' AND object_type = 'namespace'", escaped_id);
1460        let mut scanner = self.manifest_scanner().await?;
1461        scanner.filter(&filter).map_err(|e| {
1462            Error::io_source(box_error(std::io::Error::other(format!(
1463                "Failed to filter: {}",
1464                e
1465            ))))
1466        })?;
1467        scanner.project(&["object_id", "metadata"]).map_err(|e| {
1468            Error::io_source(box_error(std::io::Error::other(format!(
1469                "Failed to project: {}",
1470                e
1471            ))))
1472        })?;
1473        let batches = Self::execute_scanner(scanner).await?;
1474
1475        let mut found_result: Option<NamespaceInfo> = None;
1476        let mut total_rows = 0;
1477
1478        for batch in batches {
1479            if batch.num_rows() == 0 {
1480                continue;
1481            }
1482
1483            total_rows += batch.num_rows();
1484            if total_rows > 1 {
1485                return Err(Error::io(format!(
1486                    "Expected exactly 1 namespace with id '{}', found {}",
1487                    object_id, total_rows
1488                )));
1489            }
1490
1491            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1492            let metadata_array = Self::get_string_column(&batch, "metadata")?;
1493
1494            let object_id_str = object_id_array.value(0);
1495            let metadata = if !metadata_array.is_null(0) {
1496                let metadata_str = metadata_array.value(0);
1497                match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
1498                    Ok(map) => Some(map),
1499                    Err(e) => {
1500                        return Err(Error::io(format!(
1501                            "Failed to deserialize metadata for namespace '{}': {}",
1502                            object_id, e
1503                        )));
1504                    }
1505                }
1506            } else {
1507                None
1508            };
1509
1510            let (namespace, name) = Self::parse_object_id(object_id_str);
1511            found_result = Some(NamespaceInfo {
1512                namespace,
1513                name,
1514                metadata,
1515            });
1516        }
1517
1518        Ok(found_result)
1519    }
1520
1521    /// Create or load the manifest dataset, ensuring it has the latest schema setup.
1522    ///
1523    /// This function will:
1524    /// 1. Try to load an existing manifest table
1525    /// 2. If it exists, check and migrate the schema if needed (e.g., add primary key metadata)
1526    /// 3. If it doesn't exist, create a new manifest table with the current schema
1527    /// 4. Persist feature flags (e.g., table_version_storage_enabled) if requested
1528    async fn ensure_manifest_table_up_to_date(
1529        root: &str,
1530        storage_options: &Option<HashMap<String, String>>,
1531        session: Option<Arc<Session>>,
1532        table_version_storage_enabled: bool,
1533    ) -> Result<DatasetConsistencyWrapper> {
1534        let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME);
1535        log::debug!("Attempting to load manifest from {}", manifest_path);
1536        let store_options = ObjectStoreParams {
1537            storage_options_accessor: storage_options.as_ref().map(|opts| {
1538                Arc::new(
1539                    lance_io::object_store::StorageOptionsAccessor::with_static_options(
1540                        opts.clone(),
1541                    ),
1542                )
1543            }),
1544            ..Default::default()
1545        };
1546        let read_params = ReadParams {
1547            session: session.clone(),
1548            store_options: Some(store_options.clone()),
1549            ..Default::default()
1550        };
1551        let dataset_result = DatasetBuilder::from_uri(&manifest_path)
1552            .with_read_params(read_params)
1553            .load()
1554            .await;
1555        if let Ok(mut dataset) = dataset_result {
1556            // Check if the object_id field has primary key metadata, migrate if not
1557            let needs_pk_migration = dataset
1558                .schema()
1559                .field("object_id")
1560                .map(|f| {
1561                    !f.metadata
1562                        .contains_key(LANCE_UNENFORCED_PRIMARY_KEY_POSITION)
1563                })
1564                .unwrap_or(false);
1565
1566            if needs_pk_migration {
1567                log::info!("Migrating __manifest table to add primary key metadata on object_id");
1568                dataset
1569                    .update_field_metadata()
1570                    .update("object_id", [(LANCE_UNENFORCED_PRIMARY_KEY_POSITION, "0")])
1571                    .map_err(|e| {
1572                        Error::io_source(box_error(std::io::Error::other(format!(
1573                            "Failed to find object_id field for migration: {}",
1574                            e
1575                        ))))
1576                    })?
1577                    .await
1578                    .map_err(|e| {
1579                        Error::io_source(box_error(std::io::Error::other(format!(
1580                            "Failed to migrate primary key metadata: {}",
1581                            e
1582                        ))))
1583                    })?;
1584            }
1585
1586            // Persist table_version_storage_enabled flag in __manifest so that once
1587            // enabled, it becomes a permanent property of this namespace.
1588            if table_version_storage_enabled {
1589                let needs_flag = dataset
1590                    .metadata()
1591                    .get("table_version_storage_enabled")
1592                    .map(|v| v != "true")
1593                    .unwrap_or(true);
1594
1595                if needs_flag
1596                    && let Err(e) = dataset
1597                        .update_metadata([("table_version_storage_enabled", "true")])
1598                        .await
1599                {
1600                    log::warn!(
1601                        "Failed to persist table_version_storage_enabled flag in __manifest: {:?}",
1602                        e
1603                    );
1604                }
1605            }
1606
1607            Ok(DatasetConsistencyWrapper::new(dataset))
1608        } else {
1609            log::info!("Creating new manifest table at {}", manifest_path);
1610            let schema = Self::manifest_schema();
1611            let empty_batch = RecordBatch::new_empty(schema.clone());
1612            let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone());
1613
1614            let store_params = ObjectStoreParams {
1615                storage_options_accessor: storage_options.as_ref().map(|opts| {
1616                    Arc::new(
1617                        lance_io::object_store::StorageOptionsAccessor::with_static_options(
1618                            opts.clone(),
1619                        ),
1620                    )
1621                }),
1622                ..Default::default()
1623            };
1624            let write_params = WriteParams {
1625                session: session.clone(),
1626                store_params: Some(store_params),
1627                ..Default::default()
1628            };
1629
1630            let dataset =
1631                Dataset::write(Box::new(reader), &manifest_path, Some(write_params)).await;
1632
1633            // Handle race condition where another process created the manifest concurrently
1634            match dataset {
1635                Ok(dataset) => {
1636                    log::info!(
1637                        "Successfully created manifest table at {}, version={}, uri={}",
1638                        manifest_path,
1639                        dataset.version().version,
1640                        dataset.uri()
1641                    );
1642                    Ok(DatasetConsistencyWrapper::new(dataset))
1643                }
1644                Err(ref e)
1645                    if matches!(
1646                        e,
1647                        LanceError::DatasetAlreadyExists { .. }
1648                            | LanceError::CommitConflict { .. }
1649                            | LanceError::IncompatibleTransaction { .. }
1650                            | LanceError::RetryableCommitConflict { .. }
1651                    ) =>
1652                {
1653                    // Another process created the manifest concurrently, try to load it
1654                    log::info!(
1655                        "Manifest table was created by another process, loading it: {}",
1656                        manifest_path
1657                    );
1658                    let recovery_store_options = ObjectStoreParams {
1659                        storage_options_accessor: storage_options.as_ref().map(|opts| {
1660                            Arc::new(
1661                                lance_io::object_store::StorageOptionsAccessor::with_static_options(
1662                                    opts.clone(),
1663                                ),
1664                            )
1665                        }),
1666                        ..Default::default()
1667                    };
1668                    let recovery_read_params = ReadParams {
1669                        session,
1670                        store_options: Some(recovery_store_options),
1671                        ..Default::default()
1672                    };
1673                    let dataset = DatasetBuilder::from_uri(&manifest_path)
1674                        .with_read_params(recovery_read_params)
1675                        .load()
1676                        .await
1677                        .map_err(|e| {
1678                            Error::io_source(box_error(std::io::Error::other(format!(
1679                                "Failed to load manifest dataset after creation conflict: {}",
1680                                e
1681                            ))))
1682                        })?;
1683                    Ok(DatasetConsistencyWrapper::new(dataset))
1684                }
1685                Err(e) => Err(Error::io_source(box_error(std::io::Error::other(format!(
1686                    "Failed to create manifest dataset: {}",
1687                    e
1688                ))))),
1689            }
1690        }
1691    }
1692}
1693
1694#[async_trait]
1695impl LanceNamespace for ManifestNamespace {
1696    fn namespace_id(&self) -> String {
1697        self.root.clone()
1698    }
1699
1700    async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1701        let namespace_id = request
1702            .id
1703            .as_ref()
1704            .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
1705
1706        // Build filter to find tables in this namespace
1707        let filter = if namespace_id.is_empty() {
1708            // Root namespace: find tables without a namespace prefix
1709            "object_type = 'table' AND NOT contains(object_id, '$')".to_string()
1710        } else {
1711            // Namespaced: find tables that start with namespace$ but have no additional $
1712            let prefix = namespace_id.join(DELIMITER);
1713            format!(
1714                "object_type = 'table' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1715                prefix,
1716                DELIMITER,
1717                prefix.len() + 2
1718            )
1719        };
1720
1721        let mut scanner = self.manifest_scanner().await?;
1722        scanner.filter(&filter).map_err(|e| {
1723            Error::io_source(box_error(std::io::Error::other(format!(
1724                "Failed to filter: {}",
1725                e
1726            ))))
1727        })?;
1728        scanner.project(&["object_id"]).map_err(|e| {
1729            Error::io_source(box_error(std::io::Error::other(format!(
1730                "Failed to project: {}",
1731                e
1732            ))))
1733        })?;
1734
1735        let batches = Self::execute_scanner(scanner).await?;
1736
1737        let mut tables = Vec::new();
1738        for batch in batches {
1739            if batch.num_rows() == 0 {
1740                continue;
1741            }
1742
1743            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1744            for i in 0..batch.num_rows() {
1745                let object_id = object_id_array.value(i);
1746                let (_namespace, name) = Self::parse_object_id(object_id);
1747                tables.push(name);
1748            }
1749        }
1750
1751        Ok(ListTablesResponse::new(tables))
1752    }
1753
1754    async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1755        let table_id = request
1756            .id
1757            .as_ref()
1758            .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1759
1760        if table_id.is_empty() {
1761            return Err(Error::invalid_input_source(
1762                "Table ID cannot be empty".into(),
1763            ));
1764        }
1765
1766        let object_id = Self::str_object_id(table_id);
1767        let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
1768
1769        // Extract table name and namespace from table_id
1770        let table_name = table_id.last().cloned().unwrap_or_default();
1771        let namespace_id: Vec<String> = if table_id.len() > 1 {
1772            table_id[..table_id.len() - 1].to_vec()
1773        } else {
1774            vec![]
1775        };
1776
1777        let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
1778        // For backwards compatibility, only skip vending credentials when explicitly set to false
1779        let vend_credentials = request.vend_credentials.unwrap_or(true);
1780
1781        match table_info {
1782            Some(info) => {
1783                // Construct full URI from relative location
1784                let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1785
1786                let storage_options = if vend_credentials {
1787                    self.storage_options.clone()
1788                } else {
1789                    None
1790                };
1791
1792                // If not loading detailed metadata, return minimal response with just location
1793                if !load_detailed_metadata {
1794                    return Ok(DescribeTableResponse {
1795                        table: Some(table_name),
1796                        namespace: Some(namespace_id),
1797                        location: Some(table_uri.clone()),
1798                        table_uri: Some(table_uri),
1799                        storage_options,
1800                        ..Default::default()
1801                    });
1802                }
1803
1804                // Try to open the dataset to get version and schema
1805                match Dataset::open(&table_uri).await {
1806                    Ok(mut dataset) => {
1807                        // If a specific version is requested, checkout that version
1808                        if let Some(requested_version) = request.version {
1809                            dataset = dataset.checkout_version(requested_version as u64).await?;
1810                        }
1811
1812                        let version = dataset.version().version;
1813                        let lance_schema = dataset.schema();
1814                        let arrow_schema: arrow_schema::Schema = lance_schema.into();
1815                        let json_schema = arrow_schema_to_json(&arrow_schema)?;
1816
1817                        Ok(DescribeTableResponse {
1818                            table: Some(table_name.clone()),
1819                            namespace: Some(namespace_id.clone()),
1820                            version: Some(version as i64),
1821                            location: Some(table_uri.clone()),
1822                            table_uri: Some(table_uri),
1823                            schema: Some(Box::new(json_schema)),
1824                            storage_options,
1825                            ..Default::default()
1826                        })
1827                    }
1828                    Err(_) => {
1829                        // If dataset can't be opened (e.g., empty table), return minimal info
1830                        Ok(DescribeTableResponse {
1831                            table: Some(table_name),
1832                            namespace: Some(namespace_id),
1833                            location: Some(table_uri.clone()),
1834                            table_uri: Some(table_uri),
1835                            storage_options,
1836                            ..Default::default()
1837                        })
1838                    }
1839                }
1840            }
1841            None => Err(Error::namespace_source(
1842                format!("Table '{}' not found", object_id).into(),
1843            )),
1844        }
1845    }
1846
1847    async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1848        let table_id = request
1849            .id
1850            .as_ref()
1851            .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1852
1853        if table_id.is_empty() {
1854            return Err(Error::invalid_input_source(
1855                "Table ID cannot be empty".into(),
1856            ));
1857        }
1858
1859        let (namespace, table_name) = Self::split_object_id(table_id);
1860        let object_id = Self::build_object_id(&namespace, &table_name);
1861        let exists = self.manifest_contains_object(&object_id).await?;
1862        if exists {
1863            Ok(())
1864        } else {
1865            Err(Error::namespace_source(
1866                format!("Table '{}' not found", table_name).into(),
1867            ))
1868        }
1869    }
1870
1871    async fn create_table(
1872        &self,
1873        request: CreateTableRequest,
1874        data: Bytes,
1875    ) -> Result<CreateTableResponse> {
1876        let table_id = request
1877            .id
1878            .as_ref()
1879            .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1880
1881        if table_id.is_empty() {
1882            return Err(Error::invalid_input_source(
1883                "Table ID cannot be empty".into(),
1884            ));
1885        }
1886
1887        let (namespace, table_name) = Self::split_object_id(table_id);
1888        let object_id = Self::build_object_id(&namespace, &table_name);
1889
1890        // Check if table already exists in manifest
1891        if self.manifest_contains_object(&object_id).await? {
1892            return Err(Error::io(format!("Table '{}' already exists", table_name)));
1893        }
1894
1895        // Create the physical table location with hash-based naming
1896        // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance
1897        // Otherwise, use hash-based naming: {hash}_{object_id}
1898        let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1899            // Root table with directory listing enabled: use {table_name}.lance
1900            format!("{}.lance", table_name)
1901        } else {
1902            // Child namespace table or dir listing disabled: use hash-based naming
1903            Self::generate_dir_name(&object_id)
1904        };
1905        let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1906
1907        // Validate that request_data is provided
1908        if data.is_empty() {
1909            return Err(Error::namespace_source(
1910                "Request data (Arrow IPC stream) is required for create_table".into(),
1911            ));
1912        }
1913
1914        // Write the data using Lance Dataset
1915        let cursor = Cursor::new(data.to_vec());
1916        let stream_reader = StreamReader::try_new(cursor, None)
1917            .map_err(|e| Error::io(format!("Failed to read IPC stream: {}", e)))?;
1918
1919        let batches: Vec<RecordBatch> =
1920            stream_reader
1921                .collect::<std::result::Result<Vec<_>, _>>()
1922                .map_err(|e| Error::io(format!("Failed to collect batches: {}", e)))?;
1923
1924        if batches.is_empty() {
1925            return Err(Error::io("No data provided for table creation"));
1926        }
1927
1928        let schema = batches[0].schema();
1929        let batch_results: Vec<std::result::Result<RecordBatch, arrow_schema::ArrowError>> =
1930            batches.into_iter().map(Ok).collect();
1931        let reader = RecordBatchIterator::new(batch_results, schema);
1932
1933        let store_params = ObjectStoreParams {
1934            storage_options_accessor: self.storage_options.as_ref().map(|opts| {
1935                Arc::new(
1936                    lance_io::object_store::StorageOptionsAccessor::with_static_options(
1937                        opts.clone(),
1938                    ),
1939                )
1940            }),
1941            ..Default::default()
1942        };
1943        let write_params = WriteParams {
1944            session: self.session.clone(),
1945            store_params: Some(store_params),
1946            ..Default::default()
1947        };
1948        let _dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params))
1949            .await
1950            .map_err(|e| {
1951                Error::io_source(box_error(std::io::Error::other(format!(
1952                    "Failed to write dataset: {}",
1953                    e
1954                ))))
1955            })?;
1956
1957        // Register in manifest (store dir_name, not full URI)
1958        self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1959            .await?;
1960
1961        Ok(CreateTableResponse {
1962            version: Some(1),
1963            location: Some(table_uri),
1964            storage_options: self.storage_options.clone(),
1965            ..Default::default()
1966        })
1967    }
1968
1969    async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1970        let table_id = request
1971            .id
1972            .as_ref()
1973            .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1974
1975        if table_id.is_empty() {
1976            return Err(Error::invalid_input_source(
1977                "Table ID cannot be empty".into(),
1978            ));
1979        }
1980
1981        let (namespace, table_name) = Self::split_object_id(table_id);
1982        let object_id = Self::build_object_id(&namespace, &table_name);
1983
1984        // Query manifest for table location
1985        let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
1986
1987        match table_info {
1988            Some(info) => {
1989                // Delete from manifest first
1990                self.delete_from_manifest(&object_id).boxed().await?;
1991
1992                // Delete physical data directory using the dir_name from manifest
1993                let table_path = self.base_path.child(info.location.as_str());
1994                let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1995
1996                // Remove the table directory
1997                self.object_store
1998                    .remove_dir_all(table_path)
1999                    .boxed()
2000                    .await
2001                    .map_err(|e| {
2002                        Error::namespace_source(
2003                            format!("Failed to delete table directory: {}", e).into(),
2004                        )
2005                    })?;
2006
2007                Ok(DropTableResponse {
2008                    id: request.id.clone(),
2009                    location: Some(table_uri),
2010                    ..Default::default()
2011                })
2012            }
2013            None => Err(Error::namespace_source(
2014                format!("Table '{}' not found", table_name).into(),
2015            )),
2016        }
2017    }
2018
2019    async fn list_namespaces(
2020        &self,
2021        request: ListNamespacesRequest,
2022    ) -> Result<ListNamespacesResponse> {
2023        let parent_namespace = request
2024            .id
2025            .as_ref()
2026            .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
2027
2028        // Build filter to find direct child namespaces
2029        let filter = if parent_namespace.is_empty() {
2030            // Root namespace: find all namespaces without a parent
2031            "object_type = 'namespace' AND NOT contains(object_id, '$')".to_string()
2032        } else {
2033            // Non-root: find namespaces that start with parent$ but have no additional $
2034            let prefix = parent_namespace.join(DELIMITER);
2035            format!(
2036                "object_type = 'namespace' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
2037                prefix,
2038                DELIMITER,
2039                prefix.len() + 2
2040            )
2041        };
2042
2043        let mut scanner = self.manifest_scanner().await?;
2044        scanner.filter(&filter).map_err(|e| {
2045            Error::io_source(box_error(std::io::Error::other(format!(
2046                "Failed to filter: {}",
2047                e
2048            ))))
2049        })?;
2050        scanner.project(&["object_id"]).map_err(|e| {
2051            Error::io_source(box_error(std::io::Error::other(format!(
2052                "Failed to project: {}",
2053                e
2054            ))))
2055        })?;
2056
2057        let batches = Self::execute_scanner(scanner).await?;
2058        let mut namespaces = Vec::new();
2059
2060        for batch in batches {
2061            if batch.num_rows() == 0 {
2062                continue;
2063            }
2064
2065            let object_id_array = Self::get_string_column(&batch, "object_id")?;
2066            for i in 0..batch.num_rows() {
2067                let object_id = object_id_array.value(i);
2068                let (_namespace, name) = Self::parse_object_id(object_id);
2069                namespaces.push(name);
2070            }
2071        }
2072
2073        Ok(ListNamespacesResponse::new(namespaces))
2074    }
2075
2076    async fn describe_namespace(
2077        &self,
2078        request: DescribeNamespaceRequest,
2079    ) -> Result<DescribeNamespaceResponse> {
2080        let namespace_id = request
2081            .id
2082            .as_ref()
2083            .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
2084
2085        // Root namespace always exists
2086        if namespace_id.is_empty() {
2087            #[allow(clippy::needless_update)]
2088            return Ok(DescribeNamespaceResponse {
2089                properties: Some(HashMap::new()),
2090                ..Default::default()
2091            });
2092        }
2093
2094        // Check if namespace exists in manifest
2095        let object_id = namespace_id.join(DELIMITER);
2096        let namespace_info = self.query_manifest_for_namespace(&object_id).await?;
2097
2098        match namespace_info {
2099            #[allow(clippy::needless_update)]
2100            Some(info) => Ok(DescribeNamespaceResponse {
2101                properties: info.metadata,
2102                ..Default::default()
2103            }),
2104            None => Err(Error::namespace_source(
2105                format!("Namespace '{}' not found", object_id).into(),
2106            )),
2107        }
2108    }
2109
2110    async fn create_namespace(
2111        &self,
2112        request: CreateNamespaceRequest,
2113    ) -> Result<CreateNamespaceResponse> {
2114        let namespace_id = request
2115            .id
2116            .as_ref()
2117            .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
2118
2119        // Root namespace always exists and cannot be created
2120        if namespace_id.is_empty() {
2121            return Err(Error::namespace_source(
2122                "Root namespace already exists and cannot be created".into(),
2123            ));
2124        }
2125
2126        // Validate parent namespaces exist (but not the namespace being created)
2127        if namespace_id.len() > 1 {
2128            self.validate_namespace_levels_exist(&namespace_id[..namespace_id.len() - 1])
2129                .await?;
2130        }
2131
2132        let object_id = namespace_id.join(DELIMITER);
2133        if self.manifest_contains_object(&object_id).await? {
2134            return Err(Error::namespace_source(
2135                format!("Namespace '{}' already exists", object_id).into(),
2136            ));
2137        }
2138
2139        // Serialize properties if provided
2140        let metadata = request.properties.as_ref().and_then(|props| {
2141            if props.is_empty() {
2142                None
2143            } else {
2144                Some(serde_json::to_string(props).ok()?)
2145            }
2146        });
2147
2148        self.insert_into_manifest_with_metadata(
2149            vec![ManifestEntry {
2150                object_id,
2151                object_type: ObjectType::Namespace,
2152                location: None,
2153                metadata,
2154            }],
2155            None,
2156        )
2157        .await?;
2158
2159        Ok(CreateNamespaceResponse {
2160            properties: request.properties,
2161            ..Default::default()
2162        })
2163    }
2164
2165    async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
2166        let namespace_id = request
2167            .id
2168            .as_ref()
2169            .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
2170
2171        // Root namespace always exists and cannot be dropped
2172        if namespace_id.is_empty() {
2173            return Err(Error::namespace_source(
2174                "Root namespace cannot be dropped".into(),
2175            ));
2176        }
2177
2178        let object_id = namespace_id.join(DELIMITER);
2179
2180        // Check if namespace exists
2181        if !self.manifest_contains_object(&object_id).boxed().await? {
2182            return Err(Error::namespace_source(
2183                format!("Namespace '{}' not found", object_id).into(),
2184            ));
2185        }
2186
2187        // Check for child namespaces
2188        let escaped_id = object_id.replace('\'', "''");
2189        let prefix = format!("{}{}", escaped_id, DELIMITER);
2190        let filter = format!("starts_with(object_id, '{}')", prefix);
2191        let mut scanner = self.manifest_scanner().boxed().await?;
2192        scanner.filter(&filter).map_err(|e| {
2193            Error::io_source(box_error(std::io::Error::other(format!(
2194                "Failed to filter: {}",
2195                e
2196            ))))
2197        })?;
2198        scanner.project::<&str>(&[]).map_err(|e| {
2199            Error::io_source(box_error(std::io::Error::other(format!(
2200                "Failed to project: {}",
2201                e
2202            ))))
2203        })?;
2204        scanner.with_row_id();
2205        let count = scanner.count_rows().boxed().await.map_err(|e| {
2206            Error::io_source(box_error(std::io::Error::other(format!(
2207                "Failed to count rows: {}",
2208                e
2209            ))))
2210        })?;
2211
2212        if count > 0 {
2213            return Err(Error::namespace_source(
2214                format!(
2215                    "Namespace '{}' is not empty (contains {} child objects)",
2216                    object_id, count
2217                )
2218                .into(),
2219            ));
2220        }
2221
2222        self.delete_from_manifest(&object_id).boxed().await?;
2223
2224        Ok(DropNamespaceResponse::default())
2225    }
2226
2227    async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
2228        let namespace_id = request
2229            .id
2230            .as_ref()
2231            .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
2232
2233        // Root namespace always exists
2234        if namespace_id.is_empty() {
2235            return Ok(());
2236        }
2237
2238        let object_id = namespace_id.join(DELIMITER);
2239        if self.manifest_contains_object(&object_id).await? {
2240            Ok(())
2241        } else {
2242            Err(Error::namespace_source(
2243                format!("Namespace '{}' not found", object_id).into(),
2244            ))
2245        }
2246    }
2247
2248    async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
2249        let table_id = request
2250            .id
2251            .as_ref()
2252            .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
2253
2254        if table_id.is_empty() {
2255            return Err(Error::invalid_input_source(
2256                "Table ID cannot be empty".into(),
2257            ));
2258        }
2259
2260        let (namespace, table_name) = Self::split_object_id(table_id);
2261        let object_id = Self::build_object_id(&namespace, &table_name);
2262
2263        // Check if table already exists in manifest
2264        let existing = self.query_manifest_for_table(&object_id).await?;
2265        if existing.is_some() {
2266            return Err(Error::namespace_source(
2267                format!("Table '{}' already exists", table_name).into(),
2268            ));
2269        }
2270
2271        // Create table location path with hash-based naming
2272        // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance
2273        // Otherwise, use hash-based naming: {hash}_{object_id}
2274        let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
2275            // Root table with directory listing enabled: use {table_name}.lance
2276            format!("{}.lance", table_name)
2277        } else {
2278            // Child namespace table or dir listing disabled: use hash-based naming
2279            Self::generate_dir_name(&object_id)
2280        };
2281        let table_path = self.base_path.child(dir_name.as_str());
2282        let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
2283
2284        // Validate location if provided
2285        if let Some(req_location) = &request.location {
2286            let req_location = req_location.trim_end_matches('/');
2287            if req_location != table_uri {
2288                return Err(Error::namespace_source(
2289                    format!(
2290                        "Cannot declare table {} at location {}, must be at location {}",
2291                        table_name, req_location, table_uri
2292                    )
2293                    .into(),
2294                ));
2295            }
2296        }
2297
2298        // Create the .lance-reserved file to mark the table as existing
2299        let reserved_file_path = table_path.child(".lance-reserved");
2300
2301        self.object_store
2302            .create(&reserved_file_path)
2303            .await
2304            .map_err(|e| {
2305                Error::namespace_source(
2306                    format!(
2307                        "Failed to create .lance-reserved file for table {}: {}",
2308                        table_name, e
2309                    )
2310                    .into(),
2311                )
2312            })?
2313            .shutdown()
2314            .await
2315            .map_err(|e| {
2316                Error::namespace_source(
2317                    format!(
2318                        "Failed to finalize .lance-reserved file for table {}: {}",
2319                        table_name, e
2320                    )
2321                    .into(),
2322                )
2323            })?;
2324
2325        // Add entry to manifest marking this as a declared table (store dir_name, not full path)
2326        self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
2327            .await?;
2328
2329        log::info!(
2330            "Declared table '{}' in manifest at {}",
2331            table_name,
2332            table_uri
2333        );
2334
2335        // For backwards compatibility, only skip vending credentials when explicitly set to false
2336        let vend_credentials = request.vend_credentials.unwrap_or(true);
2337        let storage_options = if vend_credentials {
2338            self.storage_options.clone()
2339        } else {
2340            None
2341        };
2342
2343        Ok(DeclareTableResponse {
2344            location: Some(table_uri),
2345            storage_options,
2346            ..Default::default()
2347        })
2348    }
2349
2350    async fn register_table(&self, request: RegisterTableRequest) -> Result<RegisterTableResponse> {
2351        let table_id = request
2352            .id
2353            .as_ref()
2354            .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
2355
2356        if table_id.is_empty() {
2357            return Err(Error::invalid_input_source(
2358                "Table ID cannot be empty".into(),
2359            ));
2360        }
2361
2362        let location = request.location.clone();
2363
2364        // Validate that location is a relative path within the root directory
2365        // We don't allow absolute URIs or paths that escape the root
2366        if location.contains("://") {
2367            return Err(Error::invalid_input_source(format!(
2368                "Absolute URIs are not allowed for register_table. Location must be a relative path within the root directory: {}",
2369                location
2370            ).into()));
2371        }
2372
2373        if location.starts_with('/') {
2374            return Err(Error::invalid_input_source(format!(
2375                "Absolute paths are not allowed for register_table. Location must be a relative path within the root directory: {}",
2376                location
2377            ).into()));
2378        }
2379
2380        // Check for path traversal attempts
2381        if location.contains("..") {
2382            return Err(Error::invalid_input_source(format!(
2383                "Path traversal is not allowed. Location must be a relative path within the root directory: {}",
2384                location
2385            ).into()));
2386        }
2387
2388        let (namespace, table_name) = Self::split_object_id(table_id);
2389        let object_id = Self::build_object_id(&namespace, &table_name);
2390
2391        // Validate that parent namespaces exist (if not root)
2392        if !namespace.is_empty() {
2393            self.validate_namespace_levels_exist(&namespace).await?;
2394        }
2395
2396        // Check if table already exists
2397        if self.manifest_contains_object(&object_id).await? {
2398            return Err(Error::namespace_source(
2399                format!("Table '{}' already exists", object_id).into(),
2400            ));
2401        }
2402
2403        // Register the table with its location in the manifest
2404        self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone()))
2405            .await?;
2406
2407        Ok(RegisterTableResponse {
2408            location: Some(location),
2409            ..Default::default()
2410        })
2411    }
2412
2413    async fn deregister_table(
2414        &self,
2415        request: DeregisterTableRequest,
2416    ) -> Result<DeregisterTableResponse> {
2417        let table_id = request
2418            .id
2419            .as_ref()
2420            .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
2421
2422        if table_id.is_empty() {
2423            return Err(Error::invalid_input_source(
2424                "Table ID cannot be empty".into(),
2425            ));
2426        }
2427
2428        let (namespace, table_name) = Self::split_object_id(table_id);
2429        let object_id = Self::build_object_id(&namespace, &table_name);
2430
2431        // Get table info before deleting
2432        let table_info = self.query_manifest_for_table(&object_id).await?;
2433
2434        let table_uri = match table_info {
2435            Some(info) => {
2436                // Delete from manifest only (leave physical data intact)
2437                self.delete_from_manifest(&object_id).boxed().await?;
2438                Self::construct_full_uri(&self.root, &info.location)?
2439            }
2440            None => {
2441                return Err(Error::namespace_source(
2442                    format!("Table '{}' not found", object_id).into(),
2443                ));
2444            }
2445        };
2446
2447        Ok(DeregisterTableResponse {
2448            id: request.id.clone(),
2449            location: Some(table_uri),
2450            ..Default::default()
2451        })
2452    }
2453}
2454
2455#[cfg(test)]
2456mod tests {
2457    use crate::{DirectoryNamespaceBuilder, ManifestNamespace};
2458    use bytes::Bytes;
2459    use lance_core::utils::tempfile::TempStdDir;
2460    use lance_namespace::LanceNamespace;
2461    use lance_namespace::models::{
2462        CreateNamespaceRequest, CreateTableRequest, DescribeTableRequest, DropTableRequest,
2463        ListTablesRequest, TableExistsRequest,
2464    };
2465    use rstest::rstest;
2466
2467    fn create_test_ipc_data() -> Vec<u8> {
2468        use arrow::array::{Int32Array, StringArray};
2469        use arrow::datatypes::{DataType, Field, Schema};
2470        use arrow::ipc::writer::StreamWriter;
2471        use arrow::record_batch::RecordBatch;
2472        use std::sync::Arc;
2473
2474        let schema = Arc::new(Schema::new(vec![
2475            Field::new("id", DataType::Int32, false),
2476            Field::new("name", DataType::Utf8, false),
2477        ]));
2478
2479        let batch = RecordBatch::try_new(
2480            schema.clone(),
2481            vec![
2482                Arc::new(Int32Array::from(vec![1, 2, 3])),
2483                Arc::new(StringArray::from(vec!["a", "b", "c"])),
2484            ],
2485        )
2486        .unwrap();
2487
2488        let mut buffer = Vec::new();
2489        {
2490            let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap();
2491            writer.write(&batch).unwrap();
2492            writer.finish().unwrap();
2493        }
2494        buffer
2495    }
2496
2497    #[rstest]
2498    #[case::with_optimization(true)]
2499    #[case::without_optimization(false)]
2500    #[tokio::test]
2501    async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) {
2502        let temp_dir = TempStdDir::default();
2503        let temp_path = temp_dir.to_str().unwrap();
2504
2505        // Create a DirectoryNamespace with manifest enabled (default)
2506        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2507            .inline_optimization_enabled(inline_optimization)
2508            .build()
2509            .await
2510            .unwrap();
2511
2512        // Verify we can list tables (should be empty)
2513        let mut request = ListTablesRequest::new();
2514        request.id = Some(vec![]);
2515        let response = dir_namespace.list_tables(request).await.unwrap();
2516        assert_eq!(response.tables.len(), 0);
2517
2518        // Create a test table
2519        let buffer = create_test_ipc_data();
2520        let mut create_request = CreateTableRequest::new();
2521        create_request.id = Some(vec!["test_table".to_string()]);
2522
2523        let _response = dir_namespace
2524            .create_table(create_request, Bytes::from(buffer))
2525            .await
2526            .unwrap();
2527
2528        // List tables again - should see our new table
2529        let mut request = ListTablesRequest::new();
2530        request.id = Some(vec![]);
2531        let response = dir_namespace.list_tables(request).await.unwrap();
2532        assert_eq!(response.tables.len(), 1);
2533        assert_eq!(response.tables[0], "test_table");
2534    }
2535
2536    #[rstest]
2537    #[case::with_optimization(true)]
2538    #[case::without_optimization(false)]
2539    #[tokio::test]
2540    async fn test_manifest_namespace_table_exists(#[case] inline_optimization: bool) {
2541        let temp_dir = TempStdDir::default();
2542        let temp_path = temp_dir.to_str().unwrap();
2543
2544        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2545            .inline_optimization_enabled(inline_optimization)
2546            .build()
2547            .await
2548            .unwrap();
2549
2550        // Check non-existent table
2551        let mut request = TableExistsRequest::new();
2552        request.id = Some(vec!["nonexistent".to_string()]);
2553        let result = dir_namespace.table_exists(request).await;
2554        assert!(result.is_err());
2555
2556        // Create table
2557        let buffer = create_test_ipc_data();
2558        let mut create_request = CreateTableRequest::new();
2559        create_request.id = Some(vec!["test_table".to_string()]);
2560        dir_namespace
2561            .create_table(create_request, Bytes::from(buffer))
2562            .await
2563            .unwrap();
2564
2565        // Check existing table
2566        let mut request = TableExistsRequest::new();
2567        request.id = Some(vec!["test_table".to_string()]);
2568        let result = dir_namespace.table_exists(request).await;
2569        assert!(result.is_ok());
2570    }
2571
2572    #[rstest]
2573    #[case::with_optimization(true)]
2574    #[case::without_optimization(false)]
2575    #[tokio::test]
2576    async fn test_manifest_namespace_describe_table(#[case] inline_optimization: bool) {
2577        let temp_dir = TempStdDir::default();
2578        let temp_path = temp_dir.to_str().unwrap();
2579
2580        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2581            .inline_optimization_enabled(inline_optimization)
2582            .build()
2583            .await
2584            .unwrap();
2585
2586        // Describe non-existent table
2587        let mut request = DescribeTableRequest::new();
2588        request.id = Some(vec!["nonexistent".to_string()]);
2589        let result = dir_namespace.describe_table(request).await;
2590        assert!(result.is_err());
2591
2592        // Create table
2593        let buffer = create_test_ipc_data();
2594        let mut create_request = CreateTableRequest::new();
2595        create_request.id = Some(vec!["test_table".to_string()]);
2596        dir_namespace
2597            .create_table(create_request, Bytes::from(buffer))
2598            .await
2599            .unwrap();
2600
2601        // Describe existing table
2602        let mut request = DescribeTableRequest::new();
2603        request.id = Some(vec!["test_table".to_string()]);
2604        let response = dir_namespace.describe_table(request).await.unwrap();
2605        assert!(response.location.is_some());
2606        assert!(response.location.unwrap().contains("test_table"));
2607    }
2608
2609    #[rstest]
2610    #[case::with_optimization(true)]
2611    #[case::without_optimization(false)]
2612    #[tokio::test]
2613    async fn test_manifest_namespace_drop_table(#[case] inline_optimization: bool) {
2614        let temp_dir = TempStdDir::default();
2615        let temp_path = temp_dir.to_str().unwrap();
2616
2617        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2618            .inline_optimization_enabled(inline_optimization)
2619            .build()
2620            .await
2621            .unwrap();
2622
2623        // Create table
2624        let buffer = create_test_ipc_data();
2625        let mut create_request = CreateTableRequest::new();
2626        create_request.id = Some(vec!["test_table".to_string()]);
2627        dir_namespace
2628            .create_table(create_request, Bytes::from(buffer))
2629            .await
2630            .unwrap();
2631
2632        // Verify table exists
2633        let mut request = ListTablesRequest::new();
2634        request.id = Some(vec![]);
2635        let response = dir_namespace.list_tables(request).await.unwrap();
2636        assert_eq!(response.tables.len(), 1);
2637
2638        // Drop table
2639        let mut drop_request = DropTableRequest::new();
2640        drop_request.id = Some(vec!["test_table".to_string()]);
2641        let _response = dir_namespace.drop_table(drop_request).await.unwrap();
2642
2643        // Verify table is gone
2644        let mut request = ListTablesRequest::new();
2645        request.id = Some(vec![]);
2646        let response = dir_namespace.list_tables(request).await.unwrap();
2647        assert_eq!(response.tables.len(), 0);
2648    }
2649
2650    #[rstest]
2651    #[case::with_optimization(true)]
2652    #[case::without_optimization(false)]
2653    #[tokio::test]
2654    async fn test_manifest_namespace_multiple_tables(#[case] inline_optimization: bool) {
2655        let temp_dir = TempStdDir::default();
2656        let temp_path = temp_dir.to_str().unwrap();
2657
2658        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2659            .inline_optimization_enabled(inline_optimization)
2660            .build()
2661            .await
2662            .unwrap();
2663
2664        // Create multiple tables
2665        let buffer = create_test_ipc_data();
2666        for i in 1..=3 {
2667            let mut create_request = CreateTableRequest::new();
2668            create_request.id = Some(vec![format!("table{}", i)]);
2669            dir_namespace
2670                .create_table(create_request, Bytes::from(buffer.clone()))
2671                .await
2672                .unwrap();
2673        }
2674
2675        // List all tables
2676        let mut request = ListTablesRequest::new();
2677        request.id = Some(vec![]);
2678        let response = dir_namespace.list_tables(request).await.unwrap();
2679        assert_eq!(response.tables.len(), 3);
2680        assert!(response.tables.contains(&"table1".to_string()));
2681        assert!(response.tables.contains(&"table2".to_string()));
2682        assert!(response.tables.contains(&"table3".to_string()));
2683    }
2684
2685    #[rstest]
2686    #[case::with_optimization(true)]
2687    #[case::without_optimization(false)]
2688    #[tokio::test]
2689    async fn test_directory_only_mode(#[case] inline_optimization: bool) {
2690        let temp_dir = TempStdDir::default();
2691        let temp_path = temp_dir.to_str().unwrap();
2692
2693        // Create a DirectoryNamespace with manifest disabled
2694        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2695            .manifest_enabled(false)
2696            .inline_optimization_enabled(inline_optimization)
2697            .build()
2698            .await
2699            .unwrap();
2700
2701        // Verify we can list tables (should be empty)
2702        let mut request = ListTablesRequest::new();
2703        request.id = Some(vec![]);
2704        let response = dir_namespace.list_tables(request).await.unwrap();
2705        assert_eq!(response.tables.len(), 0);
2706
2707        // Create a test table
2708        let buffer = create_test_ipc_data();
2709        let mut create_request = CreateTableRequest::new();
2710        create_request.id = Some(vec!["test_table".to_string()]);
2711
2712        // Create table - this should use directory-only mode
2713        let _response = dir_namespace
2714            .create_table(create_request, Bytes::from(buffer))
2715            .await
2716            .unwrap();
2717
2718        // List tables - should see our new table
2719        let mut request = ListTablesRequest::new();
2720        request.id = Some(vec![]);
2721        let response = dir_namespace.list_tables(request).await.unwrap();
2722        assert_eq!(response.tables.len(), 1);
2723        assert_eq!(response.tables[0], "test_table");
2724    }
2725
2726    #[rstest]
2727    #[case::with_optimization(true)]
2728    #[case::without_optimization(false)]
2729    #[tokio::test]
2730    async fn test_dual_mode_merge(#[case] inline_optimization: bool) {
2731        let temp_dir = TempStdDir::default();
2732        let temp_path = temp_dir.to_str().unwrap();
2733
2734        // Create a DirectoryNamespace with both manifest and directory enabled
2735        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2736            .manifest_enabled(true)
2737            .dir_listing_enabled(true)
2738            .inline_optimization_enabled(inline_optimization)
2739            .build()
2740            .await
2741            .unwrap();
2742
2743        // Create tables through manifest
2744        let buffer = create_test_ipc_data();
2745        let mut create_request = CreateTableRequest::new();
2746        create_request.id = Some(vec!["table1".to_string()]);
2747        dir_namespace
2748            .create_table(create_request, Bytes::from(buffer))
2749            .await
2750            .unwrap();
2751
2752        // List tables - should see table from both manifest and directory
2753        let mut request = ListTablesRequest::new();
2754        request.id = Some(vec![]);
2755        let response = dir_namespace.list_tables(request).await.unwrap();
2756        assert_eq!(response.tables.len(), 1);
2757        assert_eq!(response.tables[0], "table1");
2758    }
2759
2760    #[rstest]
2761    #[case::with_optimization(true)]
2762    #[case::without_optimization(false)]
2763    #[tokio::test]
2764    async fn test_manifest_only_mode(#[case] inline_optimization: bool) {
2765        let temp_dir = TempStdDir::default();
2766        let temp_path = temp_dir.to_str().unwrap();
2767
2768        // Create a DirectoryNamespace with only manifest enabled
2769        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2770            .manifest_enabled(true)
2771            .dir_listing_enabled(false)
2772            .inline_optimization_enabled(inline_optimization)
2773            .build()
2774            .await
2775            .unwrap();
2776
2777        // Create table
2778        let buffer = create_test_ipc_data();
2779        let mut create_request = CreateTableRequest::new();
2780        create_request.id = Some(vec!["test_table".to_string()]);
2781        dir_namespace
2782            .create_table(create_request, Bytes::from(buffer))
2783            .await
2784            .unwrap();
2785
2786        // List tables - should only use manifest
2787        let mut request = ListTablesRequest::new();
2788        request.id = Some(vec![]);
2789        let response = dir_namespace.list_tables(request).await.unwrap();
2790        assert_eq!(response.tables.len(), 1);
2791        assert_eq!(response.tables[0], "test_table");
2792    }
2793
2794    #[rstest]
2795    #[case::with_optimization(true)]
2796    #[case::without_optimization(false)]
2797    #[tokio::test]
2798    async fn test_drop_nonexistent_table(#[case] inline_optimization: bool) {
2799        let temp_dir = TempStdDir::default();
2800        let temp_path = temp_dir.to_str().unwrap();
2801
2802        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2803            .inline_optimization_enabled(inline_optimization)
2804            .build()
2805            .await
2806            .unwrap();
2807
2808        // Try to drop non-existent table
2809        let mut drop_request = DropTableRequest::new();
2810        drop_request.id = Some(vec!["nonexistent".to_string()]);
2811        let result = dir_namespace.drop_table(drop_request).await;
2812        assert!(result.is_err());
2813    }
2814
2815    #[rstest]
2816    #[case::with_optimization(true)]
2817    #[case::without_optimization(false)]
2818    #[tokio::test]
2819    async fn test_create_duplicate_table_fails(#[case] inline_optimization: bool) {
2820        let temp_dir = TempStdDir::default();
2821        let temp_path = temp_dir.to_str().unwrap();
2822
2823        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2824            .inline_optimization_enabled(inline_optimization)
2825            .build()
2826            .await
2827            .unwrap();
2828
2829        // Create table
2830        let buffer = create_test_ipc_data();
2831        let mut create_request = CreateTableRequest::new();
2832        create_request.id = Some(vec!["test_table".to_string()]);
2833        dir_namespace
2834            .create_table(create_request, Bytes::from(buffer.clone()))
2835            .await
2836            .unwrap();
2837
2838        // Try to create table with same name - should fail
2839        let mut create_request = CreateTableRequest::new();
2840        create_request.id = Some(vec!["test_table".to_string()]);
2841        let result = dir_namespace
2842            .create_table(create_request, Bytes::from(buffer))
2843            .await;
2844        assert!(result.is_err());
2845    }
2846
2847    #[rstest]
2848    #[case::with_optimization(true)]
2849    #[case::without_optimization(false)]
2850    #[tokio::test]
2851    async fn test_create_child_namespace(#[case] inline_optimization: bool) {
2852        use lance_namespace::models::{
2853            CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2854        };
2855
2856        let temp_dir = TempStdDir::default();
2857        let temp_path = temp_dir.to_str().unwrap();
2858
2859        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2860            .inline_optimization_enabled(inline_optimization)
2861            .build()
2862            .await
2863            .unwrap();
2864
2865        // Create a child namespace
2866        let mut create_req = CreateNamespaceRequest::new();
2867        create_req.id = Some(vec!["ns1".to_string()]);
2868        let result = dir_namespace.create_namespace(create_req).await;
2869        assert!(
2870            result.is_ok(),
2871            "Failed to create child namespace: {:?}",
2872            result.err()
2873        );
2874
2875        // Verify namespace exists
2876        let exists_req = NamespaceExistsRequest {
2877            id: Some(vec!["ns1".to_string()]),
2878            ..Default::default()
2879        };
2880        let result = dir_namespace.namespace_exists(exists_req).await;
2881        assert!(result.is_ok(), "Namespace should exist");
2882
2883        // List child namespaces of root
2884        let list_req = ListNamespacesRequest {
2885            id: Some(vec![]),
2886            page_token: None,
2887            limit: None,
2888            ..Default::default()
2889        };
2890        let result = dir_namespace.list_namespaces(list_req).await;
2891        assert!(result.is_ok());
2892        let namespaces = result.unwrap();
2893        assert_eq!(namespaces.namespaces.len(), 1);
2894        assert_eq!(namespaces.namespaces[0], "ns1");
2895    }
2896
2897    #[rstest]
2898    #[case::with_optimization(true)]
2899    #[case::without_optimization(false)]
2900    #[tokio::test]
2901    async fn test_create_nested_namespace(#[case] inline_optimization: bool) {
2902        use lance_namespace::models::{
2903            CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2904        };
2905
2906        let temp_dir = TempStdDir::default();
2907        let temp_path = temp_dir.to_str().unwrap();
2908
2909        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2910            .inline_optimization_enabled(inline_optimization)
2911            .build()
2912            .await
2913            .unwrap();
2914
2915        // Create parent namespace
2916        let mut create_req = CreateNamespaceRequest::new();
2917        create_req.id = Some(vec!["parent".to_string()]);
2918        dir_namespace.create_namespace(create_req).await.unwrap();
2919
2920        // Create nested child namespace
2921        let mut create_req = CreateNamespaceRequest::new();
2922        create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2923        let result = dir_namespace.create_namespace(create_req).await;
2924        assert!(
2925            result.is_ok(),
2926            "Failed to create nested namespace: {:?}",
2927            result.err()
2928        );
2929
2930        // Verify nested namespace exists
2931        let exists_req = NamespaceExistsRequest {
2932            id: Some(vec!["parent".to_string(), "child".to_string()]),
2933            ..Default::default()
2934        };
2935        let result = dir_namespace.namespace_exists(exists_req).await;
2936        assert!(result.is_ok(), "Nested namespace should exist");
2937
2938        // List child namespaces of parent
2939        let list_req = ListNamespacesRequest {
2940            id: Some(vec!["parent".to_string()]),
2941            page_token: None,
2942            limit: None,
2943            ..Default::default()
2944        };
2945        let result = dir_namespace.list_namespaces(list_req).await;
2946        assert!(result.is_ok());
2947        let namespaces = result.unwrap();
2948        assert_eq!(namespaces.namespaces.len(), 1);
2949        assert_eq!(namespaces.namespaces[0], "child");
2950    }
2951
2952    #[rstest]
2953    #[case::with_optimization(true)]
2954    #[case::without_optimization(false)]
2955    #[tokio::test]
2956    async fn test_create_namespace_without_parent_fails(#[case] inline_optimization: bool) {
2957        use lance_namespace::models::CreateNamespaceRequest;
2958
2959        let temp_dir = TempStdDir::default();
2960        let temp_path = temp_dir.to_str().unwrap();
2961
2962        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2963            .inline_optimization_enabled(inline_optimization)
2964            .build()
2965            .await
2966            .unwrap();
2967
2968        // Try to create nested namespace without parent
2969        let mut create_req = CreateNamespaceRequest::new();
2970        create_req.id = Some(vec!["nonexistent_parent".to_string(), "child".to_string()]);
2971        let result = dir_namespace.create_namespace(create_req).await;
2972        assert!(result.is_err(), "Should fail when parent doesn't exist");
2973    }
2974
2975    #[rstest]
2976    #[case::with_optimization(true)]
2977    #[case::without_optimization(false)]
2978    #[tokio::test]
2979    async fn test_drop_child_namespace(#[case] inline_optimization: bool) {
2980        use lance_namespace::models::{
2981            CreateNamespaceRequest, DropNamespaceRequest, NamespaceExistsRequest,
2982        };
2983
2984        let temp_dir = TempStdDir::default();
2985        let temp_path = temp_dir.to_str().unwrap();
2986
2987        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2988            .inline_optimization_enabled(inline_optimization)
2989            .build()
2990            .await
2991            .unwrap();
2992
2993        // Create a child namespace
2994        let mut create_req = CreateNamespaceRequest::new();
2995        create_req.id = Some(vec!["ns1".to_string()]);
2996        dir_namespace.create_namespace(create_req).await.unwrap();
2997
2998        // Drop the namespace
2999        let mut drop_req = DropNamespaceRequest::new();
3000        drop_req.id = Some(vec!["ns1".to_string()]);
3001        let result = dir_namespace.drop_namespace(drop_req).await;
3002        assert!(
3003            result.is_ok(),
3004            "Failed to drop namespace: {:?}",
3005            result.err()
3006        );
3007
3008        // Verify namespace no longer exists
3009        let exists_req = NamespaceExistsRequest {
3010            id: Some(vec!["ns1".to_string()]),
3011            ..Default::default()
3012        };
3013        let result = dir_namespace.namespace_exists(exists_req).await;
3014        assert!(result.is_err(), "Namespace should not exist after drop");
3015    }
3016
3017    #[rstest]
3018    #[case::with_optimization(true)]
3019    #[case::without_optimization(false)]
3020    #[tokio::test]
3021    async fn test_drop_namespace_with_children_fails(#[case] inline_optimization: bool) {
3022        use lance_namespace::models::{CreateNamespaceRequest, DropNamespaceRequest};
3023
3024        let temp_dir = TempStdDir::default();
3025        let temp_path = temp_dir.to_str().unwrap();
3026
3027        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3028            .inline_optimization_enabled(inline_optimization)
3029            .build()
3030            .await
3031            .unwrap();
3032
3033        // Create parent and child namespaces
3034        let mut create_req = CreateNamespaceRequest::new();
3035        create_req.id = Some(vec!["parent".to_string()]);
3036        dir_namespace.create_namespace(create_req).await.unwrap();
3037
3038        let mut create_req = CreateNamespaceRequest::new();
3039        create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
3040        dir_namespace.create_namespace(create_req).await.unwrap();
3041
3042        // Try to drop parent namespace - should fail because it has children
3043        let mut drop_req = DropNamespaceRequest::new();
3044        drop_req.id = Some(vec!["parent".to_string()]);
3045        let result = dir_namespace.drop_namespace(drop_req).await;
3046        assert!(result.is_err(), "Should fail when namespace has children");
3047    }
3048
3049    #[rstest]
3050    #[case::with_optimization(true)]
3051    #[case::without_optimization(false)]
3052    #[tokio::test]
3053    async fn test_create_table_in_child_namespace(#[case] inline_optimization: bool) {
3054        use lance_namespace::models::{
3055            CreateNamespaceRequest, CreateTableRequest, ListTablesRequest,
3056        };
3057
3058        let temp_dir = TempStdDir::default();
3059        let temp_path = temp_dir.to_str().unwrap();
3060
3061        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3062            .inline_optimization_enabled(inline_optimization)
3063            .build()
3064            .await
3065            .unwrap();
3066
3067        // Create a child namespace
3068        let mut create_ns_req = CreateNamespaceRequest::new();
3069        create_ns_req.id = Some(vec!["ns1".to_string()]);
3070        dir_namespace.create_namespace(create_ns_req).await.unwrap();
3071
3072        // Create a table in the child namespace
3073        let buffer = create_test_ipc_data();
3074        let mut create_table_req = CreateTableRequest::new();
3075        create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
3076        let result = dir_namespace
3077            .create_table(create_table_req, Bytes::from(buffer))
3078            .await;
3079        assert!(
3080            result.is_ok(),
3081            "Failed to create table in child namespace: {:?}",
3082            result.err()
3083        );
3084
3085        // List tables in the namespace
3086        let list_req = ListTablesRequest {
3087            id: Some(vec!["ns1".to_string()]),
3088            page_token: None,
3089            limit: None,
3090            ..Default::default()
3091        };
3092        let result = dir_namespace.list_tables(list_req).await;
3093        assert!(result.is_ok());
3094        let tables = result.unwrap();
3095        assert_eq!(tables.tables.len(), 1);
3096        assert_eq!(tables.tables[0], "table1");
3097    }
3098
3099    #[rstest]
3100    #[case::with_optimization(true)]
3101    #[case::without_optimization(false)]
3102    #[tokio::test]
3103    async fn test_describe_child_namespace(#[case] inline_optimization: bool) {
3104        use lance_namespace::models::{CreateNamespaceRequest, DescribeNamespaceRequest};
3105
3106        let temp_dir = TempStdDir::default();
3107        let temp_path = temp_dir.to_str().unwrap();
3108
3109        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3110            .inline_optimization_enabled(inline_optimization)
3111            .build()
3112            .await
3113            .unwrap();
3114
3115        // Create a child namespace with properties
3116        let mut properties = std::collections::HashMap::new();
3117        properties.insert("key1".to_string(), "value1".to_string());
3118
3119        let mut create_req = CreateNamespaceRequest::new();
3120        create_req.id = Some(vec!["ns1".to_string()]);
3121        create_req.properties = Some(properties.clone());
3122        dir_namespace.create_namespace(create_req).await.unwrap();
3123
3124        // Describe the namespace
3125        let describe_req = DescribeNamespaceRequest {
3126            id: Some(vec!["ns1".to_string()]),
3127            ..Default::default()
3128        };
3129        let result = dir_namespace.describe_namespace(describe_req).await;
3130        assert!(
3131            result.is_ok(),
3132            "Failed to describe namespace: {:?}",
3133            result.err()
3134        );
3135        let response = result.unwrap();
3136        assert!(response.properties.is_some());
3137        assert_eq!(
3138            response.properties.unwrap().get("key1"),
3139            Some(&"value1".to_string())
3140        );
3141    }
3142
3143    #[rstest]
3144    #[case::with_optimization(true)]
3145    #[case::without_optimization(false)]
3146    #[tokio::test]
3147    async fn test_concurrent_create_and_drop_single_instance(#[case] inline_optimization: bool) {
3148        use futures::future::join_all;
3149        use std::sync::Arc;
3150
3151        let temp_dir = TempStdDir::default();
3152        let temp_path = temp_dir.to_str().unwrap();
3153
3154        let dir_namespace = Arc::new(
3155            DirectoryNamespaceBuilder::new(temp_path)
3156                .inline_optimization_enabled(inline_optimization)
3157                .build()
3158                .await
3159                .unwrap(),
3160        );
3161
3162        // Initialize namespace first - create parent namespace to ensure __manifest table
3163        // is created before concurrent operations
3164        let mut create_ns_request = CreateNamespaceRequest::new();
3165        create_ns_request.id = Some(vec!["test_ns".to_string()]);
3166        dir_namespace
3167            .create_namespace(create_ns_request)
3168            .await
3169            .unwrap();
3170
3171        let num_tables = 10;
3172        let mut handles = Vec::new();
3173
3174        for i in 0..num_tables {
3175            let ns = dir_namespace.clone();
3176            let handle = async move {
3177                let table_name = format!("concurrent_table_{}", i);
3178                let table_id = vec!["test_ns".to_string(), table_name.clone()];
3179                let buffer = create_test_ipc_data();
3180
3181                // Create table
3182                let mut create_request = CreateTableRequest::new();
3183                create_request.id = Some(table_id.clone());
3184                ns.create_table(create_request, Bytes::from(buffer))
3185                    .await
3186                    .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3187
3188                // Drop table
3189                let mut drop_request = DropTableRequest::new();
3190                drop_request.id = Some(table_id);
3191                ns.drop_table(drop_request)
3192                    .await
3193                    .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3194
3195                Ok::<_, lance_core::Error>(())
3196            };
3197            handles.push(handle);
3198        }
3199
3200        let results = join_all(handles).await;
3201        for result in results {
3202            assert!(result.is_ok(), "All concurrent operations should succeed");
3203        }
3204
3205        // Verify all tables are dropped
3206        let mut request = ListTablesRequest::new();
3207        request.id = Some(vec!["test_ns".to_string()]);
3208        let response = dir_namespace.list_tables(request).await.unwrap();
3209        assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3210    }
3211
3212    #[rstest]
3213    #[case::with_optimization(true)]
3214    #[case::without_optimization(false)]
3215    #[tokio::test]
3216    async fn test_concurrent_create_and_drop_multiple_instances(#[case] inline_optimization: bool) {
3217        use futures::future::join_all;
3218
3219        let temp_dir = TempStdDir::default();
3220        let temp_path = temp_dir.to_str().unwrap().to_string();
3221
3222        // Initialize namespace first with a single instance to ensure __manifest
3223        // table is created and parent namespace exists before concurrent operations
3224        let init_ns = DirectoryNamespaceBuilder::new(&temp_path)
3225            .inline_optimization_enabled(inline_optimization)
3226            .build()
3227            .await
3228            .unwrap();
3229        let mut create_ns_request = CreateNamespaceRequest::new();
3230        create_ns_request.id = Some(vec!["test_ns".to_string()]);
3231        init_ns.create_namespace(create_ns_request).await.unwrap();
3232
3233        let num_tables = 10;
3234        let mut handles = Vec::new();
3235
3236        for i in 0..num_tables {
3237            let path = temp_path.clone();
3238            let handle = async move {
3239                // Each task creates its own namespace instance
3240                let ns = DirectoryNamespaceBuilder::new(&path)
3241                    .inline_optimization_enabled(inline_optimization)
3242                    .build()
3243                    .await
3244                    .unwrap();
3245
3246                let table_name = format!("multi_ns_table_{}", i);
3247                let table_id = vec!["test_ns".to_string(), table_name.clone()];
3248                let buffer = create_test_ipc_data();
3249
3250                // Create table
3251                let mut create_request = CreateTableRequest::new();
3252                create_request.id = Some(table_id.clone());
3253                ns.create_table(create_request, Bytes::from(buffer))
3254                    .await
3255                    .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3256
3257                // Drop table
3258                let mut drop_request = DropTableRequest::new();
3259                drop_request.id = Some(table_id);
3260                ns.drop_table(drop_request)
3261                    .await
3262                    .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3263
3264                Ok::<_, lance_core::Error>(())
3265            };
3266            handles.push(handle);
3267        }
3268
3269        let results = join_all(handles).await;
3270        for result in results {
3271            assert!(result.is_ok(), "All concurrent operations should succeed");
3272        }
3273
3274        // Verify with a fresh namespace instance
3275        let verify_ns = DirectoryNamespaceBuilder::new(&temp_path)
3276            .inline_optimization_enabled(inline_optimization)
3277            .build()
3278            .await
3279            .unwrap();
3280
3281        let mut request = ListTablesRequest::new();
3282        request.id = Some(vec!["test_ns".to_string()]);
3283        let response = verify_ns.list_tables(request).await.unwrap();
3284        assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3285    }
3286
3287    #[rstest]
3288    #[case::with_optimization(true)]
3289    #[case::without_optimization(false)]
3290    #[tokio::test]
3291    async fn test_concurrent_create_then_drop_from_different_instance(
3292        #[case] inline_optimization: bool,
3293    ) {
3294        use futures::future::join_all;
3295
3296        let temp_dir = TempStdDir::default();
3297        let temp_path = temp_dir.to_str().unwrap().to_string();
3298
3299        // Initialize namespace first with a single instance to ensure __manifest
3300        // table is created and parent namespace exists before concurrent operations
3301        let init_ns = DirectoryNamespaceBuilder::new(&temp_path)
3302            .inline_optimization_enabled(inline_optimization)
3303            .build()
3304            .await
3305            .unwrap();
3306        let mut create_ns_request = CreateNamespaceRequest::new();
3307        create_ns_request.id = Some(vec!["test_ns".to_string()]);
3308        init_ns.create_namespace(create_ns_request).await.unwrap();
3309
3310        let num_tables = 10;
3311
3312        // Phase 1: Create all tables concurrently using separate namespace instances
3313        let mut create_handles = Vec::new();
3314        for i in 0..num_tables {
3315            let path = temp_path.clone();
3316            let handle = async move {
3317                let ns = DirectoryNamespaceBuilder::new(&path)
3318                    .inline_optimization_enabled(inline_optimization)
3319                    .build()
3320                    .await
3321                    .unwrap();
3322
3323                let table_name = format!("cross_instance_table_{}", i);
3324                let table_id = vec!["test_ns".to_string(), table_name.clone()];
3325                let buffer = create_test_ipc_data();
3326
3327                let mut create_request = CreateTableRequest::new();
3328                create_request.id = Some(table_id);
3329                ns.create_table(create_request, Bytes::from(buffer))
3330                    .await
3331                    .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3332
3333                Ok::<_, lance_core::Error>(())
3334            };
3335            create_handles.push(handle);
3336        }
3337
3338        let create_results = join_all(create_handles).await;
3339        for result in create_results {
3340            assert!(result.is_ok(), "All create operations should succeed");
3341        }
3342
3343        // Phase 2: Drop all tables concurrently using NEW namespace instances
3344        let mut drop_handles = Vec::new();
3345        for i in 0..num_tables {
3346            let path = temp_path.clone();
3347            let handle = async move {
3348                let ns = DirectoryNamespaceBuilder::new(&path)
3349                    .inline_optimization_enabled(inline_optimization)
3350                    .build()
3351                    .await
3352                    .unwrap();
3353
3354                let table_name = format!("cross_instance_table_{}", i);
3355                let table_id = vec!["test_ns".to_string(), table_name.clone()];
3356
3357                let mut drop_request = DropTableRequest::new();
3358                drop_request.id = Some(table_id);
3359                ns.drop_table(drop_request)
3360                    .await
3361                    .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3362
3363                Ok::<_, lance_core::Error>(())
3364            };
3365            drop_handles.push(handle);
3366        }
3367
3368        let drop_results = join_all(drop_handles).await;
3369        for result in drop_results {
3370            assert!(result.is_ok(), "All drop operations should succeed");
3371        }
3372
3373        // Verify all tables are dropped
3374        let verify_ns = DirectoryNamespaceBuilder::new(&temp_path)
3375            .inline_optimization_enabled(inline_optimization)
3376            .build()
3377            .await
3378            .unwrap();
3379
3380        let mut request = ListTablesRequest::new();
3381        request.id = Some(vec!["test_ns".to_string()]);
3382        let response = verify_ns.list_tables(request).await.unwrap();
3383        assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3384    }
3385
3386    #[test]
3387    fn test_construct_full_uri_with_cloud_urls() {
3388        // Test S3-style URL with nested path (no trailing slash)
3389        let s3_result =
3390            ManifestNamespace::construct_full_uri("s3://bucket/path/subdir", "table.lance")
3391                .unwrap();
3392        assert_eq!(
3393            s3_result, "s3://bucket/path/subdir/table.lance",
3394            "S3 URL should correctly append table name to nested path"
3395        );
3396
3397        // Test Azure-style URL with nested path (no trailing slash)
3398        let az_result =
3399            ManifestNamespace::construct_full_uri("az://container/path/subdir", "table.lance")
3400                .unwrap();
3401        assert_eq!(
3402            az_result, "az://container/path/subdir/table.lance",
3403            "Azure URL should correctly append table name to nested path"
3404        );
3405
3406        // Test GCS-style URL with nested path (no trailing slash)
3407        let gs_result =
3408            ManifestNamespace::construct_full_uri("gs://bucket/path/subdir", "table.lance")
3409                .unwrap();
3410        assert_eq!(
3411            gs_result, "gs://bucket/path/subdir/table.lance",
3412            "GCS URL should correctly append table name to nested path"
3413        );
3414
3415        // Test with deeper nesting
3416        let deep_result =
3417            ManifestNamespace::construct_full_uri("s3://bucket/a/b/c/d", "my_table.lance").unwrap();
3418        assert_eq!(
3419            deep_result, "s3://bucket/a/b/c/d/my_table.lance",
3420            "Deeply nested path should work correctly"
3421        );
3422
3423        // Test with root-level path (single segment after bucket)
3424        let shallow_result =
3425            ManifestNamespace::construct_full_uri("s3://bucket", "table.lance").unwrap();
3426        assert_eq!(
3427            shallow_result, "s3://bucket/table.lance",
3428            "Single-level nested path should work correctly"
3429        );
3430
3431        // Test that URLs with trailing slash already work (no regression)
3432        let trailing_slash_result =
3433            ManifestNamespace::construct_full_uri("s3://bucket/path/subdir/", "table.lance")
3434                .unwrap();
3435        assert_eq!(
3436            trailing_slash_result, "s3://bucket/path/subdir/table.lance",
3437            "URL with existing trailing slash should still work"
3438        );
3439    }
3440
3441    /// Test that concurrent create_table calls for the same table name don't
3442    /// create duplicate entries in the manifest. Uses two independent
3443    /// ManifestNamespace instances pointing at the same directory to simulate
3444    /// two separate OS processes racing on table creation. The conflict_retries
3445    /// setting on the MergeInsert ensures the second operation properly detects
3446    /// the duplicate via WhenMatched::Fail after retrying against the latest data.
3447    #[tokio::test]
3448    async fn test_concurrent_create_table_no_duplicates() {
3449        let temp_dir = TempStdDir::default();
3450        let temp_path = temp_dir.to_str().unwrap();
3451
3452        // Two independent namespace instances = two separate "processes"
3453        // sharing the same underlying filesystem directory.
3454        let ns1 = DirectoryNamespaceBuilder::new(temp_path)
3455            .inline_optimization_enabled(false)
3456            .build()
3457            .await
3458            .unwrap();
3459        let ns2 = DirectoryNamespaceBuilder::new(temp_path)
3460            .inline_optimization_enabled(false)
3461            .build()
3462            .await
3463            .unwrap();
3464
3465        let buffer = create_test_ipc_data();
3466
3467        let mut req1 = CreateTableRequest::new();
3468        req1.id = Some(vec!["race_table".to_string()]);
3469        let mut req2 = CreateTableRequest::new();
3470        req2.id = Some(vec!["race_table".to_string()]);
3471
3472        // Launch both create_table calls concurrently
3473        let (result1, result2) = tokio::join!(
3474            ns1.create_table(req1, Bytes::from(buffer.clone())),
3475            ns2.create_table(req2, Bytes::from(buffer.clone())),
3476        );
3477
3478        // Exactly one should succeed and one should fail
3479        let success_count = [&result1, &result2].iter().filter(|r| r.is_ok()).count();
3480        let failure_count = [&result1, &result2].iter().filter(|r| r.is_err()).count();
3481        assert_eq!(
3482            success_count, 1,
3483            "Exactly one create should succeed, got: result1={:?}, result2={:?}",
3484            result1, result2
3485        );
3486        assert_eq!(
3487            failure_count, 1,
3488            "Exactly one create should fail, got: result1={:?}, result2={:?}",
3489            result1, result2
3490        );
3491
3492        // Verify only one table entry exists in the manifest
3493        let ns_check = DirectoryNamespaceBuilder::new(temp_path)
3494            .inline_optimization_enabled(false)
3495            .build()
3496            .await
3497            .unwrap();
3498        let mut list_request = ListTablesRequest::new();
3499        list_request.id = Some(vec![]);
3500        let response = ns_check.list_tables(list_request).await.unwrap();
3501        assert_eq!(
3502            response.tables.len(),
3503            1,
3504            "Should have exactly 1 table, found: {:?}",
3505            response.tables
3506        );
3507        assert_eq!(response.tables[0], "race_table");
3508
3509        // Also verify describe_table works (no "found 2" error)
3510        let mut describe_request = DescribeTableRequest::new();
3511        describe_request.id = Some(vec!["race_table".to_string()]);
3512        let describe_result = ns_check.describe_table(describe_request).await;
3513        assert!(
3514            describe_result.is_ok(),
3515            "describe_table should not fail with duplicate entries: {:?}",
3516            describe_result
3517        );
3518    }
3519}