Skip to main content

lance_namespace_impls/dir/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Manifest-based namespace implementation
5//!
6//! This module provides a namespace implementation that uses a manifest table
7//! to track tables and nested namespaces.
8
9use arrow::array::builder::{ListBuilder, StringBuilder};
10use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray};
11use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
12use arrow_ipc::reader::StreamReader;
13use async_trait::async_trait;
14use bytes::Bytes;
15use futures::{FutureExt, TryStreamExt, stream::StreamExt};
16use lance::dataset::optimize::{CompactionOptions, compact_files};
17use lance::dataset::{
18    DeleteBuilder, MergeInsertBuilder, ReadParams, WhenMatched, WhenNotMatched, WriteMode,
19    WriteParams, builder::DatasetBuilder,
20};
21use lance::index::DatasetIndexExt;
22use lance::session::Session;
23use lance::{Dataset, dataset::scanner::Scanner};
24use lance_core::Error as LanceError;
25use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION;
26use lance_core::{Error, Result};
27use lance_index::IndexType;
28use lance_index::optimize::OptimizeOptions;
29use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
30use lance_io::object_store::{ObjectStore, ObjectStoreParams};
31use lance_namespace::LanceNamespace;
32use lance_namespace::error::NamespaceError;
33use lance_namespace::models::{
34    CreateNamespaceRequest, CreateNamespaceResponse, CreateTableRequest, CreateTableResponse,
35    DeclareTableRequest, DeclareTableResponse, DeregisterTableRequest, DeregisterTableResponse,
36    DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
37    DescribeTableResponse, DescribeTableVersionResponse, DropNamespaceRequest,
38    DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest,
39    ListNamespacesResponse, ListTableVersionsResponse, ListTablesRequest, ListTablesResponse,
40    NamespaceExistsRequest, RegisterTableRequest, RegisterTableResponse, TableExistsRequest,
41    TableVersion,
42};
43use lance_namespace::schema::arrow_schema_to_json;
44use object_store::{Error as ObjectStoreError, path::Path};
45use std::io::Cursor;
46use std::{
47    collections::HashMap,
48    hash::{DefaultHasher, Hash, Hasher},
49    ops::{Deref, DerefMut},
50    sync::Arc,
51};
52use tokio::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
53
54const MANIFEST_TABLE_NAME: &str = "__manifest";
55const DELIMITER: &str = "$";
56/// Bounded concurrency for per-table `_versions/` probes when filtering declared tables.
57/// Higher values reduce latency but increase burst load against the object store.
58pub(crate) const DECLARED_FILTER_CONCURRENCY: usize = 16;
59
60// Index names for the __manifest table
61/// BTREE index on the object_id column for fast lookups
62const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
63/// Bitmap index on the object_type column for filtering by type
64const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
65/// LabelList index on the base_objects column for view dependencies
66const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
67/// Inline maintenance on the manifest table is expensive relative to a single-row mutation.
68/// Wait until enough fragments accumulate before compacting files or merging indices.
69const MANIFEST_INLINE_OPTIMIZATION_FRAGMENT_THRESHOLD: usize = 8;
70
71/// Object types that can be stored in the manifest
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum ObjectType {
74    Namespace,
75    Table,
76    TableVersion,
77}
78
79impl ObjectType {
80    pub fn as_str(&self) -> &str {
81        match self {
82            Self::Namespace => "namespace",
83            Self::Table => "table",
84            Self::TableVersion => "table_version",
85        }
86    }
87
88    pub fn parse(s: &str) -> Result<Self> {
89        match s {
90            "namespace" => Ok(Self::Namespace),
91            "table" => Ok(Self::Table),
92            "table_version" => Ok(Self::TableVersion),
93            _ => Err(NamespaceError::Internal {
94                message: format!("Invalid object type: {}", s),
95            }
96            .into()),
97        }
98    }
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102enum CreateTableMode {
103    Create,
104    ExistOk,
105    Overwrite,
106}
107
108impl CreateTableMode {
109    fn parse(mode: Option<&str>) -> Result<Self> {
110        match mode {
111            None => Ok(Self::Create),
112            Some(mode) if mode.eq_ignore_ascii_case("create") => Ok(Self::Create),
113            Some(mode)
114                if mode.eq_ignore_ascii_case("existok")
115                    || mode.eq_ignore_ascii_case("exist_ok") =>
116            {
117                Ok(Self::ExistOk)
118            }
119            Some(mode) if mode.eq_ignore_ascii_case("overwrite") => Ok(Self::Overwrite),
120            Some(mode) => Err(NamespaceError::InvalidInput {
121                message: format!(
122                    "Unsupported create_table mode '{}'. Supported modes are: 'Create', 'ExistOk', 'Overwrite'",
123                    mode
124                ),
125            }
126            .into()),
127        }
128    }
129
130    fn write_mode(self) -> WriteMode {
131        match self {
132            Self::Overwrite => WriteMode::Overwrite,
133            Self::Create | Self::ExistOk => WriteMode::Create,
134        }
135    }
136}
137
138/// Information about a table stored in the manifest
139#[derive(Debug, Clone)]
140pub struct TableInfo {
141    pub namespace: Vec<String>,
142    pub name: String,
143    pub location: String,
144    pub metadata: Option<HashMap<String, String>>,
145}
146
147/// An entry to be inserted into the manifest table.
148///
149/// This struct makes the meaning of each field explicit, replacing the
150/// previous tuple-based API `(String, ObjectType, Option<String>, Option<String>)`.
151#[derive(Debug, Clone)]
152pub struct ManifestEntry {
153    /// The unique object identifier (e.g., table name or version object_id)
154    pub object_id: String,
155    /// The type of the object (Namespace, Table, or TableVersion)
156    pub object_type: ObjectType,
157    /// The storage location (e.g., directory name for tables)
158    pub location: Option<String>,
159    /// Additional metadata serialized as JSON
160    pub metadata: Option<String>,
161}
162
163/// Information about a namespace stored in the manifest
164#[derive(Debug, Clone)]
165pub struct NamespaceInfo {
166    pub namespace: Vec<String>,
167    pub name: String,
168    pub metadata: Option<HashMap<String, String>>,
169}
170
171/// A wrapper around a Dataset that provides concurrent access.
172///
173/// This can be cloned cheaply. It supports concurrent reads or exclusive writes.
174/// The manifest dataset is always kept strongly consistent by reloading on each read.
175#[derive(Debug, Clone)]
176pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);
177
178impl DatasetConsistencyWrapper {
179    /// Create a new wrapper with the given dataset.
180    pub fn new(dataset: Dataset) -> Self {
181        Self(Arc::new(RwLock::new(dataset)))
182    }
183
184    /// Get an immutable reference to the dataset.
185    /// Always reloads to ensure strong consistency.
186    pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
187        self.reload().await?;
188        Ok(DatasetReadGuard {
189            guard: self.0.read().await,
190        })
191    }
192
193    /// Get a mutable reference to the dataset.
194    /// Always reloads to ensure strong consistency.
195    pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
196        self.reload().await?;
197        Ok(DatasetWriteGuard {
198            guard: self.0.write().await,
199        })
200    }
201
202    /// Provide a known latest version of the dataset.
203    ///
204    /// This is usually done after some write operation, which inherently will
205    /// have the latest version.
206    pub async fn set_latest(&self, dataset: Dataset) {
207        let mut write_guard = self.0.write().await;
208        if dataset.manifest().version > write_guard.manifest().version {
209            *write_guard = dataset;
210        }
211    }
212
213    /// Reload the dataset to the latest version.
214    async fn reload(&self) -> Result<()> {
215        // First check if we need to reload (with read lock)
216        let read_guard = self.0.read().await;
217        let dataset_uri = read_guard.uri().to_string();
218        let current_version = read_guard.version().version;
219        log::debug!(
220            "Reload starting for uri={}, current_version={}",
221            dataset_uri,
222            current_version
223        );
224        let latest_version = read_guard.latest_version_id().await.map_err(|e| {
225            lance_core::Error::from(NamespaceError::Internal {
226                message: format!("Failed to get latest version: {:?}", e),
227            })
228        })?;
229        log::debug!(
230            "Reload got latest_version={} for uri={}, current_version={}",
231            latest_version,
232            dataset_uri,
233            current_version
234        );
235        drop(read_guard);
236
237        // If already up-to-date, return early
238        if latest_version == current_version {
239            log::debug!("Already up-to-date for uri={}", dataset_uri);
240            return Ok(());
241        }
242
243        // Need to reload, acquire write lock
244        let mut write_guard = self.0.write().await;
245
246        // Double-check after acquiring write lock (someone else might have reloaded)
247        let latest_version = write_guard.latest_version_id().await.map_err(|e| {
248            lance_core::Error::from(NamespaceError::Internal {
249                message: format!("Failed to get latest version: {:?}", e),
250            })
251        })?;
252
253        if latest_version != write_guard.version().version {
254            write_guard.checkout_latest().await.map_err(|e| {
255                lance_core::Error::from(NamespaceError::Internal {
256                    message: format!("Failed to checkout latest: {:?}", e),
257                })
258            })?;
259        }
260
261        Ok(())
262    }
263}
264
265pub struct DatasetReadGuard<'a> {
266    guard: RwLockReadGuard<'a, Dataset>,
267}
268
269impl Deref for DatasetReadGuard<'_> {
270    type Target = Dataset;
271
272    fn deref(&self) -> &Self::Target {
273        &self.guard
274    }
275}
276
277pub struct DatasetWriteGuard<'a> {
278    guard: RwLockWriteGuard<'a, Dataset>,
279}
280
281impl Deref for DatasetWriteGuard<'_> {
282    type Target = Dataset;
283
284    fn deref(&self) -> &Self::Target {
285        &self.guard
286    }
287}
288
289impl DerefMut for DatasetWriteGuard<'_> {
290    fn deref_mut(&mut self) -> &mut Self::Target {
291        &mut self.guard
292    }
293}
294
295/// Manifest-based namespace implementation
296///
297/// Uses a special `__manifest` Lance table to track tables and nested namespaces.
298pub struct ManifestNamespace {
299    root: String,
300    storage_options: Option<HashMap<String, String>>,
301    session: Option<Arc<Session>>,
302    object_store: Arc<ObjectStore>,
303    base_path: Path,
304    manifest_dataset: DatasetConsistencyWrapper,
305    /// Whether directory listing is enabled in dual mode
306    /// If true, root namespace tables use {table_name}.lance naming
307    /// If false, they use namespace-prefixed names
308    dir_listing_enabled: bool,
309    /// Whether to perform inline optimization (compaction and indexing) on the __manifest table
310    /// after every write. Defaults to true.
311    inline_optimization_enabled: bool,
312    /// Number of retries for commit operations on the manifest table.
313    /// If None, defaults to [`lance_table::io::commit::CommitConfig`] default (20).
314    commit_retries: Option<u32>,
315    /// Serialize manifest mutations within a single namespace instance so concurrent
316    /// create/drop calls do not compete with each other on the same in-memory snapshot.
317    manifest_mutation_lock: Arc<Mutex<()>>,
318}
319
320impl std::fmt::Debug for ManifestNamespace {
321    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322        f.debug_struct("ManifestNamespace")
323            .field("root", &self.root)
324            .field("storage_options", &self.storage_options)
325            .field("dir_listing_enabled", &self.dir_listing_enabled)
326            .field(
327                "inline_optimization_enabled",
328                &self.inline_optimization_enabled,
329            )
330            .finish()
331    }
332}
333
334/// Convert a Lance commit error to an appropriate namespace error.
335///
336/// Maps lance commit errors to namespace errors:
337/// - `CommitConflict`: version collision retries exhausted -> Throttling (safe to retry)
338/// - `TooMuchWriteContention`: RetryableCommitConflict (semantic conflict) retries exhausted -> ConcurrentModification
339/// - `IncompatibleTransaction`: incompatible concurrent change -> ConcurrentModification
340/// - Errors containing "matched/duplicate/already exists": ConcurrentModification (from WhenMatched::Fail)
341/// - Other errors: IO error with the operation description
342fn convert_lance_commit_error(e: &LanceError, operation: &str, object_id: Option<&str>) -> Error {
343    match e {
344        // CommitConflict: version collision retries exhausted -> Throttling (safe to retry)
345        LanceError::CommitConflict { .. } => NamespaceError::Throttling {
346            message: format!("Too many concurrent writes, please retry later: {:?}", e),
347        }
348        .into(),
349        // TooMuchWriteContention: RetryableCommitConflict (semantic conflict) retries exhausted -> ConcurrentModification
350        // IncompatibleTransaction: incompatible concurrent change -> ConcurrentModification
351        LanceError::TooMuchWriteContention { .. } | LanceError::IncompatibleTransaction { .. } => {
352            let message = if let Some(id) = object_id {
353                format!(
354                    "Object '{}' was concurrently modified by another operation: {:?}",
355                    id, e
356                )
357            } else {
358                format!(
359                    "Object was concurrently modified by another operation: {:?}",
360                    e
361                )
362            };
363            NamespaceError::ConcurrentModification { message }.into()
364        }
365        // Other errors: check message for semantic conflicts (matched/duplicate from WhenMatched::Fail)
366        _ => {
367            let error_msg = e.to_string();
368            if error_msg.contains("matched")
369                || error_msg.contains("duplicate")
370                || error_msg.contains("already exists")
371            {
372                let message = if let Some(id) = object_id {
373                    format!(
374                        "Object '{}' was concurrently created by another operation: {:?}",
375                        id, e
376                    )
377                } else {
378                    format!(
379                        "Object was concurrently created by another operation: {:?}",
380                        e
381                    )
382                };
383                return NamespaceError::ConcurrentModification { message }.into();
384            }
385            lance_core::Error::from(NamespaceError::Internal {
386                message: format!("{}: {:?}", operation, e),
387            })
388        }
389    }
390}
391
392impl ManifestNamespace {
393    /// Create a new ManifestNamespace from an existing DirectoryNamespace
394    #[allow(clippy::too_many_arguments)]
395    pub async fn from_directory(
396        root: String,
397        storage_options: Option<HashMap<String, String>>,
398        session: Option<Arc<Session>>,
399        object_store: Arc<ObjectStore>,
400        base_path: Path,
401        dir_listing_enabled: bool,
402        inline_optimization_enabled: bool,
403        commit_retries: Option<u32>,
404        table_version_storage_enabled: bool,
405    ) -> Result<Self> {
406        let manifest_dataset = Self::ensure_manifest_table_up_to_date(
407            &root,
408            &storage_options,
409            session.clone(),
410            table_version_storage_enabled,
411        )
412        .await?;
413
414        Ok(Self {
415            root,
416            storage_options,
417            session,
418            object_store,
419            base_path,
420            manifest_dataset,
421            dir_listing_enabled,
422            inline_optimization_enabled,
423            commit_retries,
424            manifest_mutation_lock: Arc::new(Mutex::new(())),
425        })
426    }
427
428    /// Build object ID from namespace path and name
429    pub fn build_object_id(namespace: &[String], name: &str) -> String {
430        if namespace.is_empty() {
431            name.to_string()
432        } else {
433            let mut id = namespace.join(DELIMITER);
434            id.push_str(DELIMITER);
435            id.push_str(name);
436            id
437        }
438    }
439
440    /// Parse object ID into namespace path and name
441    pub fn parse_object_id(object_id: &str) -> (Vec<String>, String) {
442        let parts: Vec<&str> = object_id.split(DELIMITER).collect();
443        if parts.len() == 1 {
444            (Vec::new(), parts[0].to_string())
445        } else {
446            let namespace = parts[..parts.len() - 1]
447                .iter()
448                .map(|s| s.to_string())
449                .collect();
450            let name = parts[parts.len() - 1].to_string();
451            (namespace, name)
452        }
453    }
454
455    /// Split an object ID (vec of strings) into namespace and table name
456    pub fn split_object_id(object_id: &[String]) -> (Vec<String>, String) {
457        if object_id.len() == 1 {
458            (vec![], object_id[0].clone())
459        } else {
460            (
461                object_id[..object_id.len() - 1].to_vec(),
462                object_id[object_id.len() - 1].clone(),
463            )
464        }
465    }
466
467    /// Convert an ID (vec of strings) to an object_id string
468    pub fn str_object_id(object_id: &[String]) -> String {
469        object_id.join(DELIMITER)
470    }
471
472    fn format_table_id(table_id: &[String]) -> String {
473        format!("table id '{}'", Self::str_object_id(table_id))
474    }
475
476    /// Format a version number as a zero-padded lexicographically sortable string.
477    ///
478    /// Versions are stored as 20-digit zero-padded integers (e.g., `00000000000000000001`
479    /// for version 1) so that string-based range queries and sorting work correctly.
480    pub fn format_table_version(version: i64) -> String {
481        format!("{:020}", version)
482    }
483
484    /// Build the object_id for a table version entry.
485    ///
486    /// Format: `{table_object_id}${zero_padded_version}`
487    pub fn build_version_object_id(table_object_id: &str, version: i64) -> String {
488        format!(
489            "{}{}{}",
490            table_object_id,
491            DELIMITER,
492            Self::format_table_version(version)
493        )
494    }
495
496    /// Parse a version number from the version suffix of a table version object_id.
497    ///
498    /// The object_id is formatted as `{table_id}${zero_padded_version}`.
499    pub fn parse_version_from_object_id(object_id: &str) -> Option<i64> {
500        let (_namespace, name) = Self::parse_object_id(object_id);
501        name.parse::<i64>().ok()
502    }
503
504    /// Generate a new directory name in format: `<hash>_<object_id>`
505    /// The hash is used to (1) optimize object store throughput,
506    /// (2) have high enough entropy in a short period of time to prevent issues like
507    /// failed table creation, delete and create new table of the same name, etc.
508    /// The object_id is added after the hash to ensure
509    /// dir name uniqueness and make debugging easier.
510    pub fn generate_dir_name(object_id: &str) -> String {
511        // Generate a random number for uniqueness
512        let random_num: u64 = rand::random();
513
514        // Create hash from random number + object_id
515        let mut hasher = DefaultHasher::new();
516        random_num.hash(&mut hasher);
517        object_id.hash(&mut hasher);
518        let hash = hasher.finish();
519
520        // Format as lowercase hex (8 characters - sufficient entropy for uniqueness)
521        format!("{:08x}_{}", (hash & 0xFFFFFFFF) as u32, object_id)
522    }
523
524    /// Construct a full URI from root and relative location
525    pub(crate) fn construct_full_uri(root: &str, relative_location: &str) -> Result<String> {
526        let mut base_url = lance_io::object_store::uri_to_url(root)?;
527
528        // Ensure the base URL has a trailing slash so that path segment mutation
529        // appends rather than replaces the last path segment.
530        // Without this fix, appending "table.lance" to "s3://bucket/path/subdir"
531        // would incorrectly produce "s3://bucket/path/table.lance" (missing subdir).
532        if !base_url.path().ends_with('/') {
533            base_url.set_path(&format!("{}/", base_url.path()));
534        }
535
536        let mut full_url = base_url.clone();
537        full_url
538            .path_segments_mut()
539            .map_err(|_| {
540                lance_core::Error::from(NamespaceError::InvalidInput {
541                    message: format!("Cannot modify path segments for URI '{}'", root),
542                })
543            })?
544            .pop_if_empty()
545            .extend(
546                relative_location
547                    .split('/')
548                    .filter(|segment| !segment.is_empty()),
549            );
550
551        // Clear any query string to avoid trailing "?" in the URL.
552        // Use set_query(None) instead of set_query("") because the latter
553        // would still add a trailing '?' to the URL when serialized.
554        full_url.set_query(None);
555
556        Ok(full_url.to_string())
557    }
558
559    /// Perform inline optimization on the __manifest table.
560    ///
561    /// This method:
562    /// 1. Creates three indexes on the manifest table:
563    ///    - BTREE index on object_id for fast lookups
564    ///    - Bitmap index on object_type for filtering by type
565    ///    - LabelList index on base_objects for view dependencies
566    /// 2. Runs file compaction to merge small files
567    /// 3. Optimizes existing indices
568    ///
569    /// This is called automatically after writes when inline_optimization_enabled is true.
570    async fn run_inline_optimization(&self) -> Result<()> {
571        if !self.inline_optimization_enabled {
572            return Ok(());
573        }
574
575        // Get a mutable reference to the dataset to perform optimization
576        let mut dataset_guard = self.manifest_dataset.get_mut().await?;
577        let dataset: &mut Dataset = &mut dataset_guard;
578
579        // Step 1: Create indexes if they don't already exist
580        let indices = dataset.load_indices().await?;
581
582        // Check which indexes already exist
583        let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
584        let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
585        let has_base_objects_index = indices
586            .iter()
587            .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);
588
589        // Create BTREE index on object_id
590        if !has_object_id_index {
591            log::debug!(
592                "Creating BTREE index '{}' on object_id for __manifest table",
593                OBJECT_ID_INDEX_NAME
594            );
595            let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
596            if let Err(e) = dataset
597                .create_index(
598                    &["object_id"],
599                    IndexType::BTree,
600                    Some(OBJECT_ID_INDEX_NAME.to_string()),
601                    &params,
602                    true,
603                )
604                .await
605            {
606                log::warn!(
607                    "Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.",
608                    e
609                );
610            } else {
611                log::info!(
612                    "Created BTREE index '{}' on object_id for __manifest table",
613                    OBJECT_ID_INDEX_NAME
614                );
615            }
616        }
617
618        // Create Bitmap index on object_type
619        if !has_object_type_index {
620            log::debug!(
621                "Creating Bitmap index '{}' on object_type for __manifest table",
622                OBJECT_TYPE_INDEX_NAME
623            );
624            let params = ScalarIndexParams::default();
625            if let Err(e) = dataset
626                .create_index(
627                    &["object_type"],
628                    IndexType::Bitmap,
629                    Some(OBJECT_TYPE_INDEX_NAME.to_string()),
630                    &params,
631                    true,
632                )
633                .await
634            {
635                log::warn!(
636                    "Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.",
637                    e
638                );
639            } else {
640                log::info!(
641                    "Created Bitmap index '{}' on object_type for __manifest table",
642                    OBJECT_TYPE_INDEX_NAME
643                );
644            }
645        }
646
647        // Create LabelList index on base_objects
648        if !has_base_objects_index {
649            log::debug!(
650                "Creating LabelList index '{}' on base_objects for __manifest table",
651                BASE_OBJECTS_INDEX_NAME
652            );
653            let params = ScalarIndexParams::default();
654            if let Err(e) = dataset
655                .create_index(
656                    &["base_objects"],
657                    IndexType::LabelList,
658                    Some(BASE_OBJECTS_INDEX_NAME.to_string()),
659                    &params,
660                    true,
661                )
662                .await
663            {
664                log::warn!(
665                    "Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.",
666                    e
667                );
668            } else {
669                log::info!(
670                    "Created LabelList index '{}' on base_objects for __manifest table",
671                    BASE_OBJECTS_INDEX_NAME
672                );
673            }
674        }
675
676        let should_compact_and_optimize =
677            dataset.count_fragments() >= MANIFEST_INLINE_OPTIMIZATION_FRAGMENT_THRESHOLD;
678
679        if !should_compact_and_optimize {
680            return Ok(());
681        }
682
683        // Step 2: Run file compaction
684        log::debug!("Running file compaction on __manifest table");
685        match compact_files(dataset, CompactionOptions::default(), None).await {
686            Ok(compaction_metrics) => {
687                if compaction_metrics.fragments_removed > 0 {
688                    log::info!(
689                        "Compacted __manifest table: removed {} fragments, added {} fragments",
690                        compaction_metrics.fragments_removed,
691                        compaction_metrics.fragments_added
692                    );
693                }
694            }
695            Err(e) => {
696                log::warn!(
697                    "Failed to compact files for __manifest table: {:?}. Continuing with optimization.",
698                    e
699                );
700            }
701        }
702
703        // Step 3: Optimize indices
704        log::debug!("Optimizing indices on __manifest table");
705        match dataset.optimize_indices(&OptimizeOptions::default()).await {
706            Ok(_) => {
707                log::info!("Successfully optimized indices on __manifest table");
708            }
709            Err(e) => {
710                log::warn!(
711                    "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.",
712                    e
713                );
714            }
715        }
716
717        Ok(())
718    }
719
720    /// Get the manifest schema
721    fn manifest_schema() -> Arc<ArrowSchema> {
722        Arc::new(ArrowSchema::new(vec![
723            // Set unenforced primary key on object_id for bloom filter conflict detection
724            Field::new("object_id", DataType::Utf8, false).with_metadata(
725                [(
726                    LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(),
727                    "0".to_string(),
728                )]
729                .into_iter()
730                .collect(),
731            ),
732            Field::new("object_type", DataType::Utf8, false),
733            Field::new("location", DataType::Utf8, true),
734            Field::new("metadata", DataType::Utf8, true),
735            Field::new(
736                "base_objects",
737                DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))),
738                true,
739            ),
740        ]))
741    }
742
743    /// Get a scanner for the manifest dataset
744    async fn manifest_scanner(&self) -> Result<Scanner> {
745        let dataset_guard = self.manifest_dataset.get().await?;
746        Ok(dataset_guard.scan())
747    }
748
749    /// Helper to execute a scanner and collect results into a Vec
750    async fn execute_scanner(scanner: Scanner) -> Result<Vec<RecordBatch>> {
751        let mut stream = scanner.try_into_stream().await.map_err(|e| {
752            lance_core::Error::from(NamespaceError::Internal {
753                message: format!("Failed to create stream: {:?}", e),
754            })
755        })?;
756
757        let mut batches = Vec::new();
758        while let Some(batch) = stream.next().await {
759            batches.push(batch.map_err(|e| {
760                lance_core::Error::from(NamespaceError::Internal {
761                    message: format!("Failed to read batch: {:?}", e),
762                })
763            })?);
764        }
765
766        Ok(batches)
767    }
768
769    /// Helper to get a string column from a record batch
770    fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> {
771        let column = batch.column_by_name(column_name).ok_or_else(|| {
772            lance_core::Error::from(NamespaceError::Internal {
773                message: format!("Column '{}' not found", column_name),
774            })
775        })?;
776        column
777            .as_any()
778            .downcast_ref::<StringArray>()
779            .ok_or_else(|| {
780                lance_core::Error::from(NamespaceError::Internal {
781                    message: format!("Column '{}' is not a string array", column_name),
782                })
783            })
784    }
785
786    /// Check if the manifest contains an object with the given ID
787    async fn manifest_contains_object(&self, object_id: &str) -> Result<bool> {
788        let escaped_id = object_id.replace('\'', "''");
789        let filter = format!("object_id = '{}'", escaped_id);
790
791        let dataset_guard = self.manifest_dataset.get().await?;
792        let mut scanner = dataset_guard.scan();
793
794        scanner.filter(&filter).map_err(|e| {
795            lance_core::Error::from(NamespaceError::Internal {
796                message: format!("Failed to filter: {:?}", e),
797            })
798        })?;
799
800        // Project no columns and enable row IDs for count_rows to work
801        scanner.project::<&str>(&[]).map_err(|e| {
802            lance_core::Error::from(NamespaceError::Internal {
803                message: format!("Failed to project: {:?}", e),
804            })
805        })?;
806
807        scanner.with_row_id();
808
809        let count = scanner.count_rows().await.map_err(|e| {
810            lance_core::Error::from(NamespaceError::Internal {
811                message: format!("Failed to count rows: {:?}", e),
812            })
813        })?;
814
815        Ok(count > 0)
816    }
817
818    /// Query the manifest for a table with the given object ID
819    async fn query_manifest_for_table(&self, object_id: &str) -> Result<Option<TableInfo>> {
820        let escaped_id = object_id.replace('\'', "''");
821        let filter = format!("object_id = '{}' AND object_type = 'table'", escaped_id);
822        let mut scanner = self.manifest_scanner().await?;
823        scanner.filter(&filter).map_err(|e| {
824            lance_core::Error::from(NamespaceError::Internal {
825                message: format!("Failed to filter: {:?}", e),
826            })
827        })?;
828        scanner
829            .project(&["object_id", "location", "metadata"])
830            .map_err(|e| {
831                lance_core::Error::from(NamespaceError::Internal {
832                    message: format!("Failed to project: {:?}", e),
833                })
834            })?;
835        let batches = Self::execute_scanner(scanner).await?;
836
837        let mut found_result: Option<TableInfo> = None;
838        let mut total_rows = 0;
839
840        for batch in batches {
841            if batch.num_rows() == 0 {
842                continue;
843            }
844
845            total_rows += batch.num_rows();
846            if total_rows > 1 {
847                return Err(NamespaceError::Internal {
848                    message: format!(
849                        "Expected exactly 1 table with id '{}', found {}",
850                        object_id, total_rows
851                    ),
852                }
853                .into());
854            }
855
856            let object_id_array = Self::get_string_column(&batch, "object_id")?;
857            let location_array = Self::get_string_column(&batch, "location")?;
858            let metadata_array = Self::get_string_column(&batch, "metadata")?;
859            let location = location_array.value(0).to_string();
860            let metadata = if !metadata_array.is_null(0) {
861                let metadata_str = metadata_array.value(0);
862                match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
863                    Ok(map) => Some(map),
864                    Err(e) => {
865                        return Err(NamespaceError::Internal {
866                            message: format!(
867                                "Failed to deserialize metadata for table '{}': {}",
868                                object_id, e
869                            ),
870                        }
871                        .into());
872                    }
873                }
874            } else {
875                None
876            };
877            let (namespace, name) = Self::parse_object_id(object_id_array.value(0));
878            found_result = Some(TableInfo {
879                namespace,
880                name,
881                location,
882                metadata,
883            });
884        }
885
886        Ok(found_result)
887    }
888
889    fn serialize_metadata(
890        properties: Option<&HashMap<String, String>>,
891        object_type: &str,
892        object_id: &str,
893    ) -> Result<Option<String>> {
894        match properties {
895            Some(properties) if !properties.is_empty() => {
896                serde_json::to_string(properties).map(Some).map_err(|e| {
897                    LanceError::from(NamespaceError::Internal {
898                        message: format!(
899                            "Failed to serialize {} metadata for '{}': {}",
900                            object_type, object_id, e
901                        ),
902                    })
903                })
904            }
905            _ => Ok(None),
906        }
907    }
908
909    pub(crate) async fn path_has_actual_manifests(
910        object_store: &ObjectStore,
911        table_path: &Path,
912    ) -> Result<bool> {
913        let versions_path = table_path.child(lance_table::io::commit::VERSIONS_DIR);
914        // `_versions/` should only contain manifest files, so probing the first entry is enough
915        // to distinguish declared-only tables (empty `_versions/`) from created tables.
916        Ok(object_store
917            .list(Some(versions_path))
918            .try_next()
919            .await?
920            .is_some())
921    }
922
923    async fn location_has_actual_manifests(&self, location: &str) -> Result<bool> {
924        Self::path_has_actual_manifests(&self.object_store, &self.base_path.child(location)).await
925    }
926
927    pub(crate) fn is_not_found_load_error(err: &LanceError) -> bool {
928        match err {
929            LanceError::NotFound { .. } => true,
930            LanceError::IO { source, .. } => source
931                .downcast_ref::<ObjectStoreError>()
932                .is_some_and(|source| matches!(source, ObjectStoreError::NotFound { .. })),
933            LanceError::DatasetNotFound { source, .. } => {
934                source
935                    .downcast_ref::<LanceError>()
936                    .is_some_and(|source| matches!(source, LanceError::NotFound { .. }))
937                    || source
938                        .downcast_ref::<ObjectStoreError>()
939                        .is_some_and(|source| matches!(source, ObjectStoreError::NotFound { .. }))
940            }
941            _ => false,
942        }
943    }
944
945    /// List all table locations in the manifest (for root namespace only)
946    /// Returns a set of table locations (e.g., "table_name.lance")
947    pub async fn list_manifest_table_locations(&self) -> Result<std::collections::HashSet<String>> {
948        let filter = "object_type = 'table' AND NOT contains(object_id, '$')";
949        let mut scanner = self.manifest_scanner().await?;
950        scanner.filter(filter).map_err(|e| {
951            lance_core::Error::from(NamespaceError::Internal {
952                message: format!("Failed to filter: {:?}", e),
953            })
954        })?;
955        scanner.project(&["location"]).map_err(|e| {
956            lance_core::Error::from(NamespaceError::Internal {
957                message: format!("Failed to project: {:?}", e),
958            })
959        })?;
960
961        let batches = Self::execute_scanner(scanner).await?;
962        let mut locations = std::collections::HashSet::new();
963
964        for batch in batches {
965            if batch.num_rows() == 0 {
966                continue;
967            }
968            let location_array = Self::get_string_column(&batch, "location")?;
969            for i in 0..location_array.len() {
970                locations.insert(location_array.value(i).to_string());
971            }
972        }
973
974        Ok(locations)
975    }
976
977    /// Insert an entry into the manifest table
978    async fn insert_into_manifest(
979        &self,
980        object_id: String,
981        object_type: ObjectType,
982        location: Option<String>,
983    ) -> Result<()> {
984        self.insert_into_manifest_with_metadata(
985            vec![ManifestEntry {
986                object_id,
987                object_type,
988                location,
989                metadata: None,
990            }],
991            None,
992        )
993        .await
994    }
995
996    /// Insert one or more entries into the manifest table with metadata and base_objects.
997    ///
998    /// This is the unified entry point for both single and batch inserts.
999    /// Uses a single MergeInsert operation to insert all entries at once.
1000    /// If any entry already exists (matching object_id), the entire batch fails.
1001    pub async fn insert_into_manifest_with_metadata(
1002        &self,
1003        entries: Vec<ManifestEntry>,
1004        base_objects: Option<Vec<String>>,
1005    ) -> Result<()> {
1006        self.merge_into_manifest_with_metadata(entries, base_objects, WhenMatched::Fail)
1007            .await
1008    }
1009
1010    async fn upsert_into_manifest_with_metadata(
1011        &self,
1012        entries: Vec<ManifestEntry>,
1013        base_objects: Option<Vec<String>>,
1014    ) -> Result<()> {
1015        self.merge_into_manifest_with_metadata(entries, base_objects, WhenMatched::UpdateAll)
1016            .await
1017    }
1018
1019    async fn merge_into_manifest_with_metadata(
1020        &self,
1021        entries: Vec<ManifestEntry>,
1022        base_objects: Option<Vec<String>>,
1023        when_matched: WhenMatched,
1024    ) -> Result<()> {
1025        if entries.is_empty() {
1026            return Ok(());
1027        }
1028
1029        let schema = Self::manifest_schema();
1030
1031        let mut object_ids = Vec::with_capacity(entries.len());
1032        let mut object_types = Vec::with_capacity(entries.len());
1033        let mut locations: Vec<Option<String>> = Vec::with_capacity(entries.len());
1034        let mut metadatas: Vec<Option<String>> = Vec::with_capacity(entries.len());
1035
1036        let string_builder = StringBuilder::new();
1037        let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
1038            "object_id",
1039            DataType::Utf8,
1040            true,
1041        )));
1042
1043        for (i, entry) in entries.iter().enumerate() {
1044            object_ids.push(entry.object_id.as_str());
1045            object_types.push(entry.object_type.as_str());
1046            locations.push(entry.location.clone());
1047            metadatas.push(entry.metadata.clone());
1048
1049            // Only the first entry gets the base_objects (for single-entry inserts
1050            // with base_objects like view creation); batch entries use null.
1051            if i == 0 {
1052                match &base_objects {
1053                    Some(objects) => {
1054                        for obj in objects {
1055                            list_builder.values().append_value(obj);
1056                        }
1057                        list_builder.append(true);
1058                    }
1059                    None => {
1060                        list_builder.append_null();
1061                    }
1062                }
1063            } else {
1064                list_builder.append_null();
1065            }
1066        }
1067
1068        let base_objects_array = list_builder.finish();
1069
1070        let location_array: Arc<dyn Array> = Arc::new(StringArray::from(
1071            locations.iter().map(|l| l.as_deref()).collect::<Vec<_>>(),
1072        ));
1073
1074        let metadata_array: Arc<dyn Array> = Arc::new(StringArray::from(
1075            metadatas.iter().map(|m| m.as_deref()).collect::<Vec<_>>(),
1076        ));
1077
1078        let batch = RecordBatch::try_new(
1079            schema.clone(),
1080            vec![
1081                Arc::new(StringArray::from(object_ids)),
1082                Arc::new(StringArray::from(object_types.to_vec())),
1083                location_array,
1084                metadata_array,
1085                Arc::new(base_objects_array),
1086            ],
1087        )
1088        .map_err(|e| {
1089            lance_core::Error::from(NamespaceError::Internal {
1090                message: format!("Failed to create manifest entries: {:?}", e),
1091            })
1092        })?;
1093
1094        let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
1095
1096        // Use MergeInsert so callers can choose fail-on-existing inserts or metadata upserts.
1097        let _mutation_guard = self.manifest_mutation_lock.lock().await;
1098        let dataset_guard = self.manifest_dataset.get().await?;
1099        let dataset_arc = Arc::new(dataset_guard.clone());
1100        drop(dataset_guard); // Drop read guard before merge insert
1101
1102        let mut merge_builder =
1103            MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()]).map_err(
1104                |e| {
1105                    lance_core::Error::from(NamespaceError::Internal {
1106                        message: format!("Failed to create merge builder: {:?}", e),
1107                    })
1108                },
1109            )?;
1110        merge_builder.when_matched(when_matched);
1111        merge_builder.when_not_matched(WhenNotMatched::InsertAll);
1112        // Use conflict_retries to handle cross-process races on manifest mutations.
1113        merge_builder.conflict_retries(5);
1114        // TODO: after BTREE index creation on object_id, has_scalar_index=true causes
1115        // MergeInsert to use V1 path which lacks bloom filters for conflict detection. This
1116        // results in (Some, None) filter mismatch when rebasing against V2 operations.
1117        // Setting use_index=false ensures all operations consistently use V2 path.
1118        merge_builder.use_index(false);
1119        if let Some(retries) = self.commit_retries {
1120            merge_builder.commit_retries(retries);
1121        }
1122
1123        let (new_dataset_arc, _merge_stats) = merge_builder
1124            .try_build()
1125            .map_err(|e| {
1126                lance_core::Error::from(NamespaceError::Internal {
1127                    message: format!("Failed to build merge: {:?}", e),
1128                })
1129            })?
1130            .execute_reader(Box::new(reader))
1131            .await
1132            .map_err(|e| {
1133                convert_lance_commit_error(&e, "Failed to execute merge insert into manifest", None)
1134            })?;
1135
1136        let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
1137        self.manifest_dataset.set_latest(new_dataset).await;
1138
1139        // Run inline optimization after write
1140        if let Err(e) = self.run_inline_optimization().await {
1141            log::warn!(
1142                "Unexpected failure when running inline optimization: {:?}",
1143                e
1144            );
1145        }
1146
1147        Ok(())
1148    }
1149
1150    /// Delete an entry from the manifest table
1151    pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> {
1152        let predicate = format!("object_id = '{}'", object_id);
1153
1154        // Get dataset and use DeleteBuilder with configured retries
1155        let _mutation_guard = self.manifest_mutation_lock.lock().await;
1156        let dataset_guard = self.manifest_dataset.get().await?;
1157        let dataset = Arc::new(dataset_guard.clone());
1158        drop(dataset_guard); // Drop read guard before delete
1159
1160        let new_dataset = DeleteBuilder::new(dataset, &predicate)
1161            .execute()
1162            .await
1163            .map_err(|e| convert_lance_commit_error(&e, "Failed to delete", None))?;
1164
1165        // Update the wrapper with the new dataset
1166        self.manifest_dataset
1167            .set_latest(
1168                Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1169            )
1170            .await;
1171
1172        // Run inline optimization after delete
1173        if let Err(e) = self.run_inline_optimization().await {
1174            log::warn!(
1175                "Unexpected failure when running inline optimization: {:?}",
1176                e
1177            );
1178        }
1179
1180        Ok(())
1181    }
1182
1183    /// Query the manifest for all versions of a table, sorted by version.
1184    ///
1185    /// Returns a list of (version, metadata_json_string) tuples where metadata_json_string
1186    /// contains the full metadata JSON stored in the manifest (manifest_path, manifest_size,
1187    /// e_tag, naming_scheme).
1188    ///
1189    /// **Known limitation**: All matching rows are loaded into memory, sorted in Rust,
1190    /// and then truncated. For tables with a very large number of versions this may be
1191    /// expensive. Pushing sort/limit into the scan is not yet supported by Lance.
1192    pub async fn query_table_versions(
1193        &self,
1194        object_id: &str,
1195        descending: bool,
1196        limit: Option<i32>,
1197    ) -> Result<Vec<(i64, String)>> {
1198        let escaped_id = object_id.replace('\'', "''");
1199        // table_version object_ids are formatted as "{object_id}${zero_padded_version}"
1200        let filter = format!(
1201            "object_type = 'table_version' AND starts_with(object_id, '{}{}')",
1202            escaped_id, DELIMITER
1203        );
1204        let mut scanner = self.manifest_scanner().await?;
1205        scanner.filter(&filter).map_err(|e| {
1206            lance_core::Error::from(NamespaceError::Internal {
1207                message: format!("Failed to filter: {:?}", e),
1208            })
1209        })?;
1210        scanner.project(&["object_id", "metadata"]).map_err(|e| {
1211            lance_core::Error::from(NamespaceError::Internal {
1212                message: format!("Failed to project: {:?}", e),
1213            })
1214        })?;
1215        let batches = Self::execute_scanner(scanner).await?;
1216
1217        let mut versions: Vec<(i64, String)> = Vec::new();
1218        for batch in batches {
1219            if batch.num_rows() == 0 {
1220                continue;
1221            }
1222            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1223            let metadata_array = Self::get_string_column(&batch, "metadata")?;
1224            for i in 0..batch.num_rows() {
1225                let oid = object_id_array.value(i);
1226                // Parse version from object_id
1227                if let Some(version) = Self::parse_version_from_object_id(oid) {
1228                    let metadata_str = metadata_array.value(i).to_string();
1229                    versions.push((version, metadata_str));
1230                }
1231            }
1232        }
1233
1234        if descending {
1235            versions.sort_by(|a, b| b.0.cmp(&a.0));
1236        } else {
1237            versions.sort_by(|a, b| a.0.cmp(&b.0));
1238        }
1239
1240        if let Some(limit) = limit {
1241            versions.truncate(limit as usize);
1242        }
1243
1244        Ok(versions)
1245    }
1246
1247    /// Query the manifest for a specific version of a table.
1248    ///
1249    /// Returns the full metadata JSON string if found, which contains
1250    /// manifest_path, manifest_size, e_tag, and naming_scheme.
1251    ///
1252    pub async fn query_table_version(
1253        &self,
1254        object_id: &str,
1255        version: i64,
1256    ) -> Result<Option<String>> {
1257        let version_object_id = Self::build_version_object_id(object_id, version);
1258        self.query_table_version_by_object_id(&version_object_id)
1259            .await
1260    }
1261
1262    /// Query a specific table version by its exact object_id.
1263    async fn query_table_version_by_object_id(
1264        &self,
1265        version_object_id: &str,
1266    ) -> Result<Option<String>> {
1267        let escaped_id = version_object_id.replace('\'', "''");
1268        let filter = format!(
1269            "object_id = '{}' AND object_type = 'table_version'",
1270            escaped_id
1271        );
1272        let mut scanner = self.manifest_scanner().await?;
1273        scanner.filter(&filter).map_err(|e| {
1274            lance_core::Error::from(NamespaceError::Internal {
1275                message: format!("Failed to filter: {:?}", e),
1276            })
1277        })?;
1278        scanner.project(&["metadata"]).map_err(|e| {
1279            lance_core::Error::from(NamespaceError::Internal {
1280                message: format!("Failed to project: {:?}", e),
1281            })
1282        })?;
1283        let batches = Self::execute_scanner(scanner).await?;
1284
1285        for batch in batches {
1286            if batch.num_rows() == 0 {
1287                continue;
1288            }
1289            let metadata_array = Self::get_string_column(&batch, "metadata")?;
1290            return Ok(Some(metadata_array.value(0).to_string()));
1291        }
1292
1293        Ok(None)
1294    }
1295
1296    /// Delete table version entries from the manifest for a given table and version ranges.
1297    ///
1298    /// Each range is (start_version, end_version) inclusive. Deletes all matching
1299    /// `object_type = 'table_version'` entries whose object_id matches
1300    /// `{object_id}${zero_padded_version}`.
1301    ///
1302    /// Builds a single filter expression covering all version ranges and executes
1303    /// one bulk delete operation instead of deleting versions one at a time.
1304    pub async fn delete_table_versions(
1305        &self,
1306        object_id: &str,
1307        ranges: &[(i64, i64)],
1308    ) -> Result<i64> {
1309        if ranges.is_empty() {
1310            return Ok(0);
1311        }
1312
1313        // Collect all object_ids to delete (both new zero-padded and legacy formats)
1314        let mut object_id_conditions: Vec<String> = Vec::new();
1315        for (start, end) in ranges {
1316            for version in *start..=*end {
1317                let oid = Self::build_version_object_id(object_id, version);
1318                let escaped = oid.replace('\'', "''");
1319                object_id_conditions.push(format!("'{}'", escaped));
1320            }
1321        }
1322
1323        if object_id_conditions.is_empty() {
1324            return Ok(0);
1325        }
1326
1327        // First, count how many entries exist so we can report the deleted count
1328        let in_list = object_id_conditions.join(", ");
1329        let filter = format!(
1330            "object_type = 'table_version' AND object_id IN ({})",
1331            in_list
1332        );
1333
1334        let mut scanner = self.manifest_scanner().await?;
1335        scanner.filter(&filter).map_err(|e| {
1336            lance_core::Error::from(NamespaceError::Internal {
1337                message: format!("Failed to filter: {:?}", e),
1338            })
1339        })?;
1340        scanner.project(&["object_id", "location"]).map_err(|e| {
1341            lance_core::Error::from(NamespaceError::Internal {
1342                message: format!("Failed to project: {:?}", e),
1343            })
1344        })?;
1345        let batches = Self::execute_scanner(scanner).await?;
1346        let deleted_count: i64 = batches.iter().map(|b| b.num_rows() as i64).sum();
1347
1348        if deleted_count == 0 {
1349            return Ok(0);
1350        }
1351
1352        // Execute a single bulk delete with the combined filter
1353        let _mutation_guard = self.manifest_mutation_lock.lock().await;
1354        let dataset_guard = self.manifest_dataset.get().await?;
1355        let dataset = Arc::new(dataset_guard.clone());
1356        drop(dataset_guard);
1357
1358        let new_dataset = DeleteBuilder::new(dataset, &filter)
1359            .execute()
1360            .await
1361            .map_err(|e| {
1362                convert_lance_commit_error(&e, "Failed to batch delete table versions", None)
1363            })?;
1364
1365        self.manifest_dataset
1366            .set_latest(
1367                Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1368            )
1369            .await;
1370
1371        if let Err(e) = self.run_inline_optimization().await {
1372            log::warn!(
1373                "Unexpected failure when running inline optimization: {:?}",
1374                e
1375            );
1376        }
1377
1378        Ok(deleted_count)
1379    }
1380
1381    /// Atomically delete table version entries from the manifest by their object_ids.
1382    ///
1383    /// This method supports multi-table transactional deletion: all specified
1384    /// object_ids (which may span multiple tables) are deleted in a single atomic
1385    /// `DeleteBuilder` operation. Either all entries are removed or none are.
1386    ///
1387    /// Object IDs are formatted as `{table_id}${version}`.
1388    pub async fn batch_delete_table_versions_by_object_ids(
1389        &self,
1390        object_ids: &[String],
1391    ) -> Result<i64> {
1392        if object_ids.is_empty() {
1393            return Ok(0);
1394        }
1395
1396        let in_list: String = object_ids
1397            .iter()
1398            .map(|oid| {
1399                let escaped = oid.replace('\'', "''");
1400                format!("'{}'", escaped)
1401            })
1402            .collect::<Vec<_>>()
1403            .join(", ");
1404
1405        let filter = format!(
1406            "object_type = 'table_version' AND object_id IN ({})",
1407            in_list
1408        );
1409
1410        // Count how many entries exist so we can report the deleted count
1411        let mut scanner = self.manifest_scanner().await?;
1412        scanner.filter(&filter).map_err(|e| {
1413            lance_core::Error::from(NamespaceError::Internal {
1414                message: format!("Failed to filter: {:?}", e),
1415            })
1416        })?;
1417        scanner.project(&["object_id", "location"]).map_err(|e| {
1418            lance_core::Error::from(NamespaceError::Internal {
1419                message: format!("Failed to project: {:?}", e),
1420            })
1421        })?;
1422        let batches = Self::execute_scanner(scanner).await?;
1423        let deleted_count: i64 = batches.iter().map(|b| b.num_rows() as i64).sum();
1424
1425        if deleted_count == 0 {
1426            return Ok(0);
1427        }
1428
1429        // Execute a single atomic bulk delete covering all tables
1430        let _mutation_guard = self.manifest_mutation_lock.lock().await;
1431        let dataset_guard = self.manifest_dataset.get().await?;
1432        let dataset = Arc::new(dataset_guard.clone());
1433        drop(dataset_guard);
1434
1435        let new_dataset = DeleteBuilder::new(dataset, &filter)
1436            .execute()
1437            .await
1438            .map_err(|e| {
1439                convert_lance_commit_error(
1440                    &e,
1441                    "Failed to batch delete table versions across multiple tables",
1442                    None,
1443                )
1444            })?;
1445
1446        self.manifest_dataset
1447            .set_latest(
1448                Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1449            )
1450            .await;
1451
1452        if let Err(e) = self.run_inline_optimization().await {
1453            log::warn!(
1454                "Unexpected failure when running inline optimization: {:?}",
1455                e
1456            );
1457        }
1458
1459        Ok(deleted_count)
1460    }
1461
1462    /// Set a property flag in the __manifest table's metadata key-value map.
1463    ///
1464    /// This uses `dataset.update_metadata()` to persist the flag in the
1465    /// __manifest dataset's table metadata, rather than inserting a row.
1466    /// If the property already exists with the same value, this is a no-op.
1467    pub async fn set_property(&self, name: &str, value: &str) -> Result<()> {
1468        let _mutation_guard = self.manifest_mutation_lock.lock().await;
1469        let dataset_guard = self.manifest_dataset.get().await?;
1470        if dataset_guard.metadata().get(name) == Some(&value.to_string()) {
1471            return Ok(());
1472        }
1473        drop(dataset_guard);
1474
1475        let mut dataset_guard = self.manifest_dataset.get_mut().await?;
1476        dataset_guard
1477            .update_metadata([(name, value)])
1478            .await
1479            .map_err(|e| {
1480                lance_core::Error::from(NamespaceError::Internal {
1481                    message: format!(
1482                        "Failed to set property '{}' in __manifest metadata: {}",
1483                        name, e
1484                    ),
1485                })
1486            })?;
1487        Ok(())
1488    }
1489
1490    /// Check if a property flag exists in the __manifest table's metadata key-value map.
1491    pub async fn has_property(&self, name: &str) -> Result<bool> {
1492        let dataset_guard = self.manifest_dataset.get().await?;
1493        Ok(dataset_guard.metadata().contains_key(name))
1494    }
1495
1496    /// Parse metadata JSON into a `TableVersion`.
1497    ///
1498    /// Returns `None` if metadata is invalid or missing required fields.
1499    fn parse_table_version(version: i64, metadata_str: &str) -> Option<TableVersion> {
1500        let meta: serde_json::Value = match serde_json::from_str(metadata_str) {
1501            Ok(v) => v,
1502            Err(e) => {
1503                log::warn!(
1504                    "Skipping version {} due to invalid metadata JSON: {}",
1505                    version,
1506                    e
1507                );
1508                return None;
1509            }
1510        };
1511        let manifest_path = match meta.get("manifest_path").and_then(|v| v.as_str()) {
1512            Some(p) => p.to_string(),
1513            None => {
1514                log::warn!(
1515                    "Skipping version {} due to missing 'manifest_path' in metadata — \
1516                     this may indicate data corruption",
1517                    version
1518                );
1519                return None;
1520            }
1521        };
1522        let manifest_size = meta.get("manifest_size").and_then(|v| v.as_i64());
1523        let e_tag = meta
1524            .get("e_tag")
1525            .and_then(|v| v.as_str())
1526            .map(|s| s.to_string());
1527        Some(TableVersion {
1528            version,
1529            manifest_path,
1530            manifest_size,
1531            e_tag,
1532            timestamp_millis: None,
1533            metadata: None,
1534        })
1535    }
1536
1537    /// List table versions from the __manifest table.
1538    ///
1539    /// Queries the manifest for all versions of the given table and returns
1540    /// them as a `ListTableVersionsResponse`.
1541    pub async fn list_table_versions(
1542        &self,
1543        table_id: &[String],
1544        descending: bool,
1545        limit: Option<i32>,
1546    ) -> Result<ListTableVersionsResponse> {
1547        let object_id = Self::str_object_id(table_id);
1548        let manifest_versions = self
1549            .query_table_versions(&object_id, descending, limit)
1550            .await?;
1551
1552        let table_versions: Vec<TableVersion> = manifest_versions
1553            .into_iter()
1554            .filter_map(|(version, metadata_str)| Self::parse_table_version(version, &metadata_str))
1555            .collect();
1556
1557        Ok(ListTableVersionsResponse {
1558            versions: table_versions,
1559            page_token: None,
1560        })
1561    }
1562
1563    /// Describe a specific table version from the __manifest table.
1564    ///
1565    /// Queries the manifest for a specific version and returns it as a
1566    /// `DescribeTableVersionResponse`. Returns an error if the version is not found.
1567    pub async fn describe_table_version(
1568        &self,
1569        table_id: &[String],
1570        version: i64,
1571    ) -> Result<DescribeTableVersionResponse> {
1572        let object_id = Self::str_object_id(table_id);
1573        if let Some(metadata_str) = self.query_table_version(&object_id, version).await?
1574            && let Some(tv) = Self::parse_table_version(version, &metadata_str)
1575        {
1576            return Ok(DescribeTableVersionResponse {
1577                version: Box::new(tv),
1578            });
1579        }
1580        Err(NamespaceError::TableVersionNotFound {
1581            message: format!("version {} for table {:?}", version, table_id),
1582        }
1583        .into())
1584    }
1585
1586    /// Register a table in the manifest without creating the physical table (internal helper for migration)
1587    pub async fn register_table(&self, name: &str, location: String) -> Result<()> {
1588        let object_id = Self::build_object_id(&[], name);
1589        if self.manifest_contains_object(&object_id).await? {
1590            return Err(NamespaceError::Internal {
1591                message: format!("Table '{}' already exists", name),
1592            }
1593            .into());
1594        }
1595
1596        self.insert_into_manifest(object_id, ObjectType::Table, Some(location))
1597            .await
1598    }
1599
1600    /// Validate that all levels of a namespace path exist
1601    async fn validate_namespace_levels_exist(&self, namespace_path: &[String]) -> Result<()> {
1602        for i in 1..=namespace_path.len() {
1603            let partial_path = &namespace_path[..i];
1604            let object_id = partial_path.join(DELIMITER);
1605            if !self.manifest_contains_object(&object_id).await? {
1606                return Err(NamespaceError::NamespaceNotFound {
1607                    message: format!("parent namespace '{}'", object_id),
1608                }
1609                .into());
1610            }
1611        }
1612        Ok(())
1613    }
1614
1615    /// Query the manifest for a namespace with the given object ID
1616    async fn query_manifest_for_namespace(&self, object_id: &str) -> Result<Option<NamespaceInfo>> {
1617        let escaped_id = object_id.replace('\'', "''");
1618        let filter = format!("object_id = '{}' AND object_type = 'namespace'", escaped_id);
1619        let mut scanner = self.manifest_scanner().await?;
1620        scanner.filter(&filter).map_err(|e| {
1621            lance_core::Error::from(NamespaceError::Internal {
1622                message: format!("Failed to filter: {:?}", e),
1623            })
1624        })?;
1625        scanner.project(&["object_id", "metadata"]).map_err(|e| {
1626            lance_core::Error::from(NamespaceError::Internal {
1627                message: format!("Failed to project: {:?}", e),
1628            })
1629        })?;
1630        let batches = Self::execute_scanner(scanner).await?;
1631
1632        let mut found_result: Option<NamespaceInfo> = None;
1633        let mut total_rows = 0;
1634
1635        for batch in batches {
1636            if batch.num_rows() == 0 {
1637                continue;
1638            }
1639
1640            total_rows += batch.num_rows();
1641            if total_rows > 1 {
1642                return Err(NamespaceError::Internal {
1643                    message: format!(
1644                        "Expected exactly 1 namespace with id '{}', found {}",
1645                        object_id, total_rows
1646                    ),
1647                }
1648                .into());
1649            }
1650
1651            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1652            let metadata_array = Self::get_string_column(&batch, "metadata")?;
1653
1654            let object_id_str = object_id_array.value(0);
1655            let metadata = if !metadata_array.is_null(0) {
1656                let metadata_str = metadata_array.value(0);
1657                match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
1658                    Ok(map) => Some(map),
1659                    Err(e) => {
1660                        return Err(NamespaceError::Internal {
1661                            message: format!(
1662                                "Failed to deserialize metadata for namespace '{}': {}",
1663                                object_id, e
1664                            ),
1665                        }
1666                        .into());
1667                    }
1668                }
1669            } else {
1670                None
1671            };
1672
1673            let (namespace, name) = Self::parse_object_id(object_id_str);
1674            found_result = Some(NamespaceInfo {
1675                namespace,
1676                name,
1677                metadata,
1678            });
1679        }
1680
1681        Ok(found_result)
1682    }
1683
1684    /// Create or load the manifest dataset, ensuring it has the latest schema setup.
1685    ///
1686    /// This function will:
1687    /// 1. Try to load an existing manifest table
1688    /// 2. If it exists, check and migrate the schema if needed (e.g., add primary key metadata)
1689    /// 3. If it doesn't exist, create a new manifest table with the current schema
1690    /// 4. Persist feature flags (e.g., table_version_storage_enabled) if requested
1691    async fn ensure_manifest_table_up_to_date(
1692        root: &str,
1693        storage_options: &Option<HashMap<String, String>>,
1694        session: Option<Arc<Session>>,
1695        table_version_storage_enabled: bool,
1696    ) -> Result<DatasetConsistencyWrapper> {
1697        let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME);
1698        log::debug!("Attempting to load manifest from {}", manifest_path);
1699        let store_options = ObjectStoreParams {
1700            storage_options_accessor: storage_options.as_ref().map(|opts| {
1701                Arc::new(
1702                    lance_io::object_store::StorageOptionsAccessor::with_static_options(
1703                        opts.clone(),
1704                    ),
1705                )
1706            }),
1707            ..Default::default()
1708        };
1709        let read_params = ReadParams {
1710            session: session.clone(),
1711            store_options: Some(store_options.clone()),
1712            ..Default::default()
1713        };
1714        let dataset_result = DatasetBuilder::from_uri(&manifest_path)
1715            .with_read_params(read_params)
1716            .load()
1717            .await;
1718        if let Ok(mut dataset) = dataset_result {
1719            // Check if the object_id field has primary key metadata, migrate if not
1720            let needs_pk_migration = dataset
1721                .schema()
1722                .field("object_id")
1723                .map(|f| {
1724                    !f.metadata
1725                        .contains_key(LANCE_UNENFORCED_PRIMARY_KEY_POSITION)
1726                })
1727                .unwrap_or(false);
1728
1729            if needs_pk_migration {
1730                log::info!("Migrating __manifest table to add primary key metadata on object_id");
1731                dataset
1732                    .update_field_metadata()
1733                    .update("object_id", [(LANCE_UNENFORCED_PRIMARY_KEY_POSITION, "0")])
1734                    .map_err(|e| {
1735                        lance_core::Error::from(NamespaceError::Internal {
1736                            message: format!(
1737                                "Failed to find object_id field for migration: {:?}",
1738                                e
1739                            ),
1740                        })
1741                    })?
1742                    .await
1743                    .map_err(|e| {
1744                        lance_core::Error::from(NamespaceError::Internal {
1745                            message: format!("Failed to migrate primary key metadata: {:?}", e),
1746                        })
1747                    })?;
1748            }
1749
1750            // Persist table_version_storage_enabled flag in __manifest so that once
1751            // enabled, it becomes a permanent property of this namespace.
1752            if table_version_storage_enabled {
1753                let needs_flag = dataset
1754                    .metadata()
1755                    .get("table_version_storage_enabled")
1756                    .map(|v| v != "true")
1757                    .unwrap_or(true);
1758
1759                if needs_flag
1760                    && let Err(e) = dataset
1761                        .update_metadata([("table_version_storage_enabled", "true")])
1762                        .await
1763                {
1764                    log::warn!(
1765                        "Failed to persist table_version_storage_enabled flag in __manifest: {:?}",
1766                        e
1767                    );
1768                }
1769            }
1770
1771            Ok(DatasetConsistencyWrapper::new(dataset))
1772        } else {
1773            log::info!("Creating new manifest table at {}", manifest_path);
1774            let schema = Self::manifest_schema();
1775            let empty_batch = RecordBatch::new_empty(schema.clone());
1776            let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone());
1777
1778            let store_params = ObjectStoreParams {
1779                storage_options_accessor: storage_options.as_ref().map(|opts| {
1780                    Arc::new(
1781                        lance_io::object_store::StorageOptionsAccessor::with_static_options(
1782                            opts.clone(),
1783                        ),
1784                    )
1785                }),
1786                ..Default::default()
1787            };
1788            let write_params = WriteParams {
1789                session: session.clone(),
1790                store_params: Some(store_params),
1791                ..Default::default()
1792            };
1793
1794            let dataset =
1795                Dataset::write(Box::new(reader), &manifest_path, Some(write_params)).await;
1796
1797            // Handle race condition where another process created the manifest concurrently
1798            match dataset {
1799                Ok(dataset) => {
1800                    log::info!(
1801                        "Successfully created manifest table at {}, version={}, uri={}",
1802                        manifest_path,
1803                        dataset.version().version,
1804                        dataset.uri()
1805                    );
1806                    Ok(DatasetConsistencyWrapper::new(dataset))
1807                }
1808                Err(ref e)
1809                    if matches!(
1810                        e,
1811                        LanceError::DatasetAlreadyExists { .. }
1812                            | LanceError::CommitConflict { .. }
1813                            | LanceError::IncompatibleTransaction { .. }
1814                            | LanceError::RetryableCommitConflict { .. }
1815                    ) =>
1816                {
1817                    // Another process created the manifest concurrently, try to load it
1818                    log::info!(
1819                        "Manifest table was created by another process, loading it: {}",
1820                        manifest_path
1821                    );
1822                    let recovery_store_options = ObjectStoreParams {
1823                        storage_options_accessor: storage_options.as_ref().map(|opts| {
1824                            Arc::new(
1825                                lance_io::object_store::StorageOptionsAccessor::with_static_options(
1826                                    opts.clone(),
1827                                ),
1828                            )
1829                        }),
1830                        ..Default::default()
1831                    };
1832                    let recovery_read_params = ReadParams {
1833                        session,
1834                        store_options: Some(recovery_store_options),
1835                        ..Default::default()
1836                    };
1837                    let dataset = DatasetBuilder::from_uri(&manifest_path)
1838                        .with_read_params(recovery_read_params)
1839                        .load()
1840                        .await
1841                        .map_err(|e| {
1842                            lance_core::Error::from(NamespaceError::Internal {
1843                                message: format!(
1844                                    "Failed to load manifest dataset after creation conflict: {}",
1845                                    e
1846                                ),
1847                            })
1848                        })?;
1849                    Ok(DatasetConsistencyWrapper::new(dataset))
1850                }
1851                Err(e) => Err(lance_core::Error::from(NamespaceError::Internal {
1852                    message: format!("Failed to create manifest dataset: {:?}", e),
1853                })),
1854            }
1855        }
1856    }
1857
1858    /// Sorts names alphabetically and applies pagination using page_token (start_after) and limit.
1859    ///
1860    /// Returns the next page token (last item in this page) if more results exist beyond the limit,
1861    /// or `None` if this is the last page.
1862    fn apply_pagination(
1863        names: &mut Vec<String>,
1864        page_token: Option<String>,
1865        limit: Option<i32>,
1866    ) -> Option<String> {
1867        names.sort();
1868
1869        if let Some(start_after) = page_token {
1870            if let Some(index) = names
1871                .iter()
1872                .position(|name| name.as_str() > start_after.as_str())
1873            {
1874                names.drain(0..index);
1875            } else {
1876                names.clear();
1877            }
1878        }
1879
1880        if let Some(limit) = limit
1881            && limit >= 0
1882        {
1883            let limit = limit as usize;
1884            if names.len() > limit {
1885                let next_page_token = if limit > 0 {
1886                    Some(names[limit - 1].clone())
1887                } else {
1888                    None
1889                };
1890                names.truncate(limit);
1891                return next_page_token;
1892            }
1893        }
1894
1895        None
1896    }
1897}
1898
1899#[async_trait]
1900impl LanceNamespace for ManifestNamespace {
1901    fn namespace_id(&self) -> String {
1902        self.root.clone()
1903    }
1904
1905    async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1906        let namespace_id = request.id.as_ref().ok_or_else(|| {
1907            lance_core::Error::from(NamespaceError::InvalidInput {
1908                message: "Namespace ID is required".to_string(),
1909            })
1910        })?;
1911
1912        // Build filter to find tables in this namespace
1913        let filter = if namespace_id.is_empty() {
1914            // Root namespace: find tables without a namespace prefix
1915            "object_type = 'table' AND NOT contains(object_id, '$')".to_string()
1916        } else {
1917            // Namespaced: find tables that start with namespace$ but have no additional $
1918            let prefix = namespace_id.join(DELIMITER);
1919            format!(
1920                "object_type = 'table' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1921                prefix,
1922                DELIMITER,
1923                prefix.len() + 2
1924            )
1925        };
1926
1927        let mut scanner = self.manifest_scanner().await?;
1928        scanner.filter(&filter).map_err(|e| {
1929            lance_core::Error::from(NamespaceError::Internal {
1930                message: format!("Failed to filter: {:?}", e),
1931            })
1932        })?;
1933        scanner.project(&["object_id", "location"]).map_err(|e| {
1934            lance_core::Error::from(NamespaceError::Internal {
1935                message: format!("Failed to project: {:?}", e),
1936            })
1937        })?;
1938
1939        let batches = Self::execute_scanner(scanner).await?;
1940
1941        let mut table_entries = Vec::new();
1942        for batch in batches {
1943            if batch.num_rows() == 0 {
1944                continue;
1945            }
1946
1947            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1948            let location_array = Self::get_string_column(&batch, "location")?;
1949            for i in 0..batch.num_rows() {
1950                let object_id = object_id_array.value(i);
1951                let location = location_array.value(i);
1952                let (_namespace, name) = Self::parse_object_id(object_id);
1953                table_entries.push((name, location.to_string()));
1954            }
1955        }
1956
1957        let mut tables: Vec<String> = if request.include_declared.unwrap_or(true) {
1958            table_entries.into_iter().map(|(name, _)| name).collect()
1959        } else {
1960            let mut stream = futures::stream::iter(table_entries.into_iter().map(
1961                |(name, location)| async move {
1962                    // `include_declared=false` is an explicit opt-in. We still pay one
1963                    // `_versions/` probe per table so declared-state is derived from actual
1964                    // manifests. This is linear in the total number of listed tables, and we do
1965                    // the probes with bounded concurrency before pagination.
1966                    if self.location_has_actual_manifests(&location).await? {
1967                        Ok::<Option<String>, Error>(Some(name))
1968                    } else {
1969                        Ok::<Option<String>, Error>(None)
1970                    }
1971                },
1972            ))
1973            .buffered(DECLARED_FILTER_CONCURRENCY);
1974
1975            let mut filtered = Vec::new();
1976            while let Some(result) = stream.next().await {
1977                if let Some(name) = result? {
1978                    filtered.push(name);
1979                }
1980            }
1981            filtered
1982        };
1983
1984        let next_page_token =
1985            Self::apply_pagination(&mut tables, request.page_token, request.limit);
1986        let mut response = ListTablesResponse::new(tables);
1987        response.page_token = next_page_token;
1988        Ok(response)
1989    }
1990
1991    async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1992        let table_id = request.id.as_ref().ok_or_else(|| {
1993            lance_core::Error::from(NamespaceError::InvalidInput {
1994                message: "Table ID is required".to_string(),
1995            })
1996        })?;
1997
1998        if table_id.is_empty() {
1999            return Err(NamespaceError::InvalidInput {
2000                message: "Table ID cannot be empty".to_string(),
2001            }
2002            .into());
2003        }
2004
2005        let object_id = Self::str_object_id(table_id);
2006        let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
2007
2008        // Extract table name and namespace from table_id
2009        let table_name = table_id.last().cloned().unwrap_or_default();
2010        let namespace_id: Vec<String> = if table_id.len() > 1 {
2011            table_id[..table_id.len() - 1].to_vec()
2012        } else {
2013            vec![]
2014        };
2015
2016        let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
2017        let should_check_declared =
2018            load_detailed_metadata || request.check_declared.unwrap_or(false);
2019        // For backwards compatibility, only skip vending credentials when explicitly set to false
2020        let vend_credentials = request.vend_credentials.unwrap_or(true);
2021
2022        match table_info {
2023            Some(info) => {
2024                // Construct full URI from relative location
2025                let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
2026
2027                let storage_options = if vend_credentials {
2028                    self.storage_options.clone()
2029                } else {
2030                    None
2031                };
2032                let is_only_declared = if should_check_declared {
2033                    Some(!self.location_has_actual_manifests(&info.location).await?)
2034                } else {
2035                    None
2036                };
2037
2038                if !load_detailed_metadata {
2039                    return Ok(DescribeTableResponse {
2040                        table: Some(table_name),
2041                        namespace: Some(namespace_id),
2042                        location: Some(table_uri.clone()),
2043                        table_uri: Some(table_uri),
2044                        storage_options,
2045                        properties: info.metadata,
2046                        is_only_declared,
2047                        ..Default::default()
2048                    });
2049                }
2050
2051                if is_only_declared == Some(true) {
2052                    return Ok(DescribeTableResponse {
2053                        table: Some(table_name),
2054                        namespace: Some(namespace_id),
2055                        location: Some(table_uri.clone()),
2056                        table_uri: Some(table_uri),
2057                        storage_options,
2058                        properties: info.metadata,
2059                        is_only_declared,
2060                        ..Default::default()
2061                    });
2062                }
2063
2064                let mut builder = DatasetBuilder::from_uri(&table_uri);
2065                if let Some(opts) = &self.storage_options {
2066                    builder = builder.with_storage_options(opts.clone());
2067                }
2068                if let Some(session) = &self.session {
2069                    builder = builder.with_session(session.clone());
2070                }
2071
2072                match builder.load().await {
2073                    Ok(mut dataset) => {
2074                        // If a specific version is requested, checkout that version
2075                        if let Some(requested_version) = request.version {
2076                            dataset = dataset.checkout_version(requested_version as u64).await?;
2077                        }
2078
2079                        let version = dataset.version().version;
2080                        let lance_schema = dataset.schema();
2081                        let arrow_schema: arrow_schema::Schema = lance_schema.into();
2082                        let json_schema = arrow_schema_to_json(&arrow_schema)?;
2083
2084                        Ok(DescribeTableResponse {
2085                            table: Some(table_name.clone()),
2086                            namespace: Some(namespace_id.clone()),
2087                            version: Some(version as i64),
2088                            location: Some(table_uri.clone()),
2089                            table_uri: Some(table_uri),
2090                            schema: Some(Box::new(json_schema)),
2091                            storage_options,
2092                            properties: info.metadata.clone(),
2093                            is_only_declared,
2094                            ..Default::default()
2095                        })
2096                    }
2097                    Err(err) => Err(NamespaceError::Internal {
2098                        message: format!(
2099                            "Table exists in manifest but failed to load dataset '{}': {}",
2100                            object_id, err
2101                        ),
2102                    }
2103                    .into()),
2104                }
2105            }
2106            None => Err(NamespaceError::TableNotFound {
2107                message: Self::format_table_id(table_id),
2108            }
2109            .into()),
2110        }
2111    }
2112
2113    async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
2114        let table_id = request.id.as_ref().ok_or_else(|| {
2115            lance_core::Error::from(NamespaceError::InvalidInput {
2116                message: "Table ID is required".to_string(),
2117            })
2118        })?;
2119
2120        if table_id.is_empty() {
2121            return Err(NamespaceError::InvalidInput {
2122                message: "Table ID cannot be empty".to_string(),
2123            }
2124            .into());
2125        }
2126
2127        let object_id = Self::str_object_id(table_id);
2128        let exists = self.manifest_contains_object(&object_id).await?;
2129        if exists {
2130            Ok(())
2131        } else {
2132            Err(NamespaceError::TableNotFound {
2133                message: Self::format_table_id(table_id),
2134            }
2135            .into())
2136        }
2137    }
2138
2139    async fn create_table(
2140        &self,
2141        request: CreateTableRequest,
2142        data: Bytes,
2143    ) -> Result<CreateTableResponse> {
2144        let table_id = request.id.as_ref().ok_or_else(|| {
2145            lance_core::Error::from(NamespaceError::InvalidInput {
2146                message: "Table ID is required".to_string(),
2147            })
2148        })?;
2149
2150        if table_id.is_empty() {
2151            return Err(NamespaceError::InvalidInput {
2152                message: "Table ID cannot be empty".to_string(),
2153            }
2154            .into());
2155        }
2156
2157        let (namespace, table_name) = Self::split_object_id(table_id);
2158        let object_id = Self::build_object_id(&namespace, &table_name);
2159
2160        let existing_table = self.query_manifest_for_table(&object_id).await?;
2161        let existing_has_manifests = if let Some(existing_table) = &existing_table {
2162            Some(
2163                self.location_has_actual_manifests(&existing_table.location)
2164                    .await?,
2165            )
2166        } else {
2167            None
2168        };
2169
2170        if existing_has_manifests == Some(false)
2171            && request
2172                .properties
2173                .as_ref()
2174                .is_some_and(|properties| !properties.is_empty())
2175        {
2176            return Err(NamespaceError::InvalidInput {
2177                message: format!(
2178                    "create_table cannot set properties for already declared table '{}'",
2179                    object_id
2180                ),
2181            }
2182            .into());
2183        }
2184
2185        let create_mode = if existing_has_manifests == Some(false) {
2186            CreateTableMode::Create
2187        } else {
2188            CreateTableMode::parse(request.mode.as_deref())?
2189        };
2190        let dir_name = if let Some(existing_table) = &existing_table {
2191            existing_table.location.clone()
2192        } else if namespace.is_empty() && self.dir_listing_enabled {
2193            format!("{}.lance", table_name)
2194        } else {
2195            Self::generate_dir_name(&object_id)
2196        };
2197        let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
2198        let overwriting_existing_table =
2199            existing_has_manifests == Some(true) && create_mode == CreateTableMode::Overwrite;
2200
2201        if existing_has_manifests == Some(true) {
2202            match create_mode {
2203                CreateTableMode::Create => {
2204                    return Err(NamespaceError::TableAlreadyExists {
2205                        message: table_name.clone(),
2206                    }
2207                    .into());
2208                }
2209                CreateTableMode::ExistOk => {
2210                    let properties = existing_table
2211                        .as_ref()
2212                        .and_then(|table| table.metadata.clone());
2213                    return Ok(CreateTableResponse {
2214                        location: Some(table_uri),
2215                        storage_options: self.storage_options.clone(),
2216                        properties,
2217                        ..Default::default()
2218                    });
2219                }
2220                CreateTableMode::Overwrite => {}
2221            }
2222        }
2223
2224        // Validate that request_data is provided
2225        if data.is_empty() {
2226            return Err(NamespaceError::InvalidInput {
2227                message: "Request data (Arrow IPC stream) is required for create_table".to_string(),
2228            }
2229            .into());
2230        }
2231
2232        // Write the data using Lance Dataset
2233        let cursor = Cursor::new(data.to_vec());
2234        let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| {
2235            lance_core::Error::from(NamespaceError::Internal {
2236                message: format!("Failed to read IPC stream: {:?}", e),
2237            })
2238        })?;
2239
2240        let batches: Vec<RecordBatch> = stream_reader
2241            .collect::<std::result::Result<Vec<_>, _>>()
2242            .map_err(|e| {
2243            lance_core::Error::from(NamespaceError::Internal {
2244                message: format!("Failed to collect batches: {:?}", e),
2245            })
2246        })?;
2247
2248        if batches.is_empty() {
2249            return Err(NamespaceError::Internal {
2250                message: "No data provided for table creation".to_string(),
2251            }
2252            .into());
2253        }
2254
2255        let schema = batches[0].schema();
2256        let batch_results: Vec<std::result::Result<RecordBatch, arrow_schema::ArrowError>> =
2257            batches.into_iter().map(Ok).collect();
2258        let reader = RecordBatchIterator::new(batch_results, schema);
2259
2260        let mut write_storage_options = self.storage_options.clone().unwrap_or_default();
2261        if let Some(request_storage_options) = request.storage_options.as_ref() {
2262            write_storage_options.extend(request_storage_options.clone());
2263        }
2264
2265        let store_params = ObjectStoreParams {
2266            storage_options_accessor: (!write_storage_options.is_empty()).then(|| {
2267                Arc::new(
2268                    lance_io::object_store::StorageOptionsAccessor::with_static_options(
2269                        write_storage_options,
2270                    ),
2271                )
2272            }),
2273            ..Default::default()
2274        };
2275        let write_params = WriteParams {
2276            mode: create_mode.write_mode(),
2277            session: self.session.clone(),
2278            store_params: Some(store_params),
2279            ..Default::default()
2280        };
2281        let dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params))
2282            .await
2283            .map_err(|e| {
2284                lance_core::Error::from(NamespaceError::Internal {
2285                    message: format!("Failed to write dataset: {:?}", e),
2286                })
2287            })?;
2288        let version = dataset.version().version as i64;
2289
2290        if overwriting_existing_table {
2291            let metadata =
2292                Self::serialize_metadata(request.properties.as_ref(), "table", &object_id)?;
2293            self.upsert_into_manifest_with_metadata(
2294                vec![ManifestEntry {
2295                    object_id,
2296                    object_type: ObjectType::Table,
2297                    location: Some(dir_name),
2298                    metadata,
2299                }],
2300                None,
2301            )
2302            .await?;
2303
2304            Ok(CreateTableResponse {
2305                version: Some(version),
2306                location: Some(table_uri),
2307                storage_options: self.storage_options.clone(),
2308                properties: request.properties,
2309                ..Default::default()
2310            })
2311        } else {
2312            match existing_table {
2313                Some(existing_table) => Ok(CreateTableResponse {
2314                    version: Some(version),
2315                    location: Some(table_uri),
2316                    storage_options: self.storage_options.clone(),
2317                    properties: existing_table.metadata,
2318                    ..Default::default()
2319                }),
2320                None => {
2321                    let metadata =
2322                        Self::serialize_metadata(request.properties.as_ref(), "table", &object_id)?;
2323                    // Register in manifest (store dir_name, not full URI)
2324                    self.insert_into_manifest_with_metadata(
2325                        vec![ManifestEntry {
2326                            object_id,
2327                            object_type: ObjectType::Table,
2328                            location: Some(dir_name.clone()),
2329                            metadata,
2330                        }],
2331                        None,
2332                    )
2333                    .await?;
2334
2335                    Ok(CreateTableResponse {
2336                        version: Some(version),
2337                        location: Some(table_uri),
2338                        storage_options: self.storage_options.clone(),
2339                        properties: request.properties,
2340                        ..Default::default()
2341                    })
2342                }
2343            }
2344        }
2345    }
2346
2347    async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
2348        let table_id = request.id.as_ref().ok_or_else(|| {
2349            lance_core::Error::from(NamespaceError::InvalidInput {
2350                message: "Table ID is required".to_string(),
2351            })
2352        })?;
2353
2354        if table_id.is_empty() {
2355            return Err(NamespaceError::InvalidInput {
2356                message: "Table ID cannot be empty".to_string(),
2357            }
2358            .into());
2359        }
2360
2361        let (namespace, table_name) = Self::split_object_id(table_id);
2362        let object_id = Self::build_object_id(&namespace, &table_name);
2363
2364        // Query manifest for table location
2365        let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
2366
2367        match table_info {
2368            Some(info) => {
2369                // Delete from manifest first
2370                self.delete_from_manifest(&object_id).boxed().await?;
2371
2372                // Delete physical data directory using the dir_name from manifest
2373                let table_path = self.base_path.child(info.location.as_str());
2374                let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
2375
2376                // Remove the table directory
2377                self.object_store
2378                    .remove_dir_all(table_path)
2379                    .boxed()
2380                    .await
2381                    .map_err(|e| {
2382                        lance_core::Error::from(NamespaceError::Internal {
2383                            message: format!("Failed to delete table directory: {:?}", e),
2384                        })
2385                    })?;
2386
2387                Ok(DropTableResponse {
2388                    id: request.id.clone(),
2389                    location: Some(table_uri),
2390                    ..Default::default()
2391                })
2392            }
2393            None => Err(NamespaceError::TableNotFound {
2394                message: table_name.to_string(),
2395            }
2396            .into()),
2397        }
2398    }
2399
2400    async fn list_namespaces(
2401        &self,
2402        request: ListNamespacesRequest,
2403    ) -> Result<ListNamespacesResponse> {
2404        let parent_namespace = request.id.as_ref().ok_or_else(|| {
2405            lance_core::Error::from(NamespaceError::InvalidInput {
2406                message: "Namespace ID is required".to_string(),
2407            })
2408        })?;
2409
2410        // Build filter to find direct child namespaces
2411        let filter = if parent_namespace.is_empty() {
2412            // Root namespace: find all namespaces without a parent
2413            "object_type = 'namespace' AND NOT contains(object_id, '$')".to_string()
2414        } else {
2415            // Non-root: find namespaces that start with parent$ but have no additional $
2416            let prefix = parent_namespace.join(DELIMITER);
2417            format!(
2418                "object_type = 'namespace' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
2419                prefix,
2420                DELIMITER,
2421                prefix.len() + 2
2422            )
2423        };
2424
2425        let mut scanner = self.manifest_scanner().await?;
2426        scanner.filter(&filter).map_err(|e| {
2427            lance_core::Error::from(NamespaceError::Internal {
2428                message: format!("Failed to filter: {:?}", e),
2429            })
2430        })?;
2431        scanner.project(&["object_id"]).map_err(|e| {
2432            lance_core::Error::from(NamespaceError::Internal {
2433                message: format!("Failed to project: {:?}", e),
2434            })
2435        })?;
2436
2437        let batches = Self::execute_scanner(scanner).await?;
2438        let mut namespaces = Vec::new();
2439
2440        for batch in batches {
2441            if batch.num_rows() == 0 {
2442                continue;
2443            }
2444
2445            let object_id_array = Self::get_string_column(&batch, "object_id")?;
2446            for i in 0..batch.num_rows() {
2447                let object_id = object_id_array.value(i);
2448                let (_namespace, name) = Self::parse_object_id(object_id);
2449                namespaces.push(name);
2450            }
2451        }
2452
2453        let next_page_token =
2454            Self::apply_pagination(&mut namespaces, request.page_token, request.limit);
2455        let mut response = ListNamespacesResponse::new(namespaces);
2456        response.page_token = next_page_token;
2457        Ok(response)
2458    }
2459
2460    async fn describe_namespace(
2461        &self,
2462        request: DescribeNamespaceRequest,
2463    ) -> Result<DescribeNamespaceResponse> {
2464        let namespace_id = request.id.as_ref().ok_or_else(|| {
2465            lance_core::Error::from(NamespaceError::InvalidInput {
2466                message: "Namespace ID is required".to_string(),
2467            })
2468        })?;
2469
2470        // Root namespace always exists
2471        if namespace_id.is_empty() {
2472            #[allow(clippy::needless_update)]
2473            return Ok(DescribeNamespaceResponse {
2474                properties: Some(HashMap::new()),
2475                ..Default::default()
2476            });
2477        }
2478
2479        // Check if namespace exists in manifest
2480        let object_id = namespace_id.join(DELIMITER);
2481        let namespace_info = self.query_manifest_for_namespace(&object_id).await?;
2482
2483        match namespace_info {
2484            #[allow(clippy::needless_update)]
2485            Some(info) => Ok(DescribeNamespaceResponse {
2486                properties: info.metadata,
2487                ..Default::default()
2488            }),
2489            None => Err(NamespaceError::NamespaceNotFound {
2490                message: object_id.to_string(),
2491            }
2492            .into()),
2493        }
2494    }
2495
2496    async fn create_namespace(
2497        &self,
2498        request: CreateNamespaceRequest,
2499    ) -> Result<CreateNamespaceResponse> {
2500        let namespace_id = request.id.as_ref().ok_or_else(|| {
2501            lance_core::Error::from(NamespaceError::InvalidInput {
2502                message: "Namespace ID is required".to_string(),
2503            })
2504        })?;
2505
2506        // Root namespace always exists and cannot be created
2507        if namespace_id.is_empty() {
2508            return Err(NamespaceError::NamespaceAlreadyExists {
2509                message: "root namespace".to_string(),
2510            }
2511            .into());
2512        }
2513
2514        // Validate parent namespaces exist (but not the namespace being created)
2515        if namespace_id.len() > 1 {
2516            self.validate_namespace_levels_exist(&namespace_id[..namespace_id.len() - 1])
2517                .await?;
2518        }
2519
2520        let object_id = namespace_id.join(DELIMITER);
2521        if self.manifest_contains_object(&object_id).await? {
2522            return Err(NamespaceError::NamespaceAlreadyExists {
2523                message: object_id.to_string(),
2524            }
2525            .into());
2526        }
2527
2528        let metadata =
2529            Self::serialize_metadata(request.properties.as_ref(), "namespace", &object_id)?;
2530
2531        self.insert_into_manifest_with_metadata(
2532            vec![ManifestEntry {
2533                object_id,
2534                object_type: ObjectType::Namespace,
2535                location: None,
2536                metadata,
2537            }],
2538            None,
2539        )
2540        .await?;
2541
2542        Ok(CreateNamespaceResponse {
2543            properties: request.properties,
2544            ..Default::default()
2545        })
2546    }
2547
2548    async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
2549        let namespace_id = request.id.as_ref().ok_or_else(|| {
2550            lance_core::Error::from(NamespaceError::InvalidInput {
2551                message: "Namespace ID is required".to_string(),
2552            })
2553        })?;
2554
2555        // Root namespace always exists and cannot be dropped
2556        if namespace_id.is_empty() {
2557            return Err(NamespaceError::InvalidInput {
2558                message: "Root namespace cannot be dropped".to_string(),
2559            }
2560            .into());
2561        }
2562
2563        let object_id = namespace_id.join(DELIMITER);
2564
2565        // Check if namespace exists
2566        if !self.manifest_contains_object(&object_id).boxed().await? {
2567            return Err(NamespaceError::NamespaceNotFound {
2568                message: object_id.to_string(),
2569            }
2570            .into());
2571        }
2572
2573        // Check for child namespaces
2574        let escaped_id = object_id.replace('\'', "''");
2575        let prefix = format!("{}{}", escaped_id, DELIMITER);
2576        let filter = format!("starts_with(object_id, '{}')", prefix);
2577        let mut scanner = self.manifest_scanner().boxed().await?;
2578        scanner.filter(&filter).map_err(|e| {
2579            lance_core::Error::from(NamespaceError::Internal {
2580                message: format!("Failed to filter: {:?}", e),
2581            })
2582        })?;
2583        scanner.project::<&str>(&[]).map_err(|e| {
2584            lance_core::Error::from(NamespaceError::Internal {
2585                message: format!("Failed to project: {:?}", e),
2586            })
2587        })?;
2588        scanner.with_row_id();
2589        let count = scanner.count_rows().boxed().await.map_err(|e| {
2590            lance_core::Error::from(NamespaceError::Internal {
2591                message: format!("Failed to count rows: {:?}", e),
2592            })
2593        })?;
2594
2595        if count > 0 {
2596            return Err(NamespaceError::NamespaceNotEmpty {
2597                message: format!("'{}' (contains {} child objects)", object_id, count),
2598            }
2599            .into());
2600        }
2601
2602        self.delete_from_manifest(&object_id).boxed().await?;
2603
2604        Ok(DropNamespaceResponse::default())
2605    }
2606
2607    async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
2608        let namespace_id = request.id.as_ref().ok_or_else(|| {
2609            lance_core::Error::from(NamespaceError::InvalidInput {
2610                message: "Namespace ID is required".to_string(),
2611            })
2612        })?;
2613
2614        // Root namespace always exists
2615        if namespace_id.is_empty() {
2616            return Ok(());
2617        }
2618
2619        let object_id = namespace_id.join(DELIMITER);
2620        if self.manifest_contains_object(&object_id).await? {
2621            Ok(())
2622        } else {
2623            Err(NamespaceError::NamespaceNotFound {
2624                message: object_id.to_string(),
2625            }
2626            .into())
2627        }
2628    }
2629
2630    async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
2631        let table_id = request.id.as_ref().ok_or_else(|| {
2632            lance_core::Error::from(NamespaceError::InvalidInput {
2633                message: "Table ID is required".to_string(),
2634            })
2635        })?;
2636
2637        if table_id.is_empty() {
2638            return Err(NamespaceError::InvalidInput {
2639                message: "Table ID cannot be empty".to_string(),
2640            }
2641            .into());
2642        }
2643
2644        let (namespace, table_name) = Self::split_object_id(table_id);
2645        let object_id = Self::build_object_id(&namespace, &table_name);
2646
2647        // Check if table already exists in manifest
2648        let existing = self.query_manifest_for_table(&object_id).await?;
2649        if existing.is_some() {
2650            return Err(NamespaceError::TableAlreadyExists {
2651                message: table_name.to_string(),
2652            }
2653            .into());
2654        }
2655
2656        // Create table location path with hash-based naming
2657        // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance
2658        // Otherwise, use hash-based naming: {hash}_{object_id}
2659        let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
2660            // Root table with directory listing enabled: use {table_name}.lance
2661            format!("{}.lance", table_name)
2662        } else {
2663            // Child namespace table or dir listing disabled: use hash-based naming
2664            Self::generate_dir_name(&object_id)
2665        };
2666        let table_path = self.base_path.child(dir_name.as_str());
2667        let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
2668
2669        // Validate location if provided
2670        if let Some(req_location) = &request.location {
2671            let req_location = req_location.trim_end_matches('/');
2672            if req_location != table_uri {
2673                return Err(NamespaceError::InvalidInput {
2674                    message: format!(
2675                        "Cannot declare table {} at location {}, must be at location {}",
2676                        table_name, req_location, table_uri
2677                    ),
2678                }
2679                .into());
2680            }
2681        }
2682
2683        // Create the .lance-reserved file to mark the table as existing
2684        let reserved_file_path = table_path.child(".lance-reserved");
2685
2686        self.object_store
2687            .create(&reserved_file_path)
2688            .await
2689            .map_err(|e| {
2690                lance_core::Error::from(NamespaceError::Internal {
2691                    message: format!(
2692                        "Failed to create .lance-reserved file for table {}: {}",
2693                        table_name, e
2694                    ),
2695                })
2696            })?
2697            .shutdown()
2698            .await
2699            .map_err(|e| {
2700                lance_core::Error::from(NamespaceError::Internal {
2701                    message: format!(
2702                        "Failed to finalize .lance-reserved file for table {}: {}",
2703                        table_name, e
2704                    ),
2705                })
2706            })?;
2707
2708        let metadata = Self::serialize_metadata(request.properties.as_ref(), "table", &object_id)?;
2709
2710        // Add entry to manifest marking this as a declared table (store dir_name, not full path)
2711        self.insert_into_manifest_with_metadata(
2712            vec![ManifestEntry {
2713                object_id,
2714                object_type: ObjectType::Table,
2715                location: Some(dir_name),
2716                metadata,
2717            }],
2718            None,
2719        )
2720        .await?;
2721
2722        log::info!(
2723            "Declared table '{}' in manifest at {}",
2724            table_name,
2725            table_uri
2726        );
2727
2728        // For backwards compatibility, only skip vending credentials when explicitly set to false
2729        let vend_credentials = request.vend_credentials.unwrap_or(true);
2730        let storage_options = if vend_credentials {
2731            self.storage_options.clone()
2732        } else {
2733            None
2734        };
2735
2736        Ok(DeclareTableResponse {
2737            location: Some(table_uri),
2738            storage_options,
2739            properties: request.properties,
2740            ..Default::default()
2741        })
2742    }
2743
2744    async fn register_table(&self, request: RegisterTableRequest) -> Result<RegisterTableResponse> {
2745        let table_id = request.id.as_ref().ok_or_else(|| {
2746            lance_core::Error::from(NamespaceError::InvalidInput {
2747                message: "Table ID is required".to_string(),
2748            })
2749        })?;
2750
2751        if table_id.is_empty() {
2752            return Err(NamespaceError::InvalidInput {
2753                message: "Table ID cannot be empty".to_string(),
2754            }
2755            .into());
2756        }
2757
2758        let location = request.location.clone();
2759
2760        // Validate that location is a relative path within the root directory
2761        // We don't allow absolute URIs or paths that escape the root
2762        if location.contains("://") {
2763            return Err(NamespaceError::InvalidInput {
2764                message: format!(
2765                    "Absolute URIs are not allowed for register_table. Location must be a relative path within the root directory: {}",
2766                    location
2767                ),
2768            }
2769            .into());
2770        }
2771
2772        if location.starts_with('/') {
2773            return Err(NamespaceError::InvalidInput {
2774                message: format!(
2775                    "Absolute paths are not allowed for register_table. Location must be a relative path within the root directory: {}",
2776                    location
2777                ),
2778            }
2779            .into());
2780        }
2781
2782        // Check for path traversal attempts
2783        if location.contains("..") {
2784            return Err(NamespaceError::InvalidInput {
2785                message: format!(
2786                    "Path traversal is not allowed. Location must be a relative path within the root directory: {}",
2787                    location
2788                ),
2789            }
2790            .into());
2791        }
2792
2793        let (namespace, table_name) = Self::split_object_id(table_id);
2794        let object_id = Self::build_object_id(&namespace, &table_name);
2795
2796        // Validate that parent namespaces exist (if not root)
2797        if !namespace.is_empty() {
2798            self.validate_namespace_levels_exist(&namespace).await?;
2799        }
2800
2801        // Check if table already exists
2802        if self.manifest_contains_object(&object_id).await? {
2803            return Err(NamespaceError::TableAlreadyExists {
2804                message: object_id.to_string(),
2805            }
2806            .into());
2807        }
2808
2809        // Register the table with its location in the manifest
2810        self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone()))
2811            .await?;
2812
2813        Ok(RegisterTableResponse {
2814            location: Some(location),
2815            ..Default::default()
2816        })
2817    }
2818
2819    async fn deregister_table(
2820        &self,
2821        request: DeregisterTableRequest,
2822    ) -> Result<DeregisterTableResponse> {
2823        let table_id = request.id.as_ref().ok_or_else(|| {
2824            lance_core::Error::from(NamespaceError::InvalidInput {
2825                message: "Table ID is required".to_string(),
2826            })
2827        })?;
2828
2829        if table_id.is_empty() {
2830            return Err(NamespaceError::InvalidInput {
2831                message: "Table ID cannot be empty".to_string(),
2832            }
2833            .into());
2834        }
2835
2836        let (namespace, table_name) = Self::split_object_id(table_id);
2837        let object_id = Self::build_object_id(&namespace, &table_name);
2838
2839        // Get table info before deleting
2840        let table_info = self.query_manifest_for_table(&object_id).await?;
2841
2842        let table_uri = match table_info {
2843            Some(info) => {
2844                // Delete from manifest only (leave physical data intact)
2845                self.delete_from_manifest(&object_id).boxed().await?;
2846                Self::construct_full_uri(&self.root, &info.location)?
2847            }
2848            None => {
2849                return Err(NamespaceError::TableNotFound {
2850                    message: object_id.to_string(),
2851                }
2852                .into());
2853            }
2854        };
2855
2856        Ok(DeregisterTableResponse {
2857            id: request.id.clone(),
2858            location: Some(table_uri),
2859            ..Default::default()
2860        })
2861    }
2862}
2863
2864#[cfg(test)]
2865mod tests {
2866    use crate::{DirectoryNamespaceBuilder, ManifestNamespace};
2867    use bytes::Bytes;
2868    use lance_core::utils::tempfile::TempStdDir;
2869    use lance_namespace::LanceNamespace;
2870    use lance_namespace::models::{
2871        CreateNamespaceRequest, CreateTableRequest, DescribeTableRequest, DropTableRequest,
2872        ListTablesRequest, TableExistsRequest,
2873    };
2874    use rstest::rstest;
2875
2876    fn create_test_ipc_data() -> Vec<u8> {
2877        use arrow::array::{Int32Array, StringArray};
2878        use arrow::datatypes::{DataType, Field, Schema};
2879        use arrow::ipc::writer::StreamWriter;
2880        use arrow::record_batch::RecordBatch;
2881        use std::sync::Arc;
2882
2883        let schema = Arc::new(Schema::new(vec![
2884            Field::new("id", DataType::Int32, false),
2885            Field::new("name", DataType::Utf8, false),
2886        ]));
2887
2888        let batch = RecordBatch::try_new(
2889            schema.clone(),
2890            vec![
2891                Arc::new(Int32Array::from(vec![1, 2, 3])),
2892                Arc::new(StringArray::from(vec!["a", "b", "c"])),
2893            ],
2894        )
2895        .unwrap();
2896
2897        let mut buffer = Vec::new();
2898        {
2899            let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap();
2900            writer.write(&batch).unwrap();
2901            writer.finish().unwrap();
2902        }
2903        buffer
2904    }
2905
2906    #[rstest]
2907    #[case::with_optimization(true)]
2908    #[case::without_optimization(false)]
2909    #[tokio::test]
2910    async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) {
2911        let temp_dir = TempStdDir::default();
2912        let temp_path = temp_dir.to_str().unwrap();
2913
2914        // Create a DirectoryNamespace with manifest enabled (default)
2915        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2916            .inline_optimization_enabled(inline_optimization)
2917            .build()
2918            .await
2919            .unwrap();
2920
2921        // Verify we can list tables (should be empty)
2922        let mut request = ListTablesRequest::new();
2923        request.id = Some(vec![]);
2924        let response = dir_namespace.list_tables(request).await.unwrap();
2925        assert_eq!(response.tables.len(), 0);
2926
2927        // Create a test table
2928        let buffer = create_test_ipc_data();
2929        let mut create_request = CreateTableRequest::new();
2930        create_request.id = Some(vec!["test_table".to_string()]);
2931
2932        let _response = dir_namespace
2933            .create_table(create_request, Bytes::from(buffer))
2934            .await
2935            .unwrap();
2936
2937        // List tables again - should see our new table
2938        let mut request = ListTablesRequest::new();
2939        request.id = Some(vec![]);
2940        let response = dir_namespace.list_tables(request).await.unwrap();
2941        assert_eq!(response.tables.len(), 1);
2942        assert_eq!(response.tables[0], "test_table");
2943    }
2944
2945    #[rstest]
2946    #[case::with_optimization(true)]
2947    #[case::without_optimization(false)]
2948    #[tokio::test]
2949    async fn test_manifest_namespace_table_exists(#[case] inline_optimization: bool) {
2950        let temp_dir = TempStdDir::default();
2951        let temp_path = temp_dir.to_str().unwrap();
2952
2953        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2954            .inline_optimization_enabled(inline_optimization)
2955            .build()
2956            .await
2957            .unwrap();
2958
2959        // Check non-existent table
2960        let mut request = TableExistsRequest::new();
2961        request.id = Some(vec!["nonexistent".to_string()]);
2962        let result = dir_namespace.table_exists(request).await;
2963        assert!(result.is_err());
2964
2965        // Create table
2966        let buffer = create_test_ipc_data();
2967        let mut create_request = CreateTableRequest::new();
2968        create_request.id = Some(vec!["test_table".to_string()]);
2969        dir_namespace
2970            .create_table(create_request, Bytes::from(buffer))
2971            .await
2972            .unwrap();
2973
2974        // Check existing table
2975        let mut request = TableExistsRequest::new();
2976        request.id = Some(vec!["test_table".to_string()]);
2977        let result = dir_namespace.table_exists(request).await;
2978        assert!(result.is_ok());
2979    }
2980
2981    #[rstest]
2982    #[case::with_optimization(true)]
2983    #[case::without_optimization(false)]
2984    #[tokio::test]
2985    async fn test_manifest_namespace_describe_table(#[case] inline_optimization: bool) {
2986        let temp_dir = TempStdDir::default();
2987        let temp_path = temp_dir.to_str().unwrap();
2988
2989        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2990            .inline_optimization_enabled(inline_optimization)
2991            .build()
2992            .await
2993            .unwrap();
2994
2995        // Describe non-existent table
2996        let mut request = DescribeTableRequest::new();
2997        request.id = Some(vec!["nonexistent".to_string()]);
2998        let result = dir_namespace.describe_table(request).await;
2999        assert!(result.is_err());
3000
3001        // Create table
3002        let buffer = create_test_ipc_data();
3003        let mut create_request = CreateTableRequest::new();
3004        create_request.id = Some(vec!["test_table".to_string()]);
3005        dir_namespace
3006            .create_table(create_request, Bytes::from(buffer))
3007            .await
3008            .unwrap();
3009
3010        // Describe existing table
3011        let mut request = DescribeTableRequest::new();
3012        request.id = Some(vec!["test_table".to_string()]);
3013        let response = dir_namespace.describe_table(request).await.unwrap();
3014        assert!(response.location.is_some());
3015        assert!(response.location.unwrap().contains("test_table"));
3016    }
3017
3018    #[rstest]
3019    #[case::with_optimization(true)]
3020    #[case::without_optimization(false)]
3021    #[tokio::test]
3022    async fn test_manifest_namespace_drop_table(#[case] inline_optimization: bool) {
3023        let temp_dir = TempStdDir::default();
3024        let temp_path = temp_dir.to_str().unwrap();
3025
3026        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3027            .inline_optimization_enabled(inline_optimization)
3028            .build()
3029            .await
3030            .unwrap();
3031
3032        // Create table
3033        let buffer = create_test_ipc_data();
3034        let mut create_request = CreateTableRequest::new();
3035        create_request.id = Some(vec!["test_table".to_string()]);
3036        dir_namespace
3037            .create_table(create_request, Bytes::from(buffer))
3038            .await
3039            .unwrap();
3040
3041        // Verify table exists
3042        let mut request = ListTablesRequest::new();
3043        request.id = Some(vec![]);
3044        let response = dir_namespace.list_tables(request).await.unwrap();
3045        assert_eq!(response.tables.len(), 1);
3046
3047        // Drop table
3048        let mut drop_request = DropTableRequest::new();
3049        drop_request.id = Some(vec!["test_table".to_string()]);
3050        let _response = dir_namespace.drop_table(drop_request).await.unwrap();
3051
3052        // Verify table is gone
3053        let mut request = ListTablesRequest::new();
3054        request.id = Some(vec![]);
3055        let response = dir_namespace.list_tables(request).await.unwrap();
3056        assert_eq!(response.tables.len(), 0);
3057    }
3058
3059    #[tokio::test]
3060    async fn test_list_tables_pagination_limit_zero() {
3061        let temp_dir = TempStdDir::default();
3062        let temp_path = temp_dir.to_str().unwrap();
3063
3064        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3065            .build()
3066            .await
3067            .unwrap();
3068
3069        let buffer = create_test_ipc_data();
3070        let mut create_request = CreateTableRequest::new();
3071        create_request.id = Some(vec!["alpha".to_string()]);
3072        dir_namespace
3073            .create_table(create_request, Bytes::from(buffer))
3074            .await
3075            .unwrap();
3076
3077        let response = dir_namespace
3078            .list_tables(ListTablesRequest {
3079                id: Some(vec![]),
3080                limit: Some(0),
3081                ..Default::default()
3082            })
3083            .await
3084            .unwrap();
3085
3086        assert!(response.tables.is_empty());
3087        assert!(response.page_token.is_none());
3088    }
3089
3090    #[rstest]
3091    #[case::with_optimization(true)]
3092    #[case::without_optimization(false)]
3093    #[tokio::test]
3094    async fn test_manifest_namespace_multiple_tables(#[case] inline_optimization: bool) {
3095        let temp_dir = TempStdDir::default();
3096        let temp_path = temp_dir.to_str().unwrap();
3097
3098        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3099            .inline_optimization_enabled(inline_optimization)
3100            .build()
3101            .await
3102            .unwrap();
3103
3104        // Create multiple tables
3105        let buffer = create_test_ipc_data();
3106        for i in 1..=3 {
3107            let mut create_request = CreateTableRequest::new();
3108            create_request.id = Some(vec![format!("table{}", i)]);
3109            dir_namespace
3110                .create_table(create_request, Bytes::from(buffer.clone()))
3111                .await
3112                .unwrap();
3113        }
3114
3115        // List all tables
3116        let mut request = ListTablesRequest::new();
3117        request.id = Some(vec![]);
3118        let response = dir_namespace.list_tables(request).await.unwrap();
3119        assert_eq!(response.tables.len(), 3);
3120        assert!(response.tables.contains(&"table1".to_string()));
3121        assert!(response.tables.contains(&"table2".to_string()));
3122        assert!(response.tables.contains(&"table3".to_string()));
3123    }
3124
3125    #[rstest]
3126    #[case::with_optimization(true)]
3127    #[case::without_optimization(false)]
3128    #[tokio::test]
3129    async fn test_directory_only_mode(#[case] inline_optimization: bool) {
3130        let temp_dir = TempStdDir::default();
3131        let temp_path = temp_dir.to_str().unwrap();
3132
3133        // Create a DirectoryNamespace with manifest disabled
3134        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3135            .manifest_enabled(false)
3136            .inline_optimization_enabled(inline_optimization)
3137            .build()
3138            .await
3139            .unwrap();
3140
3141        // Verify we can list tables (should be empty)
3142        let mut request = ListTablesRequest::new();
3143        request.id = Some(vec![]);
3144        let response = dir_namespace.list_tables(request).await.unwrap();
3145        assert_eq!(response.tables.len(), 0);
3146
3147        // Create a test table
3148        let buffer = create_test_ipc_data();
3149        let mut create_request = CreateTableRequest::new();
3150        create_request.id = Some(vec!["test_table".to_string()]);
3151
3152        // Create table - this should use directory-only mode
3153        let _response = dir_namespace
3154            .create_table(create_request, Bytes::from(buffer))
3155            .await
3156            .unwrap();
3157
3158        // List tables - should see our new table
3159        let mut request = ListTablesRequest::new();
3160        request.id = Some(vec![]);
3161        let response = dir_namespace.list_tables(request).await.unwrap();
3162        assert_eq!(response.tables.len(), 1);
3163        assert_eq!(response.tables[0], "test_table");
3164    }
3165
3166    #[rstest]
3167    #[case::with_optimization(true)]
3168    #[case::without_optimization(false)]
3169    #[tokio::test]
3170    async fn test_dual_mode_merge(#[case] inline_optimization: bool) {
3171        let temp_dir = TempStdDir::default();
3172        let temp_path = temp_dir.to_str().unwrap();
3173
3174        // Create a DirectoryNamespace with both manifest and directory enabled
3175        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3176            .manifest_enabled(true)
3177            .dir_listing_enabled(true)
3178            .inline_optimization_enabled(inline_optimization)
3179            .build()
3180            .await
3181            .unwrap();
3182
3183        // Create tables through manifest
3184        let buffer = create_test_ipc_data();
3185        let mut create_request = CreateTableRequest::new();
3186        create_request.id = Some(vec!["table1".to_string()]);
3187        dir_namespace
3188            .create_table(create_request, Bytes::from(buffer))
3189            .await
3190            .unwrap();
3191
3192        // List tables - should see table from both manifest and directory
3193        let mut request = ListTablesRequest::new();
3194        request.id = Some(vec![]);
3195        let response = dir_namespace.list_tables(request).await.unwrap();
3196        assert_eq!(response.tables.len(), 1);
3197        assert_eq!(response.tables[0], "table1");
3198    }
3199
3200    #[rstest]
3201    #[case::with_optimization(true)]
3202    #[case::without_optimization(false)]
3203    #[tokio::test]
3204    async fn test_manifest_only_mode(#[case] inline_optimization: bool) {
3205        let temp_dir = TempStdDir::default();
3206        let temp_path = temp_dir.to_str().unwrap();
3207
3208        // Create a DirectoryNamespace with only manifest enabled
3209        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3210            .manifest_enabled(true)
3211            .dir_listing_enabled(false)
3212            .inline_optimization_enabled(inline_optimization)
3213            .build()
3214            .await
3215            .unwrap();
3216
3217        // Create table
3218        let buffer = create_test_ipc_data();
3219        let mut create_request = CreateTableRequest::new();
3220        create_request.id = Some(vec!["test_table".to_string()]);
3221        dir_namespace
3222            .create_table(create_request, Bytes::from(buffer))
3223            .await
3224            .unwrap();
3225
3226        // List tables - should only use manifest
3227        let mut request = ListTablesRequest::new();
3228        request.id = Some(vec![]);
3229        let response = dir_namespace.list_tables(request).await.unwrap();
3230        assert_eq!(response.tables.len(), 1);
3231        assert_eq!(response.tables[0], "test_table");
3232    }
3233
3234    #[rstest]
3235    #[case::with_optimization(true)]
3236    #[case::without_optimization(false)]
3237    #[tokio::test]
3238    async fn test_drop_nonexistent_table(#[case] inline_optimization: bool) {
3239        let temp_dir = TempStdDir::default();
3240        let temp_path = temp_dir.to_str().unwrap();
3241
3242        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3243            .inline_optimization_enabled(inline_optimization)
3244            .build()
3245            .await
3246            .unwrap();
3247
3248        // Try to drop non-existent table
3249        let mut drop_request = DropTableRequest::new();
3250        drop_request.id = Some(vec!["nonexistent".to_string()]);
3251        let result = dir_namespace.drop_table(drop_request).await;
3252        assert!(result.is_err());
3253    }
3254
3255    #[rstest]
3256    #[case::with_optimization(true)]
3257    #[case::without_optimization(false)]
3258    #[tokio::test]
3259    async fn test_create_duplicate_table_fails(#[case] inline_optimization: bool) {
3260        let temp_dir = TempStdDir::default();
3261        let temp_path = temp_dir.to_str().unwrap();
3262
3263        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3264            .inline_optimization_enabled(inline_optimization)
3265            .build()
3266            .await
3267            .unwrap();
3268
3269        // Create table
3270        let buffer = create_test_ipc_data();
3271        let mut create_request = CreateTableRequest::new();
3272        create_request.id = Some(vec!["test_table".to_string()]);
3273        dir_namespace
3274            .create_table(create_request, Bytes::from(buffer.clone()))
3275            .await
3276            .unwrap();
3277
3278        // Try to create table with same name - should fail
3279        let mut create_request = CreateTableRequest::new();
3280        create_request.id = Some(vec!["test_table".to_string()]);
3281        let result = dir_namespace
3282            .create_table(create_request, Bytes::from(buffer))
3283            .await;
3284        assert!(result.is_err());
3285    }
3286
3287    #[rstest]
3288    #[case::with_optimization(true)]
3289    #[case::without_optimization(false)]
3290    #[tokio::test]
3291    async fn test_create_child_namespace(#[case] inline_optimization: bool) {
3292        use lance_namespace::models::{
3293            CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
3294        };
3295
3296        let temp_dir = TempStdDir::default();
3297        let temp_path = temp_dir.to_str().unwrap();
3298
3299        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3300            .inline_optimization_enabled(inline_optimization)
3301            .build()
3302            .await
3303            .unwrap();
3304
3305        // Create a child namespace
3306        let mut create_req = CreateNamespaceRequest::new();
3307        create_req.id = Some(vec!["ns1".to_string()]);
3308        let result = dir_namespace.create_namespace(create_req).await;
3309        assert!(
3310            result.is_ok(),
3311            "Failed to create child namespace: {:?}",
3312            result.err()
3313        );
3314
3315        // Verify namespace exists
3316        let exists_req = NamespaceExistsRequest {
3317            id: Some(vec!["ns1".to_string()]),
3318            ..Default::default()
3319        };
3320        let result = dir_namespace.namespace_exists(exists_req).await;
3321        assert!(result.is_ok(), "Namespace should exist");
3322
3323        // List child namespaces of root
3324        let list_req = ListNamespacesRequest {
3325            id: Some(vec![]),
3326            page_token: None,
3327            limit: None,
3328            ..Default::default()
3329        };
3330        let result = dir_namespace.list_namespaces(list_req).await;
3331        assert!(result.is_ok());
3332        let namespaces = result.unwrap();
3333        assert_eq!(namespaces.namespaces.len(), 1);
3334        assert_eq!(namespaces.namespaces[0], "ns1");
3335    }
3336
3337    #[rstest]
3338    #[case::with_optimization(true)]
3339    #[case::without_optimization(false)]
3340    #[tokio::test]
3341    async fn test_create_nested_namespace(#[case] inline_optimization: bool) {
3342        use lance_namespace::models::{
3343            CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
3344        };
3345
3346        let temp_dir = TempStdDir::default();
3347        let temp_path = temp_dir.to_str().unwrap();
3348
3349        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3350            .inline_optimization_enabled(inline_optimization)
3351            .build()
3352            .await
3353            .unwrap();
3354
3355        // Create parent namespace
3356        let mut create_req = CreateNamespaceRequest::new();
3357        create_req.id = Some(vec!["parent".to_string()]);
3358        dir_namespace.create_namespace(create_req).await.unwrap();
3359
3360        // Create nested child namespace
3361        let mut create_req = CreateNamespaceRequest::new();
3362        create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
3363        let result = dir_namespace.create_namespace(create_req).await;
3364        assert!(
3365            result.is_ok(),
3366            "Failed to create nested namespace: {:?}",
3367            result.err()
3368        );
3369
3370        // Verify nested namespace exists
3371        let exists_req = NamespaceExistsRequest {
3372            id: Some(vec!["parent".to_string(), "child".to_string()]),
3373            ..Default::default()
3374        };
3375        let result = dir_namespace.namespace_exists(exists_req).await;
3376        assert!(result.is_ok(), "Nested namespace should exist");
3377
3378        // List child namespaces of parent
3379        let list_req = ListNamespacesRequest {
3380            id: Some(vec!["parent".to_string()]),
3381            page_token: None,
3382            limit: None,
3383            ..Default::default()
3384        };
3385        let result = dir_namespace.list_namespaces(list_req).await;
3386        assert!(result.is_ok());
3387        let namespaces = result.unwrap();
3388        assert_eq!(namespaces.namespaces.len(), 1);
3389        assert_eq!(namespaces.namespaces[0], "child");
3390    }
3391
3392    #[rstest]
3393    #[case::with_optimization(true)]
3394    #[case::without_optimization(false)]
3395    #[tokio::test]
3396    async fn test_create_namespace_without_parent_fails(#[case] inline_optimization: bool) {
3397        use lance_namespace::models::CreateNamespaceRequest;
3398
3399        let temp_dir = TempStdDir::default();
3400        let temp_path = temp_dir.to_str().unwrap();
3401
3402        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3403            .inline_optimization_enabled(inline_optimization)
3404            .build()
3405            .await
3406            .unwrap();
3407
3408        // Try to create nested namespace without parent
3409        let mut create_req = CreateNamespaceRequest::new();
3410        create_req.id = Some(vec!["nonexistent_parent".to_string(), "child".to_string()]);
3411        let result = dir_namespace.create_namespace(create_req).await;
3412        assert!(result.is_err(), "Should fail when parent doesn't exist");
3413    }
3414
3415    #[rstest]
3416    #[case::with_optimization(true)]
3417    #[case::without_optimization(false)]
3418    #[tokio::test]
3419    async fn test_drop_child_namespace(#[case] inline_optimization: bool) {
3420        use lance_namespace::models::{
3421            CreateNamespaceRequest, DropNamespaceRequest, NamespaceExistsRequest,
3422        };
3423
3424        let temp_dir = TempStdDir::default();
3425        let temp_path = temp_dir.to_str().unwrap();
3426
3427        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3428            .inline_optimization_enabled(inline_optimization)
3429            .build()
3430            .await
3431            .unwrap();
3432
3433        // Create a child namespace
3434        let mut create_req = CreateNamespaceRequest::new();
3435        create_req.id = Some(vec!["ns1".to_string()]);
3436        dir_namespace.create_namespace(create_req).await.unwrap();
3437
3438        // Drop the namespace
3439        let mut drop_req = DropNamespaceRequest::new();
3440        drop_req.id = Some(vec!["ns1".to_string()]);
3441        let result = dir_namespace.drop_namespace(drop_req).await;
3442        assert!(
3443            result.is_ok(),
3444            "Failed to drop namespace: {:?}",
3445            result.err()
3446        );
3447
3448        // Verify namespace no longer exists
3449        let exists_req = NamespaceExistsRequest {
3450            id: Some(vec!["ns1".to_string()]),
3451            ..Default::default()
3452        };
3453        let result = dir_namespace.namespace_exists(exists_req).await;
3454        assert!(result.is_err(), "Namespace should not exist after drop");
3455    }
3456
3457    #[rstest]
3458    #[case::with_optimization(true)]
3459    #[case::without_optimization(false)]
3460    #[tokio::test]
3461    async fn test_drop_namespace_with_children_fails(#[case] inline_optimization: bool) {
3462        use lance_namespace::models::{CreateNamespaceRequest, DropNamespaceRequest};
3463
3464        let temp_dir = TempStdDir::default();
3465        let temp_path = temp_dir.to_str().unwrap();
3466
3467        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3468            .inline_optimization_enabled(inline_optimization)
3469            .build()
3470            .await
3471            .unwrap();
3472
3473        // Create parent and child namespaces
3474        let mut create_req = CreateNamespaceRequest::new();
3475        create_req.id = Some(vec!["parent".to_string()]);
3476        dir_namespace.create_namespace(create_req).await.unwrap();
3477
3478        let mut create_req = CreateNamespaceRequest::new();
3479        create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
3480        dir_namespace.create_namespace(create_req).await.unwrap();
3481
3482        // Try to drop parent namespace - should fail because it has children
3483        let mut drop_req = DropNamespaceRequest::new();
3484        drop_req.id = Some(vec!["parent".to_string()]);
3485        let result = dir_namespace.drop_namespace(drop_req).await;
3486        assert!(result.is_err(), "Should fail when namespace has children");
3487    }
3488
3489    #[rstest]
3490    #[case::with_optimization(true)]
3491    #[case::without_optimization(false)]
3492    #[tokio::test]
3493    async fn test_create_table_in_child_namespace(#[case] inline_optimization: bool) {
3494        use lance_namespace::models::{
3495            CreateNamespaceRequest, CreateTableRequest, ListTablesRequest,
3496        };
3497
3498        let temp_dir = TempStdDir::default();
3499        let temp_path = temp_dir.to_str().unwrap();
3500
3501        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3502            .inline_optimization_enabled(inline_optimization)
3503            .build()
3504            .await
3505            .unwrap();
3506
3507        // Create a child namespace
3508        let mut create_ns_req = CreateNamespaceRequest::new();
3509        create_ns_req.id = Some(vec!["ns1".to_string()]);
3510        dir_namespace.create_namespace(create_ns_req).await.unwrap();
3511
3512        // Create a table in the child namespace
3513        let buffer = create_test_ipc_data();
3514        let mut create_table_req = CreateTableRequest::new();
3515        create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
3516        let result = dir_namespace
3517            .create_table(create_table_req, Bytes::from(buffer))
3518            .await;
3519        assert!(
3520            result.is_ok(),
3521            "Failed to create table in child namespace: {:?}",
3522            result.err()
3523        );
3524
3525        // List tables in the namespace
3526        let list_req = ListTablesRequest {
3527            id: Some(vec!["ns1".to_string()]),
3528            page_token: None,
3529            limit: None,
3530            ..Default::default()
3531        };
3532        let result = dir_namespace.list_tables(list_req).await;
3533        assert!(result.is_ok());
3534        let tables = result.unwrap();
3535        assert_eq!(tables.tables.len(), 1);
3536        assert_eq!(tables.tables[0], "table1");
3537    }
3538
3539    #[rstest]
3540    #[case::with_optimization(true)]
3541    #[case::without_optimization(false)]
3542    #[tokio::test]
3543    async fn test_describe_child_namespace(#[case] inline_optimization: bool) {
3544        use lance_namespace::models::{CreateNamespaceRequest, DescribeNamespaceRequest};
3545
3546        let temp_dir = TempStdDir::default();
3547        let temp_path = temp_dir.to_str().unwrap();
3548
3549        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3550            .inline_optimization_enabled(inline_optimization)
3551            .build()
3552            .await
3553            .unwrap();
3554
3555        // Create a child namespace with properties
3556        let mut properties = std::collections::HashMap::new();
3557        properties.insert("key1".to_string(), "value1".to_string());
3558
3559        let mut create_req = CreateNamespaceRequest::new();
3560        create_req.id = Some(vec!["ns1".to_string()]);
3561        create_req.properties = Some(properties.clone());
3562        dir_namespace.create_namespace(create_req).await.unwrap();
3563
3564        // Describe the namespace
3565        let describe_req = DescribeNamespaceRequest {
3566            id: Some(vec!["ns1".to_string()]),
3567            ..Default::default()
3568        };
3569        let result = dir_namespace.describe_namespace(describe_req).await;
3570        assert!(
3571            result.is_ok(),
3572            "Failed to describe namespace: {:?}",
3573            result.err()
3574        );
3575        let response = result.unwrap();
3576        assert!(response.properties.is_some());
3577        assert_eq!(
3578            response.properties.unwrap().get("key1"),
3579            Some(&"value1".to_string())
3580        );
3581    }
3582
3583    #[rstest]
3584    #[case::with_optimization(true)]
3585    #[case::without_optimization(false)]
3586    #[tokio::test]
3587    async fn test_concurrent_create_and_drop_single_instance(#[case] inline_optimization: bool) {
3588        use futures::future::join_all;
3589        use std::sync::Arc;
3590
3591        let temp_dir = TempStdDir::default();
3592        let temp_path = temp_dir.to_str().unwrap();
3593
3594        let dir_namespace = Arc::new(
3595            DirectoryNamespaceBuilder::new(temp_path)
3596                .inline_optimization_enabled(inline_optimization)
3597                .build()
3598                .await
3599                .unwrap(),
3600        );
3601
3602        // Initialize namespace first - create parent namespace to ensure __manifest table
3603        // is created before concurrent operations
3604        let mut create_ns_request = CreateNamespaceRequest::new();
3605        create_ns_request.id = Some(vec!["test_ns".to_string()]);
3606        dir_namespace
3607            .create_namespace(create_ns_request)
3608            .await
3609            .unwrap();
3610
3611        let num_tables = 10;
3612        let mut handles = Vec::new();
3613
3614        for i in 0..num_tables {
3615            let ns = dir_namespace.clone();
3616            let handle = async move {
3617                let table_name = format!("concurrent_table_{}", i);
3618                let table_id = vec!["test_ns".to_string(), table_name.clone()];
3619                let buffer = create_test_ipc_data();
3620
3621                // Create table
3622                let mut create_request = CreateTableRequest::new();
3623                create_request.id = Some(table_id.clone());
3624                ns.create_table(create_request, Bytes::from(buffer))
3625                    .await
3626                    .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3627
3628                // Drop table
3629                let mut drop_request = DropTableRequest::new();
3630                drop_request.id = Some(table_id);
3631                ns.drop_table(drop_request)
3632                    .await
3633                    .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3634
3635                Ok::<_, lance_core::Error>(())
3636            };
3637            handles.push(handle);
3638        }
3639
3640        let results = join_all(handles).await;
3641        for result in results {
3642            assert!(result.is_ok(), "All concurrent operations should succeed");
3643        }
3644
3645        // Verify all tables are dropped
3646        let mut request = ListTablesRequest::new();
3647        request.id = Some(vec!["test_ns".to_string()]);
3648        let response = dir_namespace.list_tables(request).await.unwrap();
3649        assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3650    }
3651
3652    #[rstest]
3653    #[case::with_optimization(true)]
3654    #[case::without_optimization(false)]
3655    #[tokio::test]
3656    async fn test_concurrent_create_and_drop_multiple_instances(#[case] inline_optimization: bool) {
3657        use futures::future::join_all;
3658
3659        let temp_dir = TempStdDir::default();
3660        let temp_path = temp_dir.to_str().unwrap().to_string();
3661
3662        // Initialize namespace first with a single instance to ensure __manifest
3663        // table is created and parent namespace exists before concurrent operations
3664        let init_ns = DirectoryNamespaceBuilder::new(&temp_path)
3665            .inline_optimization_enabled(inline_optimization)
3666            .build()
3667            .await
3668            .unwrap();
3669        let mut create_ns_request = CreateNamespaceRequest::new();
3670        create_ns_request.id = Some(vec!["test_ns".to_string()]);
3671        init_ns.create_namespace(create_ns_request).await.unwrap();
3672
3673        let num_tables = 10;
3674        let mut handles = Vec::new();
3675
3676        for i in 0..num_tables {
3677            let path = temp_path.clone();
3678            let handle = async move {
3679                // Each task creates its own namespace instance
3680                let ns = DirectoryNamespaceBuilder::new(&path)
3681                    .inline_optimization_enabled(inline_optimization)
3682                    .build()
3683                    .await
3684                    .unwrap();
3685
3686                let table_name = format!("multi_ns_table_{}", i);
3687                let table_id = vec!["test_ns".to_string(), table_name.clone()];
3688                let buffer = create_test_ipc_data();
3689
3690                // Create table
3691                let mut create_request = CreateTableRequest::new();
3692                create_request.id = Some(table_id.clone());
3693                ns.create_table(create_request, Bytes::from(buffer))
3694                    .await
3695                    .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3696
3697                // Drop table
3698                let mut drop_request = DropTableRequest::new();
3699                drop_request.id = Some(table_id);
3700                ns.drop_table(drop_request)
3701                    .await
3702                    .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3703
3704                Ok::<_, lance_core::Error>(())
3705            };
3706            handles.push(handle);
3707        }
3708
3709        let results = join_all(handles).await;
3710        for result in results {
3711            assert!(result.is_ok(), "All concurrent operations should succeed");
3712        }
3713
3714        // Verify with a fresh namespace instance
3715        let verify_ns = DirectoryNamespaceBuilder::new(&temp_path)
3716            .inline_optimization_enabled(inline_optimization)
3717            .build()
3718            .await
3719            .unwrap();
3720
3721        let mut request = ListTablesRequest::new();
3722        request.id = Some(vec!["test_ns".to_string()]);
3723        let response = verify_ns.list_tables(request).await.unwrap();
3724        assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3725    }
3726
3727    #[rstest]
3728    #[case::with_optimization(true)]
3729    #[case::without_optimization(false)]
3730    #[tokio::test]
3731    async fn test_concurrent_create_then_drop_from_different_instance(
3732        #[case] inline_optimization: bool,
3733    ) {
3734        use futures::future::join_all;
3735
3736        let temp_dir = TempStdDir::default();
3737        let temp_path = temp_dir.to_str().unwrap().to_string();
3738
3739        // Initialize namespace first with a single instance to ensure __manifest
3740        // table is created and parent namespace exists before concurrent operations
3741        let init_ns = DirectoryNamespaceBuilder::new(&temp_path)
3742            .inline_optimization_enabled(inline_optimization)
3743            .build()
3744            .await
3745            .unwrap();
3746        let mut create_ns_request = CreateNamespaceRequest::new();
3747        create_ns_request.id = Some(vec!["test_ns".to_string()]);
3748        init_ns.create_namespace(create_ns_request).await.unwrap();
3749
3750        let num_tables = 10;
3751
3752        // Phase 1: Create all tables concurrently using separate namespace instances
3753        let mut create_handles = Vec::new();
3754        for i in 0..num_tables {
3755            let path = temp_path.clone();
3756            let handle = async move {
3757                let ns = DirectoryNamespaceBuilder::new(&path)
3758                    .inline_optimization_enabled(inline_optimization)
3759                    .build()
3760                    .await
3761                    .unwrap();
3762
3763                let table_name = format!("cross_instance_table_{}", i);
3764                let table_id = vec!["test_ns".to_string(), table_name.clone()];
3765                let buffer = create_test_ipc_data();
3766
3767                let mut create_request = CreateTableRequest::new();
3768                create_request.id = Some(table_id);
3769                ns.create_table(create_request, Bytes::from(buffer))
3770                    .await
3771                    .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3772
3773                Ok::<_, lance_core::Error>(())
3774            };
3775            create_handles.push(handle);
3776        }
3777
3778        let create_results = join_all(create_handles).await;
3779        for result in create_results {
3780            assert!(result.is_ok(), "All create operations should succeed");
3781        }
3782
3783        // Phase 2: Drop all tables concurrently using NEW namespace instances
3784        let mut drop_handles = Vec::new();
3785        for i in 0..num_tables {
3786            let path = temp_path.clone();
3787            let handle = async move {
3788                let ns = DirectoryNamespaceBuilder::new(&path)
3789                    .inline_optimization_enabled(inline_optimization)
3790                    .build()
3791                    .await
3792                    .unwrap();
3793
3794                let table_name = format!("cross_instance_table_{}", i);
3795                let table_id = vec!["test_ns".to_string(), table_name.clone()];
3796
3797                let mut drop_request = DropTableRequest::new();
3798                drop_request.id = Some(table_id);
3799                ns.drop_table(drop_request)
3800                    .await
3801                    .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3802
3803                Ok::<_, lance_core::Error>(())
3804            };
3805            drop_handles.push(handle);
3806        }
3807
3808        let drop_results = join_all(drop_handles).await;
3809        for result in drop_results {
3810            assert!(result.is_ok(), "All drop operations should succeed");
3811        }
3812
3813        // Verify all tables are dropped
3814        let verify_ns = DirectoryNamespaceBuilder::new(&temp_path)
3815            .inline_optimization_enabled(inline_optimization)
3816            .build()
3817            .await
3818            .unwrap();
3819
3820        let mut request = ListTablesRequest::new();
3821        request.id = Some(vec!["test_ns".to_string()]);
3822        let response = verify_ns.list_tables(request).await.unwrap();
3823        assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3824    }
3825
3826    #[test]
3827    fn test_construct_full_uri_with_cloud_urls() {
3828        // Test S3-style URL with nested path (no trailing slash)
3829        let s3_result =
3830            ManifestNamespace::construct_full_uri("s3://bucket/path/subdir", "table.lance")
3831                .unwrap();
3832        assert_eq!(
3833            s3_result, "s3://bucket/path/subdir/table.lance",
3834            "S3 URL should correctly append table name to nested path"
3835        );
3836
3837        // Test Azure-style URL with nested path (no trailing slash)
3838        let az_result =
3839            ManifestNamespace::construct_full_uri("az://container/path/subdir", "table.lance")
3840                .unwrap();
3841        assert_eq!(
3842            az_result, "az://container/path/subdir/table.lance",
3843            "Azure URL should correctly append table name to nested path"
3844        );
3845
3846        // Test GCS-style URL with nested path (no trailing slash)
3847        let gs_result =
3848            ManifestNamespace::construct_full_uri("gs://bucket/path/subdir", "table.lance")
3849                .unwrap();
3850        assert_eq!(
3851            gs_result, "gs://bucket/path/subdir/table.lance",
3852            "GCS URL should correctly append table name to nested path"
3853        );
3854
3855        // Test with deeper nesting
3856        let deep_result =
3857            ManifestNamespace::construct_full_uri("s3://bucket/a/b/c/d", "my_table.lance").unwrap();
3858        assert_eq!(
3859            deep_result, "s3://bucket/a/b/c/d/my_table.lance",
3860            "Deeply nested path should work correctly"
3861        );
3862
3863        // Test with root-level path (single segment after bucket)
3864        let shallow_result =
3865            ManifestNamespace::construct_full_uri("s3://bucket", "table.lance").unwrap();
3866        assert_eq!(
3867            shallow_result, "s3://bucket/table.lance",
3868            "Single-level nested path should work correctly"
3869        );
3870
3871        // Test that URLs with trailing slash already work (no regression)
3872        let trailing_slash_result =
3873            ManifestNamespace::construct_full_uri("s3://bucket/path/subdir/", "table.lance")
3874                .unwrap();
3875        assert_eq!(
3876            trailing_slash_result, "s3://bucket/path/subdir/table.lance",
3877            "URL with existing trailing slash should still work"
3878        );
3879
3880        // Test that URLs with empty query string don't include trailing "?"
3881        // This is important because URL::to_string() can add "?" for empty queries
3882        let empty_query_result =
3883            ManifestNamespace::construct_full_uri("s3://bucket/path?", "table.lance").unwrap();
3884        assert_eq!(
3885            empty_query_result, "s3://bucket/path/table.lance",
3886            "URL with empty query string should not include trailing '?'"
3887        );
3888
3889        // Test that URLs with actual query parameters have them stripped
3890        // (query parameters are not meaningful for storage paths)
3891        let query_param_result =
3892            ManifestNamespace::construct_full_uri("s3://bucket/path?param=value", "table.lance")
3893                .unwrap();
3894        assert_eq!(
3895            query_param_result, "s3://bucket/path/table.lance",
3896            "URL with query parameters should have them stripped"
3897        );
3898    }
3899
3900    #[test]
3901    fn test_construct_full_uri_with_dollar_sign() {
3902        let result =
3903            ManifestNamespace::construct_full_uri("/tmp/root", "hash_workspace$test_table")
3904                .unwrap();
3905
3906        assert!(
3907            result.ends_with("/tmp/root/hash_workspace$test_table"),
3908            "local file URI should preserve dollar signs without adding empty path segments: {}",
3909            result
3910        );
3911        assert!(
3912            !result.contains("//hash_workspace$test_table"),
3913            "local file URI should not add a double slash before table directory: {}",
3914            result
3915        );
3916    }
3917
3918    #[test]
3919    fn test_construct_full_uri_with_nested_relative_location() {
3920        let result =
3921            ManifestNamespace::construct_full_uri("/tmp/root", "workspace/physical_table.lance")
3922                .unwrap();
3923
3924        assert!(
3925            result.ends_with("/tmp/root/workspace/physical_table.lance"),
3926            "nested relative location should preserve path separators: {}",
3927            result
3928        );
3929        assert!(
3930            !result.contains("%2Fphysical_table.lance"),
3931            "nested relative location should not encode path separators: {}",
3932            result
3933        );
3934    }
3935
3936    /// Test that concurrent create_table calls for the same table name don't
3937    /// create duplicate entries in the manifest. Uses two independent
3938    /// ManifestNamespace instances pointing at the same directory to simulate
3939    /// two separate OS processes racing on table creation. The conflict_retries
3940    /// setting on the MergeInsert ensures the second operation properly detects
3941    /// the duplicate via WhenMatched::Fail after retrying against the latest data.
3942    #[tokio::test]
3943    async fn test_concurrent_create_table_no_duplicates() {
3944        let temp_dir = TempStdDir::default();
3945        let temp_path = temp_dir.to_str().unwrap();
3946
3947        // Two independent namespace instances = two separate "processes"
3948        // sharing the same underlying filesystem directory.
3949        let ns1 = DirectoryNamespaceBuilder::new(temp_path)
3950            .inline_optimization_enabled(false)
3951            .build()
3952            .await
3953            .unwrap();
3954        let ns2 = DirectoryNamespaceBuilder::new(temp_path)
3955            .inline_optimization_enabled(false)
3956            .build()
3957            .await
3958            .unwrap();
3959
3960        let buffer = create_test_ipc_data();
3961
3962        let mut req1 = CreateTableRequest::new();
3963        req1.id = Some(vec!["race_table".to_string()]);
3964        let mut req2 = CreateTableRequest::new();
3965        req2.id = Some(vec!["race_table".to_string()]);
3966
3967        // Launch both create_table calls concurrently
3968        let (result1, result2) = tokio::join!(
3969            ns1.create_table(req1, Bytes::from(buffer.clone())),
3970            ns2.create_table(req2, Bytes::from(buffer.clone())),
3971        );
3972
3973        // Exactly one should succeed and one should fail
3974        let success_count = [&result1, &result2].iter().filter(|r| r.is_ok()).count();
3975        let failure_count = [&result1, &result2].iter().filter(|r| r.is_err()).count();
3976        assert_eq!(
3977            success_count, 1,
3978            "Exactly one create should succeed, got: result1={:?}, result2={:?}",
3979            result1, result2
3980        );
3981        assert_eq!(
3982            failure_count, 1,
3983            "Exactly one create should fail, got: result1={:?}, result2={:?}",
3984            result1, result2
3985        );
3986
3987        // Verify only one table entry exists in the manifest
3988        let ns_check = DirectoryNamespaceBuilder::new(temp_path)
3989            .inline_optimization_enabled(false)
3990            .build()
3991            .await
3992            .unwrap();
3993        let mut list_request = ListTablesRequest::new();
3994        list_request.id = Some(vec![]);
3995        let response = ns_check.list_tables(list_request).await.unwrap();
3996        assert_eq!(
3997            response.tables.len(),
3998            1,
3999            "Should have exactly 1 table, found: {:?}",
4000            response.tables
4001        );
4002        assert_eq!(response.tables[0], "race_table");
4003
4004        // Also verify describe_table works (no "found 2" error)
4005        let mut describe_request = DescribeTableRequest::new();
4006        describe_request.id = Some(vec!["race_table".to_string()]);
4007        let describe_result = ns_check.describe_table(describe_request).await;
4008        assert!(
4009            describe_result.is_ok(),
4010            "describe_table should not fail with duplicate entries: {:?}",
4011            describe_result
4012        );
4013    }
4014
4015    // --- apply_pagination unit tests ---
4016
4017    fn names(v: &[&str]) -> Vec<String> {
4018        v.iter().map(|s| s.to_string()).collect()
4019    }
4020
4021    #[test]
4022    fn test_apply_pagination_no_token_no_limit() {
4023        let mut n = names(&["b", "a", "c"]);
4024        let next = ManifestNamespace::apply_pagination(&mut n, None, None);
4025        assert_eq!(n, names(&["a", "b", "c"]));
4026        assert_eq!(next, None);
4027    }
4028
4029    #[test]
4030    fn test_apply_pagination_limit_truncates_and_returns_token() {
4031        let mut n = names(&["c", "a", "b"]);
4032        let next = ManifestNamespace::apply_pagination(&mut n, None, Some(2));
4033        assert_eq!(n, names(&["a", "b"]));
4034        assert_eq!(next, Some("b".to_string()));
4035    }
4036
4037    #[test]
4038    fn test_apply_pagination_limit_zero_returns_empty_no_token() {
4039        let mut n = names(&["a", "b", "c"]);
4040        let next = ManifestNamespace::apply_pagination(&mut n, None, Some(0));
4041        assert!(n.is_empty());
4042        assert_eq!(next, None);
4043    }
4044
4045    #[test]
4046    fn test_apply_pagination_page_token_in_list() {
4047        // "b" is in the list; should start from "c" (strict >)
4048        let mut n = names(&["a", "b", "c", "d"]);
4049        let next = ManifestNamespace::apply_pagination(&mut n, Some("b".to_string()), None);
4050        assert_eq!(n, names(&["c", "d"]));
4051        assert_eq!(next, None);
4052    }
4053
4054    #[test]
4055    fn test_apply_pagination_page_token_past_all_items() {
4056        let mut n = names(&["a", "b", "c"]);
4057        let next = ManifestNamespace::apply_pagination(&mut n, Some("z".to_string()), None);
4058        assert!(n.is_empty());
4059        assert_eq!(next, None);
4060    }
4061
4062    #[test]
4063    fn test_apply_pagination_token_and_limit_combined() {
4064        let mut n = names(&["a", "b", "c", "d", "e"]);
4065        let next = ManifestNamespace::apply_pagination(&mut n, Some("b".to_string()), Some(2));
4066        assert_eq!(n, names(&["c", "d"]));
4067        assert_eq!(next, Some("d".to_string()));
4068    }
4069}