Skip to main content

lance_namespace_impls/dir/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Manifest-based namespace implementation
5//!
6//! This module provides a namespace implementation that uses a manifest table
7//! to track tables and nested namespaces.
8
9use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray};
10use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
11use arrow_ipc::reader::StreamReader;
12use async_trait::async_trait;
13use bytes::Bytes;
14use futures::{FutureExt, stream::StreamExt};
15use lance::dataset::optimize::{CompactionOptions, compact_files};
16use lance::dataset::{ReadParams, WriteParams, builder::DatasetBuilder};
17use lance::session::Session;
18use lance::{Dataset, dataset::scanner::Scanner};
19use lance_core::Error as LanceError;
20use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION;
21use lance_core::{Error, Result, box_error};
22use lance_index::IndexType;
23use lance_index::optimize::OptimizeOptions;
24use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
25use lance_index::traits::DatasetIndexExt;
26use lance_io::object_store::{ObjectStore, ObjectStoreParams};
27use lance_namespace::LanceNamespace;
28use lance_namespace::error::NamespaceError;
29use lance_namespace::models::{
30    CreateNamespaceRequest, CreateNamespaceResponse, CreateTableRequest, CreateTableResponse,
31    DeclareTableRequest, DeclareTableResponse, DeregisterTableRequest, DeregisterTableResponse,
32    DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
33    DescribeTableResponse, DropNamespaceRequest, DropNamespaceResponse, DropTableRequest,
34    DropTableResponse, ListNamespacesRequest, ListNamespacesResponse, ListTablesRequest,
35    ListTablesResponse, NamespaceExistsRequest, RegisterTableRequest, RegisterTableResponse,
36    TableExistsRequest,
37};
38use lance_namespace::schema::arrow_schema_to_json;
39use object_store::path::Path;
40use std::io::Cursor;
41use std::{
42    collections::HashMap,
43    hash::{DefaultHasher, Hash, Hasher},
44    ops::{Deref, DerefMut},
45    sync::Arc,
46};
47use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
48
49const MANIFEST_TABLE_NAME: &str = "__manifest";
50const DELIMITER: &str = "$";
51
52// Index names for the __manifest table
53/// BTREE index on the object_id column for fast lookups
54const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
55/// Bitmap index on the object_type column for filtering by type
56const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
57/// LabelList index on the base_objects column for view dependencies
58const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
59
60/// Object types that can be stored in the manifest
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum ObjectType {
63    Namespace,
64    Table,
65}
66
67impl ObjectType {
68    pub fn as_str(&self) -> &str {
69        match self {
70            Self::Namespace => "namespace",
71            Self::Table => "table",
72        }
73    }
74
75    pub fn parse(s: &str) -> Result<Self> {
76        match s {
77            "namespace" => Ok(Self::Namespace),
78            "table" => Ok(Self::Table),
79            _ => Err(Error::io(format!("Invalid object type: {}", s))),
80        }
81    }
82}
83
84/// Information about a table stored in the manifest
85#[derive(Debug, Clone)]
86pub struct TableInfo {
87    pub namespace: Vec<String>,
88    pub name: String,
89    pub location: String,
90}
91
92/// Information about a namespace stored in the manifest
93#[derive(Debug, Clone)]
94pub struct NamespaceInfo {
95    pub namespace: Vec<String>,
96    pub name: String,
97    pub metadata: Option<HashMap<String, String>>,
98}
99
100/// A wrapper around a Dataset that provides concurrent access.
101///
102/// This can be cloned cheaply. It supports concurrent reads or exclusive writes.
103/// The manifest dataset is always kept strongly consistent by reloading on each read.
104#[derive(Debug, Clone)]
105pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);
106
107impl DatasetConsistencyWrapper {
108    /// Create a new wrapper with the given dataset.
109    pub fn new(dataset: Dataset) -> Self {
110        Self(Arc::new(RwLock::new(dataset)))
111    }
112
113    /// Get an immutable reference to the dataset.
114    /// Always reloads to ensure strong consistency.
115    pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
116        self.reload().await?;
117        Ok(DatasetReadGuard {
118            guard: self.0.read().await,
119        })
120    }
121
122    /// Get a mutable reference to the dataset.
123    /// Always reloads to ensure strong consistency.
124    pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
125        self.reload().await?;
126        Ok(DatasetWriteGuard {
127            guard: self.0.write().await,
128        })
129    }
130
131    /// Provide a known latest version of the dataset.
132    ///
133    /// This is usually done after some write operation, which inherently will
134    /// have the latest version.
135    pub async fn set_latest(&self, dataset: Dataset) {
136        let mut write_guard = self.0.write().await;
137        if dataset.manifest().version > write_guard.manifest().version {
138            *write_guard = dataset;
139        }
140    }
141
142    /// Reload the dataset to the latest version.
143    async fn reload(&self) -> Result<()> {
144        // First check if we need to reload (with read lock)
145        let read_guard = self.0.read().await;
146        let dataset_uri = read_guard.uri().to_string();
147        let current_version = read_guard.version().version;
148        log::debug!(
149            "Reload starting for uri={}, current_version={}",
150            dataset_uri,
151            current_version
152        );
153        let latest_version = read_guard.latest_version_id().await.map_err(|e| {
154            Error::io_source(box_error(std::io::Error::other(format!(
155                "Failed to get latest version: {}",
156                e
157            ))))
158        })?;
159        log::debug!(
160            "Reload got latest_version={} for uri={}, current_version={}",
161            latest_version,
162            dataset_uri,
163            current_version
164        );
165        drop(read_guard);
166
167        // If already up-to-date, return early
168        if latest_version == current_version {
169            log::debug!("Already up-to-date for uri={}", dataset_uri);
170            return Ok(());
171        }
172
173        // Need to reload, acquire write lock
174        let mut write_guard = self.0.write().await;
175
176        // Double-check after acquiring write lock (someone else might have reloaded)
177        let latest_version = write_guard.latest_version_id().await.map_err(|e| {
178            Error::io_source(box_error(std::io::Error::other(format!(
179                "Failed to get latest version: {}",
180                e
181            ))))
182        })?;
183
184        if latest_version != write_guard.version().version {
185            write_guard.checkout_latest().await.map_err(|e| {
186                Error::io_source(box_error(std::io::Error::other(format!(
187                    "Failed to checkout latest: {}",
188                    e
189                ))))
190            })?;
191        }
192
193        Ok(())
194    }
195}
196
197pub struct DatasetReadGuard<'a> {
198    guard: RwLockReadGuard<'a, Dataset>,
199}
200
201impl Deref for DatasetReadGuard<'_> {
202    type Target = Dataset;
203
204    fn deref(&self) -> &Self::Target {
205        &self.guard
206    }
207}
208
209pub struct DatasetWriteGuard<'a> {
210    guard: RwLockWriteGuard<'a, Dataset>,
211}
212
213impl Deref for DatasetWriteGuard<'_> {
214    type Target = Dataset;
215
216    fn deref(&self) -> &Self::Target {
217        &self.guard
218    }
219}
220
221impl DerefMut for DatasetWriteGuard<'_> {
222    fn deref_mut(&mut self) -> &mut Self::Target {
223        &mut self.guard
224    }
225}
226
227/// Manifest-based namespace implementation
228///
229/// Uses a special `__manifest` Lance table to track tables and nested namespaces.
230pub struct ManifestNamespace {
231    root: String,
232    storage_options: Option<HashMap<String, String>>,
233    #[allow(dead_code)]
234    session: Option<Arc<Session>>,
235    #[allow(dead_code)]
236    object_store: Arc<ObjectStore>,
237    #[allow(dead_code)]
238    base_path: Path,
239    manifest_dataset: DatasetConsistencyWrapper,
240    /// Whether directory listing is enabled in dual mode
241    /// If true, root namespace tables use {table_name}.lance naming
242    /// If false, they use namespace-prefixed names
243    dir_listing_enabled: bool,
244    /// Whether to perform inline optimization (compaction and indexing) on the __manifest table
245    /// after every write. Defaults to true.
246    inline_optimization_enabled: bool,
247    /// Number of retries for commit operations on the manifest table.
248    /// If None, defaults to [`lance_table::io::commit::CommitConfig`] default (20).
249    commit_retries: Option<u32>,
250}
251
252impl std::fmt::Debug for ManifestNamespace {
253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254        f.debug_struct("ManifestNamespace")
255            .field("root", &self.root)
256            .field("storage_options", &self.storage_options)
257            .field("dir_listing_enabled", &self.dir_listing_enabled)
258            .field(
259                "inline_optimization_enabled",
260                &self.inline_optimization_enabled,
261            )
262            .finish()
263    }
264}
265
266/// Convert a Lance commit error to an appropriate namespace error.
267///
268/// Maps lance commit errors to namespace errors:
269/// - `CommitConflict`: version collision retries exhausted -> Throttled (safe to retry)
270/// - `TooMuchWriteContention`: RetryableCommitConflict (semantic conflict) retries exhausted -> ConcurrentModification
271/// - `IncompatibleTransaction`: incompatible concurrent change -> ConcurrentModification
272/// - Errors containing "matched/duplicate/already exists": ConcurrentModification (from WhenMatched::Fail)
273/// - Other errors: IO error with the operation description
274fn convert_lance_commit_error(e: &LanceError, operation: &str, object_id: Option<&str>) -> Error {
275    match e {
276        // CommitConflict: version collision retries exhausted -> Throttled (safe to retry)
277        LanceError::CommitConflict { .. } => NamespaceError::Throttled {
278            message: format!("Too many concurrent writes, please retry later: {:?}", e),
279        }
280        .into(),
281        // TooMuchWriteContention: RetryableCommitConflict (semantic conflict) retries exhausted -> ConcurrentModification
282        // IncompatibleTransaction: incompatible concurrent change -> ConcurrentModification
283        LanceError::TooMuchWriteContention { .. } | LanceError::IncompatibleTransaction { .. } => {
284            let message = if let Some(id) = object_id {
285                format!(
286                    "Object '{}' was concurrently modified by another operation: {:?}",
287                    id, e
288                )
289            } else {
290                format!(
291                    "Object was concurrently modified by another operation: {:?}",
292                    e
293                )
294            };
295            NamespaceError::ConcurrentModification { message }.into()
296        }
297        // Other errors: check message for semantic conflicts (matched/duplicate from WhenMatched::Fail)
298        _ => {
299            let error_msg = e.to_string();
300            if error_msg.contains("matched")
301                || error_msg.contains("duplicate")
302                || error_msg.contains("already exists")
303            {
304                let message = if let Some(id) = object_id {
305                    format!(
306                        "Object '{}' was concurrently created by another operation: {:?}",
307                        id, e
308                    )
309                } else {
310                    format!(
311                        "Object was concurrently created by another operation: {:?}",
312                        e
313                    )
314                };
315                return NamespaceError::ConcurrentModification { message }.into();
316            }
317            Error::io_source(box_error(std::io::Error::other(format!(
318                "{}: {:?}",
319                operation, e
320            ))))
321        }
322    }
323}
324
325impl ManifestNamespace {
326    /// Create a new ManifestNamespace from an existing DirectoryNamespace
327    #[allow(clippy::too_many_arguments)]
328    pub async fn from_directory(
329        root: String,
330        storage_options: Option<HashMap<String, String>>,
331        session: Option<Arc<Session>>,
332        object_store: Arc<ObjectStore>,
333        base_path: Path,
334        dir_listing_enabled: bool,
335        inline_optimization_enabled: bool,
336        commit_retries: Option<u32>,
337    ) -> Result<Self> {
338        let manifest_dataset =
339            Self::ensure_manifest_table_up_to_date(&root, &storage_options, session.clone())
340                .await?;
341
342        Ok(Self {
343            root,
344            storage_options,
345            session,
346            object_store,
347            base_path,
348            manifest_dataset,
349            dir_listing_enabled,
350            inline_optimization_enabled,
351            commit_retries,
352        })
353    }
354
355    /// Build object ID from namespace path and name
356    pub fn build_object_id(namespace: &[String], name: &str) -> String {
357        if namespace.is_empty() {
358            name.to_string()
359        } else {
360            let mut id = namespace.join(DELIMITER);
361            id.push_str(DELIMITER);
362            id.push_str(name);
363            id
364        }
365    }
366
367    /// Parse object ID into namespace path and name
368    pub fn parse_object_id(object_id: &str) -> (Vec<String>, String) {
369        let parts: Vec<&str> = object_id.split(DELIMITER).collect();
370        if parts.len() == 1 {
371            (Vec::new(), parts[0].to_string())
372        } else {
373            let namespace = parts[..parts.len() - 1]
374                .iter()
375                .map(|s| s.to_string())
376                .collect();
377            let name = parts[parts.len() - 1].to_string();
378            (namespace, name)
379        }
380    }
381
382    /// Split an object ID (table_id as vec of strings) into namespace and table name
383    fn split_object_id(table_id: &[String]) -> (Vec<String>, String) {
384        if table_id.len() == 1 {
385            (vec![], table_id[0].clone())
386        } else {
387            (
388                table_id[..table_id.len() - 1].to_vec(),
389                table_id[table_id.len() - 1].clone(),
390            )
391        }
392    }
393
394    /// Convert a table ID (vec of strings) to an object_id string
395    fn str_object_id(table_id: &[String]) -> String {
396        table_id.join(DELIMITER)
397    }
398
399    /// Generate a new directory name in format: <hash>_<object_id>
400    /// The hash is used to (1) optimize object store throughput,
401    /// (2) have high enough entropy in a short period of time to prevent issues like
402    /// failed table creation, delete and create new table of the same name, etc.
403    /// The object_id is added after the hash to ensure
404    /// dir name uniqueness and make debugging easier.
405    fn generate_dir_name(object_id: &str) -> String {
406        // Generate a random number for uniqueness
407        let random_num: u64 = rand::random();
408
409        // Create hash from random number + object_id
410        let mut hasher = DefaultHasher::new();
411        random_num.hash(&mut hasher);
412        object_id.hash(&mut hasher);
413        let hash = hasher.finish();
414
415        // Format as lowercase hex (8 characters - sufficient entropy for uniqueness)
416        format!("{:08x}_{}", (hash & 0xFFFFFFFF) as u32, object_id)
417    }
418
419    /// Construct a full URI from root and relative location
420    pub(crate) fn construct_full_uri(root: &str, relative_location: &str) -> Result<String> {
421        let mut base_url = lance_io::object_store::uri_to_url(root)?;
422
423        // Ensure the base URL has a trailing slash so that URL.join() appends
424        // rather than replaces the last path segment.
425        // Without this fix, "s3://bucket/path/subdir".join("table.lance")
426        // would incorrectly produce "s3://bucket/path/table.lance" (missing subdir).
427        if !base_url.path().ends_with('/') {
428            base_url.set_path(&format!("{}/", base_url.path()));
429        }
430
431        let full_url = base_url.join(relative_location).map_err(|e| {
432            Error::invalid_input_source(
433                format!(
434                    "Failed to join URI '{}' with '{}': {:?}",
435                    root, relative_location, e
436                )
437                .into(),
438            )
439        })?;
440
441        Ok(full_url.to_string())
442    }
443
444    /// Perform inline optimization on the __manifest table.
445    ///
446    /// This method:
447    /// 1. Creates three indexes on the manifest table:
448    ///    - BTREE index on object_id for fast lookups
449    ///    - Bitmap index on object_type for filtering by type
450    ///    - LabelList index on base_objects for view dependencies
451    /// 2. Runs file compaction to merge small files
452    /// 3. Optimizes existing indices
453    ///
454    /// This is called automatically after writes when inline_optimization_enabled is true.
455    async fn run_inline_optimization(&self) -> Result<()> {
456        if !self.inline_optimization_enabled {
457            return Ok(());
458        }
459
460        // Get a mutable reference to the dataset to perform optimization
461        let mut dataset_guard = self.manifest_dataset.get_mut().await?;
462        let dataset: &mut Dataset = &mut dataset_guard;
463
464        // Step 1: Create indexes if they don't already exist
465        let indices = dataset.load_indices().await?;
466
467        // Check which indexes already exist
468        let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
469        let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
470        let has_base_objects_index = indices
471            .iter()
472            .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);
473
474        // Create BTREE index on object_id
475        if !has_object_id_index {
476            log::debug!(
477                "Creating BTREE index '{}' on object_id for __manifest table",
478                OBJECT_ID_INDEX_NAME
479            );
480            let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
481            if let Err(e) = dataset
482                .create_index(
483                    &["object_id"],
484                    IndexType::BTree,
485                    Some(OBJECT_ID_INDEX_NAME.to_string()),
486                    &params,
487                    true,
488                )
489                .await
490            {
491                log::warn!(
492                    "Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.",
493                    e
494                );
495            } else {
496                log::info!(
497                    "Created BTREE index '{}' on object_id for __manifest table",
498                    OBJECT_ID_INDEX_NAME
499                );
500            }
501        }
502
503        // Create Bitmap index on object_type
504        if !has_object_type_index {
505            log::debug!(
506                "Creating Bitmap index '{}' on object_type for __manifest table",
507                OBJECT_TYPE_INDEX_NAME
508            );
509            let params = ScalarIndexParams::default();
510            if let Err(e) = dataset
511                .create_index(
512                    &["object_type"],
513                    IndexType::Bitmap,
514                    Some(OBJECT_TYPE_INDEX_NAME.to_string()),
515                    &params,
516                    true,
517                )
518                .await
519            {
520                log::warn!(
521                    "Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.",
522                    e
523                );
524            } else {
525                log::info!(
526                    "Created Bitmap index '{}' on object_type for __manifest table",
527                    OBJECT_TYPE_INDEX_NAME
528                );
529            }
530        }
531
532        // Create LabelList index on base_objects
533        if !has_base_objects_index {
534            log::debug!(
535                "Creating LabelList index '{}' on base_objects for __manifest table",
536                BASE_OBJECTS_INDEX_NAME
537            );
538            let params = ScalarIndexParams::default();
539            if let Err(e) = dataset
540                .create_index(
541                    &["base_objects"],
542                    IndexType::LabelList,
543                    Some(BASE_OBJECTS_INDEX_NAME.to_string()),
544                    &params,
545                    true,
546                )
547                .await
548            {
549                log::warn!(
550                    "Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.",
551                    e
552                );
553            } else {
554                log::info!(
555                    "Created LabelList index '{}' on base_objects for __manifest table",
556                    BASE_OBJECTS_INDEX_NAME
557                );
558            }
559        }
560
561        // Step 2: Run file compaction
562        log::debug!("Running file compaction on __manifest table");
563        match compact_files(dataset, CompactionOptions::default(), None).await {
564            Ok(compaction_metrics) => {
565                if compaction_metrics.fragments_removed > 0 {
566                    log::info!(
567                        "Compacted __manifest table: removed {} fragments, added {} fragments",
568                        compaction_metrics.fragments_removed,
569                        compaction_metrics.fragments_added
570                    );
571                }
572            }
573            Err(e) => {
574                log::warn!(
575                    "Failed to compact files for __manifest table: {:?}. Continuing with optimization.",
576                    e
577                );
578            }
579        }
580
581        // Step 3: Optimize indices
582        log::debug!("Optimizing indices on __manifest table");
583        match dataset.optimize_indices(&OptimizeOptions::default()).await {
584            Ok(_) => {
585                log::info!("Successfully optimized indices on __manifest table");
586            }
587            Err(e) => {
588                log::warn!(
589                    "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.",
590                    e
591                );
592            }
593        }
594
595        Ok(())
596    }
597
598    /// Get the manifest schema
599    fn manifest_schema() -> Arc<ArrowSchema> {
600        Arc::new(ArrowSchema::new(vec![
601            // Set unenforced primary key on object_id for bloom filter conflict detection
602            Field::new("object_id", DataType::Utf8, false).with_metadata(
603                [(
604                    LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(),
605                    "0".to_string(),
606                )]
607                .into_iter()
608                .collect(),
609            ),
610            Field::new("object_type", DataType::Utf8, false),
611            Field::new("location", DataType::Utf8, true),
612            Field::new("metadata", DataType::Utf8, true),
613            Field::new(
614                "base_objects",
615                DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))),
616                true,
617            ),
618        ]))
619    }
620
621    /// Get a scanner for the manifest dataset
622    async fn manifest_scanner(&self) -> Result<Scanner> {
623        let dataset_guard = self.manifest_dataset.get().await?;
624        Ok(dataset_guard.scan())
625    }
626
627    /// Helper to execute a scanner and collect results into a Vec
628    async fn execute_scanner(scanner: Scanner) -> Result<Vec<RecordBatch>> {
629        let mut stream = scanner.try_into_stream().await.map_err(|e| {
630            Error::io_source(box_error(std::io::Error::other(format!(
631                "Failed to create stream: {}",
632                e
633            ))))
634        })?;
635
636        let mut batches = Vec::new();
637        while let Some(batch) = stream.next().await {
638            batches.push(batch.map_err(|e| {
639                Error::io_source(box_error(std::io::Error::other(format!(
640                    "Failed to read batch: {}",
641                    e
642                ))))
643            })?);
644        }
645
646        Ok(batches)
647    }
648
649    /// Helper to get a string column from a record batch
650    fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> {
651        let column = batch
652            .column_by_name(column_name)
653            .ok_or_else(|| Error::io(format!("Column '{}' not found", column_name)))?;
654        column
655            .as_any()
656            .downcast_ref::<StringArray>()
657            .ok_or_else(|| Error::io(format!("Column '{}' is not a string array", column_name)))
658    }
659
660    /// Check if the manifest contains an object with the given ID
661    async fn manifest_contains_object(&self, object_id: &str) -> Result<bool> {
662        let filter = format!("object_id = '{}'", object_id);
663
664        let dataset_guard = self.manifest_dataset.get().await?;
665        let mut scanner = dataset_guard.scan();
666
667        scanner.filter(&filter).map_err(|e| {
668            Error::io_source(box_error(std::io::Error::other(format!(
669                "Failed to filter: {}",
670                e
671            ))))
672        })?;
673
674        // Project no columns and enable row IDs for count_rows to work
675        scanner.project::<&str>(&[]).map_err(|e| {
676            Error::io_source(box_error(std::io::Error::other(format!(
677                "Failed to project: {}",
678                e
679            ))))
680        })?;
681
682        scanner.with_row_id();
683
684        let count = scanner.count_rows().await.map_err(|e| {
685            Error::io_source(box_error(std::io::Error::other(format!(
686                "Failed to count rows: {}",
687                e
688            ))))
689        })?;
690
691        Ok(count > 0)
692    }
693
694    /// Query the manifest for a table with the given object ID
695    async fn query_manifest_for_table(&self, object_id: &str) -> Result<Option<TableInfo>> {
696        let filter = format!("object_id = '{}' AND object_type = 'table'", object_id);
697        let mut scanner = self.manifest_scanner().await?;
698        scanner.filter(&filter).map_err(|e| {
699            Error::io_source(box_error(std::io::Error::other(format!(
700                "Failed to filter: {}",
701                e
702            ))))
703        })?;
704        scanner.project(&["object_id", "location"]).map_err(|e| {
705            Error::io_source(box_error(std::io::Error::other(format!(
706                "Failed to project: {}",
707                e
708            ))))
709        })?;
710        let batches = Self::execute_scanner(scanner).await?;
711
712        let mut found_result: Option<TableInfo> = None;
713        let mut total_rows = 0;
714
715        for batch in batches {
716            if batch.num_rows() == 0 {
717                continue;
718            }
719
720            total_rows += batch.num_rows();
721            if total_rows > 1 {
722                return Err(Error::io(format!(
723                    "Expected exactly 1 table with id '{}', found {}",
724                    object_id, total_rows
725                )));
726            }
727
728            let object_id_array = Self::get_string_column(&batch, "object_id")?;
729            let location_array = Self::get_string_column(&batch, "location")?;
730            let location = location_array.value(0).to_string();
731            let (namespace, name) = Self::parse_object_id(object_id_array.value(0));
732            found_result = Some(TableInfo {
733                namespace,
734                name,
735                location,
736            });
737        }
738
739        Ok(found_result)
740    }
741
742    /// List all table locations in the manifest (for root namespace only)
743    /// Returns a set of table locations (e.g., "table_name.lance")
744    pub async fn list_manifest_table_locations(&self) -> Result<std::collections::HashSet<String>> {
745        let filter = "object_type = 'table' AND NOT contains(object_id, '$')";
746        let mut scanner = self.manifest_scanner().await?;
747        scanner.filter(filter).map_err(|e| {
748            Error::io_source(box_error(std::io::Error::other(format!(
749                "Failed to filter: {}",
750                e
751            ))))
752        })?;
753        scanner.project(&["location"]).map_err(|e| {
754            Error::io_source(box_error(std::io::Error::other(format!(
755                "Failed to project: {}",
756                e
757            ))))
758        })?;
759
760        let batches = Self::execute_scanner(scanner).await?;
761        let mut locations = std::collections::HashSet::new();
762
763        for batch in batches {
764            if batch.num_rows() == 0 {
765                continue;
766            }
767            let location_array = Self::get_string_column(&batch, "location")?;
768            for i in 0..location_array.len() {
769                locations.insert(location_array.value(i).to_string());
770            }
771        }
772
773        Ok(locations)
774    }
775
776    /// Insert an entry into the manifest table
777    async fn insert_into_manifest(
778        &self,
779        object_id: String,
780        object_type: ObjectType,
781        location: Option<String>,
782    ) -> Result<()> {
783        self.insert_into_manifest_with_metadata(object_id, object_type, location, None, None)
784            .await
785    }
786
787    /// Insert an entry into the manifest table with metadata and base_objects
788    async fn insert_into_manifest_with_metadata(
789        &self,
790        object_id: String,
791        object_type: ObjectType,
792        location: Option<String>,
793        metadata: Option<String>,
794        base_objects: Option<Vec<String>>,
795    ) -> Result<()> {
796        use arrow::array::builder::{ListBuilder, StringBuilder};
797
798        let schema = Self::manifest_schema();
799
800        // Create base_objects array from the provided list
801        let string_builder = StringBuilder::new();
802        let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
803            "object_id",
804            DataType::Utf8,
805            true,
806        )));
807
808        match base_objects {
809            Some(objects) => {
810                for obj in objects {
811                    list_builder.values().append_value(obj);
812                }
813                list_builder.append(true);
814            }
815            None => {
816                list_builder.append_null();
817            }
818        }
819
820        let base_objects_array = list_builder.finish();
821
822        // Create arrays with optional values
823        let location_array = match location {
824            Some(loc) => Arc::new(StringArray::from(vec![Some(loc)])),
825            None => Arc::new(StringArray::from(vec![None::<String>])),
826        };
827
828        let metadata_array = match metadata {
829            Some(meta) => Arc::new(StringArray::from(vec![Some(meta)])),
830            None => Arc::new(StringArray::from(vec![None::<String>])),
831        };
832
833        let batch = RecordBatch::try_new(
834            schema.clone(),
835            vec![
836                Arc::new(StringArray::from(vec![object_id.as_str()])),
837                Arc::new(StringArray::from(vec![object_type.as_str()])),
838                location_array,
839                metadata_array,
840                Arc::new(base_objects_array),
841            ],
842        )
843        .map_err(|e| Error::io(format!("Failed to create manifest entry: {}", e)))?;
844
845        let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
846
847        // Use MergeInsert to ensure uniqueness on object_id
848        let dataset_guard = self.manifest_dataset.get().await?;
849        let dataset_arc = Arc::new(dataset_guard.clone());
850        drop(dataset_guard); // Drop read guard before merge insert
851
852        let mut merge_builder =
853            lance::dataset::MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()])
854                .map_err(|e| {
855                    Error::io_source(box_error(std::io::Error::other(format!(
856                        "Failed to create merge builder: {}",
857                        e
858                    ))))
859                })?;
860
861        merge_builder.when_matched(lance::dataset::WhenMatched::Fail);
862        merge_builder.when_not_matched(lance::dataset::WhenNotMatched::InsertAll);
863        // Use conflict_retries to handle cross-process races on manifest mutations.
864        // When two processes concurrently insert the same object_id, the second one
865        // hits a commit conflict. With conflict_retries > 0, the retry re-evaluates
866        // the full MergeInsert plan against the latest data, where the join detects
867        // the existing row and WhenMatched::Fail fires, producing a clear error.
868        merge_builder.conflict_retries(5);
869        // TODO: after BTREE index creation on object_id, has_scalar_index=true causes
870        // MergeInsert to use V1 path which lacks bloom filters for conflict detection. This
871        // results in (Some, None) filter mismatch when rebasing against V2 operations.
872        // Setting use_index=false ensures all operations consistently use V2 path.
873        merge_builder.use_index(false);
874        if let Some(retries) = self.commit_retries {
875            merge_builder.commit_retries(retries);
876        }
877
878        let (new_dataset_arc, _merge_stats) = merge_builder
879            .try_build()
880            .map_err(|e| {
881                Error::io_source(box_error(std::io::Error::other(format!(
882                    "Failed to build merge: {}",
883                    e
884                ))))
885            })?
886            .execute_reader(Box::new(reader))
887            .await
888            .map_err(|e| {
889                convert_lance_commit_error(&e, "Failed to execute merge", Some(&object_id))
890            })?;
891
892        let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
893        self.manifest_dataset.set_latest(new_dataset).await;
894
895        // Run inline optimization after write
896        if let Err(e) = self.run_inline_optimization().await {
897            log::warn!(
898                "Unexpected failure when running inline optimization: {:?}",
899                e
900            );
901        }
902
903        Ok(())
904    }
905
906    /// Delete an entry from the manifest table
907    pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> {
908        let predicate = format!("object_id = '{}'", object_id);
909
910        // Get dataset and use DeleteBuilder with configured retries
911        let dataset_guard = self.manifest_dataset.get().await?;
912        let dataset = Arc::new(dataset_guard.clone());
913        drop(dataset_guard); // Drop read guard before delete
914
915        let new_dataset = lance::dataset::DeleteBuilder::new(dataset, &predicate)
916            .execute()
917            .await
918            .map_err(|e| convert_lance_commit_error(&e, "Failed to delete", None))?;
919
920        // Update the wrapper with the new dataset
921        self.manifest_dataset
922            .set_latest(
923                Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
924            )
925            .await;
926
927        // Run inline optimization after delete
928        if let Err(e) = self.run_inline_optimization().await {
929            log::warn!(
930                "Unexpected failure when running inline optimization: {:?}",
931                e
932            );
933        }
934
935        Ok(())
936    }
937
938    /// Register a table in the manifest without creating the physical table (internal helper for migration)
939    pub async fn register_table(&self, name: &str, location: String) -> Result<()> {
940        let object_id = Self::build_object_id(&[], name);
941        if self.manifest_contains_object(&object_id).await? {
942            return Err(Error::io(format!("Table '{}' already exists", name)));
943        }
944
945        self.insert_into_manifest(object_id, ObjectType::Table, Some(location))
946            .await
947    }
948
949    /// Validate that all levels of a namespace path exist
950    async fn validate_namespace_levels_exist(&self, namespace_path: &[String]) -> Result<()> {
951        for i in 1..=namespace_path.len() {
952            let partial_path = &namespace_path[..i];
953            let object_id = partial_path.join(DELIMITER);
954            if !self.manifest_contains_object(&object_id).await? {
955                return Err(Error::namespace_source(
956                    format!("Parent namespace '{}' does not exist", object_id).into(),
957                ));
958            }
959        }
960        Ok(())
961    }
962
963    /// Query the manifest for a namespace with the given object ID
964    async fn query_manifest_for_namespace(&self, object_id: &str) -> Result<Option<NamespaceInfo>> {
965        let filter = format!("object_id = '{}' AND object_type = 'namespace'", object_id);
966        let mut scanner = self.manifest_scanner().await?;
967        scanner.filter(&filter).map_err(|e| {
968            Error::io_source(box_error(std::io::Error::other(format!(
969                "Failed to filter: {}",
970                e
971            ))))
972        })?;
973        scanner.project(&["object_id", "metadata"]).map_err(|e| {
974            Error::io_source(box_error(std::io::Error::other(format!(
975                "Failed to project: {}",
976                e
977            ))))
978        })?;
979        let batches = Self::execute_scanner(scanner).await?;
980
981        let mut found_result: Option<NamespaceInfo> = None;
982        let mut total_rows = 0;
983
984        for batch in batches {
985            if batch.num_rows() == 0 {
986                continue;
987            }
988
989            total_rows += batch.num_rows();
990            if total_rows > 1 {
991                return Err(Error::io(format!(
992                    "Expected exactly 1 namespace with id '{}', found {}",
993                    object_id, total_rows
994                )));
995            }
996
997            let object_id_array = Self::get_string_column(&batch, "object_id")?;
998            let metadata_array = Self::get_string_column(&batch, "metadata")?;
999
1000            let object_id_str = object_id_array.value(0);
1001            let metadata = if !metadata_array.is_null(0) {
1002                let metadata_str = metadata_array.value(0);
1003                match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
1004                    Ok(map) => Some(map),
1005                    Err(e) => {
1006                        return Err(Error::io(format!(
1007                            "Failed to deserialize metadata for namespace '{}': {}",
1008                            object_id, e
1009                        )));
1010                    }
1011                }
1012            } else {
1013                None
1014            };
1015
1016            let (namespace, name) = Self::parse_object_id(object_id_str);
1017            found_result = Some(NamespaceInfo {
1018                namespace,
1019                name,
1020                metadata,
1021            });
1022        }
1023
1024        Ok(found_result)
1025    }
1026
1027    /// Create or load the manifest dataset, ensuring it has the latest schema setup.
1028    ///
1029    /// This function will:
1030    /// 1. Try to load an existing manifest table
1031    /// 2. If it exists, check and migrate the schema if needed (e.g., add primary key metadata)
1032    /// 3. If it doesn't exist, create a new manifest table with the current schema
1033    async fn ensure_manifest_table_up_to_date(
1034        root: &str,
1035        storage_options: &Option<HashMap<String, String>>,
1036        session: Option<Arc<Session>>,
1037    ) -> Result<DatasetConsistencyWrapper> {
1038        let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME);
1039        log::debug!("Attempting to load manifest from {}", manifest_path);
1040        let store_options = ObjectStoreParams {
1041            storage_options_accessor: storage_options.as_ref().map(|opts| {
1042                Arc::new(
1043                    lance_io::object_store::StorageOptionsAccessor::with_static_options(
1044                        opts.clone(),
1045                    ),
1046                )
1047            }),
1048            ..Default::default()
1049        };
1050        let read_params = ReadParams {
1051            session: session.clone(),
1052            store_options: Some(store_options.clone()),
1053            ..Default::default()
1054        };
1055        let dataset_result = DatasetBuilder::from_uri(&manifest_path)
1056            .with_read_params(read_params)
1057            .load()
1058            .await;
1059        if let Ok(mut dataset) = dataset_result {
1060            // Check if the object_id field has primary key metadata, migrate if not
1061            let needs_pk_migration = dataset
1062                .schema()
1063                .field("object_id")
1064                .map(|f| {
1065                    !f.metadata
1066                        .contains_key(LANCE_UNENFORCED_PRIMARY_KEY_POSITION)
1067                })
1068                .unwrap_or(false);
1069
1070            if needs_pk_migration {
1071                log::info!("Migrating __manifest table to add primary key metadata on object_id");
1072                dataset
1073                    .update_field_metadata()
1074                    .update("object_id", [(LANCE_UNENFORCED_PRIMARY_KEY_POSITION, "0")])
1075                    .map_err(|e| {
1076                        Error::io_source(box_error(std::io::Error::other(format!(
1077                            "Failed to find object_id field for migration: {}",
1078                            e
1079                        ))))
1080                    })?
1081                    .await
1082                    .map_err(|e| {
1083                        Error::io_source(box_error(std::io::Error::other(format!(
1084                            "Failed to migrate primary key metadata: {}",
1085                            e
1086                        ))))
1087                    })?;
1088            }
1089
1090            Ok(DatasetConsistencyWrapper::new(dataset))
1091        } else {
1092            log::info!("Creating new manifest table at {}", manifest_path);
1093            let schema = Self::manifest_schema();
1094            let empty_batch = RecordBatch::new_empty(schema.clone());
1095            let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone());
1096
1097            let store_params = ObjectStoreParams {
1098                storage_options_accessor: storage_options.as_ref().map(|opts| {
1099                    Arc::new(
1100                        lance_io::object_store::StorageOptionsAccessor::with_static_options(
1101                            opts.clone(),
1102                        ),
1103                    )
1104                }),
1105                ..Default::default()
1106            };
1107            let write_params = WriteParams {
1108                session: session.clone(),
1109                store_params: Some(store_params),
1110                ..Default::default()
1111            };
1112
1113            let dataset =
1114                Dataset::write(Box::new(reader), &manifest_path, Some(write_params)).await;
1115
1116            // Handle race condition where another process created the manifest concurrently
1117            match dataset {
1118                Ok(dataset) => {
1119                    log::info!(
1120                        "Successfully created manifest table at {}, version={}, uri={}",
1121                        manifest_path,
1122                        dataset.version().version,
1123                        dataset.uri()
1124                    );
1125                    Ok(DatasetConsistencyWrapper::new(dataset))
1126                }
1127                Err(ref e)
1128                    if matches!(
1129                        e,
1130                        LanceError::DatasetAlreadyExists { .. }
1131                            | LanceError::CommitConflict { .. }
1132                            | LanceError::IncompatibleTransaction { .. }
1133                            | LanceError::RetryableCommitConflict { .. }
1134                    ) =>
1135                {
1136                    // Another process created the manifest concurrently, try to load it
1137                    log::info!(
1138                        "Manifest table was created by another process, loading it: {}",
1139                        manifest_path
1140                    );
1141                    let recovery_store_options = ObjectStoreParams {
1142                        storage_options_accessor: storage_options.as_ref().map(|opts| {
1143                            Arc::new(
1144                                lance_io::object_store::StorageOptionsAccessor::with_static_options(
1145                                    opts.clone(),
1146                                ),
1147                            )
1148                        }),
1149                        ..Default::default()
1150                    };
1151                    let recovery_read_params = ReadParams {
1152                        session,
1153                        store_options: Some(recovery_store_options),
1154                        ..Default::default()
1155                    };
1156                    let dataset = DatasetBuilder::from_uri(&manifest_path)
1157                        .with_read_params(recovery_read_params)
1158                        .load()
1159                        .await
1160                        .map_err(|e| {
1161                            Error::io_source(box_error(std::io::Error::other(format!(
1162                                "Failed to load manifest dataset after creation conflict: {}",
1163                                e
1164                            ))))
1165                        })?;
1166                    Ok(DatasetConsistencyWrapper::new(dataset))
1167                }
1168                Err(e) => Err(Error::io_source(box_error(std::io::Error::other(format!(
1169                    "Failed to create manifest dataset: {}",
1170                    e
1171                ))))),
1172            }
1173        }
1174    }
1175}
1176
1177#[async_trait]
1178impl LanceNamespace for ManifestNamespace {
1179    fn namespace_id(&self) -> String {
1180        self.root.clone()
1181    }
1182
1183    async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1184        let namespace_id = request
1185            .id
1186            .as_ref()
1187            .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
1188
1189        // Build filter to find tables in this namespace
1190        let filter = if namespace_id.is_empty() {
1191            // Root namespace: find tables without a namespace prefix
1192            "object_type = 'table' AND NOT contains(object_id, '$')".to_string()
1193        } else {
1194            // Namespaced: find tables that start with namespace$ but have no additional $
1195            let prefix = namespace_id.join(DELIMITER);
1196            format!(
1197                "object_type = 'table' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1198                prefix,
1199                DELIMITER,
1200                prefix.len() + 2
1201            )
1202        };
1203
1204        let mut scanner = self.manifest_scanner().await?;
1205        scanner.filter(&filter).map_err(|e| {
1206            Error::io_source(box_error(std::io::Error::other(format!(
1207                "Failed to filter: {}",
1208                e
1209            ))))
1210        })?;
1211        scanner.project(&["object_id"]).map_err(|e| {
1212            Error::io_source(box_error(std::io::Error::other(format!(
1213                "Failed to project: {}",
1214                e
1215            ))))
1216        })?;
1217
1218        let batches = Self::execute_scanner(scanner).await?;
1219
1220        let mut tables = Vec::new();
1221        for batch in batches {
1222            if batch.num_rows() == 0 {
1223                continue;
1224            }
1225
1226            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1227            for i in 0..batch.num_rows() {
1228                let object_id = object_id_array.value(i);
1229                let (_namespace, name) = Self::parse_object_id(object_id);
1230                tables.push(name);
1231            }
1232        }
1233
1234        Ok(ListTablesResponse::new(tables))
1235    }
1236
1237    async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1238        let table_id = request
1239            .id
1240            .as_ref()
1241            .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1242
1243        if table_id.is_empty() {
1244            return Err(Error::invalid_input_source(
1245                "Table ID cannot be empty".into(),
1246            ));
1247        }
1248
1249        let object_id = Self::str_object_id(table_id);
1250        let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
1251
1252        // Extract table name and namespace from table_id
1253        let table_name = table_id.last().cloned().unwrap_or_default();
1254        let namespace_id: Vec<String> = if table_id.len() > 1 {
1255            table_id[..table_id.len() - 1].to_vec()
1256        } else {
1257            vec![]
1258        };
1259
1260        let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
1261        // For backwards compatibility, only skip vending credentials when explicitly set to false
1262        let vend_credentials = request.vend_credentials.unwrap_or(true);
1263
1264        match table_info {
1265            Some(info) => {
1266                // Construct full URI from relative location
1267                let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1268
1269                let storage_options = if vend_credentials {
1270                    self.storage_options.clone()
1271                } else {
1272                    None
1273                };
1274
1275                // If not loading detailed metadata, return minimal response with just location
1276                if !load_detailed_metadata {
1277                    return Ok(DescribeTableResponse {
1278                        table: Some(table_name),
1279                        namespace: Some(namespace_id),
1280                        location: Some(table_uri.clone()),
1281                        table_uri: Some(table_uri),
1282                        storage_options,
1283                        ..Default::default()
1284                    });
1285                }
1286
1287                // Try to open the dataset to get version and schema
1288                match Dataset::open(&table_uri).await {
1289                    Ok(mut dataset) => {
1290                        // If a specific version is requested, checkout that version
1291                        if let Some(requested_version) = request.version {
1292                            dataset = dataset.checkout_version(requested_version as u64).await?;
1293                        }
1294
1295                        let version = dataset.version().version;
1296                        let lance_schema = dataset.schema();
1297                        let arrow_schema: arrow_schema::Schema = lance_schema.into();
1298                        let json_schema = arrow_schema_to_json(&arrow_schema)?;
1299
1300                        Ok(DescribeTableResponse {
1301                            table: Some(table_name.clone()),
1302                            namespace: Some(namespace_id.clone()),
1303                            version: Some(version as i64),
1304                            location: Some(table_uri.clone()),
1305                            table_uri: Some(table_uri),
1306                            schema: Some(Box::new(json_schema)),
1307                            storage_options,
1308                            ..Default::default()
1309                        })
1310                    }
1311                    Err(_) => {
1312                        // If dataset can't be opened (e.g., empty table), return minimal info
1313                        Ok(DescribeTableResponse {
1314                            table: Some(table_name),
1315                            namespace: Some(namespace_id),
1316                            location: Some(table_uri.clone()),
1317                            table_uri: Some(table_uri),
1318                            storage_options,
1319                            ..Default::default()
1320                        })
1321                    }
1322                }
1323            }
1324            None => Err(Error::namespace_source(
1325                format!("Table '{}' not found", object_id).into(),
1326            )),
1327        }
1328    }
1329
1330    async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1331        let table_id = request
1332            .id
1333            .as_ref()
1334            .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1335
1336        if table_id.is_empty() {
1337            return Err(Error::invalid_input_source(
1338                "Table ID cannot be empty".into(),
1339            ));
1340        }
1341
1342        let (namespace, table_name) = Self::split_object_id(table_id);
1343        let object_id = Self::build_object_id(&namespace, &table_name);
1344        let exists = self.manifest_contains_object(&object_id).await?;
1345        if exists {
1346            Ok(())
1347        } else {
1348            Err(Error::namespace_source(
1349                format!("Table '{}' not found", table_name).into(),
1350            ))
1351        }
1352    }
1353
1354    async fn create_table(
1355        &self,
1356        request: CreateTableRequest,
1357        data: Bytes,
1358    ) -> Result<CreateTableResponse> {
1359        let table_id = request
1360            .id
1361            .as_ref()
1362            .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1363
1364        if table_id.is_empty() {
1365            return Err(Error::invalid_input_source(
1366                "Table ID cannot be empty".into(),
1367            ));
1368        }
1369
1370        let (namespace, table_name) = Self::split_object_id(table_id);
1371        let object_id = Self::build_object_id(&namespace, &table_name);
1372
1373        // Check if table already exists in manifest
1374        if self.manifest_contains_object(&object_id).await? {
1375            return Err(Error::io(format!("Table '{}' already exists", table_name)));
1376        }
1377
1378        // Create the physical table location with hash-based naming
1379        // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance
1380        // Otherwise, use hash-based naming: {hash}_{object_id}
1381        let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1382            // Root table with directory listing enabled: use {table_name}.lance
1383            format!("{}.lance", table_name)
1384        } else {
1385            // Child namespace table or dir listing disabled: use hash-based naming
1386            Self::generate_dir_name(&object_id)
1387        };
1388        let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1389
1390        // Validate that request_data is provided
1391        if data.is_empty() {
1392            return Err(Error::namespace_source(
1393                "Request data (Arrow IPC stream) is required for create_table".into(),
1394            ));
1395        }
1396
1397        // Write the data using Lance Dataset
1398        let cursor = Cursor::new(data.to_vec());
1399        let stream_reader = StreamReader::try_new(cursor, None)
1400            .map_err(|e| Error::io(format!("Failed to read IPC stream: {}", e)))?;
1401
1402        let batches: Vec<RecordBatch> =
1403            stream_reader
1404                .collect::<std::result::Result<Vec<_>, _>>()
1405                .map_err(|e| Error::io(format!("Failed to collect batches: {}", e)))?;
1406
1407        if batches.is_empty() {
1408            return Err(Error::io("No data provided for table creation"));
1409        }
1410
1411        let schema = batches[0].schema();
1412        let batch_results: Vec<std::result::Result<RecordBatch, arrow_schema::ArrowError>> =
1413            batches.into_iter().map(Ok).collect();
1414        let reader = RecordBatchIterator::new(batch_results, schema);
1415
1416        let store_params = ObjectStoreParams {
1417            storage_options_accessor: self.storage_options.as_ref().map(|opts| {
1418                Arc::new(
1419                    lance_io::object_store::StorageOptionsAccessor::with_static_options(
1420                        opts.clone(),
1421                    ),
1422                )
1423            }),
1424            ..Default::default()
1425        };
1426        let write_params = WriteParams {
1427            session: self.session.clone(),
1428            store_params: Some(store_params),
1429            ..Default::default()
1430        };
1431        let _dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params))
1432            .await
1433            .map_err(|e| {
1434                Error::io_source(box_error(std::io::Error::other(format!(
1435                    "Failed to write dataset: {}",
1436                    e
1437                ))))
1438            })?;
1439
1440        // Register in manifest (store dir_name, not full URI)
1441        self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1442            .await?;
1443
1444        Ok(CreateTableResponse {
1445            version: Some(1),
1446            location: Some(table_uri),
1447            storage_options: self.storage_options.clone(),
1448            ..Default::default()
1449        })
1450    }
1451
1452    async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1453        let table_id = request
1454            .id
1455            .as_ref()
1456            .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1457
1458        if table_id.is_empty() {
1459            return Err(Error::invalid_input_source(
1460                "Table ID cannot be empty".into(),
1461            ));
1462        }
1463
1464        let (namespace, table_name) = Self::split_object_id(table_id);
1465        let object_id = Self::build_object_id(&namespace, &table_name);
1466
1467        // Query manifest for table location
1468        let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
1469
1470        match table_info {
1471            Some(info) => {
1472                // Delete from manifest first
1473                self.delete_from_manifest(&object_id).boxed().await?;
1474
1475                // Delete physical data directory using the dir_name from manifest
1476                let table_path = self.base_path.child(info.location.as_str());
1477                let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1478
1479                // Remove the table directory
1480                self.object_store
1481                    .remove_dir_all(table_path)
1482                    .boxed()
1483                    .await
1484                    .map_err(|e| {
1485                        Error::namespace_source(
1486                            format!("Failed to delete table directory: {}", e).into(),
1487                        )
1488                    })?;
1489
1490                Ok(DropTableResponse {
1491                    id: request.id.clone(),
1492                    location: Some(table_uri),
1493                    ..Default::default()
1494                })
1495            }
1496            None => Err(Error::namespace_source(
1497                format!("Table '{}' not found", table_name).into(),
1498            )),
1499        }
1500    }
1501
1502    async fn list_namespaces(
1503        &self,
1504        request: ListNamespacesRequest,
1505    ) -> Result<ListNamespacesResponse> {
1506        let parent_namespace = request
1507            .id
1508            .as_ref()
1509            .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
1510
1511        // Build filter to find direct child namespaces
1512        let filter = if parent_namespace.is_empty() {
1513            // Root namespace: find all namespaces without a parent
1514            "object_type = 'namespace' AND NOT contains(object_id, '$')".to_string()
1515        } else {
1516            // Non-root: find namespaces that start with parent$ but have no additional $
1517            let prefix = parent_namespace.join(DELIMITER);
1518            format!(
1519                "object_type = 'namespace' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1520                prefix,
1521                DELIMITER,
1522                prefix.len() + 2
1523            )
1524        };
1525
1526        let mut scanner = self.manifest_scanner().await?;
1527        scanner.filter(&filter).map_err(|e| {
1528            Error::io_source(box_error(std::io::Error::other(format!(
1529                "Failed to filter: {}",
1530                e
1531            ))))
1532        })?;
1533        scanner.project(&["object_id"]).map_err(|e| {
1534            Error::io_source(box_error(std::io::Error::other(format!(
1535                "Failed to project: {}",
1536                e
1537            ))))
1538        })?;
1539
1540        let batches = Self::execute_scanner(scanner).await?;
1541        let mut namespaces = Vec::new();
1542
1543        for batch in batches {
1544            if batch.num_rows() == 0 {
1545                continue;
1546            }
1547
1548            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1549            for i in 0..batch.num_rows() {
1550                let object_id = object_id_array.value(i);
1551                let (_namespace, name) = Self::parse_object_id(object_id);
1552                namespaces.push(name);
1553            }
1554        }
1555
1556        Ok(ListNamespacesResponse::new(namespaces))
1557    }
1558
1559    async fn describe_namespace(
1560        &self,
1561        request: DescribeNamespaceRequest,
1562    ) -> Result<DescribeNamespaceResponse> {
1563        let namespace_id = request
1564            .id
1565            .as_ref()
1566            .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
1567
1568        // Root namespace always exists
1569        if namespace_id.is_empty() {
1570            #[allow(clippy::needless_update)]
1571            return Ok(DescribeNamespaceResponse {
1572                properties: Some(HashMap::new()),
1573                ..Default::default()
1574            });
1575        }
1576
1577        // Check if namespace exists in manifest
1578        let object_id = namespace_id.join(DELIMITER);
1579        let namespace_info = self.query_manifest_for_namespace(&object_id).await?;
1580
1581        match namespace_info {
1582            #[allow(clippy::needless_update)]
1583            Some(info) => Ok(DescribeNamespaceResponse {
1584                properties: info.metadata,
1585                ..Default::default()
1586            }),
1587            None => Err(Error::namespace_source(
1588                format!("Namespace '{}' not found", object_id).into(),
1589            )),
1590        }
1591    }
1592
1593    async fn create_namespace(
1594        &self,
1595        request: CreateNamespaceRequest,
1596    ) -> Result<CreateNamespaceResponse> {
1597        let namespace_id = request
1598            .id
1599            .as_ref()
1600            .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
1601
1602        // Root namespace always exists and cannot be created
1603        if namespace_id.is_empty() {
1604            return Err(Error::namespace_source(
1605                "Root namespace already exists and cannot be created".into(),
1606            ));
1607        }
1608
1609        // Validate parent namespaces exist (but not the namespace being created)
1610        if namespace_id.len() > 1 {
1611            self.validate_namespace_levels_exist(&namespace_id[..namespace_id.len() - 1])
1612                .await?;
1613        }
1614
1615        let object_id = namespace_id.join(DELIMITER);
1616        if self.manifest_contains_object(&object_id).await? {
1617            return Err(Error::namespace_source(
1618                format!("Namespace '{}' already exists", object_id).into(),
1619            ));
1620        }
1621
1622        // Serialize properties if provided
1623        let metadata = request.properties.as_ref().and_then(|props| {
1624            if props.is_empty() {
1625                None
1626            } else {
1627                Some(serde_json::to_string(props).ok()?)
1628            }
1629        });
1630
1631        self.insert_into_manifest_with_metadata(
1632            object_id,
1633            ObjectType::Namespace,
1634            None,
1635            metadata,
1636            None,
1637        )
1638        .await?;
1639
1640        Ok(CreateNamespaceResponse {
1641            properties: request.properties,
1642            ..Default::default()
1643        })
1644    }
1645
1646    async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
1647        let namespace_id = request
1648            .id
1649            .as_ref()
1650            .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
1651
1652        // Root namespace always exists and cannot be dropped
1653        if namespace_id.is_empty() {
1654            return Err(Error::namespace_source(
1655                "Root namespace cannot be dropped".into(),
1656            ));
1657        }
1658
1659        let object_id = namespace_id.join(DELIMITER);
1660
1661        // Check if namespace exists
1662        if !self.manifest_contains_object(&object_id).boxed().await? {
1663            return Err(Error::namespace_source(
1664                format!("Namespace '{}' not found", object_id).into(),
1665            ));
1666        }
1667
1668        // Check for child namespaces
1669        let prefix = format!("{}{}", object_id, DELIMITER);
1670        let filter = format!("starts_with(object_id, '{}')", prefix);
1671        let mut scanner = self.manifest_scanner().boxed().await?;
1672        scanner.filter(&filter).map_err(|e| {
1673            Error::io_source(box_error(std::io::Error::other(format!(
1674                "Failed to filter: {}",
1675                e
1676            ))))
1677        })?;
1678        scanner.project::<&str>(&[]).map_err(|e| {
1679            Error::io_source(box_error(std::io::Error::other(format!(
1680                "Failed to project: {}",
1681                e
1682            ))))
1683        })?;
1684        scanner.with_row_id();
1685        let count = scanner.count_rows().boxed().await.map_err(|e| {
1686            Error::io_source(box_error(std::io::Error::other(format!(
1687                "Failed to count rows: {}",
1688                e
1689            ))))
1690        })?;
1691
1692        if count > 0 {
1693            return Err(Error::namespace_source(
1694                format!(
1695                    "Namespace '{}' is not empty (contains {} child objects)",
1696                    object_id, count
1697                )
1698                .into(),
1699            ));
1700        }
1701
1702        self.delete_from_manifest(&object_id).boxed().await?;
1703
1704        Ok(DropNamespaceResponse::default())
1705    }
1706
1707    async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
1708        let namespace_id = request
1709            .id
1710            .as_ref()
1711            .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
1712
1713        // Root namespace always exists
1714        if namespace_id.is_empty() {
1715            return Ok(());
1716        }
1717
1718        let object_id = namespace_id.join(DELIMITER);
1719        if self.manifest_contains_object(&object_id).await? {
1720            Ok(())
1721        } else {
1722            Err(Error::namespace_source(
1723                format!("Namespace '{}' not found", object_id).into(),
1724            ))
1725        }
1726    }
1727
1728    async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
1729        let table_id = request
1730            .id
1731            .as_ref()
1732            .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1733
1734        if table_id.is_empty() {
1735            return Err(Error::invalid_input_source(
1736                "Table ID cannot be empty".into(),
1737            ));
1738        }
1739
1740        let (namespace, table_name) = Self::split_object_id(table_id);
1741        let object_id = Self::build_object_id(&namespace, &table_name);
1742
1743        // Check if table already exists in manifest
1744        let existing = self.query_manifest_for_table(&object_id).await?;
1745        if existing.is_some() {
1746            return Err(Error::namespace_source(
1747                format!("Table '{}' already exists", table_name).into(),
1748            ));
1749        }
1750
1751        // Create table location path with hash-based naming
1752        // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance
1753        // Otherwise, use hash-based naming: {hash}_{object_id}
1754        let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1755            // Root table with directory listing enabled: use {table_name}.lance
1756            format!("{}.lance", table_name)
1757        } else {
1758            // Child namespace table or dir listing disabled: use hash-based naming
1759            Self::generate_dir_name(&object_id)
1760        };
1761        let table_path = self.base_path.child(dir_name.as_str());
1762        let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1763
1764        // Validate location if provided
1765        if let Some(req_location) = &request.location {
1766            let req_location = req_location.trim_end_matches('/');
1767            if req_location != table_uri {
1768                return Err(Error::namespace_source(
1769                    format!(
1770                        "Cannot declare table {} at location {}, must be at location {}",
1771                        table_name, req_location, table_uri
1772                    )
1773                    .into(),
1774                ));
1775            }
1776        }
1777
1778        // Create the .lance-reserved file to mark the table as existing
1779        let reserved_file_path = table_path.child(".lance-reserved");
1780
1781        self.object_store
1782            .create(&reserved_file_path)
1783            .await
1784            .map_err(|e| {
1785                Error::namespace_source(
1786                    format!(
1787                        "Failed to create .lance-reserved file for table {}: {}",
1788                        table_name, e
1789                    )
1790                    .into(),
1791                )
1792            })?
1793            .shutdown()
1794            .await
1795            .map_err(|e| {
1796                Error::namespace_source(
1797                    format!(
1798                        "Failed to finalize .lance-reserved file for table {}: {}",
1799                        table_name, e
1800                    )
1801                    .into(),
1802                )
1803            })?;
1804
1805        // Add entry to manifest marking this as a declared table (store dir_name, not full path)
1806        self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1807            .await?;
1808
1809        log::info!(
1810            "Declared table '{}' in manifest at {}",
1811            table_name,
1812            table_uri
1813        );
1814
1815        // For backwards compatibility, only skip vending credentials when explicitly set to false
1816        let vend_credentials = request.vend_credentials.unwrap_or(true);
1817        let storage_options = if vend_credentials {
1818            self.storage_options.clone()
1819        } else {
1820            None
1821        };
1822
1823        Ok(DeclareTableResponse {
1824            location: Some(table_uri),
1825            storage_options,
1826            ..Default::default()
1827        })
1828    }
1829
1830    async fn register_table(&self, request: RegisterTableRequest) -> Result<RegisterTableResponse> {
1831        let table_id = request
1832            .id
1833            .as_ref()
1834            .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1835
1836        if table_id.is_empty() {
1837            return Err(Error::invalid_input_source(
1838                "Table ID cannot be empty".into(),
1839            ));
1840        }
1841
1842        let location = request.location.clone();
1843
1844        // Validate that location is a relative path within the root directory
1845        // We don't allow absolute URIs or paths that escape the root
1846        if location.contains("://") {
1847            return Err(Error::invalid_input_source(format!(
1848                "Absolute URIs are not allowed for register_table. Location must be a relative path within the root directory: {}",
1849                location
1850            ).into()));
1851        }
1852
1853        if location.starts_with('/') {
1854            return Err(Error::invalid_input_source(format!(
1855                "Absolute paths are not allowed for register_table. Location must be a relative path within the root directory: {}",
1856                location
1857            ).into()));
1858        }
1859
1860        // Check for path traversal attempts
1861        if location.contains("..") {
1862            return Err(Error::invalid_input_source(format!(
1863                "Path traversal is not allowed. Location must be a relative path within the root directory: {}",
1864                location
1865            ).into()));
1866        }
1867
1868        let (namespace, table_name) = Self::split_object_id(table_id);
1869        let object_id = Self::build_object_id(&namespace, &table_name);
1870
1871        // Validate that parent namespaces exist (if not root)
1872        if !namespace.is_empty() {
1873            self.validate_namespace_levels_exist(&namespace).await?;
1874        }
1875
1876        // Check if table already exists
1877        if self.manifest_contains_object(&object_id).await? {
1878            return Err(Error::namespace_source(
1879                format!("Table '{}' already exists", object_id).into(),
1880            ));
1881        }
1882
1883        // Register the table with its location in the manifest
1884        self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone()))
1885            .await?;
1886
1887        Ok(RegisterTableResponse {
1888            location: Some(location),
1889            ..Default::default()
1890        })
1891    }
1892
1893    async fn deregister_table(
1894        &self,
1895        request: DeregisterTableRequest,
1896    ) -> Result<DeregisterTableResponse> {
1897        let table_id = request
1898            .id
1899            .as_ref()
1900            .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1901
1902        if table_id.is_empty() {
1903            return Err(Error::invalid_input_source(
1904                "Table ID cannot be empty".into(),
1905            ));
1906        }
1907
1908        let (namespace, table_name) = Self::split_object_id(table_id);
1909        let object_id = Self::build_object_id(&namespace, &table_name);
1910
1911        // Get table info before deleting
1912        let table_info = self.query_manifest_for_table(&object_id).await?;
1913
1914        let table_uri = match table_info {
1915            Some(info) => {
1916                // Delete from manifest only (leave physical data intact)
1917                self.delete_from_manifest(&object_id).boxed().await?;
1918                Self::construct_full_uri(&self.root, &info.location)?
1919            }
1920            None => {
1921                return Err(Error::namespace_source(
1922                    format!("Table '{}' not found", object_id).into(),
1923                ));
1924            }
1925        };
1926
1927        Ok(DeregisterTableResponse {
1928            id: request.id.clone(),
1929            location: Some(table_uri),
1930            ..Default::default()
1931        })
1932    }
1933}
1934
1935#[cfg(test)]
1936mod tests {
1937    use crate::{DirectoryNamespaceBuilder, ManifestNamespace};
1938    use bytes::Bytes;
1939    use lance_core::utils::tempfile::TempStdDir;
1940    use lance_namespace::LanceNamespace;
1941    use lance_namespace::models::{
1942        CreateNamespaceRequest, CreateTableRequest, DescribeTableRequest, DropTableRequest,
1943        ListTablesRequest, TableExistsRequest,
1944    };
1945    use rstest::rstest;
1946
1947    fn create_test_ipc_data() -> Vec<u8> {
1948        use arrow::array::{Int32Array, StringArray};
1949        use arrow::datatypes::{DataType, Field, Schema};
1950        use arrow::ipc::writer::StreamWriter;
1951        use arrow::record_batch::RecordBatch;
1952        use std::sync::Arc;
1953
1954        let schema = Arc::new(Schema::new(vec![
1955            Field::new("id", DataType::Int32, false),
1956            Field::new("name", DataType::Utf8, false),
1957        ]));
1958
1959        let batch = RecordBatch::try_new(
1960            schema.clone(),
1961            vec![
1962                Arc::new(Int32Array::from(vec![1, 2, 3])),
1963                Arc::new(StringArray::from(vec!["a", "b", "c"])),
1964            ],
1965        )
1966        .unwrap();
1967
1968        let mut buffer = Vec::new();
1969        {
1970            let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap();
1971            writer.write(&batch).unwrap();
1972            writer.finish().unwrap();
1973        }
1974        buffer
1975    }
1976
1977    #[rstest]
1978    #[case::with_optimization(true)]
1979    #[case::without_optimization(false)]
1980    #[tokio::test]
1981    async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) {
1982        let temp_dir = TempStdDir::default();
1983        let temp_path = temp_dir.to_str().unwrap();
1984
1985        // Create a DirectoryNamespace with manifest enabled (default)
1986        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1987            .inline_optimization_enabled(inline_optimization)
1988            .build()
1989            .await
1990            .unwrap();
1991
1992        // Verify we can list tables (should be empty)
1993        let mut request = ListTablesRequest::new();
1994        request.id = Some(vec![]);
1995        let response = dir_namespace.list_tables(request).await.unwrap();
1996        assert_eq!(response.tables.len(), 0);
1997
1998        // Create a test table
1999        let buffer = create_test_ipc_data();
2000        let mut create_request = CreateTableRequest::new();
2001        create_request.id = Some(vec!["test_table".to_string()]);
2002
2003        let _response = dir_namespace
2004            .create_table(create_request, Bytes::from(buffer))
2005            .await
2006            .unwrap();
2007
2008        // List tables again - should see our new table
2009        let mut request = ListTablesRequest::new();
2010        request.id = Some(vec![]);
2011        let response = dir_namespace.list_tables(request).await.unwrap();
2012        assert_eq!(response.tables.len(), 1);
2013        assert_eq!(response.tables[0], "test_table");
2014    }
2015
2016    #[rstest]
2017    #[case::with_optimization(true)]
2018    #[case::without_optimization(false)]
2019    #[tokio::test]
2020    async fn test_manifest_namespace_table_exists(#[case] inline_optimization: bool) {
2021        let temp_dir = TempStdDir::default();
2022        let temp_path = temp_dir.to_str().unwrap();
2023
2024        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2025            .inline_optimization_enabled(inline_optimization)
2026            .build()
2027            .await
2028            .unwrap();
2029
2030        // Check non-existent table
2031        let mut request = TableExistsRequest::new();
2032        request.id = Some(vec!["nonexistent".to_string()]);
2033        let result = dir_namespace.table_exists(request).await;
2034        assert!(result.is_err());
2035
2036        // Create table
2037        let buffer = create_test_ipc_data();
2038        let mut create_request = CreateTableRequest::new();
2039        create_request.id = Some(vec!["test_table".to_string()]);
2040        dir_namespace
2041            .create_table(create_request, Bytes::from(buffer))
2042            .await
2043            .unwrap();
2044
2045        // Check existing table
2046        let mut request = TableExistsRequest::new();
2047        request.id = Some(vec!["test_table".to_string()]);
2048        let result = dir_namespace.table_exists(request).await;
2049        assert!(result.is_ok());
2050    }
2051
2052    #[rstest]
2053    #[case::with_optimization(true)]
2054    #[case::without_optimization(false)]
2055    #[tokio::test]
2056    async fn test_manifest_namespace_describe_table(#[case] inline_optimization: bool) {
2057        let temp_dir = TempStdDir::default();
2058        let temp_path = temp_dir.to_str().unwrap();
2059
2060        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2061            .inline_optimization_enabled(inline_optimization)
2062            .build()
2063            .await
2064            .unwrap();
2065
2066        // Describe non-existent table
2067        let mut request = DescribeTableRequest::new();
2068        request.id = Some(vec!["nonexistent".to_string()]);
2069        let result = dir_namespace.describe_table(request).await;
2070        assert!(result.is_err());
2071
2072        // Create table
2073        let buffer = create_test_ipc_data();
2074        let mut create_request = CreateTableRequest::new();
2075        create_request.id = Some(vec!["test_table".to_string()]);
2076        dir_namespace
2077            .create_table(create_request, Bytes::from(buffer))
2078            .await
2079            .unwrap();
2080
2081        // Describe existing table
2082        let mut request = DescribeTableRequest::new();
2083        request.id = Some(vec!["test_table".to_string()]);
2084        let response = dir_namespace.describe_table(request).await.unwrap();
2085        assert!(response.location.is_some());
2086        assert!(response.location.unwrap().contains("test_table"));
2087    }
2088
2089    #[rstest]
2090    #[case::with_optimization(true)]
2091    #[case::without_optimization(false)]
2092    #[tokio::test]
2093    async fn test_manifest_namespace_drop_table(#[case] inline_optimization: bool) {
2094        let temp_dir = TempStdDir::default();
2095        let temp_path = temp_dir.to_str().unwrap();
2096
2097        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2098            .inline_optimization_enabled(inline_optimization)
2099            .build()
2100            .await
2101            .unwrap();
2102
2103        // Create table
2104        let buffer = create_test_ipc_data();
2105        let mut create_request = CreateTableRequest::new();
2106        create_request.id = Some(vec!["test_table".to_string()]);
2107        dir_namespace
2108            .create_table(create_request, Bytes::from(buffer))
2109            .await
2110            .unwrap();
2111
2112        // Verify table exists
2113        let mut request = ListTablesRequest::new();
2114        request.id = Some(vec![]);
2115        let response = dir_namespace.list_tables(request).await.unwrap();
2116        assert_eq!(response.tables.len(), 1);
2117
2118        // Drop table
2119        let mut drop_request = DropTableRequest::new();
2120        drop_request.id = Some(vec!["test_table".to_string()]);
2121        let _response = dir_namespace.drop_table(drop_request).await.unwrap();
2122
2123        // Verify table is gone
2124        let mut request = ListTablesRequest::new();
2125        request.id = Some(vec![]);
2126        let response = dir_namespace.list_tables(request).await.unwrap();
2127        assert_eq!(response.tables.len(), 0);
2128    }
2129
2130    #[rstest]
2131    #[case::with_optimization(true)]
2132    #[case::without_optimization(false)]
2133    #[tokio::test]
2134    async fn test_manifest_namespace_multiple_tables(#[case] inline_optimization: bool) {
2135        let temp_dir = TempStdDir::default();
2136        let temp_path = temp_dir.to_str().unwrap();
2137
2138        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2139            .inline_optimization_enabled(inline_optimization)
2140            .build()
2141            .await
2142            .unwrap();
2143
2144        // Create multiple tables
2145        let buffer = create_test_ipc_data();
2146        for i in 1..=3 {
2147            let mut create_request = CreateTableRequest::new();
2148            create_request.id = Some(vec![format!("table{}", i)]);
2149            dir_namespace
2150                .create_table(create_request, Bytes::from(buffer.clone()))
2151                .await
2152                .unwrap();
2153        }
2154
2155        // List all tables
2156        let mut request = ListTablesRequest::new();
2157        request.id = Some(vec![]);
2158        let response = dir_namespace.list_tables(request).await.unwrap();
2159        assert_eq!(response.tables.len(), 3);
2160        assert!(response.tables.contains(&"table1".to_string()));
2161        assert!(response.tables.contains(&"table2".to_string()));
2162        assert!(response.tables.contains(&"table3".to_string()));
2163    }
2164
2165    #[rstest]
2166    #[case::with_optimization(true)]
2167    #[case::without_optimization(false)]
2168    #[tokio::test]
2169    async fn test_directory_only_mode(#[case] inline_optimization: bool) {
2170        let temp_dir = TempStdDir::default();
2171        let temp_path = temp_dir.to_str().unwrap();
2172
2173        // Create a DirectoryNamespace with manifest disabled
2174        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2175            .manifest_enabled(false)
2176            .inline_optimization_enabled(inline_optimization)
2177            .build()
2178            .await
2179            .unwrap();
2180
2181        // Verify we can list tables (should be empty)
2182        let mut request = ListTablesRequest::new();
2183        request.id = Some(vec![]);
2184        let response = dir_namespace.list_tables(request).await.unwrap();
2185        assert_eq!(response.tables.len(), 0);
2186
2187        // Create a test table
2188        let buffer = create_test_ipc_data();
2189        let mut create_request = CreateTableRequest::new();
2190        create_request.id = Some(vec!["test_table".to_string()]);
2191
2192        // Create table - this should use directory-only mode
2193        let _response = dir_namespace
2194            .create_table(create_request, Bytes::from(buffer))
2195            .await
2196            .unwrap();
2197
2198        // List tables - should see our new table
2199        let mut request = ListTablesRequest::new();
2200        request.id = Some(vec![]);
2201        let response = dir_namespace.list_tables(request).await.unwrap();
2202        assert_eq!(response.tables.len(), 1);
2203        assert_eq!(response.tables[0], "test_table");
2204    }
2205
2206    #[rstest]
2207    #[case::with_optimization(true)]
2208    #[case::without_optimization(false)]
2209    #[tokio::test]
2210    async fn test_dual_mode_merge(#[case] inline_optimization: bool) {
2211        let temp_dir = TempStdDir::default();
2212        let temp_path = temp_dir.to_str().unwrap();
2213
2214        // Create a DirectoryNamespace with both manifest and directory enabled
2215        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2216            .manifest_enabled(true)
2217            .dir_listing_enabled(true)
2218            .inline_optimization_enabled(inline_optimization)
2219            .build()
2220            .await
2221            .unwrap();
2222
2223        // Create tables through manifest
2224        let buffer = create_test_ipc_data();
2225        let mut create_request = CreateTableRequest::new();
2226        create_request.id = Some(vec!["table1".to_string()]);
2227        dir_namespace
2228            .create_table(create_request, Bytes::from(buffer))
2229            .await
2230            .unwrap();
2231
2232        // List tables - should see table from both manifest and directory
2233        let mut request = ListTablesRequest::new();
2234        request.id = Some(vec![]);
2235        let response = dir_namespace.list_tables(request).await.unwrap();
2236        assert_eq!(response.tables.len(), 1);
2237        assert_eq!(response.tables[0], "table1");
2238    }
2239
2240    #[rstest]
2241    #[case::with_optimization(true)]
2242    #[case::without_optimization(false)]
2243    #[tokio::test]
2244    async fn test_manifest_only_mode(#[case] inline_optimization: bool) {
2245        let temp_dir = TempStdDir::default();
2246        let temp_path = temp_dir.to_str().unwrap();
2247
2248        // Create a DirectoryNamespace with only manifest enabled
2249        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2250            .manifest_enabled(true)
2251            .dir_listing_enabled(false)
2252            .inline_optimization_enabled(inline_optimization)
2253            .build()
2254            .await
2255            .unwrap();
2256
2257        // Create table
2258        let buffer = create_test_ipc_data();
2259        let mut create_request = CreateTableRequest::new();
2260        create_request.id = Some(vec!["test_table".to_string()]);
2261        dir_namespace
2262            .create_table(create_request, Bytes::from(buffer))
2263            .await
2264            .unwrap();
2265
2266        // List tables - should only use manifest
2267        let mut request = ListTablesRequest::new();
2268        request.id = Some(vec![]);
2269        let response = dir_namespace.list_tables(request).await.unwrap();
2270        assert_eq!(response.tables.len(), 1);
2271        assert_eq!(response.tables[0], "test_table");
2272    }
2273
2274    #[rstest]
2275    #[case::with_optimization(true)]
2276    #[case::without_optimization(false)]
2277    #[tokio::test]
2278    async fn test_drop_nonexistent_table(#[case] inline_optimization: bool) {
2279        let temp_dir = TempStdDir::default();
2280        let temp_path = temp_dir.to_str().unwrap();
2281
2282        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2283            .inline_optimization_enabled(inline_optimization)
2284            .build()
2285            .await
2286            .unwrap();
2287
2288        // Try to drop non-existent table
2289        let mut drop_request = DropTableRequest::new();
2290        drop_request.id = Some(vec!["nonexistent".to_string()]);
2291        let result = dir_namespace.drop_table(drop_request).await;
2292        assert!(result.is_err());
2293    }
2294
2295    #[rstest]
2296    #[case::with_optimization(true)]
2297    #[case::without_optimization(false)]
2298    #[tokio::test]
2299    async fn test_create_duplicate_table_fails(#[case] inline_optimization: bool) {
2300        let temp_dir = TempStdDir::default();
2301        let temp_path = temp_dir.to_str().unwrap();
2302
2303        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2304            .inline_optimization_enabled(inline_optimization)
2305            .build()
2306            .await
2307            .unwrap();
2308
2309        // Create table
2310        let buffer = create_test_ipc_data();
2311        let mut create_request = CreateTableRequest::new();
2312        create_request.id = Some(vec!["test_table".to_string()]);
2313        dir_namespace
2314            .create_table(create_request, Bytes::from(buffer.clone()))
2315            .await
2316            .unwrap();
2317
2318        // Try to create table with same name - should fail
2319        let mut create_request = CreateTableRequest::new();
2320        create_request.id = Some(vec!["test_table".to_string()]);
2321        let result = dir_namespace
2322            .create_table(create_request, Bytes::from(buffer))
2323            .await;
2324        assert!(result.is_err());
2325    }
2326
2327    #[rstest]
2328    #[case::with_optimization(true)]
2329    #[case::without_optimization(false)]
2330    #[tokio::test]
2331    async fn test_create_child_namespace(#[case] inline_optimization: bool) {
2332        use lance_namespace::models::{
2333            CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2334        };
2335
2336        let temp_dir = TempStdDir::default();
2337        let temp_path = temp_dir.to_str().unwrap();
2338
2339        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2340            .inline_optimization_enabled(inline_optimization)
2341            .build()
2342            .await
2343            .unwrap();
2344
2345        // Create a child namespace
2346        let mut create_req = CreateNamespaceRequest::new();
2347        create_req.id = Some(vec!["ns1".to_string()]);
2348        let result = dir_namespace.create_namespace(create_req).await;
2349        assert!(
2350            result.is_ok(),
2351            "Failed to create child namespace: {:?}",
2352            result.err()
2353        );
2354
2355        // Verify namespace exists
2356        let exists_req = NamespaceExistsRequest {
2357            id: Some(vec!["ns1".to_string()]),
2358            ..Default::default()
2359        };
2360        let result = dir_namespace.namespace_exists(exists_req).await;
2361        assert!(result.is_ok(), "Namespace should exist");
2362
2363        // List child namespaces of root
2364        let list_req = ListNamespacesRequest {
2365            id: Some(vec![]),
2366            page_token: None,
2367            limit: None,
2368            ..Default::default()
2369        };
2370        let result = dir_namespace.list_namespaces(list_req).await;
2371        assert!(result.is_ok());
2372        let namespaces = result.unwrap();
2373        assert_eq!(namespaces.namespaces.len(), 1);
2374        assert_eq!(namespaces.namespaces[0], "ns1");
2375    }
2376
2377    #[rstest]
2378    #[case::with_optimization(true)]
2379    #[case::without_optimization(false)]
2380    #[tokio::test]
2381    async fn test_create_nested_namespace(#[case] inline_optimization: bool) {
2382        use lance_namespace::models::{
2383            CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2384        };
2385
2386        let temp_dir = TempStdDir::default();
2387        let temp_path = temp_dir.to_str().unwrap();
2388
2389        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2390            .inline_optimization_enabled(inline_optimization)
2391            .build()
2392            .await
2393            .unwrap();
2394
2395        // Create parent namespace
2396        let mut create_req = CreateNamespaceRequest::new();
2397        create_req.id = Some(vec!["parent".to_string()]);
2398        dir_namespace.create_namespace(create_req).await.unwrap();
2399
2400        // Create nested child namespace
2401        let mut create_req = CreateNamespaceRequest::new();
2402        create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2403        let result = dir_namespace.create_namespace(create_req).await;
2404        assert!(
2405            result.is_ok(),
2406            "Failed to create nested namespace: {:?}",
2407            result.err()
2408        );
2409
2410        // Verify nested namespace exists
2411        let exists_req = NamespaceExistsRequest {
2412            id: Some(vec!["parent".to_string(), "child".to_string()]),
2413            ..Default::default()
2414        };
2415        let result = dir_namespace.namespace_exists(exists_req).await;
2416        assert!(result.is_ok(), "Nested namespace should exist");
2417
2418        // List child namespaces of parent
2419        let list_req = ListNamespacesRequest {
2420            id: Some(vec!["parent".to_string()]),
2421            page_token: None,
2422            limit: None,
2423            ..Default::default()
2424        };
2425        let result = dir_namespace.list_namespaces(list_req).await;
2426        assert!(result.is_ok());
2427        let namespaces = result.unwrap();
2428        assert_eq!(namespaces.namespaces.len(), 1);
2429        assert_eq!(namespaces.namespaces[0], "child");
2430    }
2431
2432    #[rstest]
2433    #[case::with_optimization(true)]
2434    #[case::without_optimization(false)]
2435    #[tokio::test]
2436    async fn test_create_namespace_without_parent_fails(#[case] inline_optimization: bool) {
2437        use lance_namespace::models::CreateNamespaceRequest;
2438
2439        let temp_dir = TempStdDir::default();
2440        let temp_path = temp_dir.to_str().unwrap();
2441
2442        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2443            .inline_optimization_enabled(inline_optimization)
2444            .build()
2445            .await
2446            .unwrap();
2447
2448        // Try to create nested namespace without parent
2449        let mut create_req = CreateNamespaceRequest::new();
2450        create_req.id = Some(vec!["nonexistent_parent".to_string(), "child".to_string()]);
2451        let result = dir_namespace.create_namespace(create_req).await;
2452        assert!(result.is_err(), "Should fail when parent doesn't exist");
2453    }
2454
2455    #[rstest]
2456    #[case::with_optimization(true)]
2457    #[case::without_optimization(false)]
2458    #[tokio::test]
2459    async fn test_drop_child_namespace(#[case] inline_optimization: bool) {
2460        use lance_namespace::models::{
2461            CreateNamespaceRequest, DropNamespaceRequest, NamespaceExistsRequest,
2462        };
2463
2464        let temp_dir = TempStdDir::default();
2465        let temp_path = temp_dir.to_str().unwrap();
2466
2467        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2468            .inline_optimization_enabled(inline_optimization)
2469            .build()
2470            .await
2471            .unwrap();
2472
2473        // Create a child namespace
2474        let mut create_req = CreateNamespaceRequest::new();
2475        create_req.id = Some(vec!["ns1".to_string()]);
2476        dir_namespace.create_namespace(create_req).await.unwrap();
2477
2478        // Drop the namespace
2479        let mut drop_req = DropNamespaceRequest::new();
2480        drop_req.id = Some(vec!["ns1".to_string()]);
2481        let result = dir_namespace.drop_namespace(drop_req).await;
2482        assert!(
2483            result.is_ok(),
2484            "Failed to drop namespace: {:?}",
2485            result.err()
2486        );
2487
2488        // Verify namespace no longer exists
2489        let exists_req = NamespaceExistsRequest {
2490            id: Some(vec!["ns1".to_string()]),
2491            ..Default::default()
2492        };
2493        let result = dir_namespace.namespace_exists(exists_req).await;
2494        assert!(result.is_err(), "Namespace should not exist after drop");
2495    }
2496
2497    #[rstest]
2498    #[case::with_optimization(true)]
2499    #[case::without_optimization(false)]
2500    #[tokio::test]
2501    async fn test_drop_namespace_with_children_fails(#[case] inline_optimization: bool) {
2502        use lance_namespace::models::{CreateNamespaceRequest, DropNamespaceRequest};
2503
2504        let temp_dir = TempStdDir::default();
2505        let temp_path = temp_dir.to_str().unwrap();
2506
2507        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2508            .inline_optimization_enabled(inline_optimization)
2509            .build()
2510            .await
2511            .unwrap();
2512
2513        // Create parent and child namespaces
2514        let mut create_req = CreateNamespaceRequest::new();
2515        create_req.id = Some(vec!["parent".to_string()]);
2516        dir_namespace.create_namespace(create_req).await.unwrap();
2517
2518        let mut create_req = CreateNamespaceRequest::new();
2519        create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2520        dir_namespace.create_namespace(create_req).await.unwrap();
2521
2522        // Try to drop parent namespace - should fail because it has children
2523        let mut drop_req = DropNamespaceRequest::new();
2524        drop_req.id = Some(vec!["parent".to_string()]);
2525        let result = dir_namespace.drop_namespace(drop_req).await;
2526        assert!(result.is_err(), "Should fail when namespace has children");
2527    }
2528
2529    #[rstest]
2530    #[case::with_optimization(true)]
2531    #[case::without_optimization(false)]
2532    #[tokio::test]
2533    async fn test_create_table_in_child_namespace(#[case] inline_optimization: bool) {
2534        use lance_namespace::models::{
2535            CreateNamespaceRequest, CreateTableRequest, ListTablesRequest,
2536        };
2537
2538        let temp_dir = TempStdDir::default();
2539        let temp_path = temp_dir.to_str().unwrap();
2540
2541        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2542            .inline_optimization_enabled(inline_optimization)
2543            .build()
2544            .await
2545            .unwrap();
2546
2547        // Create a child namespace
2548        let mut create_ns_req = CreateNamespaceRequest::new();
2549        create_ns_req.id = Some(vec!["ns1".to_string()]);
2550        dir_namespace.create_namespace(create_ns_req).await.unwrap();
2551
2552        // Create a table in the child namespace
2553        let buffer = create_test_ipc_data();
2554        let mut create_table_req = CreateTableRequest::new();
2555        create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2556        let result = dir_namespace
2557            .create_table(create_table_req, Bytes::from(buffer))
2558            .await;
2559        assert!(
2560            result.is_ok(),
2561            "Failed to create table in child namespace: {:?}",
2562            result.err()
2563        );
2564
2565        // List tables in the namespace
2566        let list_req = ListTablesRequest {
2567            id: Some(vec!["ns1".to_string()]),
2568            page_token: None,
2569            limit: None,
2570            ..Default::default()
2571        };
2572        let result = dir_namespace.list_tables(list_req).await;
2573        assert!(result.is_ok());
2574        let tables = result.unwrap();
2575        assert_eq!(tables.tables.len(), 1);
2576        assert_eq!(tables.tables[0], "table1");
2577    }
2578
2579    #[rstest]
2580    #[case::with_optimization(true)]
2581    #[case::without_optimization(false)]
2582    #[tokio::test]
2583    async fn test_describe_child_namespace(#[case] inline_optimization: bool) {
2584        use lance_namespace::models::{CreateNamespaceRequest, DescribeNamespaceRequest};
2585
2586        let temp_dir = TempStdDir::default();
2587        let temp_path = temp_dir.to_str().unwrap();
2588
2589        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2590            .inline_optimization_enabled(inline_optimization)
2591            .build()
2592            .await
2593            .unwrap();
2594
2595        // Create a child namespace with properties
2596        let mut properties = std::collections::HashMap::new();
2597        properties.insert("key1".to_string(), "value1".to_string());
2598
2599        let mut create_req = CreateNamespaceRequest::new();
2600        create_req.id = Some(vec!["ns1".to_string()]);
2601        create_req.properties = Some(properties.clone());
2602        dir_namespace.create_namespace(create_req).await.unwrap();
2603
2604        // Describe the namespace
2605        let describe_req = DescribeNamespaceRequest {
2606            id: Some(vec!["ns1".to_string()]),
2607            ..Default::default()
2608        };
2609        let result = dir_namespace.describe_namespace(describe_req).await;
2610        assert!(
2611            result.is_ok(),
2612            "Failed to describe namespace: {:?}",
2613            result.err()
2614        );
2615        let response = result.unwrap();
2616        assert!(response.properties.is_some());
2617        assert_eq!(
2618            response.properties.unwrap().get("key1"),
2619            Some(&"value1".to_string())
2620        );
2621    }
2622
2623    #[rstest]
2624    #[case::with_optimization(true)]
2625    #[case::without_optimization(false)]
2626    #[tokio::test]
2627    async fn test_concurrent_create_and_drop_single_instance(#[case] inline_optimization: bool) {
2628        use futures::future::join_all;
2629        use std::sync::Arc;
2630
2631        let temp_dir = TempStdDir::default();
2632        let temp_path = temp_dir.to_str().unwrap();
2633
2634        let dir_namespace = Arc::new(
2635            DirectoryNamespaceBuilder::new(temp_path)
2636                .inline_optimization_enabled(inline_optimization)
2637                .build()
2638                .await
2639                .unwrap(),
2640        );
2641
2642        // Initialize namespace first - create parent namespace to ensure __manifest table
2643        // is created before concurrent operations
2644        let mut create_ns_request = CreateNamespaceRequest::new();
2645        create_ns_request.id = Some(vec!["test_ns".to_string()]);
2646        dir_namespace
2647            .create_namespace(create_ns_request)
2648            .await
2649            .unwrap();
2650
2651        let num_tables = 10;
2652        let mut handles = Vec::new();
2653
2654        for i in 0..num_tables {
2655            let ns = dir_namespace.clone();
2656            let handle = async move {
2657                let table_name = format!("concurrent_table_{}", i);
2658                let table_id = vec!["test_ns".to_string(), table_name.clone()];
2659                let buffer = create_test_ipc_data();
2660
2661                // Create table
2662                let mut create_request = CreateTableRequest::new();
2663                create_request.id = Some(table_id.clone());
2664                ns.create_table(create_request, Bytes::from(buffer))
2665                    .await
2666                    .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
2667
2668                // Drop table
2669                let mut drop_request = DropTableRequest::new();
2670                drop_request.id = Some(table_id);
2671                ns.drop_table(drop_request)
2672                    .await
2673                    .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
2674
2675                Ok::<_, lance_core::Error>(())
2676            };
2677            handles.push(handle);
2678        }
2679
2680        let results = join_all(handles).await;
2681        for result in results {
2682            assert!(result.is_ok(), "All concurrent operations should succeed");
2683        }
2684
2685        // Verify all tables are dropped
2686        let mut request = ListTablesRequest::new();
2687        request.id = Some(vec!["test_ns".to_string()]);
2688        let response = dir_namespace.list_tables(request).await.unwrap();
2689        assert_eq!(response.tables.len(), 0, "All tables should be dropped");
2690    }
2691
2692    #[rstest]
2693    #[case::with_optimization(true)]
2694    #[case::without_optimization(false)]
2695    #[tokio::test]
2696    async fn test_concurrent_create_and_drop_multiple_instances(#[case] inline_optimization: bool) {
2697        use futures::future::join_all;
2698
2699        let temp_dir = TempStdDir::default();
2700        let temp_path = temp_dir.to_str().unwrap().to_string();
2701
2702        // Initialize namespace first with a single instance to ensure __manifest
2703        // table is created and parent namespace exists before concurrent operations
2704        let init_ns = DirectoryNamespaceBuilder::new(&temp_path)
2705            .inline_optimization_enabled(inline_optimization)
2706            .build()
2707            .await
2708            .unwrap();
2709        let mut create_ns_request = CreateNamespaceRequest::new();
2710        create_ns_request.id = Some(vec!["test_ns".to_string()]);
2711        init_ns.create_namespace(create_ns_request).await.unwrap();
2712
2713        let num_tables = 10;
2714        let mut handles = Vec::new();
2715
2716        for i in 0..num_tables {
2717            let path = temp_path.clone();
2718            let handle = async move {
2719                // Each task creates its own namespace instance
2720                let ns = DirectoryNamespaceBuilder::new(&path)
2721                    .inline_optimization_enabled(inline_optimization)
2722                    .build()
2723                    .await
2724                    .unwrap();
2725
2726                let table_name = format!("multi_ns_table_{}", i);
2727                let table_id = vec!["test_ns".to_string(), table_name.clone()];
2728                let buffer = create_test_ipc_data();
2729
2730                // Create table
2731                let mut create_request = CreateTableRequest::new();
2732                create_request.id = Some(table_id.clone());
2733                ns.create_table(create_request, Bytes::from(buffer))
2734                    .await
2735                    .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
2736
2737                // Drop table
2738                let mut drop_request = DropTableRequest::new();
2739                drop_request.id = Some(table_id);
2740                ns.drop_table(drop_request)
2741                    .await
2742                    .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
2743
2744                Ok::<_, lance_core::Error>(())
2745            };
2746            handles.push(handle);
2747        }
2748
2749        let results = join_all(handles).await;
2750        for result in results {
2751            assert!(result.is_ok(), "All concurrent operations should succeed");
2752        }
2753
2754        // Verify with a fresh namespace instance
2755        let verify_ns = DirectoryNamespaceBuilder::new(&temp_path)
2756            .inline_optimization_enabled(inline_optimization)
2757            .build()
2758            .await
2759            .unwrap();
2760
2761        let mut request = ListTablesRequest::new();
2762        request.id = Some(vec!["test_ns".to_string()]);
2763        let response = verify_ns.list_tables(request).await.unwrap();
2764        assert_eq!(response.tables.len(), 0, "All tables should be dropped");
2765    }
2766
2767    #[rstest]
2768    #[case::with_optimization(true)]
2769    #[case::without_optimization(false)]
2770    #[tokio::test]
2771    async fn test_concurrent_create_then_drop_from_different_instance(
2772        #[case] inline_optimization: bool,
2773    ) {
2774        use futures::future::join_all;
2775
2776        let temp_dir = TempStdDir::default();
2777        let temp_path = temp_dir.to_str().unwrap().to_string();
2778
2779        // Initialize namespace first with a single instance to ensure __manifest
2780        // table is created and parent namespace exists before concurrent operations
2781        let init_ns = DirectoryNamespaceBuilder::new(&temp_path)
2782            .inline_optimization_enabled(inline_optimization)
2783            .build()
2784            .await
2785            .unwrap();
2786        let mut create_ns_request = CreateNamespaceRequest::new();
2787        create_ns_request.id = Some(vec!["test_ns".to_string()]);
2788        init_ns.create_namespace(create_ns_request).await.unwrap();
2789
2790        let num_tables = 10;
2791
2792        // Phase 1: Create all tables concurrently using separate namespace instances
2793        let mut create_handles = Vec::new();
2794        for i in 0..num_tables {
2795            let path = temp_path.clone();
2796            let handle = async move {
2797                let ns = DirectoryNamespaceBuilder::new(&path)
2798                    .inline_optimization_enabled(inline_optimization)
2799                    .build()
2800                    .await
2801                    .unwrap();
2802
2803                let table_name = format!("cross_instance_table_{}", i);
2804                let table_id = vec!["test_ns".to_string(), table_name.clone()];
2805                let buffer = create_test_ipc_data();
2806
2807                let mut create_request = CreateTableRequest::new();
2808                create_request.id = Some(table_id);
2809                ns.create_table(create_request, Bytes::from(buffer))
2810                    .await
2811                    .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
2812
2813                Ok::<_, lance_core::Error>(())
2814            };
2815            create_handles.push(handle);
2816        }
2817
2818        let create_results = join_all(create_handles).await;
2819        for result in create_results {
2820            assert!(result.is_ok(), "All create operations should succeed");
2821        }
2822
2823        // Phase 2: Drop all tables concurrently using NEW namespace instances
2824        let mut drop_handles = Vec::new();
2825        for i in 0..num_tables {
2826            let path = temp_path.clone();
2827            let handle = async move {
2828                let ns = DirectoryNamespaceBuilder::new(&path)
2829                    .inline_optimization_enabled(inline_optimization)
2830                    .build()
2831                    .await
2832                    .unwrap();
2833
2834                let table_name = format!("cross_instance_table_{}", i);
2835                let table_id = vec!["test_ns".to_string(), table_name.clone()];
2836
2837                let mut drop_request = DropTableRequest::new();
2838                drop_request.id = Some(table_id);
2839                ns.drop_table(drop_request)
2840                    .await
2841                    .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
2842
2843                Ok::<_, lance_core::Error>(())
2844            };
2845            drop_handles.push(handle);
2846        }
2847
2848        let drop_results = join_all(drop_handles).await;
2849        for result in drop_results {
2850            assert!(result.is_ok(), "All drop operations should succeed");
2851        }
2852
2853        // Verify all tables are dropped
2854        let verify_ns = DirectoryNamespaceBuilder::new(&temp_path)
2855            .inline_optimization_enabled(inline_optimization)
2856            .build()
2857            .await
2858            .unwrap();
2859
2860        let mut request = ListTablesRequest::new();
2861        request.id = Some(vec!["test_ns".to_string()]);
2862        let response = verify_ns.list_tables(request).await.unwrap();
2863        assert_eq!(response.tables.len(), 0, "All tables should be dropped");
2864    }
2865
2866    #[test]
2867    fn test_construct_full_uri_with_cloud_urls() {
2868        // Test S3-style URL with nested path (no trailing slash)
2869        let s3_result =
2870            ManifestNamespace::construct_full_uri("s3://bucket/path/subdir", "table.lance")
2871                .unwrap();
2872        assert_eq!(
2873            s3_result, "s3://bucket/path/subdir/table.lance",
2874            "S3 URL should correctly append table name to nested path"
2875        );
2876
2877        // Test Azure-style URL with nested path (no trailing slash)
2878        let az_result =
2879            ManifestNamespace::construct_full_uri("az://container/path/subdir", "table.lance")
2880                .unwrap();
2881        assert_eq!(
2882            az_result, "az://container/path/subdir/table.lance",
2883            "Azure URL should correctly append table name to nested path"
2884        );
2885
2886        // Test GCS-style URL with nested path (no trailing slash)
2887        let gs_result =
2888            ManifestNamespace::construct_full_uri("gs://bucket/path/subdir", "table.lance")
2889                .unwrap();
2890        assert_eq!(
2891            gs_result, "gs://bucket/path/subdir/table.lance",
2892            "GCS URL should correctly append table name to nested path"
2893        );
2894
2895        // Test with deeper nesting
2896        let deep_result =
2897            ManifestNamespace::construct_full_uri("s3://bucket/a/b/c/d", "my_table.lance").unwrap();
2898        assert_eq!(
2899            deep_result, "s3://bucket/a/b/c/d/my_table.lance",
2900            "Deeply nested path should work correctly"
2901        );
2902
2903        // Test with root-level path (single segment after bucket)
2904        let shallow_result =
2905            ManifestNamespace::construct_full_uri("s3://bucket", "table.lance").unwrap();
2906        assert_eq!(
2907            shallow_result, "s3://bucket/table.lance",
2908            "Single-level nested path should work correctly"
2909        );
2910
2911        // Test that URLs with trailing slash already work (no regression)
2912        let trailing_slash_result =
2913            ManifestNamespace::construct_full_uri("s3://bucket/path/subdir/", "table.lance")
2914                .unwrap();
2915        assert_eq!(
2916            trailing_slash_result, "s3://bucket/path/subdir/table.lance",
2917            "URL with existing trailing slash should still work"
2918        );
2919    }
2920
2921    /// Test that concurrent create_table calls for the same table name don't
2922    /// create duplicate entries in the manifest. Uses two independent
2923    /// ManifestNamespace instances pointing at the same directory to simulate
2924    /// two separate OS processes racing on table creation. The conflict_retries
2925    /// setting on the MergeInsert ensures the second operation properly detects
2926    /// the duplicate via WhenMatched::Fail after retrying against the latest data.
2927    #[tokio::test]
2928    async fn test_concurrent_create_table_no_duplicates() {
2929        let temp_dir = TempStdDir::default();
2930        let temp_path = temp_dir.to_str().unwrap();
2931
2932        // Two independent namespace instances = two separate "processes"
2933        // sharing the same underlying filesystem directory.
2934        let ns1 = DirectoryNamespaceBuilder::new(temp_path)
2935            .inline_optimization_enabled(false)
2936            .build()
2937            .await
2938            .unwrap();
2939        let ns2 = DirectoryNamespaceBuilder::new(temp_path)
2940            .inline_optimization_enabled(false)
2941            .build()
2942            .await
2943            .unwrap();
2944
2945        let buffer = create_test_ipc_data();
2946
2947        let mut req1 = CreateTableRequest::new();
2948        req1.id = Some(vec!["race_table".to_string()]);
2949        let mut req2 = CreateTableRequest::new();
2950        req2.id = Some(vec!["race_table".to_string()]);
2951
2952        // Launch both create_table calls concurrently
2953        let (result1, result2) = tokio::join!(
2954            ns1.create_table(req1, Bytes::from(buffer.clone())),
2955            ns2.create_table(req2, Bytes::from(buffer.clone())),
2956        );
2957
2958        // Exactly one should succeed and one should fail
2959        let success_count = [&result1, &result2].iter().filter(|r| r.is_ok()).count();
2960        let failure_count = [&result1, &result2].iter().filter(|r| r.is_err()).count();
2961        assert_eq!(
2962            success_count, 1,
2963            "Exactly one create should succeed, got: result1={:?}, result2={:?}",
2964            result1, result2
2965        );
2966        assert_eq!(
2967            failure_count, 1,
2968            "Exactly one create should fail, got: result1={:?}, result2={:?}",
2969            result1, result2
2970        );
2971
2972        // Verify only one table entry exists in the manifest
2973        let ns_check = DirectoryNamespaceBuilder::new(temp_path)
2974            .inline_optimization_enabled(false)
2975            .build()
2976            .await
2977            .unwrap();
2978        let mut list_request = ListTablesRequest::new();
2979        list_request.id = Some(vec![]);
2980        let response = ns_check.list_tables(list_request).await.unwrap();
2981        assert_eq!(
2982            response.tables.len(),
2983            1,
2984            "Should have exactly 1 table, found: {:?}",
2985            response.tables
2986        );
2987        assert_eq!(response.tables[0], "race_table");
2988
2989        // Also verify describe_table works (no "found 2" error)
2990        let mut describe_request = DescribeTableRequest::new();
2991        describe_request.id = Some(vec!["race_table".to_string()]);
2992        let describe_result = ns_check.describe_table(describe_request).await;
2993        assert!(
2994            describe_result.is_ok(),
2995            "describe_table should not fail with duplicate entries: {:?}",
2996            describe_result
2997        );
2998    }
2999}