Skip to main content

lance_namespace_impls/dir/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Manifest-based namespace implementation
5//!
6//! This module provides a namespace implementation that uses a manifest table
7//! to track tables and nested namespaces.
8
9use arrow::array::builder::{ListBuilder, StringBuilder};
10use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray};
11use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
12use arrow_ipc::reader::StreamReader;
13use async_trait::async_trait;
14use bytes::Bytes;
15use futures::{FutureExt, TryStreamExt, stream::StreamExt};
16use lance::dataset::optimize::{CompactionOptions, compact_files};
17use lance::dataset::{
18    DeleteBuilder, MergeInsertBuilder, ReadParams, WhenMatched, WhenNotMatched, WriteMode,
19    WriteParams, builder::DatasetBuilder,
20};
21use lance::index::DatasetIndexExt;
22use lance::session::Session;
23use lance::{Dataset, dataset::scanner::Scanner};
24use lance_core::Error as LanceError;
25use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION;
26use lance_core::{Error, Result};
27use lance_index::IndexType;
28use lance_index::optimize::OptimizeOptions;
29use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
30use lance_io::object_store::{ObjectStore, ObjectStoreParams};
31use lance_namespace::LanceNamespace;
32use lance_namespace::error::NamespaceError;
33use lance_namespace::models::{
34    CreateNamespaceRequest, CreateNamespaceResponse, CreateTableRequest, CreateTableResponse,
35    DeclareTableRequest, DeclareTableResponse, DeregisterTableRequest, DeregisterTableResponse,
36    DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
37    DescribeTableResponse, DescribeTableVersionResponse, DropNamespaceRequest,
38    DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest,
39    ListNamespacesResponse, ListTableVersionsResponse, ListTablesRequest, ListTablesResponse,
40    NamespaceExistsRequest, RegisterTableRequest, RegisterTableResponse, TableExistsRequest,
41    TableVersion,
42};
43use lance_namespace::schema::arrow_schema_to_json;
44use object_store::{Error as ObjectStoreError, path::Path};
45use std::io::Cursor;
46use std::{
47    collections::HashMap,
48    hash::{DefaultHasher, Hash, Hasher},
49    ops::{Deref, DerefMut},
50    sync::Arc,
51};
52use tokio::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
53
54const MANIFEST_TABLE_NAME: &str = "__manifest";
55const DELIMITER: &str = "$";
56/// Bounded concurrency for per-table `_versions/` probes when filtering declared tables.
57/// Higher values reduce latency but increase burst load against the object store.
58pub(crate) const DECLARED_FILTER_CONCURRENCY: usize = 16;
59
60// Index names for the __manifest table
61/// BTREE index on the object_id column for fast lookups
62const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
63/// Bitmap index on the object_type column for filtering by type
64const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
65/// LabelList index on the base_objects column for view dependencies
66const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
67/// Inline maintenance on the manifest table is expensive relative to a single-row mutation.
68/// Wait until enough fragments accumulate before compacting files or merging indices.
69const MANIFEST_INLINE_OPTIMIZATION_FRAGMENT_THRESHOLD: usize = 8;
70
71/// Object types that can be stored in the manifest
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum ObjectType {
74    Namespace,
75    Table,
76    TableVersion,
77}
78
79impl ObjectType {
80    pub fn as_str(&self) -> &str {
81        match self {
82            Self::Namespace => "namespace",
83            Self::Table => "table",
84            Self::TableVersion => "table_version",
85        }
86    }
87
88    pub fn parse(s: &str) -> Result<Self> {
89        match s {
90            "namespace" => Ok(Self::Namespace),
91            "table" => Ok(Self::Table),
92            "table_version" => Ok(Self::TableVersion),
93            _ => Err(NamespaceError::Internal {
94                message: format!("Invalid object type: {}", s),
95            }
96            .into()),
97        }
98    }
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102enum CreateTableMode {
103    Create,
104    ExistOk,
105    Overwrite,
106}
107
108impl CreateTableMode {
109    fn parse(mode: Option<&str>) -> Result<Self> {
110        match mode {
111            None => Ok(Self::Create),
112            Some(mode) if mode.eq_ignore_ascii_case("create") => Ok(Self::Create),
113            Some(mode)
114                if mode.eq_ignore_ascii_case("existok")
115                    || mode.eq_ignore_ascii_case("exist_ok") =>
116            {
117                Ok(Self::ExistOk)
118            }
119            Some(mode) if mode.eq_ignore_ascii_case("overwrite") => Ok(Self::Overwrite),
120            Some(mode) => Err(NamespaceError::InvalidInput {
121                message: format!(
122                    "Unsupported create_table mode '{}'. Supported modes are: 'Create', 'ExistOk', 'Overwrite'",
123                    mode
124                ),
125            }
126            .into()),
127        }
128    }
129
130    fn write_mode(self) -> WriteMode {
131        match self {
132            Self::Overwrite => WriteMode::Overwrite,
133            Self::Create | Self::ExistOk => WriteMode::Create,
134        }
135    }
136}
137
138/// Information about a table stored in the manifest
139#[derive(Debug, Clone)]
140pub struct TableInfo {
141    pub namespace: Vec<String>,
142    pub name: String,
143    pub location: String,
144    pub metadata: Option<HashMap<String, String>>,
145}
146
147/// An entry to be inserted into the manifest table.
148///
149/// This struct makes the meaning of each field explicit, replacing the
150/// previous tuple-based API `(String, ObjectType, Option<String>, Option<String>)`.
151#[derive(Debug, Clone)]
152pub struct ManifestEntry {
153    /// The unique object identifier (e.g., table name or version object_id)
154    pub object_id: String,
155    /// The type of the object (Namespace, Table, or TableVersion)
156    pub object_type: ObjectType,
157    /// The storage location (e.g., directory name for tables)
158    pub location: Option<String>,
159    /// Additional metadata serialized as JSON
160    pub metadata: Option<String>,
161}
162
163/// Information about a namespace stored in the manifest
164#[derive(Debug, Clone)]
165pub struct NamespaceInfo {
166    pub namespace: Vec<String>,
167    pub name: String,
168    pub metadata: Option<HashMap<String, String>>,
169}
170
171/// A wrapper around a Dataset that provides concurrent access.
172///
173/// This can be cloned cheaply. It supports concurrent reads or exclusive writes.
174/// The manifest dataset is always kept strongly consistent by reloading on each read.
175#[derive(Debug, Clone)]
176pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);
177
178impl DatasetConsistencyWrapper {
179    /// Create a new wrapper with the given dataset.
180    pub fn new(dataset: Dataset) -> Self {
181        Self(Arc::new(RwLock::new(dataset)))
182    }
183
184    /// Get an immutable reference to the dataset.
185    /// Always reloads to ensure strong consistency.
186    pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
187        self.reload().await?;
188        Ok(DatasetReadGuard {
189            guard: self.0.read().await,
190        })
191    }
192
193    /// Get a mutable reference to the dataset.
194    /// Always reloads to ensure strong consistency.
195    pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
196        self.reload().await?;
197        Ok(DatasetWriteGuard {
198            guard: self.0.write().await,
199        })
200    }
201
202    /// Provide a known latest version of the dataset.
203    ///
204    /// This is usually done after some write operation, which inherently will
205    /// have the latest version.
206    pub async fn set_latest(&self, dataset: Dataset) {
207        let mut write_guard = self.0.write().await;
208        if dataset.manifest().version > write_guard.manifest().version {
209            *write_guard = dataset;
210        }
211    }
212
213    /// Reload the dataset to the latest version.
214    async fn reload(&self) -> Result<()> {
215        // First check if we need to reload (with read lock)
216        let read_guard = self.0.read().await;
217        let dataset_uri = read_guard.uri().to_string();
218        let current_version = read_guard.version().version;
219        log::debug!(
220            "Reload starting for uri={}, current_version={}",
221            dataset_uri,
222            current_version
223        );
224        let latest_version = read_guard.latest_version_id().await.map_err(|e| {
225            lance_core::Error::from(NamespaceError::Internal {
226                message: format!("Failed to get latest version: {:?}", e),
227            })
228        })?;
229        log::debug!(
230            "Reload got latest_version={} for uri={}, current_version={}",
231            latest_version,
232            dataset_uri,
233            current_version
234        );
235        drop(read_guard);
236
237        // If already up-to-date, return early
238        if latest_version == current_version {
239            log::debug!("Already up-to-date for uri={}", dataset_uri);
240            return Ok(());
241        }
242
243        // Need to reload, acquire write lock
244        let mut write_guard = self.0.write().await;
245
246        // Double-check after acquiring write lock (someone else might have reloaded)
247        let latest_version = write_guard.latest_version_id().await.map_err(|e| {
248            lance_core::Error::from(NamespaceError::Internal {
249                message: format!("Failed to get latest version: {:?}", e),
250            })
251        })?;
252
253        if latest_version != write_guard.version().version {
254            write_guard.checkout_latest().await.map_err(|e| {
255                lance_core::Error::from(NamespaceError::Internal {
256                    message: format!("Failed to checkout latest: {:?}", e),
257                })
258            })?;
259        }
260
261        Ok(())
262    }
263}
264
265pub struct DatasetReadGuard<'a> {
266    guard: RwLockReadGuard<'a, Dataset>,
267}
268
269impl Deref for DatasetReadGuard<'_> {
270    type Target = Dataset;
271
272    fn deref(&self) -> &Self::Target {
273        &self.guard
274    }
275}
276
277pub struct DatasetWriteGuard<'a> {
278    guard: RwLockWriteGuard<'a, Dataset>,
279}
280
281impl Deref for DatasetWriteGuard<'_> {
282    type Target = Dataset;
283
284    fn deref(&self) -> &Self::Target {
285        &self.guard
286    }
287}
288
289impl DerefMut for DatasetWriteGuard<'_> {
290    fn deref_mut(&mut self) -> &mut Self::Target {
291        &mut self.guard
292    }
293}
294
295/// Manifest-based namespace implementation
296///
297/// Uses a special `__manifest` Lance table to track tables and nested namespaces.
298pub struct ManifestNamespace {
299    root: String,
300    storage_options: Option<HashMap<String, String>>,
301    session: Option<Arc<Session>>,
302    object_store: Arc<ObjectStore>,
303    base_path: Path,
304    manifest_dataset: DatasetConsistencyWrapper,
305    /// Whether directory listing is enabled in dual mode
306    /// If true, root namespace tables use {table_name}.lance naming
307    /// If false, they use namespace-prefixed names
308    dir_listing_enabled: bool,
309    /// Whether to perform inline optimization (compaction and indexing) on the __manifest table
310    /// after every write. Defaults to true.
311    inline_optimization_enabled: bool,
312    /// Number of retries for commit operations on the manifest table.
313    /// If None, defaults to [`lance_table::io::commit::CommitConfig`] default (20).
314    commit_retries: Option<u32>,
315    /// Serialize manifest mutations within a single namespace instance so concurrent
316    /// create/drop calls do not compete with each other on the same in-memory snapshot.
317    manifest_mutation_lock: Arc<Mutex<()>>,
318}
319
320impl std::fmt::Debug for ManifestNamespace {
321    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322        f.debug_struct("ManifestNamespace")
323            .field("root", &self.root)
324            .field("storage_options", &self.storage_options)
325            .field("dir_listing_enabled", &self.dir_listing_enabled)
326            .field(
327                "inline_optimization_enabled",
328                &self.inline_optimization_enabled,
329            )
330            .finish()
331    }
332}
333
334/// Convert a Lance commit error to an appropriate namespace error.
335///
336/// Maps lance commit errors to namespace errors:
337/// - `CommitConflict`: version collision retries exhausted -> Throttling (safe to retry)
338/// - `TooMuchWriteContention`: RetryableCommitConflict (semantic conflict) retries exhausted -> ConcurrentModification
339/// - `IncompatibleTransaction`: incompatible concurrent change -> ConcurrentModification
340/// - Errors containing "matched/duplicate/already exists": ConcurrentModification (from WhenMatched::Fail)
341/// - Other errors: IO error with the operation description
342fn convert_lance_commit_error(e: &LanceError, operation: &str, object_id: Option<&str>) -> Error {
343    match e {
344        // CommitConflict: version collision retries exhausted -> Throttling (safe to retry)
345        LanceError::CommitConflict { .. } => NamespaceError::Throttling {
346            message: format!("Too many concurrent writes, please retry later: {:?}", e),
347        }
348        .into(),
349        // TooMuchWriteContention: RetryableCommitConflict (semantic conflict) retries exhausted -> ConcurrentModification
350        // IncompatibleTransaction: incompatible concurrent change -> ConcurrentModification
351        LanceError::TooMuchWriteContention { .. } | LanceError::IncompatibleTransaction { .. } => {
352            let message = if let Some(id) = object_id {
353                format!(
354                    "Object '{}' was concurrently modified by another operation: {:?}",
355                    id, e
356                )
357            } else {
358                format!(
359                    "Object was concurrently modified by another operation: {:?}",
360                    e
361                )
362            };
363            NamespaceError::ConcurrentModification { message }.into()
364        }
365        // Other errors: check message for semantic conflicts (matched/duplicate from WhenMatched::Fail)
366        _ => {
367            let error_msg = e.to_string();
368            if error_msg.contains("matched")
369                || error_msg.contains("duplicate")
370                || error_msg.contains("already exists")
371            {
372                let message = if let Some(id) = object_id {
373                    format!(
374                        "Object '{}' was concurrently created by another operation: {:?}",
375                        id, e
376                    )
377                } else {
378                    format!(
379                        "Object was concurrently created by another operation: {:?}",
380                        e
381                    )
382                };
383                return NamespaceError::ConcurrentModification { message }.into();
384            }
385            lance_core::Error::from(NamespaceError::Internal {
386                message: format!("{}: {:?}", operation, e),
387            })
388        }
389    }
390}
391
392impl ManifestNamespace {
393    /// Create a new ManifestNamespace from an existing DirectoryNamespace
394    #[allow(clippy::too_many_arguments)]
395    pub async fn from_directory(
396        root: String,
397        storage_options: Option<HashMap<String, String>>,
398        session: Option<Arc<Session>>,
399        object_store: Arc<ObjectStore>,
400        base_path: Path,
401        dir_listing_enabled: bool,
402        inline_optimization_enabled: bool,
403        commit_retries: Option<u32>,
404        table_version_storage_enabled: bool,
405    ) -> Result<Self> {
406        let manifest_dataset = Self::ensure_manifest_table_up_to_date(
407            &root,
408            &storage_options,
409            session.clone(),
410            table_version_storage_enabled,
411        )
412        .await?;
413
414        Ok(Self {
415            root,
416            storage_options,
417            session,
418            object_store,
419            base_path,
420            manifest_dataset,
421            dir_listing_enabled,
422            inline_optimization_enabled,
423            commit_retries,
424            manifest_mutation_lock: Arc::new(Mutex::new(())),
425        })
426    }
427
428    /// Build object ID from namespace path and name
429    pub fn build_object_id(namespace: &[String], name: &str) -> String {
430        if namespace.is_empty() {
431            name.to_string()
432        } else {
433            let mut id = namespace.join(DELIMITER);
434            id.push_str(DELIMITER);
435            id.push_str(name);
436            id
437        }
438    }
439
440    /// Parse object ID into namespace path and name
441    pub fn parse_object_id(object_id: &str) -> (Vec<String>, String) {
442        let parts: Vec<&str> = object_id.split(DELIMITER).collect();
443        if parts.len() == 1 {
444            (Vec::new(), parts[0].to_string())
445        } else {
446            let namespace = parts[..parts.len() - 1]
447                .iter()
448                .map(|s| s.to_string())
449                .collect();
450            let name = parts[parts.len() - 1].to_string();
451            (namespace, name)
452        }
453    }
454
455    /// Split an object ID (vec of strings) into namespace and table name
456    pub fn split_object_id(object_id: &[String]) -> (Vec<String>, String) {
457        if object_id.len() == 1 {
458            (vec![], object_id[0].clone())
459        } else {
460            (
461                object_id[..object_id.len() - 1].to_vec(),
462                object_id[object_id.len() - 1].clone(),
463            )
464        }
465    }
466
467    /// Convert an ID (vec of strings) to an object_id string
468    pub fn str_object_id(object_id: &[String]) -> String {
469        object_id.join(DELIMITER)
470    }
471
472    fn format_table_id(table_id: &[String]) -> String {
473        format!("table id '{}'", Self::str_object_id(table_id))
474    }
475
476    /// Format a version number as a zero-padded lexicographically sortable string.
477    ///
478    /// Versions are stored as 20-digit zero-padded integers (e.g., `00000000000000000001`
479    /// for version 1) so that string-based range queries and sorting work correctly.
480    pub fn format_table_version(version: i64) -> String {
481        format!("{:020}", version)
482    }
483
484    /// Build the object_id for a table version entry.
485    ///
486    /// Format: `{table_object_id}${zero_padded_version}`
487    pub fn build_version_object_id(table_object_id: &str, version: i64) -> String {
488        format!(
489            "{}{}{}",
490            table_object_id,
491            DELIMITER,
492            Self::format_table_version(version)
493        )
494    }
495
496    /// Parse a version number from the version suffix of a table version object_id.
497    ///
498    /// The object_id is formatted as `{table_id}${zero_padded_version}`.
499    pub fn parse_version_from_object_id(object_id: &str) -> Option<i64> {
500        let (_namespace, name) = Self::parse_object_id(object_id);
501        name.parse::<i64>().ok()
502    }
503
504    /// Generate a new directory name in format: `<hash>_<object_id>`
505    /// The hash is used to (1) optimize object store throughput,
506    /// (2) have high enough entropy in a short period of time to prevent issues like
507    /// failed table creation, delete and create new table of the same name, etc.
508    /// The object_id is added after the hash to ensure
509    /// dir name uniqueness and make debugging easier.
510    pub fn generate_dir_name(object_id: &str) -> String {
511        // Generate a random number for uniqueness
512        let random_num: u64 = rand::random();
513
514        // Create hash from random number + object_id
515        let mut hasher = DefaultHasher::new();
516        random_num.hash(&mut hasher);
517        object_id.hash(&mut hasher);
518        let hash = hasher.finish();
519
520        // Format as lowercase hex (8 characters - sufficient entropy for uniqueness)
521        format!("{:08x}_{}", (hash & 0xFFFFFFFF) as u32, object_id)
522    }
523
524    /// Construct a full URI from root and relative location
525    pub(crate) fn construct_full_uri(root: &str, relative_location: &str) -> Result<String> {
526        let mut base_url = lance_io::object_store::uri_to_url(root)?;
527
528        // Ensure the base URL has a trailing slash so that path segment mutation
529        // appends rather than replaces the last path segment.
530        // Without this fix, appending "table.lance" to "s3://bucket/path/subdir"
531        // would incorrectly produce "s3://bucket/path/table.lance" (missing subdir).
532        if !base_url.path().ends_with('/') {
533            base_url.set_path(&format!("{}/", base_url.path()));
534        }
535
536        let mut full_url = base_url.clone();
537        full_url
538            .path_segments_mut()
539            .map_err(|_| {
540                lance_core::Error::from(NamespaceError::InvalidInput {
541                    message: format!("Cannot modify path segments for URI '{}'", root),
542                })
543            })?
544            .pop_if_empty()
545            .extend(
546                relative_location
547                    .split('/')
548                    .filter(|segment| !segment.is_empty()),
549            );
550
551        // Clear any query string to avoid trailing "?" in the URL.
552        // Use set_query(None) instead of set_query("") because the latter
553        // would still add a trailing '?' to the URL when serialized.
554        full_url.set_query(None);
555
556        Ok(full_url.to_string())
557    }
558
559    /// Perform inline optimization on the __manifest table.
560    ///
561    /// This method:
562    /// 1. Creates three indexes on the manifest table:
563    ///    - BTREE index on object_id for fast lookups
564    ///    - Bitmap index on object_type for filtering by type
565    ///    - LabelList index on base_objects for view dependencies
566    /// 2. Runs file compaction to merge small files
567    /// 3. Optimizes existing indices
568    ///
569    /// This is called automatically after writes when inline_optimization_enabled is true.
570    async fn run_inline_optimization(&self) -> Result<()> {
571        if !self.inline_optimization_enabled {
572            return Ok(());
573        }
574
575        // Get a mutable reference to the dataset to perform optimization
576        let mut dataset_guard = self.manifest_dataset.get_mut().await?;
577        let dataset: &mut Dataset = &mut dataset_guard;
578
579        // Step 1: Create indexes if they don't already exist
580        let indices = dataset.load_indices().await?;
581
582        // Check which indexes already exist
583        let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
584        let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
585        let has_base_objects_index = indices
586            .iter()
587            .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);
588
589        // Create BTREE index on object_id
590        if !has_object_id_index {
591            log::debug!(
592                "Creating BTREE index '{}' on object_id for __manifest table",
593                OBJECT_ID_INDEX_NAME
594            );
595            let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
596            if let Err(e) = dataset
597                .create_index(
598                    &["object_id"],
599                    IndexType::BTree,
600                    Some(OBJECT_ID_INDEX_NAME.to_string()),
601                    &params,
602                    true,
603                )
604                .await
605            {
606                log::warn!(
607                    "Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.",
608                    e
609                );
610            } else {
611                log::info!(
612                    "Created BTREE index '{}' on object_id for __manifest table",
613                    OBJECT_ID_INDEX_NAME
614                );
615            }
616        }
617
618        // Create Bitmap index on object_type
619        if !has_object_type_index {
620            log::debug!(
621                "Creating Bitmap index '{}' on object_type for __manifest table",
622                OBJECT_TYPE_INDEX_NAME
623            );
624            let params = ScalarIndexParams::default();
625            if let Err(e) = dataset
626                .create_index(
627                    &["object_type"],
628                    IndexType::Bitmap,
629                    Some(OBJECT_TYPE_INDEX_NAME.to_string()),
630                    &params,
631                    true,
632                )
633                .await
634            {
635                log::warn!(
636                    "Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.",
637                    e
638                );
639            } else {
640                log::info!(
641                    "Created Bitmap index '{}' on object_type for __manifest table",
642                    OBJECT_TYPE_INDEX_NAME
643                );
644            }
645        }
646
647        // Create LabelList index on base_objects
648        if !has_base_objects_index {
649            log::debug!(
650                "Creating LabelList index '{}' on base_objects for __manifest table",
651                BASE_OBJECTS_INDEX_NAME
652            );
653            let params = ScalarIndexParams::default();
654            if let Err(e) = dataset
655                .create_index(
656                    &["base_objects"],
657                    IndexType::LabelList,
658                    Some(BASE_OBJECTS_INDEX_NAME.to_string()),
659                    &params,
660                    true,
661                )
662                .await
663            {
664                log::warn!(
665                    "Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.",
666                    e
667                );
668            } else {
669                log::info!(
670                    "Created LabelList index '{}' on base_objects for __manifest table",
671                    BASE_OBJECTS_INDEX_NAME
672                );
673            }
674        }
675
676        let should_compact_and_optimize =
677            dataset.count_fragments() >= MANIFEST_INLINE_OPTIMIZATION_FRAGMENT_THRESHOLD;
678
679        if !should_compact_and_optimize {
680            return Ok(());
681        }
682
683        // Step 2: Run file compaction
684        log::debug!("Running file compaction on __manifest table");
685        match compact_files(dataset, CompactionOptions::default(), None).await {
686            Ok(compaction_metrics) => {
687                if compaction_metrics.fragments_removed > 0 {
688                    log::info!(
689                        "Compacted __manifest table: removed {} fragments, added {} fragments",
690                        compaction_metrics.fragments_removed,
691                        compaction_metrics.fragments_added
692                    );
693                }
694            }
695            Err(e) => {
696                log::warn!(
697                    "Failed to compact files for __manifest table: {:?}. Continuing with optimization.",
698                    e
699                );
700            }
701        }
702
703        // Step 3: Optimize indices
704        log::debug!("Optimizing indices on __manifest table");
705        match dataset.optimize_indices(&OptimizeOptions::default()).await {
706            Ok(_) => {
707                log::info!("Successfully optimized indices on __manifest table");
708            }
709            Err(e) => {
710                log::warn!(
711                    "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.",
712                    e
713                );
714            }
715        }
716
717        Ok(())
718    }
719
720    /// Get the manifest schema
721    fn manifest_schema() -> Arc<ArrowSchema> {
722        Arc::new(ArrowSchema::new(vec![
723            // Set unenforced primary key on object_id for bloom filter conflict detection
724            Field::new("object_id", DataType::Utf8, false).with_metadata(
725                [(
726                    LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(),
727                    "0".to_string(),
728                )]
729                .into_iter()
730                .collect(),
731            ),
732            Field::new("object_type", DataType::Utf8, false),
733            Field::new("location", DataType::Utf8, true),
734            Field::new("metadata", DataType::Utf8, true),
735            Field::new(
736                "base_objects",
737                DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))),
738                true,
739            ),
740        ]))
741    }
742
743    /// Get a scanner for the manifest dataset
744    async fn manifest_scanner(&self) -> Result<Scanner> {
745        let dataset_guard = self.manifest_dataset.get().await?;
746        Ok(dataset_guard.scan())
747    }
748
749    /// Helper to execute a scanner and collect results into a Vec
750    async fn execute_scanner(scanner: Scanner) -> Result<Vec<RecordBatch>> {
751        let mut stream = scanner.try_into_stream().await.map_err(|e| {
752            lance_core::Error::from(NamespaceError::Internal {
753                message: format!("Failed to create stream: {:?}", e),
754            })
755        })?;
756
757        let mut batches = Vec::new();
758        while let Some(batch) = stream.next().await {
759            batches.push(batch.map_err(|e| {
760                lance_core::Error::from(NamespaceError::Internal {
761                    message: format!("Failed to read batch: {:?}", e),
762                })
763            })?);
764        }
765
766        Ok(batches)
767    }
768
769    /// Helper to get a string column from a record batch
770    fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> {
771        let column = batch.column_by_name(column_name).ok_or_else(|| {
772            lance_core::Error::from(NamespaceError::Internal {
773                message: format!("Column '{}' not found", column_name),
774            })
775        })?;
776        column
777            .as_any()
778            .downcast_ref::<StringArray>()
779            .ok_or_else(|| {
780                lance_core::Error::from(NamespaceError::Internal {
781                    message: format!("Column '{}' is not a string array", column_name),
782                })
783            })
784    }
785
786    /// Check if the manifest contains an object with the given ID
787    async fn manifest_contains_object(&self, object_id: &str) -> Result<bool> {
788        let escaped_id = object_id.replace('\'', "''");
789        let filter = format!("object_id = '{}'", escaped_id);
790
791        let dataset_guard = self.manifest_dataset.get().await?;
792        let mut scanner = dataset_guard.scan();
793
794        scanner.filter(&filter).map_err(|e| {
795            lance_core::Error::from(NamespaceError::Internal {
796                message: format!("Failed to filter: {:?}", e),
797            })
798        })?;
799
800        // Project no columns and enable row IDs for count_rows to work
801        scanner.project::<&str>(&[]).map_err(|e| {
802            lance_core::Error::from(NamespaceError::Internal {
803                message: format!("Failed to project: {:?}", e),
804            })
805        })?;
806
807        scanner.with_row_id();
808
809        let count = scanner.count_rows().await.map_err(|e| {
810            lance_core::Error::from(NamespaceError::Internal {
811                message: format!("Failed to count rows: {:?}", e),
812            })
813        })?;
814
815        Ok(count > 0)
816    }
817
818    /// Query the manifest for a table with the given object ID
819    async fn query_manifest_for_table(&self, object_id: &str) -> Result<Option<TableInfo>> {
820        let escaped_id = object_id.replace('\'', "''");
821        let filter = format!("object_id = '{}' AND object_type = 'table'", escaped_id);
822        let mut scanner = self.manifest_scanner().await?;
823        scanner.filter(&filter).map_err(|e| {
824            lance_core::Error::from(NamespaceError::Internal {
825                message: format!("Failed to filter: {:?}", e),
826            })
827        })?;
828        scanner
829            .project(&["object_id", "location", "metadata"])
830            .map_err(|e| {
831                lance_core::Error::from(NamespaceError::Internal {
832                    message: format!("Failed to project: {:?}", e),
833                })
834            })?;
835        let batches = Self::execute_scanner(scanner).await?;
836
837        let mut found_result: Option<TableInfo> = None;
838        let mut total_rows = 0;
839
840        for batch in batches {
841            if batch.num_rows() == 0 {
842                continue;
843            }
844
845            total_rows += batch.num_rows();
846            if total_rows > 1 {
847                return Err(NamespaceError::Internal {
848                    message: format!(
849                        "Expected exactly 1 table with id '{}', found {}",
850                        object_id, total_rows
851                    ),
852                }
853                .into());
854            }
855
856            let object_id_array = Self::get_string_column(&batch, "object_id")?;
857            let location_array = Self::get_string_column(&batch, "location")?;
858            let metadata_array = Self::get_string_column(&batch, "metadata")?;
859            let location = location_array.value(0).to_string();
860            let metadata = if !metadata_array.is_null(0) {
861                let metadata_str = metadata_array.value(0);
862                match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
863                    Ok(map) => Some(map),
864                    Err(e) => {
865                        return Err(NamespaceError::Internal {
866                            message: format!(
867                                "Failed to deserialize metadata for table '{}': {}",
868                                object_id, e
869                            ),
870                        }
871                        .into());
872                    }
873                }
874            } else {
875                None
876            };
877            let (namespace, name) = Self::parse_object_id(object_id_array.value(0));
878            found_result = Some(TableInfo {
879                namespace,
880                name,
881                location,
882                metadata,
883            });
884        }
885
886        Ok(found_result)
887    }
888
889    fn serialize_metadata(
890        properties: Option<&HashMap<String, String>>,
891        object_type: &str,
892        object_id: &str,
893    ) -> Result<Option<String>> {
894        match properties {
895            Some(properties) if !properties.is_empty() => {
896                serde_json::to_string(properties).map(Some).map_err(|e| {
897                    LanceError::from(NamespaceError::Internal {
898                        message: format!(
899                            "Failed to serialize {} metadata for '{}': {}",
900                            object_type, object_id, e
901                        ),
902                    })
903                })
904            }
905            _ => Ok(None),
906        }
907    }
908
909    pub(crate) async fn path_has_actual_manifests(
910        object_store: &ObjectStore,
911        table_path: &Path,
912    ) -> Result<bool> {
913        let versions_path = table_path
914            .clone()
915            .join(lance_table::io::commit::VERSIONS_DIR);
916        // `_versions/` should only contain manifest files, so probing the first entry is enough
917        // to distinguish declared-only tables (empty `_versions/`) from created tables.
918        Ok(object_store
919            .list(Some(versions_path))
920            .try_next()
921            .await?
922            .is_some())
923    }
924
925    async fn location_has_actual_manifests(&self, location: &str) -> Result<bool> {
926        Self::path_has_actual_manifests(&self.object_store, &self.base_path.clone().join(location))
927            .await
928    }
929
930    pub(crate) fn is_not_found_load_error(err: &LanceError) -> bool {
931        match err {
932            LanceError::NotFound { .. } => true,
933            LanceError::IO { source, .. } => source
934                .downcast_ref::<ObjectStoreError>()
935                .is_some_and(|source| matches!(source, ObjectStoreError::NotFound { .. })),
936            LanceError::DatasetNotFound { source, .. } => {
937                source
938                    .downcast_ref::<LanceError>()
939                    .is_some_and(|source| matches!(source, LanceError::NotFound { .. }))
940                    || source
941                        .downcast_ref::<ObjectStoreError>()
942                        .is_some_and(|source| matches!(source, ObjectStoreError::NotFound { .. }))
943            }
944            _ => false,
945        }
946    }
947
948    /// List all table locations in the manifest (for root namespace only)
949    /// Returns a set of table locations (e.g., "table_name.lance")
950    pub async fn list_manifest_table_locations(&self) -> Result<std::collections::HashSet<String>> {
951        let filter = "object_type = 'table' AND NOT contains(object_id, '$')";
952        let mut scanner = self.manifest_scanner().await?;
953        scanner.filter(filter).map_err(|e| {
954            lance_core::Error::from(NamespaceError::Internal {
955                message: format!("Failed to filter: {:?}", e),
956            })
957        })?;
958        scanner.project(&["location"]).map_err(|e| {
959            lance_core::Error::from(NamespaceError::Internal {
960                message: format!("Failed to project: {:?}", e),
961            })
962        })?;
963
964        let batches = Self::execute_scanner(scanner).await?;
965        let mut locations = std::collections::HashSet::new();
966
967        for batch in batches {
968            if batch.num_rows() == 0 {
969                continue;
970            }
971            let location_array = Self::get_string_column(&batch, "location")?;
972            for i in 0..location_array.len() {
973                locations.insert(location_array.value(i).to_string());
974            }
975        }
976
977        Ok(locations)
978    }
979
980    /// Insert an entry into the manifest table
981    async fn insert_into_manifest(
982        &self,
983        object_id: String,
984        object_type: ObjectType,
985        location: Option<String>,
986    ) -> Result<()> {
987        self.insert_into_manifest_with_metadata(
988            vec![ManifestEntry {
989                object_id,
990                object_type,
991                location,
992                metadata: None,
993            }],
994            None,
995        )
996        .await
997    }
998
999    /// Insert one or more entries into the manifest table with metadata and base_objects.
1000    ///
1001    /// This is the unified entry point for both single and batch inserts.
1002    /// Uses a single MergeInsert operation to insert all entries at once.
1003    /// If any entry already exists (matching object_id), the entire batch fails.
1004    pub async fn insert_into_manifest_with_metadata(
1005        &self,
1006        entries: Vec<ManifestEntry>,
1007        base_objects: Option<Vec<String>>,
1008    ) -> Result<()> {
1009        self.merge_into_manifest_with_metadata(entries, base_objects, WhenMatched::Fail)
1010            .await
1011    }
1012
1013    async fn upsert_into_manifest_with_metadata(
1014        &self,
1015        entries: Vec<ManifestEntry>,
1016        base_objects: Option<Vec<String>>,
1017    ) -> Result<()> {
1018        self.merge_into_manifest_with_metadata(entries, base_objects, WhenMatched::UpdateAll)
1019            .await
1020    }
1021
1022    async fn merge_into_manifest_with_metadata(
1023        &self,
1024        entries: Vec<ManifestEntry>,
1025        base_objects: Option<Vec<String>>,
1026        when_matched: WhenMatched,
1027    ) -> Result<()> {
1028        if entries.is_empty() {
1029            return Ok(());
1030        }
1031
1032        let schema = Self::manifest_schema();
1033
1034        let mut object_ids = Vec::with_capacity(entries.len());
1035        let mut object_types = Vec::with_capacity(entries.len());
1036        let mut locations: Vec<Option<String>> = Vec::with_capacity(entries.len());
1037        let mut metadatas: Vec<Option<String>> = Vec::with_capacity(entries.len());
1038
1039        let string_builder = StringBuilder::new();
1040        let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
1041            "object_id",
1042            DataType::Utf8,
1043            true,
1044        )));
1045
1046        for (i, entry) in entries.iter().enumerate() {
1047            object_ids.push(entry.object_id.as_str());
1048            object_types.push(entry.object_type.as_str());
1049            locations.push(entry.location.clone());
1050            metadatas.push(entry.metadata.clone());
1051
1052            // Only the first entry gets the base_objects (for single-entry inserts
1053            // with base_objects like view creation); batch entries use null.
1054            if i == 0 {
1055                match &base_objects {
1056                    Some(objects) => {
1057                        for obj in objects {
1058                            list_builder.values().append_value(obj);
1059                        }
1060                        list_builder.append(true);
1061                    }
1062                    None => {
1063                        list_builder.append_null();
1064                    }
1065                }
1066            } else {
1067                list_builder.append_null();
1068            }
1069        }
1070
1071        let base_objects_array = list_builder.finish();
1072
1073        let location_array: Arc<dyn Array> = Arc::new(StringArray::from(
1074            locations.iter().map(|l| l.as_deref()).collect::<Vec<_>>(),
1075        ));
1076
1077        let metadata_array: Arc<dyn Array> = Arc::new(StringArray::from(
1078            metadatas.iter().map(|m| m.as_deref()).collect::<Vec<_>>(),
1079        ));
1080
1081        let batch = RecordBatch::try_new(
1082            schema.clone(),
1083            vec![
1084                Arc::new(StringArray::from(object_ids)),
1085                Arc::new(StringArray::from(object_types.to_vec())),
1086                location_array,
1087                metadata_array,
1088                Arc::new(base_objects_array),
1089            ],
1090        )
1091        .map_err(|e| {
1092            lance_core::Error::from(NamespaceError::Internal {
1093                message: format!("Failed to create manifest entries: {:?}", e),
1094            })
1095        })?;
1096
1097        let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
1098
1099        // Use MergeInsert so callers can choose fail-on-existing inserts or metadata upserts.
1100        let _mutation_guard = self.manifest_mutation_lock.lock().await;
1101        let dataset_guard = self.manifest_dataset.get().await?;
1102        let dataset_arc = Arc::new(dataset_guard.clone());
1103        drop(dataset_guard); // Drop read guard before merge insert
1104
1105        let mut merge_builder =
1106            MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()]).map_err(
1107                |e| {
1108                    lance_core::Error::from(NamespaceError::Internal {
1109                        message: format!("Failed to create merge builder: {:?}", e),
1110                    })
1111                },
1112            )?;
1113        merge_builder.when_matched(when_matched);
1114        merge_builder.when_not_matched(WhenNotMatched::InsertAll);
1115        // Use conflict_retries to handle cross-process races on manifest mutations.
1116        merge_builder.conflict_retries(5);
1117        // TODO: after BTREE index creation on object_id, has_scalar_index=true causes
1118        // MergeInsert to use V1 path which lacks bloom filters for conflict detection. This
1119        // results in (Some, None) filter mismatch when rebasing against V2 operations.
1120        // Setting use_index=false ensures all operations consistently use V2 path.
1121        merge_builder.use_index(false);
1122        if let Some(retries) = self.commit_retries {
1123            merge_builder.commit_retries(retries);
1124        }
1125
1126        let (new_dataset_arc, _merge_stats) = merge_builder
1127            .try_build()
1128            .map_err(|e| {
1129                lance_core::Error::from(NamespaceError::Internal {
1130                    message: format!("Failed to build merge: {:?}", e),
1131                })
1132            })?
1133            .execute_reader(Box::new(reader))
1134            .await
1135            .map_err(|e| {
1136                convert_lance_commit_error(&e, "Failed to execute merge insert into manifest", None)
1137            })?;
1138
1139        let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
1140        self.manifest_dataset.set_latest(new_dataset).await;
1141
1142        // Run inline optimization after write
1143        if let Err(e) = self.run_inline_optimization().await {
1144            log::warn!(
1145                "Unexpected failure when running inline optimization: {:?}",
1146                e
1147            );
1148        }
1149
1150        Ok(())
1151    }
1152
1153    /// Delete an entry from the manifest table
1154    pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> {
1155        let predicate = format!("object_id = '{}'", object_id);
1156
1157        // Get dataset and use DeleteBuilder with configured retries
1158        let _mutation_guard = self.manifest_mutation_lock.lock().await;
1159        let dataset_guard = self.manifest_dataset.get().await?;
1160        let dataset = Arc::new(dataset_guard.clone());
1161        drop(dataset_guard); // Drop read guard before delete
1162
1163        let new_dataset = DeleteBuilder::new(dataset, &predicate)
1164            .execute()
1165            .await
1166            .map_err(|e| convert_lance_commit_error(&e, "Failed to delete", None))?;
1167
1168        // Update the wrapper with the new dataset
1169        self.manifest_dataset
1170            .set_latest(
1171                Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1172            )
1173            .await;
1174
1175        // Run inline optimization after delete
1176        if let Err(e) = self.run_inline_optimization().await {
1177            log::warn!(
1178                "Unexpected failure when running inline optimization: {:?}",
1179                e
1180            );
1181        }
1182
1183        Ok(())
1184    }
1185
1186    /// Query the manifest for all versions of a table, sorted by version.
1187    ///
1188    /// Returns a list of (version, metadata_json_string) tuples where metadata_json_string
1189    /// contains the full metadata JSON stored in the manifest (manifest_path, manifest_size,
1190    /// e_tag, naming_scheme).
1191    ///
1192    /// **Known limitation**: All matching rows are loaded into memory, sorted in Rust,
1193    /// and then truncated. For tables with a very large number of versions this may be
1194    /// expensive. Pushing sort/limit into the scan is not yet supported by Lance.
1195    pub async fn query_table_versions(
1196        &self,
1197        object_id: &str,
1198        descending: bool,
1199        limit: Option<i32>,
1200    ) -> Result<Vec<(i64, String)>> {
1201        let escaped_id = object_id.replace('\'', "''");
1202        // table_version object_ids are formatted as "{object_id}${zero_padded_version}"
1203        let filter = format!(
1204            "object_type = 'table_version' AND starts_with(object_id, '{}{}')",
1205            escaped_id, DELIMITER
1206        );
1207        let mut scanner = self.manifest_scanner().await?;
1208        scanner.filter(&filter).map_err(|e| {
1209            lance_core::Error::from(NamespaceError::Internal {
1210                message: format!("Failed to filter: {:?}", e),
1211            })
1212        })?;
1213        scanner.project(&["object_id", "metadata"]).map_err(|e| {
1214            lance_core::Error::from(NamespaceError::Internal {
1215                message: format!("Failed to project: {:?}", e),
1216            })
1217        })?;
1218        let batches = Self::execute_scanner(scanner).await?;
1219
1220        let mut versions: Vec<(i64, String)> = Vec::new();
1221        for batch in batches {
1222            if batch.num_rows() == 0 {
1223                continue;
1224            }
1225            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1226            let metadata_array = Self::get_string_column(&batch, "metadata")?;
1227            for i in 0..batch.num_rows() {
1228                let oid = object_id_array.value(i);
1229                // Parse version from object_id
1230                if let Some(version) = Self::parse_version_from_object_id(oid) {
1231                    let metadata_str = metadata_array.value(i).to_string();
1232                    versions.push((version, metadata_str));
1233                }
1234            }
1235        }
1236
1237        if descending {
1238            versions.sort_by(|a, b| b.0.cmp(&a.0));
1239        } else {
1240            versions.sort_by(|a, b| a.0.cmp(&b.0));
1241        }
1242
1243        if let Some(limit) = limit {
1244            versions.truncate(limit as usize);
1245        }
1246
1247        Ok(versions)
1248    }
1249
1250    /// Query the manifest for a specific version of a table.
1251    ///
1252    /// Returns the full metadata JSON string if found, which contains
1253    /// manifest_path, manifest_size, e_tag, and naming_scheme.
1254    ///
1255    pub async fn query_table_version(
1256        &self,
1257        object_id: &str,
1258        version: i64,
1259    ) -> Result<Option<String>> {
1260        let version_object_id = Self::build_version_object_id(object_id, version);
1261        self.query_table_version_by_object_id(&version_object_id)
1262            .await
1263    }
1264
1265    /// Query a specific table version by its exact object_id.
1266    async fn query_table_version_by_object_id(
1267        &self,
1268        version_object_id: &str,
1269    ) -> Result<Option<String>> {
1270        let escaped_id = version_object_id.replace('\'', "''");
1271        let filter = format!(
1272            "object_id = '{}' AND object_type = 'table_version'",
1273            escaped_id
1274        );
1275        let mut scanner = self.manifest_scanner().await?;
1276        scanner.filter(&filter).map_err(|e| {
1277            lance_core::Error::from(NamespaceError::Internal {
1278                message: format!("Failed to filter: {:?}", e),
1279            })
1280        })?;
1281        scanner.project(&["metadata"]).map_err(|e| {
1282            lance_core::Error::from(NamespaceError::Internal {
1283                message: format!("Failed to project: {:?}", e),
1284            })
1285        })?;
1286        let batches = Self::execute_scanner(scanner).await?;
1287
1288        for batch in batches {
1289            if batch.num_rows() == 0 {
1290                continue;
1291            }
1292            let metadata_array = Self::get_string_column(&batch, "metadata")?;
1293            return Ok(Some(metadata_array.value(0).to_string()));
1294        }
1295
1296        Ok(None)
1297    }
1298
1299    /// Delete table version entries from the manifest for a given table and version ranges.
1300    ///
1301    /// Each range is (start_version, end_version) inclusive. Deletes all matching
1302    /// `object_type = 'table_version'` entries whose object_id matches
1303    /// `{object_id}${zero_padded_version}`.
1304    ///
1305    /// Builds a single filter expression covering all version ranges and executes
1306    /// one bulk delete operation instead of deleting versions one at a time.
1307    pub async fn delete_table_versions(
1308        &self,
1309        object_id: &str,
1310        ranges: &[(i64, i64)],
1311    ) -> Result<i64> {
1312        if ranges.is_empty() {
1313            return Ok(0);
1314        }
1315
1316        // Collect all object_ids to delete (both new zero-padded and legacy formats)
1317        let mut object_id_conditions: Vec<String> = Vec::new();
1318        for (start, end) in ranges {
1319            for version in *start..=*end {
1320                let oid = Self::build_version_object_id(object_id, version);
1321                let escaped = oid.replace('\'', "''");
1322                object_id_conditions.push(format!("'{}'", escaped));
1323            }
1324        }
1325
1326        if object_id_conditions.is_empty() {
1327            return Ok(0);
1328        }
1329
1330        // First, count how many entries exist so we can report the deleted count
1331        let in_list = object_id_conditions.join(", ");
1332        let filter = format!(
1333            "object_type = 'table_version' AND object_id IN ({})",
1334            in_list
1335        );
1336
1337        let mut scanner = self.manifest_scanner().await?;
1338        scanner.filter(&filter).map_err(|e| {
1339            lance_core::Error::from(NamespaceError::Internal {
1340                message: format!("Failed to filter: {:?}", e),
1341            })
1342        })?;
1343        scanner.project(&["object_id", "location"]).map_err(|e| {
1344            lance_core::Error::from(NamespaceError::Internal {
1345                message: format!("Failed to project: {:?}", e),
1346            })
1347        })?;
1348        let batches = Self::execute_scanner(scanner).await?;
1349        let deleted_count: i64 = batches.iter().map(|b| b.num_rows() as i64).sum();
1350
1351        if deleted_count == 0 {
1352            return Ok(0);
1353        }
1354
1355        // Execute a single bulk delete with the combined filter
1356        let _mutation_guard = self.manifest_mutation_lock.lock().await;
1357        let dataset_guard = self.manifest_dataset.get().await?;
1358        let dataset = Arc::new(dataset_guard.clone());
1359        drop(dataset_guard);
1360
1361        let new_dataset = DeleteBuilder::new(dataset, &filter)
1362            .execute()
1363            .await
1364            .map_err(|e| {
1365                convert_lance_commit_error(&e, "Failed to batch delete table versions", None)
1366            })?;
1367
1368        self.manifest_dataset
1369            .set_latest(
1370                Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1371            )
1372            .await;
1373
1374        if let Err(e) = self.run_inline_optimization().await {
1375            log::warn!(
1376                "Unexpected failure when running inline optimization: {:?}",
1377                e
1378            );
1379        }
1380
1381        Ok(deleted_count)
1382    }
1383
1384    /// Atomically delete table version entries from the manifest by their object_ids.
1385    ///
1386    /// This method supports multi-table transactional deletion: all specified
1387    /// object_ids (which may span multiple tables) are deleted in a single atomic
1388    /// `DeleteBuilder` operation. Either all entries are removed or none are.
1389    ///
1390    /// Object IDs are formatted as `{table_id}${version}`.
1391    pub async fn batch_delete_table_versions_by_object_ids(
1392        &self,
1393        object_ids: &[String],
1394    ) -> Result<i64> {
1395        if object_ids.is_empty() {
1396            return Ok(0);
1397        }
1398
1399        let in_list: String = object_ids
1400            .iter()
1401            .map(|oid| {
1402                let escaped = oid.replace('\'', "''");
1403                format!("'{}'", escaped)
1404            })
1405            .collect::<Vec<_>>()
1406            .join(", ");
1407
1408        let filter = format!(
1409            "object_type = 'table_version' AND object_id IN ({})",
1410            in_list
1411        );
1412
1413        // Count how many entries exist so we can report the deleted count
1414        let mut scanner = self.manifest_scanner().await?;
1415        scanner.filter(&filter).map_err(|e| {
1416            lance_core::Error::from(NamespaceError::Internal {
1417                message: format!("Failed to filter: {:?}", e),
1418            })
1419        })?;
1420        scanner.project(&["object_id", "location"]).map_err(|e| {
1421            lance_core::Error::from(NamespaceError::Internal {
1422                message: format!("Failed to project: {:?}", e),
1423            })
1424        })?;
1425        let batches = Self::execute_scanner(scanner).await?;
1426        let deleted_count: i64 = batches.iter().map(|b| b.num_rows() as i64).sum();
1427
1428        if deleted_count == 0 {
1429            return Ok(0);
1430        }
1431
1432        // Execute a single atomic bulk delete covering all tables
1433        let _mutation_guard = self.manifest_mutation_lock.lock().await;
1434        let dataset_guard = self.manifest_dataset.get().await?;
1435        let dataset = Arc::new(dataset_guard.clone());
1436        drop(dataset_guard);
1437
1438        let new_dataset = DeleteBuilder::new(dataset, &filter)
1439            .execute()
1440            .await
1441            .map_err(|e| {
1442                convert_lance_commit_error(
1443                    &e,
1444                    "Failed to batch delete table versions across multiple tables",
1445                    None,
1446                )
1447            })?;
1448
1449        self.manifest_dataset
1450            .set_latest(
1451                Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1452            )
1453            .await;
1454
1455        if let Err(e) = self.run_inline_optimization().await {
1456            log::warn!(
1457                "Unexpected failure when running inline optimization: {:?}",
1458                e
1459            );
1460        }
1461
1462        Ok(deleted_count)
1463    }
1464
1465    /// Set a property flag in the __manifest table's metadata key-value map.
1466    ///
1467    /// This uses `dataset.update_metadata()` to persist the flag in the
1468    /// __manifest dataset's table metadata, rather than inserting a row.
1469    /// If the property already exists with the same value, this is a no-op.
1470    pub async fn set_property(&self, name: &str, value: &str) -> Result<()> {
1471        let _mutation_guard = self.manifest_mutation_lock.lock().await;
1472        let dataset_guard = self.manifest_dataset.get().await?;
1473        if dataset_guard.metadata().get(name) == Some(&value.to_string()) {
1474            return Ok(());
1475        }
1476        drop(dataset_guard);
1477
1478        let mut dataset_guard = self.manifest_dataset.get_mut().await?;
1479        dataset_guard
1480            .update_metadata([(name, value)])
1481            .await
1482            .map_err(|e| {
1483                lance_core::Error::from(NamespaceError::Internal {
1484                    message: format!(
1485                        "Failed to set property '{}' in __manifest metadata: {}",
1486                        name, e
1487                    ),
1488                })
1489            })?;
1490        Ok(())
1491    }
1492
1493    /// Check if a property flag exists in the __manifest table's metadata key-value map.
1494    pub async fn has_property(&self, name: &str) -> Result<bool> {
1495        let dataset_guard = self.manifest_dataset.get().await?;
1496        Ok(dataset_guard.metadata().contains_key(name))
1497    }
1498
1499    /// Parse metadata JSON into a `TableVersion`.
1500    ///
1501    /// Returns `None` if metadata is invalid or missing required fields.
1502    fn parse_table_version(version: i64, metadata_str: &str) -> Option<TableVersion> {
1503        let meta: serde_json::Value = match serde_json::from_str(metadata_str) {
1504            Ok(v) => v,
1505            Err(e) => {
1506                log::warn!(
1507                    "Skipping version {} due to invalid metadata JSON: {}",
1508                    version,
1509                    e
1510                );
1511                return None;
1512            }
1513        };
1514        let manifest_path = match meta.get("manifest_path").and_then(|v| v.as_str()) {
1515            Some(p) => p.to_string(),
1516            None => {
1517                log::warn!(
1518                    "Skipping version {} due to missing 'manifest_path' in metadata — \
1519                     this may indicate data corruption",
1520                    version
1521                );
1522                return None;
1523            }
1524        };
1525        let manifest_size = meta.get("manifest_size").and_then(|v| v.as_i64());
1526        let e_tag = meta
1527            .get("e_tag")
1528            .and_then(|v| v.as_str())
1529            .map(|s| s.to_string());
1530        Some(TableVersion {
1531            version,
1532            manifest_path,
1533            manifest_size,
1534            e_tag,
1535            timestamp_millis: None,
1536            metadata: None,
1537        })
1538    }
1539
1540    /// List table versions from the __manifest table.
1541    ///
1542    /// Queries the manifest for all versions of the given table and returns
1543    /// them as a `ListTableVersionsResponse`.
1544    pub async fn list_table_versions(
1545        &self,
1546        table_id: &[String],
1547        descending: bool,
1548        limit: Option<i32>,
1549    ) -> Result<ListTableVersionsResponse> {
1550        let object_id = Self::str_object_id(table_id);
1551        let manifest_versions = self
1552            .query_table_versions(&object_id, descending, limit)
1553            .await?;
1554
1555        let table_versions: Vec<TableVersion> = manifest_versions
1556            .into_iter()
1557            .filter_map(|(version, metadata_str)| Self::parse_table_version(version, &metadata_str))
1558            .collect();
1559
1560        Ok(ListTableVersionsResponse {
1561            versions: table_versions,
1562            page_token: None,
1563        })
1564    }
1565
1566    /// Describe a specific table version from the __manifest table.
1567    ///
1568    /// Queries the manifest for a specific version and returns it as a
1569    /// `DescribeTableVersionResponse`. Returns an error if the version is not found.
1570    pub async fn describe_table_version(
1571        &self,
1572        table_id: &[String],
1573        version: i64,
1574    ) -> Result<DescribeTableVersionResponse> {
1575        let object_id = Self::str_object_id(table_id);
1576        if let Some(metadata_str) = self.query_table_version(&object_id, version).await?
1577            && let Some(tv) = Self::parse_table_version(version, &metadata_str)
1578        {
1579            return Ok(DescribeTableVersionResponse {
1580                version: Box::new(tv),
1581            });
1582        }
1583        Err(NamespaceError::TableVersionNotFound {
1584            message: format!("version {} for table {:?}", version, table_id),
1585        }
1586        .into())
1587    }
1588
1589    /// Register a table in the manifest without creating the physical table (internal helper for migration)
1590    pub async fn register_table(&self, name: &str, location: String) -> Result<()> {
1591        let object_id = Self::build_object_id(&[], name);
1592        if self.manifest_contains_object(&object_id).await? {
1593            return Err(NamespaceError::Internal {
1594                message: format!("Table '{}' already exists", name),
1595            }
1596            .into());
1597        }
1598
1599        self.insert_into_manifest(object_id, ObjectType::Table, Some(location))
1600            .await
1601    }
1602
1603    /// Validate that all levels of a namespace path exist
1604    async fn validate_namespace_levels_exist(&self, namespace_path: &[String]) -> Result<()> {
1605        for i in 1..=namespace_path.len() {
1606            let partial_path = &namespace_path[..i];
1607            let object_id = partial_path.join(DELIMITER);
1608            if !self.manifest_contains_object(&object_id).await? {
1609                return Err(NamespaceError::NamespaceNotFound {
1610                    message: format!("parent namespace '{}'", object_id),
1611                }
1612                .into());
1613            }
1614        }
1615        Ok(())
1616    }
1617
1618    /// Query the manifest for a namespace with the given object ID
1619    async fn query_manifest_for_namespace(&self, object_id: &str) -> Result<Option<NamespaceInfo>> {
1620        let escaped_id = object_id.replace('\'', "''");
1621        let filter = format!("object_id = '{}' AND object_type = 'namespace'", escaped_id);
1622        let mut scanner = self.manifest_scanner().await?;
1623        scanner.filter(&filter).map_err(|e| {
1624            lance_core::Error::from(NamespaceError::Internal {
1625                message: format!("Failed to filter: {:?}", e),
1626            })
1627        })?;
1628        scanner.project(&["object_id", "metadata"]).map_err(|e| {
1629            lance_core::Error::from(NamespaceError::Internal {
1630                message: format!("Failed to project: {:?}", e),
1631            })
1632        })?;
1633        let batches = Self::execute_scanner(scanner).await?;
1634
1635        let mut found_result: Option<NamespaceInfo> = None;
1636        let mut total_rows = 0;
1637
1638        for batch in batches {
1639            if batch.num_rows() == 0 {
1640                continue;
1641            }
1642
1643            total_rows += batch.num_rows();
1644            if total_rows > 1 {
1645                return Err(NamespaceError::Internal {
1646                    message: format!(
1647                        "Expected exactly 1 namespace with id '{}', found {}",
1648                        object_id, total_rows
1649                    ),
1650                }
1651                .into());
1652            }
1653
1654            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1655            let metadata_array = Self::get_string_column(&batch, "metadata")?;
1656
1657            let object_id_str = object_id_array.value(0);
1658            let metadata = if !metadata_array.is_null(0) {
1659                let metadata_str = metadata_array.value(0);
1660                match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
1661                    Ok(map) => Some(map),
1662                    Err(e) => {
1663                        return Err(NamespaceError::Internal {
1664                            message: format!(
1665                                "Failed to deserialize metadata for namespace '{}': {}",
1666                                object_id, e
1667                            ),
1668                        }
1669                        .into());
1670                    }
1671                }
1672            } else {
1673                None
1674            };
1675
1676            let (namespace, name) = Self::parse_object_id(object_id_str);
1677            found_result = Some(NamespaceInfo {
1678                namespace,
1679                name,
1680                metadata,
1681            });
1682        }
1683
1684        Ok(found_result)
1685    }
1686
1687    /// Create or load the manifest dataset, ensuring it has the latest schema setup.
1688    ///
1689    /// This function will:
1690    /// 1. Try to load an existing manifest table
1691    /// 2. If it exists, check and migrate the schema if needed (e.g., add primary key metadata)
1692    /// 3. If it doesn't exist, create a new manifest table with the current schema
1693    /// 4. Persist feature flags (e.g., table_version_storage_enabled) if requested
1694    async fn ensure_manifest_table_up_to_date(
1695        root: &str,
1696        storage_options: &Option<HashMap<String, String>>,
1697        session: Option<Arc<Session>>,
1698        table_version_storage_enabled: bool,
1699    ) -> Result<DatasetConsistencyWrapper> {
1700        let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME);
1701        log::debug!("Attempting to load manifest from {}", manifest_path);
1702        let store_options = ObjectStoreParams {
1703            storage_options_accessor: storage_options.as_ref().map(|opts| {
1704                Arc::new(
1705                    lance_io::object_store::StorageOptionsAccessor::with_static_options(
1706                        opts.clone(),
1707                    ),
1708                )
1709            }),
1710            ..Default::default()
1711        };
1712        let read_params = ReadParams {
1713            session: session.clone(),
1714            store_options: Some(store_options.clone()),
1715            ..Default::default()
1716        };
1717        let dataset_result = DatasetBuilder::from_uri(&manifest_path)
1718            .with_read_params(read_params)
1719            .load()
1720            .await;
1721        if let Ok(mut dataset) = dataset_result {
1722            // Check if the object_id field has primary key metadata, migrate if not
1723            let needs_pk_migration = dataset
1724                .schema()
1725                .field("object_id")
1726                .map(|f| {
1727                    !f.metadata
1728                        .contains_key(LANCE_UNENFORCED_PRIMARY_KEY_POSITION)
1729                })
1730                .unwrap_or(false);
1731
1732            if needs_pk_migration {
1733                log::info!("Migrating __manifest table to add primary key metadata on object_id");
1734                dataset
1735                    .update_field_metadata()
1736                    .update("object_id", [(LANCE_UNENFORCED_PRIMARY_KEY_POSITION, "0")])
1737                    .map_err(|e| {
1738                        lance_core::Error::from(NamespaceError::Internal {
1739                            message: format!(
1740                                "Failed to find object_id field for migration: {:?}",
1741                                e
1742                            ),
1743                        })
1744                    })?
1745                    .await
1746                    .map_err(|e| {
1747                        lance_core::Error::from(NamespaceError::Internal {
1748                            message: format!("Failed to migrate primary key metadata: {:?}", e),
1749                        })
1750                    })?;
1751            }
1752
1753            // Persist table_version_storage_enabled flag in __manifest so that once
1754            // enabled, it becomes a permanent property of this namespace.
1755            if table_version_storage_enabled {
1756                let needs_flag = dataset
1757                    .metadata()
1758                    .get("table_version_storage_enabled")
1759                    .map(|v| v != "true")
1760                    .unwrap_or(true);
1761
1762                if needs_flag
1763                    && let Err(e) = dataset
1764                        .update_metadata([("table_version_storage_enabled", "true")])
1765                        .await
1766                {
1767                    log::warn!(
1768                        "Failed to persist table_version_storage_enabled flag in __manifest: {:?}",
1769                        e
1770                    );
1771                }
1772            }
1773
1774            Ok(DatasetConsistencyWrapper::new(dataset))
1775        } else {
1776            log::info!("Creating new manifest table at {}", manifest_path);
1777            let schema = Self::manifest_schema();
1778            let empty_batch = RecordBatch::new_empty(schema.clone());
1779            let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone());
1780
1781            let store_params = ObjectStoreParams {
1782                storage_options_accessor: storage_options.as_ref().map(|opts| {
1783                    Arc::new(
1784                        lance_io::object_store::StorageOptionsAccessor::with_static_options(
1785                            opts.clone(),
1786                        ),
1787                    )
1788                }),
1789                ..Default::default()
1790            };
1791            let write_params = WriteParams {
1792                session: session.clone(),
1793                store_params: Some(store_params),
1794                ..Default::default()
1795            };
1796
1797            let dataset =
1798                Dataset::write(Box::new(reader), &manifest_path, Some(write_params)).await;
1799
1800            // Handle race condition where another process created the manifest concurrently
1801            match dataset {
1802                Ok(dataset) => {
1803                    log::info!(
1804                        "Successfully created manifest table at {}, version={}, uri={}",
1805                        manifest_path,
1806                        dataset.version().version,
1807                        dataset.uri()
1808                    );
1809                    Ok(DatasetConsistencyWrapper::new(dataset))
1810                }
1811                Err(ref e)
1812                    if matches!(
1813                        e,
1814                        LanceError::DatasetAlreadyExists { .. }
1815                            | LanceError::CommitConflict { .. }
1816                            | LanceError::IncompatibleTransaction { .. }
1817                            | LanceError::RetryableCommitConflict { .. }
1818                    ) =>
1819                {
1820                    // Another process created the manifest concurrently, try to load it
1821                    log::info!(
1822                        "Manifest table was created by another process, loading it: {}",
1823                        manifest_path
1824                    );
1825                    let recovery_store_options = ObjectStoreParams {
1826                        storage_options_accessor: storage_options.as_ref().map(|opts| {
1827                            Arc::new(
1828                                lance_io::object_store::StorageOptionsAccessor::with_static_options(
1829                                    opts.clone(),
1830                                ),
1831                            )
1832                        }),
1833                        ..Default::default()
1834                    };
1835                    let recovery_read_params = ReadParams {
1836                        session,
1837                        store_options: Some(recovery_store_options),
1838                        ..Default::default()
1839                    };
1840                    let dataset = DatasetBuilder::from_uri(&manifest_path)
1841                        .with_read_params(recovery_read_params)
1842                        .load()
1843                        .await
1844                        .map_err(|e| {
1845                            lance_core::Error::from(NamespaceError::Internal {
1846                                message: format!(
1847                                    "Failed to load manifest dataset after creation conflict: {}",
1848                                    e
1849                                ),
1850                            })
1851                        })?;
1852                    Ok(DatasetConsistencyWrapper::new(dataset))
1853                }
1854                Err(e) => Err(lance_core::Error::from(NamespaceError::Internal {
1855                    message: format!("Failed to create manifest dataset: {:?}", e),
1856                })),
1857            }
1858        }
1859    }
1860
1861    /// Sorts names alphabetically and applies pagination using page_token (start_after) and limit.
1862    ///
1863    /// Returns the next page token (last item in this page) if more results exist beyond the limit,
1864    /// or `None` if this is the last page.
1865    fn apply_pagination(
1866        names: &mut Vec<String>,
1867        page_token: Option<String>,
1868        limit: Option<i32>,
1869    ) -> Option<String> {
1870        names.sort();
1871
1872        if let Some(start_after) = page_token {
1873            if let Some(index) = names
1874                .iter()
1875                .position(|name| name.as_str() > start_after.as_str())
1876            {
1877                names.drain(0..index);
1878            } else {
1879                names.clear();
1880            }
1881        }
1882
1883        if let Some(limit) = limit
1884            && limit >= 0
1885        {
1886            let limit = limit as usize;
1887            if names.len() > limit {
1888                let next_page_token = if limit > 0 {
1889                    Some(names[limit - 1].clone())
1890                } else {
1891                    None
1892                };
1893                names.truncate(limit);
1894                return next_page_token;
1895            }
1896        }
1897
1898        None
1899    }
1900}
1901
1902#[async_trait]
1903impl LanceNamespace for ManifestNamespace {
1904    fn namespace_id(&self) -> String {
1905        self.root.clone()
1906    }
1907
1908    async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1909        let namespace_id = request.id.as_ref().ok_or_else(|| {
1910            lance_core::Error::from(NamespaceError::InvalidInput {
1911                message: "Namespace ID is required".to_string(),
1912            })
1913        })?;
1914
1915        // Build filter to find tables in this namespace
1916        let filter = if namespace_id.is_empty() {
1917            // Root namespace: find tables without a namespace prefix
1918            "object_type = 'table' AND NOT contains(object_id, '$')".to_string()
1919        } else {
1920            // Namespaced: find tables that start with namespace$ but have no additional $
1921            let prefix = namespace_id.join(DELIMITER);
1922            format!(
1923                "object_type = 'table' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1924                prefix,
1925                DELIMITER,
1926                prefix.len() + 2
1927            )
1928        };
1929
1930        let mut scanner = self.manifest_scanner().await?;
1931        scanner.filter(&filter).map_err(|e| {
1932            lance_core::Error::from(NamespaceError::Internal {
1933                message: format!("Failed to filter: {:?}", e),
1934            })
1935        })?;
1936        scanner.project(&["object_id", "location"]).map_err(|e| {
1937            lance_core::Error::from(NamespaceError::Internal {
1938                message: format!("Failed to project: {:?}", e),
1939            })
1940        })?;
1941
1942        let batches = Self::execute_scanner(scanner).await?;
1943
1944        let mut table_entries = Vec::new();
1945        for batch in batches {
1946            if batch.num_rows() == 0 {
1947                continue;
1948            }
1949
1950            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1951            let location_array = Self::get_string_column(&batch, "location")?;
1952            for i in 0..batch.num_rows() {
1953                let object_id = object_id_array.value(i);
1954                let location = location_array.value(i);
1955                let (_namespace, name) = Self::parse_object_id(object_id);
1956                table_entries.push((name, location.to_string()));
1957            }
1958        }
1959
1960        let mut tables: Vec<String> = if request.include_declared.unwrap_or(true) {
1961            table_entries.into_iter().map(|(name, _)| name).collect()
1962        } else {
1963            let mut stream = futures::stream::iter(table_entries.into_iter().map(
1964                |(name, location)| async move {
1965                    // `include_declared=false` is an explicit opt-in. We still pay one
1966                    // `_versions/` probe per table so declared-state is derived from actual
1967                    // manifests. This is linear in the total number of listed tables, and we do
1968                    // the probes with bounded concurrency before pagination.
1969                    if self.location_has_actual_manifests(&location).await? {
1970                        Ok::<Option<String>, Error>(Some(name))
1971                    } else {
1972                        Ok::<Option<String>, Error>(None)
1973                    }
1974                },
1975            ))
1976            .buffered(DECLARED_FILTER_CONCURRENCY);
1977
1978            let mut filtered = Vec::new();
1979            while let Some(result) = stream.next().await {
1980                if let Some(name) = result? {
1981                    filtered.push(name);
1982                }
1983            }
1984            filtered
1985        };
1986
1987        let next_page_token =
1988            Self::apply_pagination(&mut tables, request.page_token, request.limit);
1989        let mut response = ListTablesResponse::new(tables);
1990        response.page_token = next_page_token;
1991        Ok(response)
1992    }
1993
1994    async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1995        let table_id = request.id.as_ref().ok_or_else(|| {
1996            lance_core::Error::from(NamespaceError::InvalidInput {
1997                message: "Table ID is required".to_string(),
1998            })
1999        })?;
2000
2001        if table_id.is_empty() {
2002            return Err(NamespaceError::InvalidInput {
2003                message: "Table ID cannot be empty".to_string(),
2004            }
2005            .into());
2006        }
2007
2008        let object_id = Self::str_object_id(table_id);
2009        let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
2010
2011        // Extract table name and namespace from table_id
2012        let table_name = table_id.last().cloned().unwrap_or_default();
2013        let namespace_id: Vec<String> = if table_id.len() > 1 {
2014            table_id[..table_id.len() - 1].to_vec()
2015        } else {
2016            vec![]
2017        };
2018
2019        let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
2020        let should_check_declared =
2021            load_detailed_metadata || request.check_declared.unwrap_or(false);
2022        // For backwards compatibility, only skip vending credentials when explicitly set to false
2023        let vend_credentials = request.vend_credentials.unwrap_or(true);
2024
2025        match table_info {
2026            Some(info) => {
2027                // Construct full URI from relative location
2028                let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
2029
2030                let storage_options = if vend_credentials {
2031                    self.storage_options.clone()
2032                } else {
2033                    None
2034                };
2035                let is_only_declared = if should_check_declared {
2036                    Some(!self.location_has_actual_manifests(&info.location).await?)
2037                } else {
2038                    None
2039                };
2040
2041                if !load_detailed_metadata {
2042                    return Ok(DescribeTableResponse {
2043                        table: Some(table_name),
2044                        namespace: Some(namespace_id),
2045                        location: Some(table_uri.clone()),
2046                        table_uri: Some(table_uri),
2047                        storage_options,
2048                        properties: info.metadata,
2049                        is_only_declared,
2050                        ..Default::default()
2051                    });
2052                }
2053
2054                if is_only_declared == Some(true) {
2055                    return Ok(DescribeTableResponse {
2056                        table: Some(table_name),
2057                        namespace: Some(namespace_id),
2058                        location: Some(table_uri.clone()),
2059                        table_uri: Some(table_uri),
2060                        storage_options,
2061                        properties: info.metadata,
2062                        is_only_declared,
2063                        ..Default::default()
2064                    });
2065                }
2066
2067                let mut builder = DatasetBuilder::from_uri(&table_uri);
2068                if let Some(opts) = &self.storage_options {
2069                    builder = builder.with_storage_options(opts.clone());
2070                }
2071                if let Some(session) = &self.session {
2072                    builder = builder.with_session(session.clone());
2073                }
2074
2075                match builder.load().await {
2076                    Ok(mut dataset) => {
2077                        // If a specific version is requested, checkout that version
2078                        if let Some(requested_version) = request.version {
2079                            dataset = dataset.checkout_version(requested_version as u64).await?;
2080                        }
2081
2082                        let version = dataset.version().version;
2083                        let lance_schema = dataset.schema();
2084                        let arrow_schema: arrow_schema::Schema = lance_schema.into();
2085                        let json_schema = arrow_schema_to_json(&arrow_schema)?;
2086
2087                        Ok(DescribeTableResponse {
2088                            table: Some(table_name.clone()),
2089                            namespace: Some(namespace_id.clone()),
2090                            version: Some(version as i64),
2091                            location: Some(table_uri.clone()),
2092                            table_uri: Some(table_uri),
2093                            schema: Some(Box::new(json_schema)),
2094                            storage_options,
2095                            properties: info.metadata.clone(),
2096                            is_only_declared,
2097                            ..Default::default()
2098                        })
2099                    }
2100                    Err(err) => Err(NamespaceError::Internal {
2101                        message: format!(
2102                            "Table exists in manifest but failed to load dataset '{}': {}",
2103                            object_id, err
2104                        ),
2105                    }
2106                    .into()),
2107                }
2108            }
2109            None => Err(NamespaceError::TableNotFound {
2110                message: Self::format_table_id(table_id),
2111            }
2112            .into()),
2113        }
2114    }
2115
2116    async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
2117        let table_id = request.id.as_ref().ok_or_else(|| {
2118            lance_core::Error::from(NamespaceError::InvalidInput {
2119                message: "Table ID is required".to_string(),
2120            })
2121        })?;
2122
2123        if table_id.is_empty() {
2124            return Err(NamespaceError::InvalidInput {
2125                message: "Table ID cannot be empty".to_string(),
2126            }
2127            .into());
2128        }
2129
2130        let object_id = Self::str_object_id(table_id);
2131        let exists = self.manifest_contains_object(&object_id).await?;
2132        if exists {
2133            Ok(())
2134        } else {
2135            Err(NamespaceError::TableNotFound {
2136                message: Self::format_table_id(table_id),
2137            }
2138            .into())
2139        }
2140    }
2141
2142    async fn create_table(
2143        &self,
2144        request: CreateTableRequest,
2145        data: Bytes,
2146    ) -> Result<CreateTableResponse> {
2147        let table_id = request.id.as_ref().ok_or_else(|| {
2148            lance_core::Error::from(NamespaceError::InvalidInput {
2149                message: "Table ID is required".to_string(),
2150            })
2151        })?;
2152
2153        if table_id.is_empty() {
2154            return Err(NamespaceError::InvalidInput {
2155                message: "Table ID cannot be empty".to_string(),
2156            }
2157            .into());
2158        }
2159
2160        let (namespace, table_name) = Self::split_object_id(table_id);
2161        let object_id = Self::build_object_id(&namespace, &table_name);
2162
2163        let existing_table = self.query_manifest_for_table(&object_id).await?;
2164        let existing_has_manifests = if let Some(existing_table) = &existing_table {
2165            Some(
2166                self.location_has_actual_manifests(&existing_table.location)
2167                    .await?,
2168            )
2169        } else {
2170            None
2171        };
2172
2173        if existing_has_manifests == Some(false)
2174            && request
2175                .properties
2176                .as_ref()
2177                .is_some_and(|properties| !properties.is_empty())
2178        {
2179            return Err(NamespaceError::InvalidInput {
2180                message: format!(
2181                    "create_table cannot set properties for already declared table '{}'",
2182                    object_id
2183                ),
2184            }
2185            .into());
2186        }
2187
2188        let create_mode = if existing_has_manifests == Some(false) {
2189            CreateTableMode::Create
2190        } else {
2191            CreateTableMode::parse(request.mode.as_deref())?
2192        };
2193        let dir_name = if let Some(existing_table) = &existing_table {
2194            existing_table.location.clone()
2195        } else if namespace.is_empty() && self.dir_listing_enabled {
2196            format!("{}.lance", table_name)
2197        } else {
2198            Self::generate_dir_name(&object_id)
2199        };
2200        let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
2201        let overwriting_existing_table =
2202            existing_has_manifests == Some(true) && create_mode == CreateTableMode::Overwrite;
2203
2204        if existing_has_manifests == Some(true) {
2205            match create_mode {
2206                CreateTableMode::Create => {
2207                    return Err(NamespaceError::TableAlreadyExists {
2208                        message: table_name.clone(),
2209                    }
2210                    .into());
2211                }
2212                CreateTableMode::ExistOk => {
2213                    let properties = existing_table
2214                        .as_ref()
2215                        .and_then(|table| table.metadata.clone());
2216                    return Ok(CreateTableResponse {
2217                        location: Some(table_uri),
2218                        storage_options: self.storage_options.clone(),
2219                        properties,
2220                        ..Default::default()
2221                    });
2222                }
2223                CreateTableMode::Overwrite => {}
2224            }
2225        }
2226
2227        // Validate that request_data is provided
2228        if data.is_empty() {
2229            return Err(NamespaceError::InvalidInput {
2230                message: "Request data (Arrow IPC stream) is required for create_table".to_string(),
2231            }
2232            .into());
2233        }
2234
2235        // Write the data using Lance Dataset
2236        let cursor = Cursor::new(data.to_vec());
2237        let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| {
2238            lance_core::Error::from(NamespaceError::Internal {
2239                message: format!("Failed to read IPC stream: {:?}", e),
2240            })
2241        })?;
2242
2243        let batches: Vec<RecordBatch> = stream_reader
2244            .collect::<std::result::Result<Vec<_>, _>>()
2245            .map_err(|e| {
2246            lance_core::Error::from(NamespaceError::Internal {
2247                message: format!("Failed to collect batches: {:?}", e),
2248            })
2249        })?;
2250
2251        if batches.is_empty() {
2252            return Err(NamespaceError::Internal {
2253                message: "No data provided for table creation".to_string(),
2254            }
2255            .into());
2256        }
2257
2258        let schema = batches[0].schema();
2259        let batch_results: Vec<std::result::Result<RecordBatch, arrow_schema::ArrowError>> =
2260            batches.into_iter().map(Ok).collect();
2261        let reader = RecordBatchIterator::new(batch_results, schema);
2262
2263        let mut write_storage_options = self.storage_options.clone().unwrap_or_default();
2264        if let Some(request_storage_options) = request.storage_options.as_ref() {
2265            write_storage_options.extend(request_storage_options.clone());
2266        }
2267
2268        let store_params = ObjectStoreParams {
2269            storage_options_accessor: (!write_storage_options.is_empty()).then(|| {
2270                Arc::new(
2271                    lance_io::object_store::StorageOptionsAccessor::with_static_options(
2272                        write_storage_options,
2273                    ),
2274                )
2275            }),
2276            ..Default::default()
2277        };
2278        let write_params = WriteParams {
2279            mode: create_mode.write_mode(),
2280            session: self.session.clone(),
2281            store_params: Some(store_params),
2282            ..Default::default()
2283        };
2284        let dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params))
2285            .await
2286            .map_err(|e| {
2287                lance_core::Error::from(NamespaceError::Internal {
2288                    message: format!("Failed to write dataset: {:?}", e),
2289                })
2290            })?;
2291        let version = dataset.version().version as i64;
2292
2293        if overwriting_existing_table {
2294            let metadata =
2295                Self::serialize_metadata(request.properties.as_ref(), "table", &object_id)?;
2296            self.upsert_into_manifest_with_metadata(
2297                vec![ManifestEntry {
2298                    object_id,
2299                    object_type: ObjectType::Table,
2300                    location: Some(dir_name),
2301                    metadata,
2302                }],
2303                None,
2304            )
2305            .await?;
2306
2307            Ok(CreateTableResponse {
2308                version: Some(version),
2309                location: Some(table_uri),
2310                storage_options: self.storage_options.clone(),
2311                properties: request.properties,
2312                ..Default::default()
2313            })
2314        } else {
2315            match existing_table {
2316                Some(existing_table) => Ok(CreateTableResponse {
2317                    version: Some(version),
2318                    location: Some(table_uri),
2319                    storage_options: self.storage_options.clone(),
2320                    properties: existing_table.metadata,
2321                    ..Default::default()
2322                }),
2323                None => {
2324                    let metadata =
2325                        Self::serialize_metadata(request.properties.as_ref(), "table", &object_id)?;
2326                    // Register in manifest (store dir_name, not full URI)
2327                    self.insert_into_manifest_with_metadata(
2328                        vec![ManifestEntry {
2329                            object_id,
2330                            object_type: ObjectType::Table,
2331                            location: Some(dir_name.clone()),
2332                            metadata,
2333                        }],
2334                        None,
2335                    )
2336                    .await?;
2337
2338                    Ok(CreateTableResponse {
2339                        version: Some(version),
2340                        location: Some(table_uri),
2341                        storage_options: self.storage_options.clone(),
2342                        properties: request.properties,
2343                        ..Default::default()
2344                    })
2345                }
2346            }
2347        }
2348    }
2349
2350    async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
2351        let table_id = request.id.as_ref().ok_or_else(|| {
2352            lance_core::Error::from(NamespaceError::InvalidInput {
2353                message: "Table ID is required".to_string(),
2354            })
2355        })?;
2356
2357        if table_id.is_empty() {
2358            return Err(NamespaceError::InvalidInput {
2359                message: "Table ID cannot be empty".to_string(),
2360            }
2361            .into());
2362        }
2363
2364        let (namespace, table_name) = Self::split_object_id(table_id);
2365        let object_id = Self::build_object_id(&namespace, &table_name);
2366
2367        // Query manifest for table location
2368        let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
2369
2370        match table_info {
2371            Some(info) => {
2372                // Delete from manifest first
2373                self.delete_from_manifest(&object_id).boxed().await?;
2374
2375                // Delete physical data directory using the dir_name from manifest
2376                let table_path = self.base_path.clone().join(info.location.as_str());
2377                let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
2378
2379                // Remove the table directory
2380                self.object_store
2381                    .remove_dir_all(table_path)
2382                    .boxed()
2383                    .await
2384                    .map_err(|e| {
2385                        lance_core::Error::from(NamespaceError::Internal {
2386                            message: format!("Failed to delete table directory: {:?}", e),
2387                        })
2388                    })?;
2389
2390                Ok(DropTableResponse {
2391                    id: request.id.clone(),
2392                    location: Some(table_uri),
2393                    ..Default::default()
2394                })
2395            }
2396            None => Err(NamespaceError::TableNotFound {
2397                message: table_name.to_string(),
2398            }
2399            .into()),
2400        }
2401    }
2402
2403    async fn list_namespaces(
2404        &self,
2405        request: ListNamespacesRequest,
2406    ) -> Result<ListNamespacesResponse> {
2407        let parent_namespace = request.id.as_ref().ok_or_else(|| {
2408            lance_core::Error::from(NamespaceError::InvalidInput {
2409                message: "Namespace ID is required".to_string(),
2410            })
2411        })?;
2412
2413        // Build filter to find direct child namespaces
2414        let filter = if parent_namespace.is_empty() {
2415            // Root namespace: find all namespaces without a parent
2416            "object_type = 'namespace' AND NOT contains(object_id, '$')".to_string()
2417        } else {
2418            // Non-root: find namespaces that start with parent$ but have no additional $
2419            let prefix = parent_namespace.join(DELIMITER);
2420            format!(
2421                "object_type = 'namespace' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
2422                prefix,
2423                DELIMITER,
2424                prefix.len() + 2
2425            )
2426        };
2427
2428        let mut scanner = self.manifest_scanner().await?;
2429        scanner.filter(&filter).map_err(|e| {
2430            lance_core::Error::from(NamespaceError::Internal {
2431                message: format!("Failed to filter: {:?}", e),
2432            })
2433        })?;
2434        scanner.project(&["object_id"]).map_err(|e| {
2435            lance_core::Error::from(NamespaceError::Internal {
2436                message: format!("Failed to project: {:?}", e),
2437            })
2438        })?;
2439
2440        let batches = Self::execute_scanner(scanner).await?;
2441        let mut namespaces = Vec::new();
2442
2443        for batch in batches {
2444            if batch.num_rows() == 0 {
2445                continue;
2446            }
2447
2448            let object_id_array = Self::get_string_column(&batch, "object_id")?;
2449            for i in 0..batch.num_rows() {
2450                let object_id = object_id_array.value(i);
2451                let (_namespace, name) = Self::parse_object_id(object_id);
2452                namespaces.push(name);
2453            }
2454        }
2455
2456        let next_page_token =
2457            Self::apply_pagination(&mut namespaces, request.page_token, request.limit);
2458        let mut response = ListNamespacesResponse::new(namespaces);
2459        response.page_token = next_page_token;
2460        Ok(response)
2461    }
2462
2463    async fn describe_namespace(
2464        &self,
2465        request: DescribeNamespaceRequest,
2466    ) -> Result<DescribeNamespaceResponse> {
2467        let namespace_id = request.id.as_ref().ok_or_else(|| {
2468            lance_core::Error::from(NamespaceError::InvalidInput {
2469                message: "Namespace ID is required".to_string(),
2470            })
2471        })?;
2472
2473        // Root namespace always exists
2474        if namespace_id.is_empty() {
2475            #[allow(clippy::needless_update)]
2476            return Ok(DescribeNamespaceResponse {
2477                properties: Some(HashMap::new()),
2478                ..Default::default()
2479            });
2480        }
2481
2482        // Check if namespace exists in manifest
2483        let object_id = namespace_id.join(DELIMITER);
2484        let namespace_info = self.query_manifest_for_namespace(&object_id).await?;
2485
2486        match namespace_info {
2487            #[allow(clippy::needless_update)]
2488            Some(info) => Ok(DescribeNamespaceResponse {
2489                properties: info.metadata,
2490                ..Default::default()
2491            }),
2492            None => Err(NamespaceError::NamespaceNotFound {
2493                message: object_id.to_string(),
2494            }
2495            .into()),
2496        }
2497    }
2498
2499    async fn create_namespace(
2500        &self,
2501        request: CreateNamespaceRequest,
2502    ) -> Result<CreateNamespaceResponse> {
2503        let namespace_id = request.id.as_ref().ok_or_else(|| {
2504            lance_core::Error::from(NamespaceError::InvalidInput {
2505                message: "Namespace ID is required".to_string(),
2506            })
2507        })?;
2508
2509        // Root namespace always exists and cannot be created
2510        if namespace_id.is_empty() {
2511            return Err(NamespaceError::NamespaceAlreadyExists {
2512                message: "root namespace".to_string(),
2513            }
2514            .into());
2515        }
2516
2517        // Validate parent namespaces exist (but not the namespace being created)
2518        if namespace_id.len() > 1 {
2519            self.validate_namespace_levels_exist(&namespace_id[..namespace_id.len() - 1])
2520                .await?;
2521        }
2522
2523        let object_id = namespace_id.join(DELIMITER);
2524        if self.manifest_contains_object(&object_id).await? {
2525            return Err(NamespaceError::NamespaceAlreadyExists {
2526                message: object_id.to_string(),
2527            }
2528            .into());
2529        }
2530
2531        let metadata =
2532            Self::serialize_metadata(request.properties.as_ref(), "namespace", &object_id)?;
2533
2534        self.insert_into_manifest_with_metadata(
2535            vec![ManifestEntry {
2536                object_id,
2537                object_type: ObjectType::Namespace,
2538                location: None,
2539                metadata,
2540            }],
2541            None,
2542        )
2543        .await?;
2544
2545        Ok(CreateNamespaceResponse {
2546            properties: request.properties,
2547            ..Default::default()
2548        })
2549    }
2550
2551    async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
2552        let namespace_id = request.id.as_ref().ok_or_else(|| {
2553            lance_core::Error::from(NamespaceError::InvalidInput {
2554                message: "Namespace ID is required".to_string(),
2555            })
2556        })?;
2557
2558        // Root namespace always exists and cannot be dropped
2559        if namespace_id.is_empty() {
2560            return Err(NamespaceError::InvalidInput {
2561                message: "Root namespace cannot be dropped".to_string(),
2562            }
2563            .into());
2564        }
2565
2566        let object_id = namespace_id.join(DELIMITER);
2567
2568        // Check if namespace exists
2569        if !self.manifest_contains_object(&object_id).boxed().await? {
2570            return Err(NamespaceError::NamespaceNotFound {
2571                message: object_id.to_string(),
2572            }
2573            .into());
2574        }
2575
2576        // Check for child namespaces
2577        let escaped_id = object_id.replace('\'', "''");
2578        let prefix = format!("{}{}", escaped_id, DELIMITER);
2579        let filter = format!("starts_with(object_id, '{}')", prefix);
2580        let mut scanner = self.manifest_scanner().boxed().await?;
2581        scanner.filter(&filter).map_err(|e| {
2582            lance_core::Error::from(NamespaceError::Internal {
2583                message: format!("Failed to filter: {:?}", e),
2584            })
2585        })?;
2586        scanner.project::<&str>(&[]).map_err(|e| {
2587            lance_core::Error::from(NamespaceError::Internal {
2588                message: format!("Failed to project: {:?}", e),
2589            })
2590        })?;
2591        scanner.with_row_id();
2592        let count = scanner.count_rows().boxed().await.map_err(|e| {
2593            lance_core::Error::from(NamespaceError::Internal {
2594                message: format!("Failed to count rows: {:?}", e),
2595            })
2596        })?;
2597
2598        if count > 0 {
2599            return Err(NamespaceError::NamespaceNotEmpty {
2600                message: format!("'{}' (contains {} child objects)", object_id, count),
2601            }
2602            .into());
2603        }
2604
2605        self.delete_from_manifest(&object_id).boxed().await?;
2606
2607        Ok(DropNamespaceResponse::default())
2608    }
2609
2610    async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
2611        let namespace_id = request.id.as_ref().ok_or_else(|| {
2612            lance_core::Error::from(NamespaceError::InvalidInput {
2613                message: "Namespace ID is required".to_string(),
2614            })
2615        })?;
2616
2617        // Root namespace always exists
2618        if namespace_id.is_empty() {
2619            return Ok(());
2620        }
2621
2622        let object_id = namespace_id.join(DELIMITER);
2623        if self.manifest_contains_object(&object_id).await? {
2624            Ok(())
2625        } else {
2626            Err(NamespaceError::NamespaceNotFound {
2627                message: object_id.to_string(),
2628            }
2629            .into())
2630        }
2631    }
2632
2633    async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
2634        let table_id = request.id.as_ref().ok_or_else(|| {
2635            lance_core::Error::from(NamespaceError::InvalidInput {
2636                message: "Table ID is required".to_string(),
2637            })
2638        })?;
2639
2640        if table_id.is_empty() {
2641            return Err(NamespaceError::InvalidInput {
2642                message: "Table ID cannot be empty".to_string(),
2643            }
2644            .into());
2645        }
2646
2647        let (namespace, table_name) = Self::split_object_id(table_id);
2648        let object_id = Self::build_object_id(&namespace, &table_name);
2649
2650        // Check if table already exists in manifest
2651        let existing = self.query_manifest_for_table(&object_id).await?;
2652        if existing.is_some() {
2653            return Err(NamespaceError::TableAlreadyExists {
2654                message: table_name.to_string(),
2655            }
2656            .into());
2657        }
2658
2659        // Create table location path with hash-based naming
2660        // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance
2661        // Otherwise, use hash-based naming: {hash}_{object_id}
2662        let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
2663            // Root table with directory listing enabled: use {table_name}.lance
2664            format!("{}.lance", table_name)
2665        } else {
2666            // Child namespace table or dir listing disabled: use hash-based naming
2667            Self::generate_dir_name(&object_id)
2668        };
2669        let table_path = self.base_path.clone().join(dir_name.as_str());
2670        let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
2671
2672        // Validate location if provided
2673        if let Some(req_location) = &request.location {
2674            let req_location = req_location.trim_end_matches('/');
2675            if req_location != table_uri {
2676                return Err(NamespaceError::InvalidInput {
2677                    message: format!(
2678                        "Cannot declare table {} at location {}, must be at location {}",
2679                        table_name, req_location, table_uri
2680                    ),
2681                }
2682                .into());
2683            }
2684        }
2685
2686        // Create the .lance-reserved file to mark the table as existing
2687        let reserved_file_path = table_path.clone().join(".lance-reserved");
2688
2689        self.object_store
2690            .create(&reserved_file_path)
2691            .await
2692            .map_err(|e| {
2693                lance_core::Error::from(NamespaceError::Internal {
2694                    message: format!(
2695                        "Failed to create .lance-reserved file for table {}: {}",
2696                        table_name, e
2697                    ),
2698                })
2699            })?
2700            .shutdown()
2701            .await
2702            .map_err(|e| {
2703                lance_core::Error::from(NamespaceError::Internal {
2704                    message: format!(
2705                        "Failed to finalize .lance-reserved file for table {}: {}",
2706                        table_name, e
2707                    ),
2708                })
2709            })?;
2710
2711        let metadata = Self::serialize_metadata(request.properties.as_ref(), "table", &object_id)?;
2712
2713        // Add entry to manifest marking this as a declared table (store dir_name, not full path)
2714        self.insert_into_manifest_with_metadata(
2715            vec![ManifestEntry {
2716                object_id,
2717                object_type: ObjectType::Table,
2718                location: Some(dir_name),
2719                metadata,
2720            }],
2721            None,
2722        )
2723        .await?;
2724
2725        log::info!(
2726            "Declared table '{}' in manifest at {}",
2727            table_name,
2728            table_uri
2729        );
2730
2731        // For backwards compatibility, only skip vending credentials when explicitly set to false
2732        let vend_credentials = request.vend_credentials.unwrap_or(true);
2733        let storage_options = if vend_credentials {
2734            self.storage_options.clone()
2735        } else {
2736            None
2737        };
2738
2739        Ok(DeclareTableResponse {
2740            location: Some(table_uri),
2741            storage_options,
2742            properties: request.properties,
2743            ..Default::default()
2744        })
2745    }
2746
2747    async fn register_table(&self, request: RegisterTableRequest) -> Result<RegisterTableResponse> {
2748        let table_id = request.id.as_ref().ok_or_else(|| {
2749            lance_core::Error::from(NamespaceError::InvalidInput {
2750                message: "Table ID is required".to_string(),
2751            })
2752        })?;
2753
2754        if table_id.is_empty() {
2755            return Err(NamespaceError::InvalidInput {
2756                message: "Table ID cannot be empty".to_string(),
2757            }
2758            .into());
2759        }
2760
2761        let location = request.location.clone();
2762
2763        // Validate that location is a relative path within the root directory
2764        // We don't allow absolute URIs or paths that escape the root
2765        if location.contains("://") {
2766            return Err(NamespaceError::InvalidInput {
2767                message: format!(
2768                    "Absolute URIs are not allowed for register_table. Location must be a relative path within the root directory: {}",
2769                    location
2770                ),
2771            }
2772            .into());
2773        }
2774
2775        if location.starts_with('/') {
2776            return Err(NamespaceError::InvalidInput {
2777                message: format!(
2778                    "Absolute paths are not allowed for register_table. Location must be a relative path within the root directory: {}",
2779                    location
2780                ),
2781            }
2782            .into());
2783        }
2784
2785        // Check for path traversal attempts
2786        if location.contains("..") {
2787            return Err(NamespaceError::InvalidInput {
2788                message: format!(
2789                    "Path traversal is not allowed. Location must be a relative path within the root directory: {}",
2790                    location
2791                ),
2792            }
2793            .into());
2794        }
2795
2796        let (namespace, table_name) = Self::split_object_id(table_id);
2797        let object_id = Self::build_object_id(&namespace, &table_name);
2798
2799        // Validate that parent namespaces exist (if not root)
2800        if !namespace.is_empty() {
2801            self.validate_namespace_levels_exist(&namespace).await?;
2802        }
2803
2804        // Check if table already exists
2805        if self.manifest_contains_object(&object_id).await? {
2806            return Err(NamespaceError::TableAlreadyExists {
2807                message: object_id.to_string(),
2808            }
2809            .into());
2810        }
2811
2812        // Register the table with its location in the manifest
2813        self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone()))
2814            .await?;
2815
2816        Ok(RegisterTableResponse {
2817            location: Some(location),
2818            ..Default::default()
2819        })
2820    }
2821
2822    async fn deregister_table(
2823        &self,
2824        request: DeregisterTableRequest,
2825    ) -> Result<DeregisterTableResponse> {
2826        let table_id = request.id.as_ref().ok_or_else(|| {
2827            lance_core::Error::from(NamespaceError::InvalidInput {
2828                message: "Table ID is required".to_string(),
2829            })
2830        })?;
2831
2832        if table_id.is_empty() {
2833            return Err(NamespaceError::InvalidInput {
2834                message: "Table ID cannot be empty".to_string(),
2835            }
2836            .into());
2837        }
2838
2839        let (namespace, table_name) = Self::split_object_id(table_id);
2840        let object_id = Self::build_object_id(&namespace, &table_name);
2841
2842        // Get table info before deleting
2843        let table_info = self.query_manifest_for_table(&object_id).await?;
2844
2845        let table_uri = match table_info {
2846            Some(info) => {
2847                // Delete from manifest only (leave physical data intact)
2848                self.delete_from_manifest(&object_id).boxed().await?;
2849                Self::construct_full_uri(&self.root, &info.location)?
2850            }
2851            None => {
2852                return Err(NamespaceError::TableNotFound {
2853                    message: object_id.to_string(),
2854                }
2855                .into());
2856            }
2857        };
2858
2859        Ok(DeregisterTableResponse {
2860            id: request.id.clone(),
2861            location: Some(table_uri),
2862            ..Default::default()
2863        })
2864    }
2865}
2866
2867#[cfg(test)]
2868mod tests {
2869    use crate::{DirectoryNamespaceBuilder, ManifestNamespace};
2870    use bytes::Bytes;
2871    use lance_core::utils::tempfile::TempStdDir;
2872    use lance_namespace::LanceNamespace;
2873    use lance_namespace::models::{
2874        CreateNamespaceRequest, CreateTableRequest, DescribeTableRequest, DropTableRequest,
2875        ListTablesRequest, TableExistsRequest,
2876    };
2877    use rstest::rstest;
2878
2879    fn create_test_ipc_data() -> Vec<u8> {
2880        use arrow::array::{Int32Array, StringArray};
2881        use arrow::datatypes::{DataType, Field, Schema};
2882        use arrow::ipc::writer::StreamWriter;
2883        use arrow::record_batch::RecordBatch;
2884        use std::sync::Arc;
2885
2886        let schema = Arc::new(Schema::new(vec![
2887            Field::new("id", DataType::Int32, false),
2888            Field::new("name", DataType::Utf8, false),
2889        ]));
2890
2891        let batch = RecordBatch::try_new(
2892            schema.clone(),
2893            vec![
2894                Arc::new(Int32Array::from(vec![1, 2, 3])),
2895                Arc::new(StringArray::from(vec!["a", "b", "c"])),
2896            ],
2897        )
2898        .unwrap();
2899
2900        let mut buffer = Vec::new();
2901        {
2902            let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap();
2903            writer.write(&batch).unwrap();
2904            writer.finish().unwrap();
2905        }
2906        buffer
2907    }
2908
2909    #[rstest]
2910    #[case::with_optimization(true)]
2911    #[case::without_optimization(false)]
2912    #[tokio::test]
2913    async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) {
2914        let temp_dir = TempStdDir::default();
2915        let temp_path = temp_dir.to_str().unwrap();
2916
2917        // Create a DirectoryNamespace with manifest enabled (default)
2918        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2919            .inline_optimization_enabled(inline_optimization)
2920            .build()
2921            .await
2922            .unwrap();
2923
2924        // Verify we can list tables (should be empty)
2925        let mut request = ListTablesRequest::new();
2926        request.id = Some(vec![]);
2927        let response = dir_namespace.list_tables(request).await.unwrap();
2928        assert_eq!(response.tables.len(), 0);
2929
2930        // Create a test table
2931        let buffer = create_test_ipc_data();
2932        let mut create_request = CreateTableRequest::new();
2933        create_request.id = Some(vec!["test_table".to_string()]);
2934
2935        let _response = dir_namespace
2936            .create_table(create_request, Bytes::from(buffer))
2937            .await
2938            .unwrap();
2939
2940        // List tables again - should see our new table
2941        let mut request = ListTablesRequest::new();
2942        request.id = Some(vec![]);
2943        let response = dir_namespace.list_tables(request).await.unwrap();
2944        assert_eq!(response.tables.len(), 1);
2945        assert_eq!(response.tables[0], "test_table");
2946    }
2947
2948    #[rstest]
2949    #[case::with_optimization(true)]
2950    #[case::without_optimization(false)]
2951    #[tokio::test]
2952    async fn test_manifest_namespace_table_exists(#[case] inline_optimization: bool) {
2953        let temp_dir = TempStdDir::default();
2954        let temp_path = temp_dir.to_str().unwrap();
2955
2956        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2957            .inline_optimization_enabled(inline_optimization)
2958            .build()
2959            .await
2960            .unwrap();
2961
2962        // Check non-existent table
2963        let mut request = TableExistsRequest::new();
2964        request.id = Some(vec!["nonexistent".to_string()]);
2965        let result = dir_namespace.table_exists(request).await;
2966        assert!(result.is_err());
2967
2968        // Create table
2969        let buffer = create_test_ipc_data();
2970        let mut create_request = CreateTableRequest::new();
2971        create_request.id = Some(vec!["test_table".to_string()]);
2972        dir_namespace
2973            .create_table(create_request, Bytes::from(buffer))
2974            .await
2975            .unwrap();
2976
2977        // Check existing table
2978        let mut request = TableExistsRequest::new();
2979        request.id = Some(vec!["test_table".to_string()]);
2980        let result = dir_namespace.table_exists(request).await;
2981        assert!(result.is_ok());
2982    }
2983
2984    #[rstest]
2985    #[case::with_optimization(true)]
2986    #[case::without_optimization(false)]
2987    #[tokio::test]
2988    async fn test_manifest_namespace_describe_table(#[case] inline_optimization: bool) {
2989        let temp_dir = TempStdDir::default();
2990        let temp_path = temp_dir.to_str().unwrap();
2991
2992        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2993            .inline_optimization_enabled(inline_optimization)
2994            .build()
2995            .await
2996            .unwrap();
2997
2998        // Describe non-existent table
2999        let mut request = DescribeTableRequest::new();
3000        request.id = Some(vec!["nonexistent".to_string()]);
3001        let result = dir_namespace.describe_table(request).await;
3002        assert!(result.is_err());
3003
3004        // Create table
3005        let buffer = create_test_ipc_data();
3006        let mut create_request = CreateTableRequest::new();
3007        create_request.id = Some(vec!["test_table".to_string()]);
3008        dir_namespace
3009            .create_table(create_request, Bytes::from(buffer))
3010            .await
3011            .unwrap();
3012
3013        // Describe existing table
3014        let mut request = DescribeTableRequest::new();
3015        request.id = Some(vec!["test_table".to_string()]);
3016        let response = dir_namespace.describe_table(request).await.unwrap();
3017        assert!(response.location.is_some());
3018        assert!(response.location.unwrap().contains("test_table"));
3019    }
3020
3021    #[rstest]
3022    #[case::with_optimization(true)]
3023    #[case::without_optimization(false)]
3024    #[tokio::test]
3025    async fn test_manifest_namespace_drop_table(#[case] inline_optimization: bool) {
3026        let temp_dir = TempStdDir::default();
3027        let temp_path = temp_dir.to_str().unwrap();
3028
3029        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3030            .inline_optimization_enabled(inline_optimization)
3031            .build()
3032            .await
3033            .unwrap();
3034
3035        // Create table
3036        let buffer = create_test_ipc_data();
3037        let mut create_request = CreateTableRequest::new();
3038        create_request.id = Some(vec!["test_table".to_string()]);
3039        dir_namespace
3040            .create_table(create_request, Bytes::from(buffer))
3041            .await
3042            .unwrap();
3043
3044        // Verify table exists
3045        let mut request = ListTablesRequest::new();
3046        request.id = Some(vec![]);
3047        let response = dir_namespace.list_tables(request).await.unwrap();
3048        assert_eq!(response.tables.len(), 1);
3049
3050        // Drop table
3051        let mut drop_request = DropTableRequest::new();
3052        drop_request.id = Some(vec!["test_table".to_string()]);
3053        let _response = dir_namespace.drop_table(drop_request).await.unwrap();
3054
3055        // Verify table is gone
3056        let mut request = ListTablesRequest::new();
3057        request.id = Some(vec![]);
3058        let response = dir_namespace.list_tables(request).await.unwrap();
3059        assert_eq!(response.tables.len(), 0);
3060    }
3061
3062    #[tokio::test]
3063    async fn test_list_tables_pagination_limit_zero() {
3064        let temp_dir = TempStdDir::default();
3065        let temp_path = temp_dir.to_str().unwrap();
3066
3067        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3068            .build()
3069            .await
3070            .unwrap();
3071
3072        let buffer = create_test_ipc_data();
3073        let mut create_request = CreateTableRequest::new();
3074        create_request.id = Some(vec!["alpha".to_string()]);
3075        dir_namespace
3076            .create_table(create_request, Bytes::from(buffer))
3077            .await
3078            .unwrap();
3079
3080        let response = dir_namespace
3081            .list_tables(ListTablesRequest {
3082                id: Some(vec![]),
3083                limit: Some(0),
3084                ..Default::default()
3085            })
3086            .await
3087            .unwrap();
3088
3089        assert!(response.tables.is_empty());
3090        assert!(response.page_token.is_none());
3091    }
3092
3093    #[rstest]
3094    #[case::with_optimization(true)]
3095    #[case::without_optimization(false)]
3096    #[tokio::test]
3097    async fn test_manifest_namespace_multiple_tables(#[case] inline_optimization: bool) {
3098        let temp_dir = TempStdDir::default();
3099        let temp_path = temp_dir.to_str().unwrap();
3100
3101        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3102            .inline_optimization_enabled(inline_optimization)
3103            .build()
3104            .await
3105            .unwrap();
3106
3107        // Create multiple tables
3108        let buffer = create_test_ipc_data();
3109        for i in 1..=3 {
3110            let mut create_request = CreateTableRequest::new();
3111            create_request.id = Some(vec![format!("table{}", i)]);
3112            dir_namespace
3113                .create_table(create_request, Bytes::from(buffer.clone()))
3114                .await
3115                .unwrap();
3116        }
3117
3118        // List all tables
3119        let mut request = ListTablesRequest::new();
3120        request.id = Some(vec![]);
3121        let response = dir_namespace.list_tables(request).await.unwrap();
3122        assert_eq!(response.tables.len(), 3);
3123        assert!(response.tables.contains(&"table1".to_string()));
3124        assert!(response.tables.contains(&"table2".to_string()));
3125        assert!(response.tables.contains(&"table3".to_string()));
3126    }
3127
3128    #[rstest]
3129    #[case::with_optimization(true)]
3130    #[case::without_optimization(false)]
3131    #[tokio::test]
3132    async fn test_directory_only_mode(#[case] inline_optimization: bool) {
3133        let temp_dir = TempStdDir::default();
3134        let temp_path = temp_dir.to_str().unwrap();
3135
3136        // Create a DirectoryNamespace with manifest disabled
3137        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3138            .manifest_enabled(false)
3139            .inline_optimization_enabled(inline_optimization)
3140            .build()
3141            .await
3142            .unwrap();
3143
3144        // Verify we can list tables (should be empty)
3145        let mut request = ListTablesRequest::new();
3146        request.id = Some(vec![]);
3147        let response = dir_namespace.list_tables(request).await.unwrap();
3148        assert_eq!(response.tables.len(), 0);
3149
3150        // Create a test table
3151        let buffer = create_test_ipc_data();
3152        let mut create_request = CreateTableRequest::new();
3153        create_request.id = Some(vec!["test_table".to_string()]);
3154
3155        // Create table - this should use directory-only mode
3156        let _response = dir_namespace
3157            .create_table(create_request, Bytes::from(buffer))
3158            .await
3159            .unwrap();
3160
3161        // List tables - should see our new table
3162        let mut request = ListTablesRequest::new();
3163        request.id = Some(vec![]);
3164        let response = dir_namespace.list_tables(request).await.unwrap();
3165        assert_eq!(response.tables.len(), 1);
3166        assert_eq!(response.tables[0], "test_table");
3167    }
3168
3169    #[rstest]
3170    #[case::with_optimization(true)]
3171    #[case::without_optimization(false)]
3172    #[tokio::test]
3173    async fn test_dual_mode_merge(#[case] inline_optimization: bool) {
3174        let temp_dir = TempStdDir::default();
3175        let temp_path = temp_dir.to_str().unwrap();
3176
3177        // Create a DirectoryNamespace with both manifest and directory enabled
3178        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3179            .manifest_enabled(true)
3180            .dir_listing_enabled(true)
3181            .inline_optimization_enabled(inline_optimization)
3182            .build()
3183            .await
3184            .unwrap();
3185
3186        // Create tables through manifest
3187        let buffer = create_test_ipc_data();
3188        let mut create_request = CreateTableRequest::new();
3189        create_request.id = Some(vec!["table1".to_string()]);
3190        dir_namespace
3191            .create_table(create_request, Bytes::from(buffer))
3192            .await
3193            .unwrap();
3194
3195        // List tables - should see table from both manifest and directory
3196        let mut request = ListTablesRequest::new();
3197        request.id = Some(vec![]);
3198        let response = dir_namespace.list_tables(request).await.unwrap();
3199        assert_eq!(response.tables.len(), 1);
3200        assert_eq!(response.tables[0], "table1");
3201    }
3202
3203    #[rstest]
3204    #[case::with_optimization(true)]
3205    #[case::without_optimization(false)]
3206    #[tokio::test]
3207    async fn test_manifest_only_mode(#[case] inline_optimization: bool) {
3208        let temp_dir = TempStdDir::default();
3209        let temp_path = temp_dir.to_str().unwrap();
3210
3211        // Create a DirectoryNamespace with only manifest enabled
3212        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3213            .manifest_enabled(true)
3214            .dir_listing_enabled(false)
3215            .inline_optimization_enabled(inline_optimization)
3216            .build()
3217            .await
3218            .unwrap();
3219
3220        // Create table
3221        let buffer = create_test_ipc_data();
3222        let mut create_request = CreateTableRequest::new();
3223        create_request.id = Some(vec!["test_table".to_string()]);
3224        dir_namespace
3225            .create_table(create_request, Bytes::from(buffer))
3226            .await
3227            .unwrap();
3228
3229        // List tables - should only use manifest
3230        let mut request = ListTablesRequest::new();
3231        request.id = Some(vec![]);
3232        let response = dir_namespace.list_tables(request).await.unwrap();
3233        assert_eq!(response.tables.len(), 1);
3234        assert_eq!(response.tables[0], "test_table");
3235    }
3236
3237    #[rstest]
3238    #[case::with_optimization(true)]
3239    #[case::without_optimization(false)]
3240    #[tokio::test]
3241    async fn test_drop_nonexistent_table(#[case] inline_optimization: bool) {
3242        let temp_dir = TempStdDir::default();
3243        let temp_path = temp_dir.to_str().unwrap();
3244
3245        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3246            .inline_optimization_enabled(inline_optimization)
3247            .build()
3248            .await
3249            .unwrap();
3250
3251        // Try to drop non-existent table
3252        let mut drop_request = DropTableRequest::new();
3253        drop_request.id = Some(vec!["nonexistent".to_string()]);
3254        let result = dir_namespace.drop_table(drop_request).await;
3255        assert!(result.is_err());
3256    }
3257
3258    #[rstest]
3259    #[case::with_optimization(true)]
3260    #[case::without_optimization(false)]
3261    #[tokio::test]
3262    async fn test_create_duplicate_table_fails(#[case] inline_optimization: bool) {
3263        let temp_dir = TempStdDir::default();
3264        let temp_path = temp_dir.to_str().unwrap();
3265
3266        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3267            .inline_optimization_enabled(inline_optimization)
3268            .build()
3269            .await
3270            .unwrap();
3271
3272        // Create table
3273        let buffer = create_test_ipc_data();
3274        let mut create_request = CreateTableRequest::new();
3275        create_request.id = Some(vec!["test_table".to_string()]);
3276        dir_namespace
3277            .create_table(create_request, Bytes::from(buffer.clone()))
3278            .await
3279            .unwrap();
3280
3281        // Try to create table with same name - should fail
3282        let mut create_request = CreateTableRequest::new();
3283        create_request.id = Some(vec!["test_table".to_string()]);
3284        let result = dir_namespace
3285            .create_table(create_request, Bytes::from(buffer))
3286            .await;
3287        assert!(result.is_err());
3288    }
3289
3290    #[rstest]
3291    #[case::with_optimization(true)]
3292    #[case::without_optimization(false)]
3293    #[tokio::test]
3294    async fn test_create_child_namespace(#[case] inline_optimization: bool) {
3295        use lance_namespace::models::{
3296            CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
3297        };
3298
3299        let temp_dir = TempStdDir::default();
3300        let temp_path = temp_dir.to_str().unwrap();
3301
3302        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3303            .inline_optimization_enabled(inline_optimization)
3304            .build()
3305            .await
3306            .unwrap();
3307
3308        // Create a child namespace
3309        let mut create_req = CreateNamespaceRequest::new();
3310        create_req.id = Some(vec!["ns1".to_string()]);
3311        let result = dir_namespace.create_namespace(create_req).await;
3312        assert!(
3313            result.is_ok(),
3314            "Failed to create child namespace: {:?}",
3315            result.err()
3316        );
3317
3318        // Verify namespace exists
3319        let exists_req = NamespaceExistsRequest {
3320            id: Some(vec!["ns1".to_string()]),
3321            ..Default::default()
3322        };
3323        let result = dir_namespace.namespace_exists(exists_req).await;
3324        assert!(result.is_ok(), "Namespace should exist");
3325
3326        // List child namespaces of root
3327        let list_req = ListNamespacesRequest {
3328            id: Some(vec![]),
3329            page_token: None,
3330            limit: None,
3331            ..Default::default()
3332        };
3333        let result = dir_namespace.list_namespaces(list_req).await;
3334        assert!(result.is_ok());
3335        let namespaces = result.unwrap();
3336        assert_eq!(namespaces.namespaces.len(), 1);
3337        assert_eq!(namespaces.namespaces[0], "ns1");
3338    }
3339
3340    #[rstest]
3341    #[case::with_optimization(true)]
3342    #[case::without_optimization(false)]
3343    #[tokio::test]
3344    async fn test_create_nested_namespace(#[case] inline_optimization: bool) {
3345        use lance_namespace::models::{
3346            CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
3347        };
3348
3349        let temp_dir = TempStdDir::default();
3350        let temp_path = temp_dir.to_str().unwrap();
3351
3352        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3353            .inline_optimization_enabled(inline_optimization)
3354            .build()
3355            .await
3356            .unwrap();
3357
3358        // Create parent namespace
3359        let mut create_req = CreateNamespaceRequest::new();
3360        create_req.id = Some(vec!["parent".to_string()]);
3361        dir_namespace.create_namespace(create_req).await.unwrap();
3362
3363        // Create nested child namespace
3364        let mut create_req = CreateNamespaceRequest::new();
3365        create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
3366        let result = dir_namespace.create_namespace(create_req).await;
3367        assert!(
3368            result.is_ok(),
3369            "Failed to create nested namespace: {:?}",
3370            result.err()
3371        );
3372
3373        // Verify nested namespace exists
3374        let exists_req = NamespaceExistsRequest {
3375            id: Some(vec!["parent".to_string(), "child".to_string()]),
3376            ..Default::default()
3377        };
3378        let result = dir_namespace.namespace_exists(exists_req).await;
3379        assert!(result.is_ok(), "Nested namespace should exist");
3380
3381        // List child namespaces of parent
3382        let list_req = ListNamespacesRequest {
3383            id: Some(vec!["parent".to_string()]),
3384            page_token: None,
3385            limit: None,
3386            ..Default::default()
3387        };
3388        let result = dir_namespace.list_namespaces(list_req).await;
3389        assert!(result.is_ok());
3390        let namespaces = result.unwrap();
3391        assert_eq!(namespaces.namespaces.len(), 1);
3392        assert_eq!(namespaces.namespaces[0], "child");
3393    }
3394
3395    #[rstest]
3396    #[case::with_optimization(true)]
3397    #[case::without_optimization(false)]
3398    #[tokio::test]
3399    async fn test_create_namespace_without_parent_fails(#[case] inline_optimization: bool) {
3400        use lance_namespace::models::CreateNamespaceRequest;
3401
3402        let temp_dir = TempStdDir::default();
3403        let temp_path = temp_dir.to_str().unwrap();
3404
3405        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3406            .inline_optimization_enabled(inline_optimization)
3407            .build()
3408            .await
3409            .unwrap();
3410
3411        // Try to create nested namespace without parent
3412        let mut create_req = CreateNamespaceRequest::new();
3413        create_req.id = Some(vec!["nonexistent_parent".to_string(), "child".to_string()]);
3414        let result = dir_namespace.create_namespace(create_req).await;
3415        assert!(result.is_err(), "Should fail when parent doesn't exist");
3416    }
3417
3418    #[rstest]
3419    #[case::with_optimization(true)]
3420    #[case::without_optimization(false)]
3421    #[tokio::test]
3422    async fn test_drop_child_namespace(#[case] inline_optimization: bool) {
3423        use lance_namespace::models::{
3424            CreateNamespaceRequest, DropNamespaceRequest, NamespaceExistsRequest,
3425        };
3426
3427        let temp_dir = TempStdDir::default();
3428        let temp_path = temp_dir.to_str().unwrap();
3429
3430        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3431            .inline_optimization_enabled(inline_optimization)
3432            .build()
3433            .await
3434            .unwrap();
3435
3436        // Create a child namespace
3437        let mut create_req = CreateNamespaceRequest::new();
3438        create_req.id = Some(vec!["ns1".to_string()]);
3439        dir_namespace.create_namespace(create_req).await.unwrap();
3440
3441        // Drop the namespace
3442        let mut drop_req = DropNamespaceRequest::new();
3443        drop_req.id = Some(vec!["ns1".to_string()]);
3444        let result = dir_namespace.drop_namespace(drop_req).await;
3445        assert!(
3446            result.is_ok(),
3447            "Failed to drop namespace: {:?}",
3448            result.err()
3449        );
3450
3451        // Verify namespace no longer exists
3452        let exists_req = NamespaceExistsRequest {
3453            id: Some(vec!["ns1".to_string()]),
3454            ..Default::default()
3455        };
3456        let result = dir_namespace.namespace_exists(exists_req).await;
3457        assert!(result.is_err(), "Namespace should not exist after drop");
3458    }
3459
3460    #[rstest]
3461    #[case::with_optimization(true)]
3462    #[case::without_optimization(false)]
3463    #[tokio::test]
3464    async fn test_drop_namespace_with_children_fails(#[case] inline_optimization: bool) {
3465        use lance_namespace::models::{CreateNamespaceRequest, DropNamespaceRequest};
3466
3467        let temp_dir = TempStdDir::default();
3468        let temp_path = temp_dir.to_str().unwrap();
3469
3470        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3471            .inline_optimization_enabled(inline_optimization)
3472            .build()
3473            .await
3474            .unwrap();
3475
3476        // Create parent and child namespaces
3477        let mut create_req = CreateNamespaceRequest::new();
3478        create_req.id = Some(vec!["parent".to_string()]);
3479        dir_namespace.create_namespace(create_req).await.unwrap();
3480
3481        let mut create_req = CreateNamespaceRequest::new();
3482        create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
3483        dir_namespace.create_namespace(create_req).await.unwrap();
3484
3485        // Try to drop parent namespace - should fail because it has children
3486        let mut drop_req = DropNamespaceRequest::new();
3487        drop_req.id = Some(vec!["parent".to_string()]);
3488        let result = dir_namespace.drop_namespace(drop_req).await;
3489        assert!(result.is_err(), "Should fail when namespace has children");
3490    }
3491
3492    #[rstest]
3493    #[case::with_optimization(true)]
3494    #[case::without_optimization(false)]
3495    #[tokio::test]
3496    async fn test_create_table_in_child_namespace(#[case] inline_optimization: bool) {
3497        use lance_namespace::models::{
3498            CreateNamespaceRequest, CreateTableRequest, ListTablesRequest,
3499        };
3500
3501        let temp_dir = TempStdDir::default();
3502        let temp_path = temp_dir.to_str().unwrap();
3503
3504        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3505            .inline_optimization_enabled(inline_optimization)
3506            .build()
3507            .await
3508            .unwrap();
3509
3510        // Create a child namespace
3511        let mut create_ns_req = CreateNamespaceRequest::new();
3512        create_ns_req.id = Some(vec!["ns1".to_string()]);
3513        dir_namespace.create_namespace(create_ns_req).await.unwrap();
3514
3515        // Create a table in the child namespace
3516        let buffer = create_test_ipc_data();
3517        let mut create_table_req = CreateTableRequest::new();
3518        create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
3519        let result = dir_namespace
3520            .create_table(create_table_req, Bytes::from(buffer))
3521            .await;
3522        assert!(
3523            result.is_ok(),
3524            "Failed to create table in child namespace: {:?}",
3525            result.err()
3526        );
3527
3528        // List tables in the namespace
3529        let list_req = ListTablesRequest {
3530            id: Some(vec!["ns1".to_string()]),
3531            page_token: None,
3532            limit: None,
3533            ..Default::default()
3534        };
3535        let result = dir_namespace.list_tables(list_req).await;
3536        assert!(result.is_ok());
3537        let tables = result.unwrap();
3538        assert_eq!(tables.tables.len(), 1);
3539        assert_eq!(tables.tables[0], "table1");
3540    }
3541
3542    #[rstest]
3543    #[case::with_optimization(true)]
3544    #[case::without_optimization(false)]
3545    #[tokio::test]
3546    async fn test_describe_child_namespace(#[case] inline_optimization: bool) {
3547        use lance_namespace::models::{CreateNamespaceRequest, DescribeNamespaceRequest};
3548
3549        let temp_dir = TempStdDir::default();
3550        let temp_path = temp_dir.to_str().unwrap();
3551
3552        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3553            .inline_optimization_enabled(inline_optimization)
3554            .build()
3555            .await
3556            .unwrap();
3557
3558        // Create a child namespace with properties
3559        let mut properties = std::collections::HashMap::new();
3560        properties.insert("key1".to_string(), "value1".to_string());
3561
3562        let mut create_req = CreateNamespaceRequest::new();
3563        create_req.id = Some(vec!["ns1".to_string()]);
3564        create_req.properties = Some(properties.clone());
3565        dir_namespace.create_namespace(create_req).await.unwrap();
3566
3567        // Describe the namespace
3568        let describe_req = DescribeNamespaceRequest {
3569            id: Some(vec!["ns1".to_string()]),
3570            ..Default::default()
3571        };
3572        let result = dir_namespace.describe_namespace(describe_req).await;
3573        assert!(
3574            result.is_ok(),
3575            "Failed to describe namespace: {:?}",
3576            result.err()
3577        );
3578        let response = result.unwrap();
3579        assert!(response.properties.is_some());
3580        assert_eq!(
3581            response.properties.unwrap().get("key1"),
3582            Some(&"value1".to_string())
3583        );
3584    }
3585
3586    #[rstest]
3587    #[case::with_optimization(true)]
3588    #[case::without_optimization(false)]
3589    #[tokio::test]
3590    async fn test_concurrent_create_and_drop_single_instance(#[case] inline_optimization: bool) {
3591        use futures::future::join_all;
3592        use std::sync::Arc;
3593
3594        let temp_dir = TempStdDir::default();
3595        let temp_path = temp_dir.to_str().unwrap();
3596
3597        let dir_namespace = Arc::new(
3598            DirectoryNamespaceBuilder::new(temp_path)
3599                .inline_optimization_enabled(inline_optimization)
3600                .build()
3601                .await
3602                .unwrap(),
3603        );
3604
3605        // Initialize namespace first - create parent namespace to ensure __manifest table
3606        // is created before concurrent operations
3607        let mut create_ns_request = CreateNamespaceRequest::new();
3608        create_ns_request.id = Some(vec!["test_ns".to_string()]);
3609        dir_namespace
3610            .create_namespace(create_ns_request)
3611            .await
3612            .unwrap();
3613
3614        let num_tables = 10;
3615        let mut handles = Vec::new();
3616
3617        for i in 0..num_tables {
3618            let ns = dir_namespace.clone();
3619            let handle = async move {
3620                let table_name = format!("concurrent_table_{}", i);
3621                let table_id = vec!["test_ns".to_string(), table_name.clone()];
3622                let buffer = create_test_ipc_data();
3623
3624                // Create table
3625                let mut create_request = CreateTableRequest::new();
3626                create_request.id = Some(table_id.clone());
3627                ns.create_table(create_request, Bytes::from(buffer))
3628                    .await
3629                    .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3630
3631                // Drop table
3632                let mut drop_request = DropTableRequest::new();
3633                drop_request.id = Some(table_id);
3634                ns.drop_table(drop_request)
3635                    .await
3636                    .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3637
3638                Ok::<_, lance_core::Error>(())
3639            };
3640            handles.push(handle);
3641        }
3642
3643        let results = join_all(handles).await;
3644        for result in results {
3645            assert!(result.is_ok(), "All concurrent operations should succeed");
3646        }
3647
3648        // Verify all tables are dropped
3649        let mut request = ListTablesRequest::new();
3650        request.id = Some(vec!["test_ns".to_string()]);
3651        let response = dir_namespace.list_tables(request).await.unwrap();
3652        assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3653    }
3654
3655    #[rstest]
3656    #[case::with_optimization(true)]
3657    #[case::without_optimization(false)]
3658    #[tokio::test]
3659    async fn test_concurrent_create_and_drop_multiple_instances(#[case] inline_optimization: bool) {
3660        use futures::future::join_all;
3661
3662        let temp_dir = TempStdDir::default();
3663        let temp_path = temp_dir.to_str().unwrap().to_string();
3664
3665        // Initialize namespace first with a single instance to ensure __manifest
3666        // table is created and parent namespace exists before concurrent operations
3667        let init_ns = DirectoryNamespaceBuilder::new(&temp_path)
3668            .inline_optimization_enabled(inline_optimization)
3669            .build()
3670            .await
3671            .unwrap();
3672        let mut create_ns_request = CreateNamespaceRequest::new();
3673        create_ns_request.id = Some(vec!["test_ns".to_string()]);
3674        init_ns.create_namespace(create_ns_request).await.unwrap();
3675
3676        let num_tables = 10;
3677        let mut handles = Vec::new();
3678
3679        for i in 0..num_tables {
3680            let path = temp_path.clone();
3681            let handle = async move {
3682                // Each task creates its own namespace instance
3683                let ns = DirectoryNamespaceBuilder::new(&path)
3684                    .inline_optimization_enabled(inline_optimization)
3685                    .build()
3686                    .await
3687                    .unwrap();
3688
3689                let table_name = format!("multi_ns_table_{}", i);
3690                let table_id = vec!["test_ns".to_string(), table_name.clone()];
3691                let buffer = create_test_ipc_data();
3692
3693                // Create table
3694                let mut create_request = CreateTableRequest::new();
3695                create_request.id = Some(table_id.clone());
3696                ns.create_table(create_request, Bytes::from(buffer))
3697                    .await
3698                    .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3699
3700                // Drop table
3701                let mut drop_request = DropTableRequest::new();
3702                drop_request.id = Some(table_id);
3703                ns.drop_table(drop_request)
3704                    .await
3705                    .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3706
3707                Ok::<_, lance_core::Error>(())
3708            };
3709            handles.push(handle);
3710        }
3711
3712        let results = join_all(handles).await;
3713        for result in results {
3714            assert!(result.is_ok(), "All concurrent operations should succeed");
3715        }
3716
3717        // Verify with a fresh namespace instance
3718        let verify_ns = DirectoryNamespaceBuilder::new(&temp_path)
3719            .inline_optimization_enabled(inline_optimization)
3720            .build()
3721            .await
3722            .unwrap();
3723
3724        let mut request = ListTablesRequest::new();
3725        request.id = Some(vec!["test_ns".to_string()]);
3726        let response = verify_ns.list_tables(request).await.unwrap();
3727        assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3728    }
3729
3730    #[rstest]
3731    #[case::with_optimization(true)]
3732    #[case::without_optimization(false)]
3733    #[tokio::test]
3734    async fn test_concurrent_create_then_drop_from_different_instance(
3735        #[case] inline_optimization: bool,
3736    ) {
3737        use futures::future::join_all;
3738
3739        let temp_dir = TempStdDir::default();
3740        let temp_path = temp_dir.to_str().unwrap().to_string();
3741
3742        // Initialize namespace first with a single instance to ensure __manifest
3743        // table is created and parent namespace exists before concurrent operations
3744        let init_ns = DirectoryNamespaceBuilder::new(&temp_path)
3745            .inline_optimization_enabled(inline_optimization)
3746            .build()
3747            .await
3748            .unwrap();
3749        let mut create_ns_request = CreateNamespaceRequest::new();
3750        create_ns_request.id = Some(vec!["test_ns".to_string()]);
3751        init_ns.create_namespace(create_ns_request).await.unwrap();
3752
3753        let num_tables = 10;
3754
3755        // Phase 1: Create all tables concurrently using separate namespace instances
3756        let mut create_handles = Vec::new();
3757        for i in 0..num_tables {
3758            let path = temp_path.clone();
3759            let handle = async move {
3760                let ns = DirectoryNamespaceBuilder::new(&path)
3761                    .inline_optimization_enabled(inline_optimization)
3762                    .build()
3763                    .await
3764                    .unwrap();
3765
3766                let table_name = format!("cross_instance_table_{}", i);
3767                let table_id = vec!["test_ns".to_string(), table_name.clone()];
3768                let buffer = create_test_ipc_data();
3769
3770                let mut create_request = CreateTableRequest::new();
3771                create_request.id = Some(table_id);
3772                ns.create_table(create_request, Bytes::from(buffer))
3773                    .await
3774                    .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3775
3776                Ok::<_, lance_core::Error>(())
3777            };
3778            create_handles.push(handle);
3779        }
3780
3781        let create_results = join_all(create_handles).await;
3782        for result in create_results {
3783            assert!(result.is_ok(), "All create operations should succeed");
3784        }
3785
3786        // Phase 2: Drop all tables concurrently using NEW namespace instances
3787        let mut drop_handles = Vec::new();
3788        for i in 0..num_tables {
3789            let path = temp_path.clone();
3790            let handle = async move {
3791                let ns = DirectoryNamespaceBuilder::new(&path)
3792                    .inline_optimization_enabled(inline_optimization)
3793                    .build()
3794                    .await
3795                    .unwrap();
3796
3797                let table_name = format!("cross_instance_table_{}", i);
3798                let table_id = vec!["test_ns".to_string(), table_name.clone()];
3799
3800                let mut drop_request = DropTableRequest::new();
3801                drop_request.id = Some(table_id);
3802                ns.drop_table(drop_request)
3803                    .await
3804                    .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3805
3806                Ok::<_, lance_core::Error>(())
3807            };
3808            drop_handles.push(handle);
3809        }
3810
3811        let drop_results = join_all(drop_handles).await;
3812        for result in drop_results {
3813            assert!(result.is_ok(), "All drop operations should succeed");
3814        }
3815
3816        // Verify all tables are dropped
3817        let verify_ns = DirectoryNamespaceBuilder::new(&temp_path)
3818            .inline_optimization_enabled(inline_optimization)
3819            .build()
3820            .await
3821            .unwrap();
3822
3823        let mut request = ListTablesRequest::new();
3824        request.id = Some(vec!["test_ns".to_string()]);
3825        let response = verify_ns.list_tables(request).await.unwrap();
3826        assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3827    }
3828
3829    #[test]
3830    fn test_construct_full_uri_with_cloud_urls() {
3831        // Test S3-style URL with nested path (no trailing slash)
3832        let s3_result =
3833            ManifestNamespace::construct_full_uri("s3://bucket/path/subdir", "table.lance")
3834                .unwrap();
3835        assert_eq!(
3836            s3_result, "s3://bucket/path/subdir/table.lance",
3837            "S3 URL should correctly append table name to nested path"
3838        );
3839
3840        // Test Azure-style URL with nested path (no trailing slash)
3841        let az_result =
3842            ManifestNamespace::construct_full_uri("az://container/path/subdir", "table.lance")
3843                .unwrap();
3844        assert_eq!(
3845            az_result, "az://container/path/subdir/table.lance",
3846            "Azure URL should correctly append table name to nested path"
3847        );
3848
3849        // Test GCS-style URL with nested path (no trailing slash)
3850        let gs_result =
3851            ManifestNamespace::construct_full_uri("gs://bucket/path/subdir", "table.lance")
3852                .unwrap();
3853        assert_eq!(
3854            gs_result, "gs://bucket/path/subdir/table.lance",
3855            "GCS URL should correctly append table name to nested path"
3856        );
3857
3858        // Test with deeper nesting
3859        let deep_result =
3860            ManifestNamespace::construct_full_uri("s3://bucket/a/b/c/d", "my_table.lance").unwrap();
3861        assert_eq!(
3862            deep_result, "s3://bucket/a/b/c/d/my_table.lance",
3863            "Deeply nested path should work correctly"
3864        );
3865
3866        // Test with root-level path (single segment after bucket)
3867        let shallow_result =
3868            ManifestNamespace::construct_full_uri("s3://bucket", "table.lance").unwrap();
3869        assert_eq!(
3870            shallow_result, "s3://bucket/table.lance",
3871            "Single-level nested path should work correctly"
3872        );
3873
3874        // Test that URLs with trailing slash already work (no regression)
3875        let trailing_slash_result =
3876            ManifestNamespace::construct_full_uri("s3://bucket/path/subdir/", "table.lance")
3877                .unwrap();
3878        assert_eq!(
3879            trailing_slash_result, "s3://bucket/path/subdir/table.lance",
3880            "URL with existing trailing slash should still work"
3881        );
3882
3883        // Test that URLs with empty query string don't include trailing "?"
3884        // This is important because URL::to_string() can add "?" for empty queries
3885        let empty_query_result =
3886            ManifestNamespace::construct_full_uri("s3://bucket/path?", "table.lance").unwrap();
3887        assert_eq!(
3888            empty_query_result, "s3://bucket/path/table.lance",
3889            "URL with empty query string should not include trailing '?'"
3890        );
3891
3892        // Test that URLs with actual query parameters have them stripped
3893        // (query parameters are not meaningful for storage paths)
3894        let query_param_result =
3895            ManifestNamespace::construct_full_uri("s3://bucket/path?param=value", "table.lance")
3896                .unwrap();
3897        assert_eq!(
3898            query_param_result, "s3://bucket/path/table.lance",
3899            "URL with query parameters should have them stripped"
3900        );
3901    }
3902
3903    #[test]
3904    fn test_construct_full_uri_with_dollar_sign() {
3905        let result =
3906            ManifestNamespace::construct_full_uri("/tmp/root", "hash_workspace$test_table")
3907                .unwrap();
3908
3909        assert!(
3910            result.ends_with("/tmp/root/hash_workspace$test_table"),
3911            "local file URI should preserve dollar signs without adding empty path segments: {}",
3912            result
3913        );
3914        assert!(
3915            !result.contains("//hash_workspace$test_table"),
3916            "local file URI should not add a double slash before table directory: {}",
3917            result
3918        );
3919    }
3920
3921    #[test]
3922    fn test_construct_full_uri_with_nested_relative_location() {
3923        let result =
3924            ManifestNamespace::construct_full_uri("/tmp/root", "workspace/physical_table.lance")
3925                .unwrap();
3926
3927        assert!(
3928            result.ends_with("/tmp/root/workspace/physical_table.lance"),
3929            "nested relative location should preserve path separators: {}",
3930            result
3931        );
3932        assert!(
3933            !result.contains("%2Fphysical_table.lance"),
3934            "nested relative location should not encode path separators: {}",
3935            result
3936        );
3937    }
3938
3939    /// Test that concurrent create_table calls for the same table name don't
3940    /// create duplicate entries in the manifest. Uses two independent
3941    /// ManifestNamespace instances pointing at the same directory to simulate
3942    /// two separate OS processes racing on table creation. The conflict_retries
3943    /// setting on the MergeInsert ensures the second operation properly detects
3944    /// the duplicate via WhenMatched::Fail after retrying against the latest data.
3945    #[tokio::test]
3946    async fn test_concurrent_create_table_no_duplicates() {
3947        let temp_dir = TempStdDir::default();
3948        let temp_path = temp_dir.to_str().unwrap();
3949
3950        // Two independent namespace instances = two separate "processes"
3951        // sharing the same underlying filesystem directory.
3952        let ns1 = DirectoryNamespaceBuilder::new(temp_path)
3953            .inline_optimization_enabled(false)
3954            .build()
3955            .await
3956            .unwrap();
3957        let ns2 = DirectoryNamespaceBuilder::new(temp_path)
3958            .inline_optimization_enabled(false)
3959            .build()
3960            .await
3961            .unwrap();
3962
3963        let buffer = create_test_ipc_data();
3964
3965        let mut req1 = CreateTableRequest::new();
3966        req1.id = Some(vec!["race_table".to_string()]);
3967        let mut req2 = CreateTableRequest::new();
3968        req2.id = Some(vec!["race_table".to_string()]);
3969
3970        // Launch both create_table calls concurrently
3971        let (result1, result2) = tokio::join!(
3972            ns1.create_table(req1, Bytes::from(buffer.clone())),
3973            ns2.create_table(req2, Bytes::from(buffer.clone())),
3974        );
3975
3976        // Exactly one should succeed and one should fail
3977        let success_count = [&result1, &result2].iter().filter(|r| r.is_ok()).count();
3978        let failure_count = [&result1, &result2].iter().filter(|r| r.is_err()).count();
3979        assert_eq!(
3980            success_count, 1,
3981            "Exactly one create should succeed, got: result1={:?}, result2={:?}",
3982            result1, result2
3983        );
3984        assert_eq!(
3985            failure_count, 1,
3986            "Exactly one create should fail, got: result1={:?}, result2={:?}",
3987            result1, result2
3988        );
3989
3990        // Verify only one table entry exists in the manifest
3991        let ns_check = DirectoryNamespaceBuilder::new(temp_path)
3992            .inline_optimization_enabled(false)
3993            .build()
3994            .await
3995            .unwrap();
3996        let mut list_request = ListTablesRequest::new();
3997        list_request.id = Some(vec![]);
3998        let response = ns_check.list_tables(list_request).await.unwrap();
3999        assert_eq!(
4000            response.tables.len(),
4001            1,
4002            "Should have exactly 1 table, found: {:?}",
4003            response.tables
4004        );
4005        assert_eq!(response.tables[0], "race_table");
4006
4007        // Also verify describe_table works (no "found 2" error)
4008        let mut describe_request = DescribeTableRequest::new();
4009        describe_request.id = Some(vec!["race_table".to_string()]);
4010        let describe_result = ns_check.describe_table(describe_request).await;
4011        assert!(
4012            describe_result.is_ok(),
4013            "describe_table should not fail with duplicate entries: {:?}",
4014            describe_result
4015        );
4016    }
4017
4018    // --- apply_pagination unit tests ---
4019
4020    fn names(v: &[&str]) -> Vec<String> {
4021        v.iter().map(|s| s.to_string()).collect()
4022    }
4023
4024    #[test]
4025    fn test_apply_pagination_no_token_no_limit() {
4026        let mut n = names(&["b", "a", "c"]);
4027        let next = ManifestNamespace::apply_pagination(&mut n, None, None);
4028        assert_eq!(n, names(&["a", "b", "c"]));
4029        assert_eq!(next, None);
4030    }
4031
4032    #[test]
4033    fn test_apply_pagination_limit_truncates_and_returns_token() {
4034        let mut n = names(&["c", "a", "b"]);
4035        let next = ManifestNamespace::apply_pagination(&mut n, None, Some(2));
4036        assert_eq!(n, names(&["a", "b"]));
4037        assert_eq!(next, Some("b".to_string()));
4038    }
4039
4040    #[test]
4041    fn test_apply_pagination_limit_zero_returns_empty_no_token() {
4042        let mut n = names(&["a", "b", "c"]);
4043        let next = ManifestNamespace::apply_pagination(&mut n, None, Some(0));
4044        assert!(n.is_empty());
4045        assert_eq!(next, None);
4046    }
4047
4048    #[test]
4049    fn test_apply_pagination_page_token_in_list() {
4050        // "b" is in the list; should start from "c" (strict >)
4051        let mut n = names(&["a", "b", "c", "d"]);
4052        let next = ManifestNamespace::apply_pagination(&mut n, Some("b".to_string()), None);
4053        assert_eq!(n, names(&["c", "d"]));
4054        assert_eq!(next, None);
4055    }
4056
4057    #[test]
4058    fn test_apply_pagination_page_token_past_all_items() {
4059        let mut n = names(&["a", "b", "c"]);
4060        let next = ManifestNamespace::apply_pagination(&mut n, Some("z".to_string()), None);
4061        assert!(n.is_empty());
4062        assert_eq!(next, None);
4063    }
4064
4065    #[test]
4066    fn test_apply_pagination_token_and_limit_combined() {
4067        let mut n = names(&["a", "b", "c", "d", "e"]);
4068        let next = ManifestNamespace::apply_pagination(&mut n, Some("b".to_string()), Some(2));
4069        assert_eq!(n, names(&["c", "d"]));
4070        assert_eq!(next, Some("d".to_string()));
4071    }
4072}