Skip to main content

lance_namespace_impls/dir/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Manifest-based namespace implementation
5//!
6//! This module provides a namespace implementation that uses a manifest table
7//! to track tables and nested namespaces.
8
9use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray};
10use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
11use arrow_ipc::reader::StreamReader;
12use async_trait::async_trait;
13use bytes::Bytes;
14use futures::{stream::StreamExt, FutureExt};
15use lance::dataset::optimize::{compact_files, CompactionOptions};
16use lance::dataset::{builder::DatasetBuilder, WriteParams};
17use lance::session::Session;
18use lance::{dataset::scanner::Scanner, Dataset};
19use lance_core::{box_error, Error, Result};
20use lance_index::optimize::OptimizeOptions;
21use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
22use lance_index::traits::DatasetIndexExt;
23use lance_index::IndexType;
24use lance_io::object_store::{ObjectStore, ObjectStoreParams};
25use lance_namespace::models::{
26    CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
27    CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DeclareTableRequest,
28    DeclareTableResponse, DeregisterTableRequest, DeregisterTableResponse,
29    DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
30    DescribeTableResponse, DropNamespaceRequest, DropNamespaceResponse, DropTableRequest,
31    DropTableResponse, ListNamespacesRequest, ListNamespacesResponse, ListTablesRequest,
32    ListTablesResponse, NamespaceExistsRequest, RegisterTableRequest, RegisterTableResponse,
33    TableExistsRequest,
34};
35use lance_namespace::schema::arrow_schema_to_json;
36use lance_namespace::LanceNamespace;
37use object_store::path::Path;
38use snafu::location;
39use std::io::Cursor;
40use std::{
41    collections::HashMap,
42    hash::{DefaultHasher, Hash, Hasher},
43    ops::{Deref, DerefMut},
44    sync::Arc,
45};
46use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
47
48const MANIFEST_TABLE_NAME: &str = "__manifest";
49const DELIMITER: &str = "$";
50
51// Index names for the __manifest table
52/// BTREE index on the object_id column for fast lookups
53const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
54/// Bitmap index on the object_type column for filtering by type
55const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
56/// LabelList index on the base_objects column for view dependencies
57const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
58
59/// Object types that can be stored in the manifest
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum ObjectType {
62    Namespace,
63    Table,
64}
65
66impl ObjectType {
67    pub fn as_str(&self) -> &str {
68        match self {
69            Self::Namespace => "namespace",
70            Self::Table => "table",
71        }
72    }
73
74    pub fn parse(s: &str) -> Result<Self> {
75        match s {
76            "namespace" => Ok(Self::Namespace),
77            "table" => Ok(Self::Table),
78            _ => Err(Error::io(
79                format!("Invalid object type: {}", s),
80                location!(),
81            )),
82        }
83    }
84}
85
86/// Information about a table stored in the manifest
87#[derive(Debug, Clone)]
88pub struct TableInfo {
89    pub namespace: Vec<String>,
90    pub name: String,
91    pub location: String,
92}
93
94/// Information about a namespace stored in the manifest
95#[derive(Debug, Clone)]
96pub struct NamespaceInfo {
97    pub namespace: Vec<String>,
98    pub name: String,
99    pub metadata: Option<HashMap<String, String>>,
100}
101
102/// A wrapper around a Dataset that provides concurrent access.
103///
104/// This can be cloned cheaply. It supports concurrent reads or exclusive writes.
105/// The manifest dataset is always kept strongly consistent by reloading on each read.
106#[derive(Debug, Clone)]
107pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);
108
109impl DatasetConsistencyWrapper {
110    /// Create a new wrapper with the given dataset.
111    pub fn new(dataset: Dataset) -> Self {
112        Self(Arc::new(RwLock::new(dataset)))
113    }
114
115    /// Get an immutable reference to the dataset.
116    /// Always reloads to ensure strong consistency.
117    pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
118        self.reload().await?;
119        Ok(DatasetReadGuard {
120            guard: self.0.read().await,
121        })
122    }
123
124    /// Get a mutable reference to the dataset.
125    /// Always reloads to ensure strong consistency.
126    pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
127        self.reload().await?;
128        Ok(DatasetWriteGuard {
129            guard: self.0.write().await,
130        })
131    }
132
133    /// Provide a known latest version of the dataset.
134    ///
135    /// This is usually done after some write operation, which inherently will
136    /// have the latest version.
137    pub async fn set_latest(&self, dataset: Dataset) {
138        let mut write_guard = self.0.write().await;
139        if dataset.manifest().version > write_guard.manifest().version {
140            *write_guard = dataset;
141        }
142    }
143
144    /// Reload the dataset to the latest version.
145    async fn reload(&self) -> Result<()> {
146        // First check if we need to reload (with read lock)
147        let read_guard = self.0.read().await;
148        let dataset_uri = read_guard.uri().to_string();
149        let current_version = read_guard.version().version;
150        log::debug!(
151            "Reload starting for uri={}, current_version={}",
152            dataset_uri,
153            current_version
154        );
155        let latest_version = read_guard
156            .latest_version_id()
157            .await
158            .map_err(|e| Error::IO {
159                source: box_error(std::io::Error::other(format!(
160                    "Failed to get latest version: {}",
161                    e
162                ))),
163                location: location!(),
164            })?;
165        log::debug!(
166            "Reload got latest_version={} for uri={}, current_version={}",
167            latest_version,
168            dataset_uri,
169            current_version
170        );
171        drop(read_guard);
172
173        // If already up-to-date, return early
174        if latest_version == current_version {
175            log::debug!("Already up-to-date for uri={}", dataset_uri);
176            return Ok(());
177        }
178
179        // Need to reload, acquire write lock
180        let mut write_guard = self.0.write().await;
181
182        // Double-check after acquiring write lock (someone else might have reloaded)
183        let latest_version = write_guard
184            .latest_version_id()
185            .await
186            .map_err(|e| Error::IO {
187                source: box_error(std::io::Error::other(format!(
188                    "Failed to get latest version: {}",
189                    e
190                ))),
191                location: location!(),
192            })?;
193
194        if latest_version != write_guard.version().version {
195            write_guard.checkout_latest().await.map_err(|e| Error::IO {
196                source: box_error(std::io::Error::other(format!(
197                    "Failed to checkout latest: {}",
198                    e
199                ))),
200                location: location!(),
201            })?;
202        }
203
204        Ok(())
205    }
206}
207
208pub struct DatasetReadGuard<'a> {
209    guard: RwLockReadGuard<'a, Dataset>,
210}
211
212impl Deref for DatasetReadGuard<'_> {
213    type Target = Dataset;
214
215    fn deref(&self) -> &Self::Target {
216        &self.guard
217    }
218}
219
220pub struct DatasetWriteGuard<'a> {
221    guard: RwLockWriteGuard<'a, Dataset>,
222}
223
224impl Deref for DatasetWriteGuard<'_> {
225    type Target = Dataset;
226
227    fn deref(&self) -> &Self::Target {
228        &self.guard
229    }
230}
231
232impl DerefMut for DatasetWriteGuard<'_> {
233    fn deref_mut(&mut self) -> &mut Self::Target {
234        &mut self.guard
235    }
236}
237
238/// Manifest-based namespace implementation
239///
240/// Uses a special `__manifest` Lance table to track tables and nested namespaces.
241#[derive(Debug)]
242pub struct ManifestNamespace {
243    root: String,
244    storage_options: Option<HashMap<String, String>>,
245    #[allow(dead_code)]
246    session: Option<Arc<Session>>,
247    #[allow(dead_code)]
248    object_store: Arc<ObjectStore>,
249    #[allow(dead_code)]
250    base_path: Path,
251    manifest_dataset: DatasetConsistencyWrapper,
252    /// Whether directory listing is enabled in dual mode
253    /// If true, root namespace tables use {table_name}.lance naming
254    /// If false, they use namespace-prefixed names
255    dir_listing_enabled: bool,
256    /// Whether to perform inline optimization (compaction and indexing) on the __manifest table
257    /// after every write. Defaults to true.
258    inline_optimization_enabled: bool,
259}
260
261impl ManifestNamespace {
262    /// Create a new ManifestNamespace from an existing DirectoryNamespace
263    pub async fn from_directory(
264        root: String,
265        storage_options: Option<HashMap<String, String>>,
266        session: Option<Arc<Session>>,
267        object_store: Arc<ObjectStore>,
268        base_path: Path,
269        dir_listing_enabled: bool,
270        inline_optimization_enabled: bool,
271    ) -> Result<Self> {
272        let manifest_dataset =
273            Self::create_or_get_manifest(&root, &storage_options, session.clone()).await?;
274
275        Ok(Self {
276            root,
277            storage_options,
278            session,
279            object_store,
280            base_path,
281            manifest_dataset,
282            dir_listing_enabled,
283            inline_optimization_enabled,
284        })
285    }
286
287    /// Build object ID from namespace path and name
288    pub fn build_object_id(namespace: &[String], name: &str) -> String {
289        if namespace.is_empty() {
290            name.to_string()
291        } else {
292            let mut id = namespace.join(DELIMITER);
293            id.push_str(DELIMITER);
294            id.push_str(name);
295            id
296        }
297    }
298
299    /// Parse object ID into namespace path and name
300    pub fn parse_object_id(object_id: &str) -> (Vec<String>, String) {
301        let parts: Vec<&str> = object_id.split(DELIMITER).collect();
302        if parts.len() == 1 {
303            (Vec::new(), parts[0].to_string())
304        } else {
305            let namespace = parts[..parts.len() - 1]
306                .iter()
307                .map(|s| s.to_string())
308                .collect();
309            let name = parts[parts.len() - 1].to_string();
310            (namespace, name)
311        }
312    }
313
314    /// Split an object ID (table_id as vec of strings) into namespace and table name
315    fn split_object_id(table_id: &[String]) -> (Vec<String>, String) {
316        if table_id.len() == 1 {
317            (vec![], table_id[0].clone())
318        } else {
319            (
320                table_id[..table_id.len() - 1].to_vec(),
321                table_id[table_id.len() - 1].clone(),
322            )
323        }
324    }
325
326    /// Convert a table ID (vec of strings) to an object_id string
327    fn str_object_id(table_id: &[String]) -> String {
328        table_id.join(DELIMITER)
329    }
330
331    /// Generate a new directory name in format: <hash>_<object_id>
332    /// The hash is used to (1) optimize object store throughput,
333    /// (2) have high enough entropy in a short period of time to prevent issues like
334    /// failed table creation, delete and create new table of the same name, etc.
335    /// The object_id is added after the hash to ensure
336    /// dir name uniqueness and make debugging easier.
337    fn generate_dir_name(object_id: &str) -> String {
338        // Generate a random number for uniqueness
339        let random_num: u64 = rand::random();
340
341        // Create hash from random number + object_id
342        let mut hasher = DefaultHasher::new();
343        random_num.hash(&mut hasher);
344        object_id.hash(&mut hasher);
345        let hash = hasher.finish();
346
347        // Format as lowercase hex (8 characters - sufficient entropy for uniqueness)
348        format!("{:08x}_{}", (hash & 0xFFFFFFFF) as u32, object_id)
349    }
350
351    /// Construct a full URI from root and relative location
352    pub(crate) fn construct_full_uri(root: &str, relative_location: &str) -> Result<String> {
353        let mut base_url = lance_io::object_store::uri_to_url(root)?;
354
355        // Ensure the base URL has a trailing slash so that URL.join() appends
356        // rather than replaces the last path segment.
357        // Without this fix, "s3://bucket/path/subdir".join("table.lance")
358        // would incorrectly produce "s3://bucket/path/table.lance" (missing subdir).
359        if !base_url.path().ends_with('/') {
360            base_url.set_path(&format!("{}/", base_url.path()));
361        }
362
363        let full_url = base_url
364            .join(relative_location)
365            .map_err(|e| Error::InvalidInput {
366                source: format!(
367                    "Failed to join URI '{}' with '{}': {:?}",
368                    root, relative_location, e
369                )
370                .into(),
371                location: location!(),
372            })?;
373
374        Ok(full_url.to_string())
375    }
376
377    /// Perform inline optimization on the __manifest table.
378    ///
379    /// This method:
380    /// 1. Creates three indexes on the manifest table:
381    ///    - BTREE index on object_id for fast lookups
382    ///    - Bitmap index on object_type for filtering by type
383    ///    - LabelList index on base_objects for view dependencies
384    /// 2. Runs file compaction to merge small files
385    /// 3. Optimizes existing indices
386    ///
387    /// This is called automatically after writes when inline_optimization_enabled is true.
388    async fn run_inline_optimization(&self) -> Result<()> {
389        if !self.inline_optimization_enabled {
390            return Ok(());
391        }
392
393        // Get a mutable reference to the dataset to perform optimization
394        let mut dataset_guard = self.manifest_dataset.get_mut().await?;
395        let dataset: &mut Dataset = &mut dataset_guard;
396
397        // Step 1: Create indexes if they don't already exist
398        let indices = dataset.load_indices().await?;
399
400        // Check which indexes already exist
401        let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
402        let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
403        let has_base_objects_index = indices
404            .iter()
405            .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);
406
407        // Create BTREE index on object_id
408        if !has_object_id_index {
409            log::debug!(
410                "Creating BTREE index '{}' on object_id for __manifest table",
411                OBJECT_ID_INDEX_NAME
412            );
413            let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
414            if let Err(e) = dataset
415                .create_index(
416                    &["object_id"],
417                    IndexType::BTree,
418                    Some(OBJECT_ID_INDEX_NAME.to_string()),
419                    &params,
420                    true,
421                )
422                .await
423            {
424                log::warn!("Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.", e);
425            } else {
426                log::info!(
427                    "Created BTREE index '{}' on object_id for __manifest table",
428                    OBJECT_ID_INDEX_NAME
429                );
430            }
431        }
432
433        // Create Bitmap index on object_type
434        if !has_object_type_index {
435            log::debug!(
436                "Creating Bitmap index '{}' on object_type for __manifest table",
437                OBJECT_TYPE_INDEX_NAME
438            );
439            let params = ScalarIndexParams::default();
440            if let Err(e) = dataset
441                .create_index(
442                    &["object_type"],
443                    IndexType::Bitmap,
444                    Some(OBJECT_TYPE_INDEX_NAME.to_string()),
445                    &params,
446                    true,
447                )
448                .await
449            {
450                log::warn!("Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.", e);
451            } else {
452                log::info!(
453                    "Created Bitmap index '{}' on object_type for __manifest table",
454                    OBJECT_TYPE_INDEX_NAME
455                );
456            }
457        }
458
459        // Create LabelList index on base_objects
460        if !has_base_objects_index {
461            log::debug!(
462                "Creating LabelList index '{}' on base_objects for __manifest table",
463                BASE_OBJECTS_INDEX_NAME
464            );
465            let params = ScalarIndexParams::default();
466            if let Err(e) = dataset
467                .create_index(
468                    &["base_objects"],
469                    IndexType::LabelList,
470                    Some(BASE_OBJECTS_INDEX_NAME.to_string()),
471                    &params,
472                    true,
473                )
474                .await
475            {
476                log::warn!("Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.", e);
477            } else {
478                log::info!(
479                    "Created LabelList index '{}' on base_objects for __manifest table",
480                    BASE_OBJECTS_INDEX_NAME
481                );
482            }
483        }
484
485        // Step 2: Run file compaction
486        log::debug!("Running file compaction on __manifest table");
487        match compact_files(dataset, CompactionOptions::default(), None).await {
488            Ok(compaction_metrics) => {
489                if compaction_metrics.fragments_removed > 0 {
490                    log::info!(
491                        "Compacted __manifest table: removed {} fragments, added {} fragments",
492                        compaction_metrics.fragments_removed,
493                        compaction_metrics.fragments_added
494                    );
495                }
496            }
497            Err(e) => {
498                log::warn!("Failed to compact files for __manifest table: {:?}. Continuing with optimization.", e);
499            }
500        }
501
502        // Step 3: Optimize indices
503        log::debug!("Optimizing indices on __manifest table");
504        match dataset.optimize_indices(&OptimizeOptions::default()).await {
505            Ok(_) => {
506                log::info!("Successfully optimized indices on __manifest table");
507            }
508            Err(e) => {
509                log::warn!(
510                    "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.",
511                    e
512                );
513            }
514        }
515
516        Ok(())
517    }
518
519    /// Get the manifest schema
520    fn manifest_schema() -> Arc<ArrowSchema> {
521        Arc::new(ArrowSchema::new(vec![
522            Field::new("object_id", DataType::Utf8, false),
523            Field::new("object_type", DataType::Utf8, false),
524            Field::new("location", DataType::Utf8, true),
525            Field::new("metadata", DataType::Utf8, true),
526            Field::new(
527                "base_objects",
528                DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))),
529                true,
530            ),
531        ]))
532    }
533
534    /// Get a scanner for the manifest dataset
535    async fn manifest_scanner(&self) -> Result<Scanner> {
536        let dataset_guard = self.manifest_dataset.get().await?;
537        Ok(dataset_guard.scan())
538    }
539
540    /// Helper to execute a scanner and collect results into a Vec
541    async fn execute_scanner(scanner: Scanner) -> Result<Vec<RecordBatch>> {
542        let mut stream = scanner.try_into_stream().await.map_err(|e| Error::IO {
543            source: box_error(std::io::Error::other(format!(
544                "Failed to create stream: {}",
545                e
546            ))),
547            location: location!(),
548        })?;
549
550        let mut batches = Vec::new();
551        while let Some(batch) = stream.next().await {
552            batches.push(batch.map_err(|e| Error::IO {
553                source: box_error(std::io::Error::other(format!(
554                    "Failed to read batch: {}",
555                    e
556                ))),
557                location: location!(),
558            })?);
559        }
560
561        Ok(batches)
562    }
563
564    /// Helper to get a string column from a record batch
565    fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> {
566        let column = batch
567            .column_by_name(column_name)
568            .ok_or_else(|| Error::io(format!("Column '{}' not found", column_name), location!()))?;
569        column
570            .as_any()
571            .downcast_ref::<StringArray>()
572            .ok_or_else(|| {
573                Error::io(
574                    format!("Column '{}' is not a string array", column_name),
575                    location!(),
576                )
577            })
578    }
579
580    /// Check if the manifest contains an object with the given ID
581    async fn manifest_contains_object(&self, object_id: &str) -> Result<bool> {
582        let filter = format!("object_id = '{}'", object_id);
583
584        let dataset_guard = self.manifest_dataset.get().await?;
585        let mut scanner = dataset_guard.scan();
586
587        scanner.filter(&filter).map_err(|e| Error::IO {
588            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
589            location: location!(),
590        })?;
591
592        // Project no columns and enable row IDs for count_rows to work
593        scanner.project::<&str>(&[]).map_err(|e| Error::IO {
594            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
595            location: location!(),
596        })?;
597
598        scanner.with_row_id();
599
600        let count = scanner.count_rows().await.map_err(|e| Error::IO {
601            source: box_error(std::io::Error::other(format!(
602                "Failed to count rows: {}",
603                e
604            ))),
605            location: location!(),
606        })?;
607
608        Ok(count > 0)
609    }
610
611    /// Query the manifest for a table with the given object ID
612    async fn query_manifest_for_table(&self, object_id: &str) -> Result<Option<TableInfo>> {
613        let filter = format!("object_id = '{}' AND object_type = 'table'", object_id);
614        let mut scanner = self.manifest_scanner().await?;
615        scanner.filter(&filter).map_err(|e| Error::IO {
616            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
617            location: location!(),
618        })?;
619        scanner
620            .project(&["object_id", "location"])
621            .map_err(|e| Error::IO {
622                source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
623                location: location!(),
624            })?;
625        let batches = Self::execute_scanner(scanner).await?;
626
627        let mut found_result: Option<TableInfo> = None;
628        let mut total_rows = 0;
629
630        for batch in batches {
631            if batch.num_rows() == 0 {
632                continue;
633            }
634
635            total_rows += batch.num_rows();
636            if total_rows > 1 {
637                return Err(Error::io(
638                    format!(
639                        "Expected exactly 1 table with id '{}', found {}",
640                        object_id, total_rows
641                    ),
642                    location!(),
643                ));
644            }
645
646            let object_id_array = Self::get_string_column(&batch, "object_id")?;
647            let location_array = Self::get_string_column(&batch, "location")?;
648            let location = location_array.value(0).to_string();
649            let (namespace, name) = Self::parse_object_id(object_id_array.value(0));
650            found_result = Some(TableInfo {
651                namespace,
652                name,
653                location,
654            });
655        }
656
657        Ok(found_result)
658    }
659
660    /// List all table locations in the manifest (for root namespace only)
661    /// Returns a set of table locations (e.g., "table_name.lance")
662    pub async fn list_manifest_table_locations(&self) -> Result<std::collections::HashSet<String>> {
663        let filter = "object_type = 'table' AND NOT contains(object_id, '$')";
664        let mut scanner = self.manifest_scanner().await?;
665        scanner.filter(filter).map_err(|e| Error::IO {
666            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
667            location: location!(),
668        })?;
669        scanner.project(&["location"]).map_err(|e| Error::IO {
670            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
671            location: location!(),
672        })?;
673
674        let batches = Self::execute_scanner(scanner).await?;
675        let mut locations = std::collections::HashSet::new();
676
677        for batch in batches {
678            if batch.num_rows() == 0 {
679                continue;
680            }
681            let location_array = Self::get_string_column(&batch, "location")?;
682            for i in 0..location_array.len() {
683                locations.insert(location_array.value(i).to_string());
684            }
685        }
686
687        Ok(locations)
688    }
689
690    /// Insert an entry into the manifest table
691    async fn insert_into_manifest(
692        &self,
693        object_id: String,
694        object_type: ObjectType,
695        location: Option<String>,
696    ) -> Result<()> {
697        self.insert_into_manifest_with_metadata(object_id, object_type, location, None, None)
698            .await
699    }
700
701    /// Insert an entry into the manifest table with metadata and base_objects
702    async fn insert_into_manifest_with_metadata(
703        &self,
704        object_id: String,
705        object_type: ObjectType,
706        location: Option<String>,
707        metadata: Option<String>,
708        base_objects: Option<Vec<String>>,
709    ) -> Result<()> {
710        use arrow::array::builder::{ListBuilder, StringBuilder};
711
712        let schema = Self::manifest_schema();
713
714        // Create base_objects array from the provided list
715        let string_builder = StringBuilder::new();
716        let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
717            "object_id",
718            DataType::Utf8,
719            true,
720        )));
721
722        match base_objects {
723            Some(objects) => {
724                for obj in objects {
725                    list_builder.values().append_value(obj);
726                }
727                list_builder.append(true);
728            }
729            None => {
730                list_builder.append_null();
731            }
732        }
733
734        let base_objects_array = list_builder.finish();
735
736        // Create arrays with optional values
737        let location_array = match location {
738            Some(loc) => Arc::new(StringArray::from(vec![Some(loc)])),
739            None => Arc::new(StringArray::from(vec![None::<String>])),
740        };
741
742        let metadata_array = match metadata {
743            Some(meta) => Arc::new(StringArray::from(vec![Some(meta)])),
744            None => Arc::new(StringArray::from(vec![None::<String>])),
745        };
746
747        let batch = RecordBatch::try_new(
748            schema.clone(),
749            vec![
750                Arc::new(StringArray::from(vec![object_id.as_str()])),
751                Arc::new(StringArray::from(vec![object_type.as_str()])),
752                location_array,
753                metadata_array,
754                Arc::new(base_objects_array),
755            ],
756        )
757        .map_err(|e| {
758            Error::io(
759                format!("Failed to create manifest entry: {}", e),
760                location!(),
761            )
762        })?;
763
764        let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
765
766        // Use MergeInsert to ensure uniqueness on object_id
767        let dataset_guard = self.manifest_dataset.get().await?;
768        let dataset_arc = Arc::new(dataset_guard.clone());
769        drop(dataset_guard); // Drop read guard before merge insert
770
771        let mut merge_builder =
772            lance::dataset::MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()])
773                .map_err(|e| Error::IO {
774                    source: box_error(std::io::Error::other(format!(
775                        "Failed to create merge builder: {}",
776                        e
777                    ))),
778                    location: location!(),
779                })?;
780
781        merge_builder.when_matched(lance::dataset::WhenMatched::Fail);
782        merge_builder.when_not_matched(lance::dataset::WhenNotMatched::InsertAll);
783
784        let (new_dataset_arc, _merge_stats) = merge_builder
785            .try_build()
786            .map_err(|e| Error::IO {
787                source: box_error(std::io::Error::other(format!(
788                    "Failed to build merge: {}",
789                    e
790                ))),
791                location: location!(),
792            })?
793            .execute_reader(Box::new(reader))
794            .await
795            .map_err(|e| {
796                // Check if this is a "matched row" error from WhenMatched::Fail
797                let error_msg = e.to_string();
798                if error_msg.contains("matched")
799                    || error_msg.contains("duplicate")
800                    || error_msg.contains("already exists")
801                {
802                    Error::io(
803                        format!("Object with id '{}' already exists in manifest", object_id),
804                        location!(),
805                    )
806                } else {
807                    Error::IO {
808                        source: box_error(std::io::Error::other(format!(
809                            "Failed to execute merge: {}",
810                            e
811                        ))),
812                        location: location!(),
813                    }
814                }
815            })?;
816
817        let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
818        self.manifest_dataset.set_latest(new_dataset).await;
819
820        // Run inline optimization after write
821        if let Err(e) = self.run_inline_optimization().await {
822            log::warn!(
823                "Unexpected failure when running inline optimization: {:?}",
824                e
825            );
826        }
827
828        Ok(())
829    }
830
831    /// Delete an entry from the manifest table
832    pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> {
833        {
834            let predicate = format!("object_id = '{}'", object_id);
835            let mut dataset_guard = self.manifest_dataset.get_mut().await?;
836            dataset_guard
837                .delete(&predicate)
838                .await
839                .map_err(|e| Error::IO {
840                    source: box_error(std::io::Error::other(format!("Failed to delete: {}", e))),
841                    location: location!(),
842                })?;
843        } // Drop the guard here
844
845        self.manifest_dataset.reload().await?;
846
847        // Run inline optimization after delete
848        if let Err(e) = self.run_inline_optimization().await {
849            log::warn!(
850                "Unexpected failure when running inline optimization: {:?}",
851                e
852            );
853        }
854
855        Ok(())
856    }
857
858    /// Register a table in the manifest without creating the physical table (internal helper for migration)
859    pub async fn register_table(&self, name: &str, location: String) -> Result<()> {
860        let object_id = Self::build_object_id(&[], name);
861        if self.manifest_contains_object(&object_id).await? {
862            return Err(Error::io(
863                format!("Table '{}' already exists", name),
864                location!(),
865            ));
866        }
867
868        self.insert_into_manifest(object_id, ObjectType::Table, Some(location))
869            .await
870    }
871
872    /// Validate that all levels of a namespace path exist
873    async fn validate_namespace_levels_exist(&self, namespace_path: &[String]) -> Result<()> {
874        for i in 1..=namespace_path.len() {
875            let partial_path = &namespace_path[..i];
876            let object_id = partial_path.join(DELIMITER);
877            if !self.manifest_contains_object(&object_id).await? {
878                return Err(Error::Namespace {
879                    source: format!("Parent namespace '{}' does not exist", object_id).into(),
880                    location: location!(),
881                });
882            }
883        }
884        Ok(())
885    }
886
887    /// Query the manifest for a namespace with the given object ID
888    async fn query_manifest_for_namespace(&self, object_id: &str) -> Result<Option<NamespaceInfo>> {
889        let filter = format!("object_id = '{}' AND object_type = 'namespace'", object_id);
890        let mut scanner = self.manifest_scanner().await?;
891        scanner.filter(&filter).map_err(|e| Error::IO {
892            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
893            location: location!(),
894        })?;
895        scanner
896            .project(&["object_id", "metadata"])
897            .map_err(|e| Error::IO {
898                source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
899                location: location!(),
900            })?;
901        let batches = Self::execute_scanner(scanner).await?;
902
903        let mut found_result: Option<NamespaceInfo> = None;
904        let mut total_rows = 0;
905
906        for batch in batches {
907            if batch.num_rows() == 0 {
908                continue;
909            }
910
911            total_rows += batch.num_rows();
912            if total_rows > 1 {
913                return Err(Error::io(
914                    format!(
915                        "Expected exactly 1 namespace with id '{}', found {}",
916                        object_id, total_rows
917                    ),
918                    location!(),
919                ));
920            }
921
922            let object_id_array = Self::get_string_column(&batch, "object_id")?;
923            let metadata_array = Self::get_string_column(&batch, "metadata")?;
924
925            let object_id_str = object_id_array.value(0);
926            let metadata = if !metadata_array.is_null(0) {
927                let metadata_str = metadata_array.value(0);
928                match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
929                    Ok(map) => Some(map),
930                    Err(e) => {
931                        return Err(Error::io(
932                            format!(
933                                "Failed to deserialize metadata for namespace '{}': {}",
934                                object_id, e
935                            ),
936                            location!(),
937                        ));
938                    }
939                }
940            } else {
941                None
942            };
943
944            let (namespace, name) = Self::parse_object_id(object_id_str);
945            found_result = Some(NamespaceInfo {
946                namespace,
947                name,
948                metadata,
949            });
950        }
951
952        Ok(found_result)
953    }
954
955    /// Create or get the manifest dataset
956    async fn create_or_get_manifest(
957        root: &str,
958        storage_options: &Option<HashMap<String, String>>,
959        session: Option<Arc<Session>>,
960    ) -> Result<DatasetConsistencyWrapper> {
961        let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME);
962        log::debug!("Attempting to load manifest from {}", manifest_path);
963        let mut builder = DatasetBuilder::from_uri(&manifest_path);
964
965        if let Some(sess) = session.clone() {
966            builder = builder.with_session(sess);
967        }
968
969        if let Some(opts) = storage_options {
970            builder = builder.with_storage_options(opts.clone());
971        }
972
973        let dataset_result = builder.load().await;
974        if let Ok(dataset) = dataset_result {
975            Ok(DatasetConsistencyWrapper::new(dataset))
976        } else {
977            log::info!("Creating new manifest table at {}", manifest_path);
978            let schema = Self::manifest_schema();
979            let empty_batch = RecordBatch::new_empty(schema.clone());
980            let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone());
981
982            let write_params = WriteParams {
983                session,
984                store_params: storage_options.as_ref().map(|opts| ObjectStoreParams {
985                    storage_options_accessor: Some(Arc::new(
986                        lance_io::object_store::StorageOptionsAccessor::with_static_options(
987                            opts.clone(),
988                        ),
989                    )),
990                    ..Default::default()
991                }),
992                ..Default::default()
993            };
994
995            let dataset = Dataset::write(Box::new(reader), &manifest_path, Some(write_params))
996                .await
997                .map_err(|e| Error::IO {
998                    source: box_error(std::io::Error::other(format!(
999                        "Failed to create manifest dataset: {}",
1000                        e
1001                    ))),
1002                    location: location!(),
1003                })?;
1004
1005            log::info!(
1006                "Successfully created manifest table at {}, version={}, uri={}",
1007                manifest_path,
1008                dataset.version().version,
1009                dataset.uri()
1010            );
1011            Ok(DatasetConsistencyWrapper::new(dataset))
1012        }
1013    }
1014}
1015
1016#[async_trait]
1017impl LanceNamespace for ManifestNamespace {
1018    fn namespace_id(&self) -> String {
1019        self.root.clone()
1020    }
1021
1022    async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1023        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1024            source: "Namespace ID is required".into(),
1025            location: location!(),
1026        })?;
1027
1028        // Build filter to find tables in this namespace
1029        let filter = if namespace_id.is_empty() {
1030            // Root namespace: find tables without a namespace prefix
1031            "object_type = 'table' AND NOT contains(object_id, '$')".to_string()
1032        } else {
1033            // Namespaced: find tables that start with namespace$ but have no additional $
1034            let prefix = namespace_id.join(DELIMITER);
1035            format!(
1036                "object_type = 'table' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1037                prefix, DELIMITER, prefix.len() + 2
1038            )
1039        };
1040
1041        let mut scanner = self.manifest_scanner().await?;
1042        scanner.filter(&filter).map_err(|e| Error::IO {
1043            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1044            location: location!(),
1045        })?;
1046        scanner.project(&["object_id"]).map_err(|e| Error::IO {
1047            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1048            location: location!(),
1049        })?;
1050
1051        let batches = Self::execute_scanner(scanner).await?;
1052
1053        let mut tables = Vec::new();
1054        for batch in batches {
1055            if batch.num_rows() == 0 {
1056                continue;
1057            }
1058
1059            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1060            for i in 0..batch.num_rows() {
1061                let object_id = object_id_array.value(i);
1062                let (_namespace, name) = Self::parse_object_id(object_id);
1063                tables.push(name);
1064            }
1065        }
1066
1067        Ok(ListTablesResponse::new(tables))
1068    }
1069
1070    async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1071        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1072            source: "Table ID is required".into(),
1073            location: location!(),
1074        })?;
1075
1076        if table_id.is_empty() {
1077            return Err(Error::InvalidInput {
1078                source: "Table ID cannot be empty".into(),
1079                location: location!(),
1080            });
1081        }
1082
1083        let object_id = Self::str_object_id(table_id);
1084        let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
1085
1086        // Extract table name and namespace from table_id
1087        let table_name = table_id.last().cloned().unwrap_or_default();
1088        let namespace_id: Vec<String> = if table_id.len() > 1 {
1089            table_id[..table_id.len() - 1].to_vec()
1090        } else {
1091            vec![]
1092        };
1093
1094        let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
1095        // For backwards compatibility, only skip vending credentials when explicitly set to false
1096        let vend_credentials = request.vend_credentials.unwrap_or(true);
1097
1098        match table_info {
1099            Some(info) => {
1100                // Construct full URI from relative location
1101                let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1102
1103                let storage_options = if vend_credentials {
1104                    self.storage_options.clone()
1105                } else {
1106                    None
1107                };
1108
1109                // If not loading detailed metadata, return minimal response with just location
1110                if !load_detailed_metadata {
1111                    return Ok(DescribeTableResponse {
1112                        table: Some(table_name),
1113                        namespace: Some(namespace_id),
1114                        location: Some(table_uri.clone()),
1115                        table_uri: Some(table_uri),
1116                        storage_options,
1117                        ..Default::default()
1118                    });
1119                }
1120
1121                // Try to open the dataset to get version and schema
1122                match Dataset::open(&table_uri).await {
1123                    Ok(mut dataset) => {
1124                        // If a specific version is requested, checkout that version
1125                        if let Some(requested_version) = request.version {
1126                            dataset = dataset.checkout_version(requested_version as u64).await?;
1127                        }
1128
1129                        let version = dataset.version().version;
1130                        let lance_schema = dataset.schema();
1131                        let arrow_schema: arrow_schema::Schema = lance_schema.into();
1132                        let json_schema = arrow_schema_to_json(&arrow_schema)?;
1133
1134                        Ok(DescribeTableResponse {
1135                            table: Some(table_name.clone()),
1136                            namespace: Some(namespace_id.clone()),
1137                            version: Some(version as i64),
1138                            location: Some(table_uri.clone()),
1139                            table_uri: Some(table_uri),
1140                            schema: Some(Box::new(json_schema)),
1141                            storage_options,
1142                            ..Default::default()
1143                        })
1144                    }
1145                    Err(_) => {
1146                        // If dataset can't be opened (e.g., empty table), return minimal info
1147                        Ok(DescribeTableResponse {
1148                            table: Some(table_name),
1149                            namespace: Some(namespace_id),
1150                            location: Some(table_uri.clone()),
1151                            table_uri: Some(table_uri),
1152                            storage_options,
1153                            ..Default::default()
1154                        })
1155                    }
1156                }
1157            }
1158            None => Err(Error::Namespace {
1159                source: format!("Table '{}' not found", object_id).into(),
1160                location: location!(),
1161            }),
1162        }
1163    }
1164
1165    async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1166        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1167            source: "Table ID is required".into(),
1168            location: location!(),
1169        })?;
1170
1171        if table_id.is_empty() {
1172            return Err(Error::InvalidInput {
1173                source: "Table ID cannot be empty".into(),
1174                location: location!(),
1175            });
1176        }
1177
1178        let (namespace, table_name) = Self::split_object_id(table_id);
1179        let object_id = Self::build_object_id(&namespace, &table_name);
1180        let exists = self.manifest_contains_object(&object_id).await?;
1181        if exists {
1182            Ok(())
1183        } else {
1184            Err(Error::Namespace {
1185                source: format!("Table '{}' not found", table_name).into(),
1186                location: location!(),
1187            })
1188        }
1189    }
1190
1191    async fn create_table(
1192        &self,
1193        request: CreateTableRequest,
1194        data: Bytes,
1195    ) -> Result<CreateTableResponse> {
1196        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1197            source: "Table ID is required".into(),
1198            location: location!(),
1199        })?;
1200
1201        if table_id.is_empty() {
1202            return Err(Error::InvalidInput {
1203                source: "Table ID cannot be empty".into(),
1204                location: location!(),
1205            });
1206        }
1207
1208        let (namespace, table_name) = Self::split_object_id(table_id);
1209        let object_id = Self::build_object_id(&namespace, &table_name);
1210
1211        // Check if table already exists in manifest
1212        if self.manifest_contains_object(&object_id).await? {
1213            return Err(Error::io(
1214                format!("Table '{}' already exists", table_name),
1215                location!(),
1216            ));
1217        }
1218
1219        // Create the physical table location with hash-based naming
1220        // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance
1221        // Otherwise, use hash-based naming: {hash}_{object_id}
1222        let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1223            // Root table with directory listing enabled: use {table_name}.lance
1224            format!("{}.lance", table_name)
1225        } else {
1226            // Child namespace table or dir listing disabled: use hash-based naming
1227            Self::generate_dir_name(&object_id)
1228        };
1229        let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1230
1231        // Validate that request_data is provided
1232        if data.is_empty() {
1233            return Err(Error::Namespace {
1234                source: "Request data (Arrow IPC stream) is required for create_table".into(),
1235                location: location!(),
1236            });
1237        }
1238
1239        // Write the data using Lance Dataset
1240        let cursor = Cursor::new(data.to_vec());
1241        let stream_reader = StreamReader::try_new(cursor, None)
1242            .map_err(|e| Error::io(format!("Failed to read IPC stream: {}", e), location!()))?;
1243
1244        let batches: Vec<RecordBatch> =
1245            stream_reader
1246                .collect::<std::result::Result<Vec<_>, _>>()
1247                .map_err(|e| Error::io(format!("Failed to collect batches: {}", e), location!()))?;
1248
1249        if batches.is_empty() {
1250            return Err(Error::io(
1251                "No data provided for table creation",
1252                location!(),
1253            ));
1254        }
1255
1256        let schema = batches[0].schema();
1257        let batch_results: Vec<std::result::Result<RecordBatch, arrow_schema::ArrowError>> =
1258            batches.into_iter().map(Ok).collect();
1259        let reader = RecordBatchIterator::new(batch_results, schema);
1260
1261        let write_params = WriteParams::default();
1262        let _dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params))
1263            .await
1264            .map_err(|e| Error::IO {
1265                source: box_error(std::io::Error::other(format!(
1266                    "Failed to write dataset: {}",
1267                    e
1268                ))),
1269                location: location!(),
1270            })?;
1271
1272        // Register in manifest (store dir_name, not full URI)
1273        self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1274            .await?;
1275
1276        Ok(CreateTableResponse {
1277            version: Some(1),
1278            location: Some(table_uri),
1279            storage_options: self.storage_options.clone(),
1280            ..Default::default()
1281        })
1282    }
1283
1284    async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1285        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1286            source: "Table ID is required".into(),
1287            location: location!(),
1288        })?;
1289
1290        if table_id.is_empty() {
1291            return Err(Error::InvalidInput {
1292                source: "Table ID cannot be empty".into(),
1293                location: location!(),
1294            });
1295        }
1296
1297        let (namespace, table_name) = Self::split_object_id(table_id);
1298        let object_id = Self::build_object_id(&namespace, &table_name);
1299
1300        // Query manifest for table location
1301        let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
1302
1303        match table_info {
1304            Some(info) => {
1305                // Delete from manifest first
1306                self.delete_from_manifest(&object_id).boxed().await?;
1307
1308                // Delete physical data directory using the dir_name from manifest
1309                let table_path = self.base_path.child(info.location.as_str());
1310                let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1311
1312                // Remove the table directory
1313                self.object_store
1314                    .remove_dir_all(table_path)
1315                    .boxed()
1316                    .await
1317                    .map_err(|e| Error::Namespace {
1318                        source: format!("Failed to delete table directory: {}", e).into(),
1319                        location: location!(),
1320                    })?;
1321
1322                Ok(DropTableResponse {
1323                    id: request.id.clone(),
1324                    location: Some(table_uri),
1325                    ..Default::default()
1326                })
1327            }
1328            None => Err(Error::Namespace {
1329                source: format!("Table '{}' not found", table_name).into(),
1330                location: location!(),
1331            }),
1332        }
1333    }
1334
1335    async fn list_namespaces(
1336        &self,
1337        request: ListNamespacesRequest,
1338    ) -> Result<ListNamespacesResponse> {
1339        let parent_namespace = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1340            source: "Namespace ID is required".into(),
1341            location: location!(),
1342        })?;
1343
1344        // Build filter to find direct child namespaces
1345        let filter = if parent_namespace.is_empty() {
1346            // Root namespace: find all namespaces without a parent
1347            "object_type = 'namespace' AND NOT contains(object_id, '$')".to_string()
1348        } else {
1349            // Non-root: find namespaces that start with parent$ but have no additional $
1350            let prefix = parent_namespace.join(DELIMITER);
1351            format!(
1352                "object_type = 'namespace' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1353                prefix, DELIMITER, prefix.len() + 2
1354            )
1355        };
1356
1357        let mut scanner = self.manifest_scanner().await?;
1358        scanner.filter(&filter).map_err(|e| Error::IO {
1359            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1360            location: location!(),
1361        })?;
1362        scanner.project(&["object_id"]).map_err(|e| Error::IO {
1363            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1364            location: location!(),
1365        })?;
1366
1367        let batches = Self::execute_scanner(scanner).await?;
1368        let mut namespaces = Vec::new();
1369
1370        for batch in batches {
1371            if batch.num_rows() == 0 {
1372                continue;
1373            }
1374
1375            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1376            for i in 0..batch.num_rows() {
1377                let object_id = object_id_array.value(i);
1378                let (_namespace, name) = Self::parse_object_id(object_id);
1379                namespaces.push(name);
1380            }
1381        }
1382
1383        Ok(ListNamespacesResponse::new(namespaces))
1384    }
1385
1386    async fn describe_namespace(
1387        &self,
1388        request: DescribeNamespaceRequest,
1389    ) -> Result<DescribeNamespaceResponse> {
1390        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1391            source: "Namespace ID is required".into(),
1392            location: location!(),
1393        })?;
1394
1395        // Root namespace always exists
1396        if namespace_id.is_empty() {
1397            #[allow(clippy::needless_update)]
1398            return Ok(DescribeNamespaceResponse {
1399                properties: Some(HashMap::new()),
1400                ..Default::default()
1401            });
1402        }
1403
1404        // Check if namespace exists in manifest
1405        let object_id = namespace_id.join(DELIMITER);
1406        let namespace_info = self.query_manifest_for_namespace(&object_id).await?;
1407
1408        match namespace_info {
1409            #[allow(clippy::needless_update)]
1410            Some(info) => Ok(DescribeNamespaceResponse {
1411                properties: info.metadata,
1412                ..Default::default()
1413            }),
1414            None => Err(Error::Namespace {
1415                source: format!("Namespace '{}' not found", object_id).into(),
1416                location: location!(),
1417            }),
1418        }
1419    }
1420
1421    async fn create_namespace(
1422        &self,
1423        request: CreateNamespaceRequest,
1424    ) -> Result<CreateNamespaceResponse> {
1425        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1426            source: "Namespace ID is required".into(),
1427            location: location!(),
1428        })?;
1429
1430        // Root namespace always exists and cannot be created
1431        if namespace_id.is_empty() {
1432            return Err(Error::Namespace {
1433                source: "Root namespace already exists and cannot be created".into(),
1434                location: location!(),
1435            });
1436        }
1437
1438        // Validate parent namespaces exist (but not the namespace being created)
1439        if namespace_id.len() > 1 {
1440            self.validate_namespace_levels_exist(&namespace_id[..namespace_id.len() - 1])
1441                .await?;
1442        }
1443
1444        let object_id = namespace_id.join(DELIMITER);
1445        if self.manifest_contains_object(&object_id).await? {
1446            return Err(Error::Namespace {
1447                source: format!("Namespace '{}' already exists", object_id).into(),
1448                location: location!(),
1449            });
1450        }
1451
1452        // Serialize properties if provided
1453        let metadata = request.properties.as_ref().and_then(|props| {
1454            if props.is_empty() {
1455                None
1456            } else {
1457                Some(serde_json::to_string(props).ok()?)
1458            }
1459        });
1460
1461        self.insert_into_manifest_with_metadata(
1462            object_id,
1463            ObjectType::Namespace,
1464            None,
1465            metadata,
1466            None,
1467        )
1468        .await?;
1469
1470        Ok(CreateNamespaceResponse {
1471            properties: request.properties,
1472            ..Default::default()
1473        })
1474    }
1475
1476    async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
1477        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1478            source: "Namespace ID is required".into(),
1479            location: location!(),
1480        })?;
1481
1482        // Root namespace always exists and cannot be dropped
1483        if namespace_id.is_empty() {
1484            return Err(Error::Namespace {
1485                source: "Root namespace cannot be dropped".into(),
1486                location: location!(),
1487            });
1488        }
1489
1490        let object_id = namespace_id.join(DELIMITER);
1491
1492        // Check if namespace exists
1493        if !self.manifest_contains_object(&object_id).boxed().await? {
1494            return Err(Error::Namespace {
1495                source: format!("Namespace '{}' not found", object_id).into(),
1496                location: location!(),
1497            });
1498        }
1499
1500        // Check for child namespaces
1501        let prefix = format!("{}{}", object_id, DELIMITER);
1502        let filter = format!("starts_with(object_id, '{}')", prefix);
1503        let mut scanner = self.manifest_scanner().boxed().await?;
1504        scanner.filter(&filter).map_err(|e| Error::IO {
1505            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1506            location: location!(),
1507        })?;
1508        scanner.project::<&str>(&[]).map_err(|e| Error::IO {
1509            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1510            location: location!(),
1511        })?;
1512        scanner.with_row_id();
1513        let count = scanner.count_rows().boxed().await.map_err(|e| Error::IO {
1514            source: box_error(std::io::Error::other(format!(
1515                "Failed to count rows: {}",
1516                e
1517            ))),
1518            location: location!(),
1519        })?;
1520
1521        if count > 0 {
1522            return Err(Error::Namespace {
1523                source: format!(
1524                    "Namespace '{}' is not empty (contains {} child objects)",
1525                    object_id, count
1526                )
1527                .into(),
1528                location: location!(),
1529            });
1530        }
1531
1532        self.delete_from_manifest(&object_id).boxed().await?;
1533
1534        Ok(DropNamespaceResponse::default())
1535    }
1536
1537    async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
1538        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1539            source: "Namespace ID is required".into(),
1540            location: location!(),
1541        })?;
1542
1543        // Root namespace always exists
1544        if namespace_id.is_empty() {
1545            return Ok(());
1546        }
1547
1548        let object_id = namespace_id.join(DELIMITER);
1549        if self.manifest_contains_object(&object_id).await? {
1550            Ok(())
1551        } else {
1552            Err(Error::Namespace {
1553                source: format!("Namespace '{}' not found", object_id).into(),
1554                location: location!(),
1555            })
1556        }
1557    }
1558
1559    async fn create_empty_table(
1560        &self,
1561        request: CreateEmptyTableRequest,
1562    ) -> Result<CreateEmptyTableResponse> {
1563        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1564            source: "Table ID is required".into(),
1565            location: location!(),
1566        })?;
1567
1568        if table_id.is_empty() {
1569            return Err(Error::InvalidInput {
1570                source: "Table ID cannot be empty".into(),
1571                location: location!(),
1572            });
1573        }
1574
1575        let (namespace, table_name) = Self::split_object_id(table_id);
1576        let object_id = Self::build_object_id(&namespace, &table_name);
1577
1578        // Check if table already exists in manifest
1579        let existing = self.query_manifest_for_table(&object_id).await?;
1580        if existing.is_some() {
1581            return Err(Error::Namespace {
1582                source: format!("Table '{}' already exists", table_name).into(),
1583                location: location!(),
1584            });
1585        }
1586
1587        // Create table location path with hash-based naming
1588        // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance
1589        // Otherwise, use hash-based naming: {hash}_{object_id}
1590        let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1591            // Root table with directory listing enabled: use {table_name}.lance
1592            format!("{}.lance", table_name)
1593        } else {
1594            // Child namespace table or dir listing disabled: use hash-based naming
1595            Self::generate_dir_name(&object_id)
1596        };
1597        let table_path = self.base_path.child(dir_name.as_str());
1598        let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1599
1600        // Validate location if provided
1601        if let Some(req_location) = &request.location {
1602            let req_location = req_location.trim_end_matches('/');
1603            if req_location != table_uri {
1604                return Err(Error::Namespace {
1605                    source: format!(
1606                        "Cannot create table {} at location {}, must be at location {}",
1607                        table_name, req_location, table_uri
1608                    )
1609                    .into(),
1610                    location: location!(),
1611                });
1612            }
1613        }
1614
1615        // Create the .lance-reserved file to mark the table as existing
1616        let reserved_file_path = table_path.child(".lance-reserved");
1617
1618        self.object_store
1619            .create(&reserved_file_path)
1620            .await
1621            .map_err(|e| Error::Namespace {
1622                source: format!(
1623                    "Failed to create .lance-reserved file for table {}: {}",
1624                    table_name, e
1625                )
1626                .into(),
1627                location: location!(),
1628            })?
1629            .shutdown()
1630            .await
1631            .map_err(|e| Error::Namespace {
1632                source: format!(
1633                    "Failed to finalize .lance-reserved file for table {}: {}",
1634                    table_name, e
1635                )
1636                .into(),
1637                location: location!(),
1638            })?;
1639
1640        // Add entry to manifest marking this as an empty table (store dir_name, not full path)
1641        self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1642            .await?;
1643
1644        log::info!(
1645            "Created empty table '{}' in manifest at {}",
1646            table_name,
1647            table_uri
1648        );
1649
1650        // For backwards compatibility, only skip vending credentials when explicitly set to false
1651        let vend_credentials = request.vend_credentials.unwrap_or(true);
1652        let storage_options = if vend_credentials {
1653            self.storage_options.clone()
1654        } else {
1655            None
1656        };
1657
1658        Ok(CreateEmptyTableResponse {
1659            location: Some(table_uri),
1660            storage_options,
1661            ..Default::default()
1662        })
1663    }
1664
1665    async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
1666        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1667            source: "Table ID is required".into(),
1668            location: location!(),
1669        })?;
1670
1671        if table_id.is_empty() {
1672            return Err(Error::InvalidInput {
1673                source: "Table ID cannot be empty".into(),
1674                location: location!(),
1675            });
1676        }
1677
1678        let (namespace, table_name) = Self::split_object_id(table_id);
1679        let object_id = Self::build_object_id(&namespace, &table_name);
1680
1681        // Check if table already exists in manifest
1682        let existing = self.query_manifest_for_table(&object_id).await?;
1683        if existing.is_some() {
1684            return Err(Error::Namespace {
1685                source: format!("Table '{}' already exists", table_name).into(),
1686                location: location!(),
1687            });
1688        }
1689
1690        // Create table location path with hash-based naming
1691        // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance
1692        // Otherwise, use hash-based naming: {hash}_{object_id}
1693        let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1694            // Root table with directory listing enabled: use {table_name}.lance
1695            format!("{}.lance", table_name)
1696        } else {
1697            // Child namespace table or dir listing disabled: use hash-based naming
1698            Self::generate_dir_name(&object_id)
1699        };
1700        let table_path = self.base_path.child(dir_name.as_str());
1701        let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1702
1703        // Validate location if provided
1704        if let Some(req_location) = &request.location {
1705            let req_location = req_location.trim_end_matches('/');
1706            if req_location != table_uri {
1707                return Err(Error::Namespace {
1708                    source: format!(
1709                        "Cannot declare table {} at location {}, must be at location {}",
1710                        table_name, req_location, table_uri
1711                    )
1712                    .into(),
1713                    location: location!(),
1714                });
1715            }
1716        }
1717
1718        // Create the .lance-reserved file to mark the table as existing
1719        let reserved_file_path = table_path.child(".lance-reserved");
1720
1721        self.object_store
1722            .create(&reserved_file_path)
1723            .await
1724            .map_err(|e| Error::Namespace {
1725                source: format!(
1726                    "Failed to create .lance-reserved file for table {}: {}",
1727                    table_name, e
1728                )
1729                .into(),
1730                location: location!(),
1731            })?
1732            .shutdown()
1733            .await
1734            .map_err(|e| Error::Namespace {
1735                source: format!(
1736                    "Failed to finalize .lance-reserved file for table {}: {}",
1737                    table_name, e
1738                )
1739                .into(),
1740                location: location!(),
1741            })?;
1742
1743        // Add entry to manifest marking this as a declared table (store dir_name, not full path)
1744        self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1745            .await?;
1746
1747        log::info!(
1748            "Declared table '{}' in manifest at {}",
1749            table_name,
1750            table_uri
1751        );
1752
1753        // For backwards compatibility, only skip vending credentials when explicitly set to false
1754        let vend_credentials = request.vend_credentials.unwrap_or(true);
1755        let storage_options = if vend_credentials {
1756            self.storage_options.clone()
1757        } else {
1758            None
1759        };
1760
1761        Ok(DeclareTableResponse {
1762            location: Some(table_uri),
1763            storage_options,
1764            ..Default::default()
1765        })
1766    }
1767
1768    async fn register_table(&self, request: RegisterTableRequest) -> Result<RegisterTableResponse> {
1769        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1770            source: "Table ID is required".into(),
1771            location: location!(),
1772        })?;
1773
1774        if table_id.is_empty() {
1775            return Err(Error::InvalidInput {
1776                source: "Table ID cannot be empty".into(),
1777                location: location!(),
1778            });
1779        }
1780
1781        let location = request.location.clone();
1782
1783        // Validate that location is a relative path within the root directory
1784        // We don't allow absolute URIs or paths that escape the root
1785        if location.contains("://") {
1786            return Err(Error::InvalidInput {
1787                source: format!(
1788                    "Absolute URIs are not allowed for register_table. Location must be a relative path within the root directory: {}",
1789                    location
1790                ).into(),
1791                location: location!(),
1792            });
1793        }
1794
1795        if location.starts_with('/') {
1796            return Err(Error::InvalidInput {
1797                source: format!(
1798                    "Absolute paths are not allowed for register_table. Location must be a relative path within the root directory: {}",
1799                    location
1800                ).into(),
1801                location: location!(),
1802            });
1803        }
1804
1805        // Check for path traversal attempts
1806        if location.contains("..") {
1807            return Err(Error::InvalidInput {
1808                source: format!(
1809                    "Path traversal is not allowed. Location must be a relative path within the root directory: {}",
1810                    location
1811                ).into(),
1812                location: location!(),
1813            });
1814        }
1815
1816        let (namespace, table_name) = Self::split_object_id(table_id);
1817        let object_id = Self::build_object_id(&namespace, &table_name);
1818
1819        // Validate that parent namespaces exist (if not root)
1820        if !namespace.is_empty() {
1821            self.validate_namespace_levels_exist(&namespace).await?;
1822        }
1823
1824        // Check if table already exists
1825        if self.manifest_contains_object(&object_id).await? {
1826            return Err(Error::Namespace {
1827                source: format!("Table '{}' already exists", object_id).into(),
1828                location: location!(),
1829            });
1830        }
1831
1832        // Register the table with its location in the manifest
1833        self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone()))
1834            .await?;
1835
1836        Ok(RegisterTableResponse {
1837            location: Some(location),
1838            ..Default::default()
1839        })
1840    }
1841
1842    async fn deregister_table(
1843        &self,
1844        request: DeregisterTableRequest,
1845    ) -> Result<DeregisterTableResponse> {
1846        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1847            source: "Table ID is required".into(),
1848            location: location!(),
1849        })?;
1850
1851        if table_id.is_empty() {
1852            return Err(Error::InvalidInput {
1853                source: "Table ID cannot be empty".into(),
1854                location: location!(),
1855            });
1856        }
1857
1858        let (namespace, table_name) = Self::split_object_id(table_id);
1859        let object_id = Self::build_object_id(&namespace, &table_name);
1860
1861        // Get table info before deleting
1862        let table_info = self.query_manifest_for_table(&object_id).await?;
1863
1864        let table_uri = match table_info {
1865            Some(info) => {
1866                // Delete from manifest only (leave physical data intact)
1867                self.delete_from_manifest(&object_id).boxed().await?;
1868                Self::construct_full_uri(&self.root, &info.location)?
1869            }
1870            None => {
1871                return Err(Error::Namespace {
1872                    source: format!("Table '{}' not found", object_id).into(),
1873                    location: location!(),
1874                });
1875            }
1876        };
1877
1878        Ok(DeregisterTableResponse {
1879            id: request.id.clone(),
1880            location: Some(table_uri),
1881            ..Default::default()
1882        })
1883    }
1884}
1885
1886#[cfg(test)]
1887mod tests {
1888    use crate::{DirectoryNamespaceBuilder, ManifestNamespace};
1889    use bytes::Bytes;
1890    use lance_core::utils::tempfile::TempStdDir;
1891    use lance_namespace::models::{
1892        CreateTableRequest, DescribeTableRequest, DropTableRequest, ListTablesRequest,
1893        TableExistsRequest,
1894    };
1895    use lance_namespace::LanceNamespace;
1896    use rstest::rstest;
1897
1898    fn create_test_ipc_data() -> Vec<u8> {
1899        use arrow::array::{Int32Array, StringArray};
1900        use arrow::datatypes::{DataType, Field, Schema};
1901        use arrow::ipc::writer::StreamWriter;
1902        use arrow::record_batch::RecordBatch;
1903        use std::sync::Arc;
1904
1905        let schema = Arc::new(Schema::new(vec![
1906            Field::new("id", DataType::Int32, false),
1907            Field::new("name", DataType::Utf8, false),
1908        ]));
1909
1910        let batch = RecordBatch::try_new(
1911            schema.clone(),
1912            vec![
1913                Arc::new(Int32Array::from(vec![1, 2, 3])),
1914                Arc::new(StringArray::from(vec!["a", "b", "c"])),
1915            ],
1916        )
1917        .unwrap();
1918
1919        let mut buffer = Vec::new();
1920        {
1921            let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap();
1922            writer.write(&batch).unwrap();
1923            writer.finish().unwrap();
1924        }
1925        buffer
1926    }
1927
1928    #[rstest]
1929    #[case::with_optimization(true)]
1930    #[case::without_optimization(false)]
1931    #[tokio::test]
1932    async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) {
1933        let temp_dir = TempStdDir::default();
1934        let temp_path = temp_dir.to_str().unwrap();
1935
1936        // Create a DirectoryNamespace with manifest enabled (default)
1937        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1938            .inline_optimization_enabled(inline_optimization)
1939            .build()
1940            .await
1941            .unwrap();
1942
1943        // Verify we can list tables (should be empty)
1944        let mut request = ListTablesRequest::new();
1945        request.id = Some(vec![]);
1946        let response = dir_namespace.list_tables(request).await.unwrap();
1947        assert_eq!(response.tables.len(), 0);
1948
1949        // Create a test table
1950        let buffer = create_test_ipc_data();
1951        let mut create_request = CreateTableRequest::new();
1952        create_request.id = Some(vec!["test_table".to_string()]);
1953
1954        let _response = dir_namespace
1955            .create_table(create_request, Bytes::from(buffer))
1956            .await
1957            .unwrap();
1958
1959        // List tables again - should see our new table
1960        let mut request = ListTablesRequest::new();
1961        request.id = Some(vec![]);
1962        let response = dir_namespace.list_tables(request).await.unwrap();
1963        assert_eq!(response.tables.len(), 1);
1964        assert_eq!(response.tables[0], "test_table");
1965    }
1966
1967    #[rstest]
1968    #[case::with_optimization(true)]
1969    #[case::without_optimization(false)]
1970    #[tokio::test]
1971    async fn test_manifest_namespace_table_exists(#[case] inline_optimization: bool) {
1972        let temp_dir = TempStdDir::default();
1973        let temp_path = temp_dir.to_str().unwrap();
1974
1975        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1976            .inline_optimization_enabled(inline_optimization)
1977            .build()
1978            .await
1979            .unwrap();
1980
1981        // Check non-existent table
1982        let mut request = TableExistsRequest::new();
1983        request.id = Some(vec!["nonexistent".to_string()]);
1984        let result = dir_namespace.table_exists(request).await;
1985        assert!(result.is_err());
1986
1987        // Create table
1988        let buffer = create_test_ipc_data();
1989        let mut create_request = CreateTableRequest::new();
1990        create_request.id = Some(vec!["test_table".to_string()]);
1991        dir_namespace
1992            .create_table(create_request, Bytes::from(buffer))
1993            .await
1994            .unwrap();
1995
1996        // Check existing table
1997        let mut request = TableExistsRequest::new();
1998        request.id = Some(vec!["test_table".to_string()]);
1999        let result = dir_namespace.table_exists(request).await;
2000        assert!(result.is_ok());
2001    }
2002
2003    #[rstest]
2004    #[case::with_optimization(true)]
2005    #[case::without_optimization(false)]
2006    #[tokio::test]
2007    async fn test_manifest_namespace_describe_table(#[case] inline_optimization: bool) {
2008        let temp_dir = TempStdDir::default();
2009        let temp_path = temp_dir.to_str().unwrap();
2010
2011        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2012            .inline_optimization_enabled(inline_optimization)
2013            .build()
2014            .await
2015            .unwrap();
2016
2017        // Describe non-existent table
2018        let mut request = DescribeTableRequest::new();
2019        request.id = Some(vec!["nonexistent".to_string()]);
2020        let result = dir_namespace.describe_table(request).await;
2021        assert!(result.is_err());
2022
2023        // Create table
2024        let buffer = create_test_ipc_data();
2025        let mut create_request = CreateTableRequest::new();
2026        create_request.id = Some(vec!["test_table".to_string()]);
2027        dir_namespace
2028            .create_table(create_request, Bytes::from(buffer))
2029            .await
2030            .unwrap();
2031
2032        // Describe existing table
2033        let mut request = DescribeTableRequest::new();
2034        request.id = Some(vec!["test_table".to_string()]);
2035        let response = dir_namespace.describe_table(request).await.unwrap();
2036        assert!(response.location.is_some());
2037        assert!(response.location.unwrap().contains("test_table"));
2038    }
2039
2040    #[rstest]
2041    #[case::with_optimization(true)]
2042    #[case::without_optimization(false)]
2043    #[tokio::test]
2044    async fn test_manifest_namespace_drop_table(#[case] inline_optimization: bool) {
2045        let temp_dir = TempStdDir::default();
2046        let temp_path = temp_dir.to_str().unwrap();
2047
2048        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2049            .inline_optimization_enabled(inline_optimization)
2050            .build()
2051            .await
2052            .unwrap();
2053
2054        // Create table
2055        let buffer = create_test_ipc_data();
2056        let mut create_request = CreateTableRequest::new();
2057        create_request.id = Some(vec!["test_table".to_string()]);
2058        dir_namespace
2059            .create_table(create_request, Bytes::from(buffer))
2060            .await
2061            .unwrap();
2062
2063        // Verify table exists
2064        let mut request = ListTablesRequest::new();
2065        request.id = Some(vec![]);
2066        let response = dir_namespace.list_tables(request).await.unwrap();
2067        assert_eq!(response.tables.len(), 1);
2068
2069        // Drop table
2070        let mut drop_request = DropTableRequest::new();
2071        drop_request.id = Some(vec!["test_table".to_string()]);
2072        let _response = dir_namespace.drop_table(drop_request).await.unwrap();
2073
2074        // Verify table is gone
2075        let mut request = ListTablesRequest::new();
2076        request.id = Some(vec![]);
2077        let response = dir_namespace.list_tables(request).await.unwrap();
2078        assert_eq!(response.tables.len(), 0);
2079    }
2080
2081    #[rstest]
2082    #[case::with_optimization(true)]
2083    #[case::without_optimization(false)]
2084    #[tokio::test]
2085    async fn test_manifest_namespace_multiple_tables(#[case] inline_optimization: bool) {
2086        let temp_dir = TempStdDir::default();
2087        let temp_path = temp_dir.to_str().unwrap();
2088
2089        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2090            .inline_optimization_enabled(inline_optimization)
2091            .build()
2092            .await
2093            .unwrap();
2094
2095        // Create multiple tables
2096        let buffer = create_test_ipc_data();
2097        for i in 1..=3 {
2098            let mut create_request = CreateTableRequest::new();
2099            create_request.id = Some(vec![format!("table{}", i)]);
2100            dir_namespace
2101                .create_table(create_request, Bytes::from(buffer.clone()))
2102                .await
2103                .unwrap();
2104        }
2105
2106        // List all tables
2107        let mut request = ListTablesRequest::new();
2108        request.id = Some(vec![]);
2109        let response = dir_namespace.list_tables(request).await.unwrap();
2110        assert_eq!(response.tables.len(), 3);
2111        assert!(response.tables.contains(&"table1".to_string()));
2112        assert!(response.tables.contains(&"table2".to_string()));
2113        assert!(response.tables.contains(&"table3".to_string()));
2114    }
2115
2116    #[rstest]
2117    #[case::with_optimization(true)]
2118    #[case::without_optimization(false)]
2119    #[tokio::test]
2120    async fn test_directory_only_mode(#[case] inline_optimization: bool) {
2121        let temp_dir = TempStdDir::default();
2122        let temp_path = temp_dir.to_str().unwrap();
2123
2124        // Create a DirectoryNamespace with manifest disabled
2125        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2126            .manifest_enabled(false)
2127            .inline_optimization_enabled(inline_optimization)
2128            .build()
2129            .await
2130            .unwrap();
2131
2132        // Verify we can list tables (should be empty)
2133        let mut request = ListTablesRequest::new();
2134        request.id = Some(vec![]);
2135        let response = dir_namespace.list_tables(request).await.unwrap();
2136        assert_eq!(response.tables.len(), 0);
2137
2138        // Create a test table
2139        let buffer = create_test_ipc_data();
2140        let mut create_request = CreateTableRequest::new();
2141        create_request.id = Some(vec!["test_table".to_string()]);
2142
2143        // Create table - this should use directory-only mode
2144        let _response = dir_namespace
2145            .create_table(create_request, Bytes::from(buffer))
2146            .await
2147            .unwrap();
2148
2149        // List tables - should see our new table
2150        let mut request = ListTablesRequest::new();
2151        request.id = Some(vec![]);
2152        let response = dir_namespace.list_tables(request).await.unwrap();
2153        assert_eq!(response.tables.len(), 1);
2154        assert_eq!(response.tables[0], "test_table");
2155    }
2156
2157    #[rstest]
2158    #[case::with_optimization(true)]
2159    #[case::without_optimization(false)]
2160    #[tokio::test]
2161    async fn test_dual_mode_merge(#[case] inline_optimization: bool) {
2162        let temp_dir = TempStdDir::default();
2163        let temp_path = temp_dir.to_str().unwrap();
2164
2165        // Create a DirectoryNamespace with both manifest and directory enabled
2166        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2167            .manifest_enabled(true)
2168            .dir_listing_enabled(true)
2169            .inline_optimization_enabled(inline_optimization)
2170            .build()
2171            .await
2172            .unwrap();
2173
2174        // Create tables through manifest
2175        let buffer = create_test_ipc_data();
2176        let mut create_request = CreateTableRequest::new();
2177        create_request.id = Some(vec!["table1".to_string()]);
2178        dir_namespace
2179            .create_table(create_request, Bytes::from(buffer))
2180            .await
2181            .unwrap();
2182
2183        // List tables - should see table from both manifest and directory
2184        let mut request = ListTablesRequest::new();
2185        request.id = Some(vec![]);
2186        let response = dir_namespace.list_tables(request).await.unwrap();
2187        assert_eq!(response.tables.len(), 1);
2188        assert_eq!(response.tables[0], "table1");
2189    }
2190
2191    #[rstest]
2192    #[case::with_optimization(true)]
2193    #[case::without_optimization(false)]
2194    #[tokio::test]
2195    async fn test_manifest_only_mode(#[case] inline_optimization: bool) {
2196        let temp_dir = TempStdDir::default();
2197        let temp_path = temp_dir.to_str().unwrap();
2198
2199        // Create a DirectoryNamespace with only manifest enabled
2200        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2201            .manifest_enabled(true)
2202            .dir_listing_enabled(false)
2203            .inline_optimization_enabled(inline_optimization)
2204            .build()
2205            .await
2206            .unwrap();
2207
2208        // Create table
2209        let buffer = create_test_ipc_data();
2210        let mut create_request = CreateTableRequest::new();
2211        create_request.id = Some(vec!["test_table".to_string()]);
2212        dir_namespace
2213            .create_table(create_request, Bytes::from(buffer))
2214            .await
2215            .unwrap();
2216
2217        // List tables - should only use manifest
2218        let mut request = ListTablesRequest::new();
2219        request.id = Some(vec![]);
2220        let response = dir_namespace.list_tables(request).await.unwrap();
2221        assert_eq!(response.tables.len(), 1);
2222        assert_eq!(response.tables[0], "test_table");
2223    }
2224
2225    #[rstest]
2226    #[case::with_optimization(true)]
2227    #[case::without_optimization(false)]
2228    #[tokio::test]
2229    async fn test_drop_nonexistent_table(#[case] inline_optimization: bool) {
2230        let temp_dir = TempStdDir::default();
2231        let temp_path = temp_dir.to_str().unwrap();
2232
2233        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2234            .inline_optimization_enabled(inline_optimization)
2235            .build()
2236            .await
2237            .unwrap();
2238
2239        // Try to drop non-existent table
2240        let mut drop_request = DropTableRequest::new();
2241        drop_request.id = Some(vec!["nonexistent".to_string()]);
2242        let result = dir_namespace.drop_table(drop_request).await;
2243        assert!(result.is_err());
2244    }
2245
2246    #[rstest]
2247    #[case::with_optimization(true)]
2248    #[case::without_optimization(false)]
2249    #[tokio::test]
2250    async fn test_create_duplicate_table_fails(#[case] inline_optimization: bool) {
2251        let temp_dir = TempStdDir::default();
2252        let temp_path = temp_dir.to_str().unwrap();
2253
2254        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2255            .inline_optimization_enabled(inline_optimization)
2256            .build()
2257            .await
2258            .unwrap();
2259
2260        // Create table
2261        let buffer = create_test_ipc_data();
2262        let mut create_request = CreateTableRequest::new();
2263        create_request.id = Some(vec!["test_table".to_string()]);
2264        dir_namespace
2265            .create_table(create_request, Bytes::from(buffer.clone()))
2266            .await
2267            .unwrap();
2268
2269        // Try to create table with same name - should fail
2270        let mut create_request = CreateTableRequest::new();
2271        create_request.id = Some(vec!["test_table".to_string()]);
2272        let result = dir_namespace
2273            .create_table(create_request, Bytes::from(buffer))
2274            .await;
2275        assert!(result.is_err());
2276    }
2277
2278    #[rstest]
2279    #[case::with_optimization(true)]
2280    #[case::without_optimization(false)]
2281    #[tokio::test]
2282    async fn test_create_child_namespace(#[case] inline_optimization: bool) {
2283        use lance_namespace::models::{
2284            CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2285        };
2286
2287        let temp_dir = TempStdDir::default();
2288        let temp_path = temp_dir.to_str().unwrap();
2289
2290        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2291            .inline_optimization_enabled(inline_optimization)
2292            .build()
2293            .await
2294            .unwrap();
2295
2296        // Create a child namespace
2297        let mut create_req = CreateNamespaceRequest::new();
2298        create_req.id = Some(vec!["ns1".to_string()]);
2299        let result = dir_namespace.create_namespace(create_req).await;
2300        assert!(
2301            result.is_ok(),
2302            "Failed to create child namespace: {:?}",
2303            result.err()
2304        );
2305
2306        // Verify namespace exists
2307        let exists_req = NamespaceExistsRequest {
2308            id: Some(vec!["ns1".to_string()]),
2309            ..Default::default()
2310        };
2311        let result = dir_namespace.namespace_exists(exists_req).await;
2312        assert!(result.is_ok(), "Namespace should exist");
2313
2314        // List child namespaces of root
2315        let list_req = ListNamespacesRequest {
2316            id: Some(vec![]),
2317            page_token: None,
2318            limit: None,
2319            ..Default::default()
2320        };
2321        let result = dir_namespace.list_namespaces(list_req).await;
2322        assert!(result.is_ok());
2323        let namespaces = result.unwrap();
2324        assert_eq!(namespaces.namespaces.len(), 1);
2325        assert_eq!(namespaces.namespaces[0], "ns1");
2326    }
2327
2328    #[rstest]
2329    #[case::with_optimization(true)]
2330    #[case::without_optimization(false)]
2331    #[tokio::test]
2332    async fn test_create_nested_namespace(#[case] inline_optimization: bool) {
2333        use lance_namespace::models::{
2334            CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2335        };
2336
2337        let temp_dir = TempStdDir::default();
2338        let temp_path = temp_dir.to_str().unwrap();
2339
2340        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2341            .inline_optimization_enabled(inline_optimization)
2342            .build()
2343            .await
2344            .unwrap();
2345
2346        // Create parent namespace
2347        let mut create_req = CreateNamespaceRequest::new();
2348        create_req.id = Some(vec!["parent".to_string()]);
2349        dir_namespace.create_namespace(create_req).await.unwrap();
2350
2351        // Create nested child namespace
2352        let mut create_req = CreateNamespaceRequest::new();
2353        create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2354        let result = dir_namespace.create_namespace(create_req).await;
2355        assert!(
2356            result.is_ok(),
2357            "Failed to create nested namespace: {:?}",
2358            result.err()
2359        );
2360
2361        // Verify nested namespace exists
2362        let exists_req = NamespaceExistsRequest {
2363            id: Some(vec!["parent".to_string(), "child".to_string()]),
2364            ..Default::default()
2365        };
2366        let result = dir_namespace.namespace_exists(exists_req).await;
2367        assert!(result.is_ok(), "Nested namespace should exist");
2368
2369        // List child namespaces of parent
2370        let list_req = ListNamespacesRequest {
2371            id: Some(vec!["parent".to_string()]),
2372            page_token: None,
2373            limit: None,
2374            ..Default::default()
2375        };
2376        let result = dir_namespace.list_namespaces(list_req).await;
2377        assert!(result.is_ok());
2378        let namespaces = result.unwrap();
2379        assert_eq!(namespaces.namespaces.len(), 1);
2380        assert_eq!(namespaces.namespaces[0], "child");
2381    }
2382
2383    #[rstest]
2384    #[case::with_optimization(true)]
2385    #[case::without_optimization(false)]
2386    #[tokio::test]
2387    async fn test_create_namespace_without_parent_fails(#[case] inline_optimization: bool) {
2388        use lance_namespace::models::CreateNamespaceRequest;
2389
2390        let temp_dir = TempStdDir::default();
2391        let temp_path = temp_dir.to_str().unwrap();
2392
2393        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2394            .inline_optimization_enabled(inline_optimization)
2395            .build()
2396            .await
2397            .unwrap();
2398
2399        // Try to create nested namespace without parent
2400        let mut create_req = CreateNamespaceRequest::new();
2401        create_req.id = Some(vec!["nonexistent_parent".to_string(), "child".to_string()]);
2402        let result = dir_namespace.create_namespace(create_req).await;
2403        assert!(result.is_err(), "Should fail when parent doesn't exist");
2404    }
2405
2406    #[rstest]
2407    #[case::with_optimization(true)]
2408    #[case::without_optimization(false)]
2409    #[tokio::test]
2410    async fn test_drop_child_namespace(#[case] inline_optimization: bool) {
2411        use lance_namespace::models::{
2412            CreateNamespaceRequest, DropNamespaceRequest, NamespaceExistsRequest,
2413        };
2414
2415        let temp_dir = TempStdDir::default();
2416        let temp_path = temp_dir.to_str().unwrap();
2417
2418        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2419            .inline_optimization_enabled(inline_optimization)
2420            .build()
2421            .await
2422            .unwrap();
2423
2424        // Create a child namespace
2425        let mut create_req = CreateNamespaceRequest::new();
2426        create_req.id = Some(vec!["ns1".to_string()]);
2427        dir_namespace.create_namespace(create_req).await.unwrap();
2428
2429        // Drop the namespace
2430        let mut drop_req = DropNamespaceRequest::new();
2431        drop_req.id = Some(vec!["ns1".to_string()]);
2432        let result = dir_namespace.drop_namespace(drop_req).await;
2433        assert!(
2434            result.is_ok(),
2435            "Failed to drop namespace: {:?}",
2436            result.err()
2437        );
2438
2439        // Verify namespace no longer exists
2440        let exists_req = NamespaceExistsRequest {
2441            id: Some(vec!["ns1".to_string()]),
2442            ..Default::default()
2443        };
2444        let result = dir_namespace.namespace_exists(exists_req).await;
2445        assert!(result.is_err(), "Namespace should not exist after drop");
2446    }
2447
2448    #[rstest]
2449    #[case::with_optimization(true)]
2450    #[case::without_optimization(false)]
2451    #[tokio::test]
2452    async fn test_drop_namespace_with_children_fails(#[case] inline_optimization: bool) {
2453        use lance_namespace::models::{CreateNamespaceRequest, DropNamespaceRequest};
2454
2455        let temp_dir = TempStdDir::default();
2456        let temp_path = temp_dir.to_str().unwrap();
2457
2458        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2459            .inline_optimization_enabled(inline_optimization)
2460            .build()
2461            .await
2462            .unwrap();
2463
2464        // Create parent and child namespaces
2465        let mut create_req = CreateNamespaceRequest::new();
2466        create_req.id = Some(vec!["parent".to_string()]);
2467        dir_namespace.create_namespace(create_req).await.unwrap();
2468
2469        let mut create_req = CreateNamespaceRequest::new();
2470        create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2471        dir_namespace.create_namespace(create_req).await.unwrap();
2472
2473        // Try to drop parent namespace - should fail because it has children
2474        let mut drop_req = DropNamespaceRequest::new();
2475        drop_req.id = Some(vec!["parent".to_string()]);
2476        let result = dir_namespace.drop_namespace(drop_req).await;
2477        assert!(result.is_err(), "Should fail when namespace has children");
2478    }
2479
2480    #[rstest]
2481    #[case::with_optimization(true)]
2482    #[case::without_optimization(false)]
2483    #[tokio::test]
2484    async fn test_create_table_in_child_namespace(#[case] inline_optimization: bool) {
2485        use lance_namespace::models::{
2486            CreateNamespaceRequest, CreateTableRequest, ListTablesRequest,
2487        };
2488
2489        let temp_dir = TempStdDir::default();
2490        let temp_path = temp_dir.to_str().unwrap();
2491
2492        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2493            .inline_optimization_enabled(inline_optimization)
2494            .build()
2495            .await
2496            .unwrap();
2497
2498        // Create a child namespace
2499        let mut create_ns_req = CreateNamespaceRequest::new();
2500        create_ns_req.id = Some(vec!["ns1".to_string()]);
2501        dir_namespace.create_namespace(create_ns_req).await.unwrap();
2502
2503        // Create a table in the child namespace
2504        let buffer = create_test_ipc_data();
2505        let mut create_table_req = CreateTableRequest::new();
2506        create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2507        let result = dir_namespace
2508            .create_table(create_table_req, Bytes::from(buffer))
2509            .await;
2510        assert!(
2511            result.is_ok(),
2512            "Failed to create table in child namespace: {:?}",
2513            result.err()
2514        );
2515
2516        // List tables in the namespace
2517        let list_req = ListTablesRequest {
2518            id: Some(vec!["ns1".to_string()]),
2519            page_token: None,
2520            limit: None,
2521            ..Default::default()
2522        };
2523        let result = dir_namespace.list_tables(list_req).await;
2524        assert!(result.is_ok());
2525        let tables = result.unwrap();
2526        assert_eq!(tables.tables.len(), 1);
2527        assert_eq!(tables.tables[0], "table1");
2528    }
2529
2530    #[rstest]
2531    #[case::with_optimization(true)]
2532    #[case::without_optimization(false)]
2533    #[tokio::test]
2534    async fn test_describe_child_namespace(#[case] inline_optimization: bool) {
2535        use lance_namespace::models::{CreateNamespaceRequest, DescribeNamespaceRequest};
2536
2537        let temp_dir = TempStdDir::default();
2538        let temp_path = temp_dir.to_str().unwrap();
2539
2540        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2541            .inline_optimization_enabled(inline_optimization)
2542            .build()
2543            .await
2544            .unwrap();
2545
2546        // Create a child namespace with properties
2547        let mut properties = std::collections::HashMap::new();
2548        properties.insert("key1".to_string(), "value1".to_string());
2549
2550        let mut create_req = CreateNamespaceRequest::new();
2551        create_req.id = Some(vec!["ns1".to_string()]);
2552        create_req.properties = Some(properties.clone());
2553        dir_namespace.create_namespace(create_req).await.unwrap();
2554
2555        // Describe the namespace
2556        let describe_req = DescribeNamespaceRequest {
2557            id: Some(vec!["ns1".to_string()]),
2558            ..Default::default()
2559        };
2560        let result = dir_namespace.describe_namespace(describe_req).await;
2561        assert!(
2562            result.is_ok(),
2563            "Failed to describe namespace: {:?}",
2564            result.err()
2565        );
2566        let response = result.unwrap();
2567        assert!(response.properties.is_some());
2568        assert_eq!(
2569            response.properties.unwrap().get("key1"),
2570            Some(&"value1".to_string())
2571        );
2572    }
2573
2574    #[test]
2575    fn test_construct_full_uri_with_cloud_urls() {
2576        // Test S3-style URL with nested path (no trailing slash)
2577        let s3_result =
2578            ManifestNamespace::construct_full_uri("s3://bucket/path/subdir", "table.lance")
2579                .unwrap();
2580        assert_eq!(
2581            s3_result, "s3://bucket/path/subdir/table.lance",
2582            "S3 URL should correctly append table name to nested path"
2583        );
2584
2585        // Test Azure-style URL with nested path (no trailing slash)
2586        let az_result =
2587            ManifestNamespace::construct_full_uri("az://container/path/subdir", "table.lance")
2588                .unwrap();
2589        assert_eq!(
2590            az_result, "az://container/path/subdir/table.lance",
2591            "Azure URL should correctly append table name to nested path"
2592        );
2593
2594        // Test GCS-style URL with nested path (no trailing slash)
2595        let gs_result =
2596            ManifestNamespace::construct_full_uri("gs://bucket/path/subdir", "table.lance")
2597                .unwrap();
2598        assert_eq!(
2599            gs_result, "gs://bucket/path/subdir/table.lance",
2600            "GCS URL should correctly append table name to nested path"
2601        );
2602
2603        // Test with deeper nesting
2604        let deep_result =
2605            ManifestNamespace::construct_full_uri("s3://bucket/a/b/c/d", "my_table.lance").unwrap();
2606        assert_eq!(
2607            deep_result, "s3://bucket/a/b/c/d/my_table.lance",
2608            "Deeply nested path should work correctly"
2609        );
2610
2611        // Test with root-level path (single segment after bucket)
2612        let shallow_result =
2613            ManifestNamespace::construct_full_uri("s3://bucket", "table.lance").unwrap();
2614        assert_eq!(
2615            shallow_result, "s3://bucket/table.lance",
2616            "Single-level nested path should work correctly"
2617        );
2618
2619        // Test that URLs with trailing slash already work (no regression)
2620        let trailing_slash_result =
2621            ManifestNamespace::construct_full_uri("s3://bucket/path/subdir/", "table.lance")
2622                .unwrap();
2623        assert_eq!(
2624            trailing_slash_result, "s3://bucket/path/subdir/table.lance",
2625            "URL with existing trailing slash should still work"
2626        );
2627    }
2628}