Skip to main content

lance_namespace_impls/dir/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Manifest-based namespace implementation
5//!
6//! This module provides a namespace implementation that uses a manifest table
7//! to track tables and nested namespaces.
8
9use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray};
10use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
11use arrow_ipc::reader::StreamReader;
12use async_trait::async_trait;
13use bytes::Bytes;
14use futures::stream::StreamExt;
15use lance::dataset::optimize::{compact_files, CompactionOptions};
16use lance::dataset::{builder::DatasetBuilder, WriteParams};
17use lance::session::Session;
18use lance::{dataset::scanner::Scanner, Dataset};
19use lance_core::{box_error, Error, Result};
20use lance_index::optimize::OptimizeOptions;
21use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
22use lance_index::traits::DatasetIndexExt;
23use lance_index::IndexType;
24use lance_io::object_store::{ObjectStore, ObjectStoreParams};
25use lance_namespace::models::{
26    CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
27    CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DeclareTableRequest,
28    DeclareTableResponse, DeregisterTableRequest, DeregisterTableResponse,
29    DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
30    DescribeTableResponse, DropNamespaceRequest, DropNamespaceResponse, DropTableRequest,
31    DropTableResponse, ListNamespacesRequest, ListNamespacesResponse, ListTablesRequest,
32    ListTablesResponse, NamespaceExistsRequest, RegisterTableRequest, RegisterTableResponse,
33    TableExistsRequest,
34};
35use lance_namespace::schema::arrow_schema_to_json;
36use lance_namespace::LanceNamespace;
37use object_store::path::Path;
38use snafu::location;
39use std::io::Cursor;
40use std::{
41    collections::HashMap,
42    hash::{DefaultHasher, Hash, Hasher},
43    ops::{Deref, DerefMut},
44    sync::Arc,
45};
46use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
47
48const MANIFEST_TABLE_NAME: &str = "__manifest";
49const DELIMITER: &str = "$";
50
51// Index names for the __manifest table
52/// BTREE index on the object_id column for fast lookups
53const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
54/// Bitmap index on the object_type column for filtering by type
55const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
56/// LabelList index on the base_objects column for view dependencies
57const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
58
59/// Object types that can be stored in the manifest
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum ObjectType {
62    Namespace,
63    Table,
64}
65
66impl ObjectType {
67    pub fn as_str(&self) -> &str {
68        match self {
69            Self::Namespace => "namespace",
70            Self::Table => "table",
71        }
72    }
73
74    pub fn parse(s: &str) -> Result<Self> {
75        match s {
76            "namespace" => Ok(Self::Namespace),
77            "table" => Ok(Self::Table),
78            _ => Err(Error::io(
79                format!("Invalid object type: {}", s),
80                location!(),
81            )),
82        }
83    }
84}
85
86/// Information about a table stored in the manifest
87#[derive(Debug, Clone)]
88pub struct TableInfo {
89    pub namespace: Vec<String>,
90    pub name: String,
91    pub location: String,
92}
93
94/// Information about a namespace stored in the manifest
95#[derive(Debug, Clone)]
96pub struct NamespaceInfo {
97    pub namespace: Vec<String>,
98    pub name: String,
99    pub metadata: Option<HashMap<String, String>>,
100}
101
102/// A wrapper around a Dataset that provides concurrent access.
103///
104/// This can be cloned cheaply. It supports concurrent reads or exclusive writes.
105/// The manifest dataset is always kept strongly consistent by reloading on each read.
106#[derive(Debug, Clone)]
107pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);
108
109impl DatasetConsistencyWrapper {
110    /// Create a new wrapper with the given dataset.
111    pub fn new(dataset: Dataset) -> Self {
112        Self(Arc::new(RwLock::new(dataset)))
113    }
114
115    /// Get an immutable reference to the dataset.
116    /// Always reloads to ensure strong consistency.
117    pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
118        self.reload().await?;
119        Ok(DatasetReadGuard {
120            guard: self.0.read().await,
121        })
122    }
123
124    /// Get a mutable reference to the dataset.
125    /// Always reloads to ensure strong consistency.
126    pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
127        self.reload().await?;
128        Ok(DatasetWriteGuard {
129            guard: self.0.write().await,
130        })
131    }
132
133    /// Provide a known latest version of the dataset.
134    ///
135    /// This is usually done after some write operation, which inherently will
136    /// have the latest version.
137    pub async fn set_latest(&self, dataset: Dataset) {
138        let mut write_guard = self.0.write().await;
139        if dataset.manifest().version > write_guard.manifest().version {
140            *write_guard = dataset;
141        }
142    }
143
144    /// Reload the dataset to the latest version.
145    async fn reload(&self) -> Result<()> {
146        // First check if we need to reload (with read lock)
147        let read_guard = self.0.read().await;
148        let dataset_uri = read_guard.uri().to_string();
149        let current_version = read_guard.version().version;
150        log::debug!(
151            "Reload starting for uri={}, current_version={}",
152            dataset_uri,
153            current_version
154        );
155        let latest_version = read_guard
156            .latest_version_id()
157            .await
158            .map_err(|e| Error::IO {
159                source: box_error(std::io::Error::other(format!(
160                    "Failed to get latest version: {}",
161                    e
162                ))),
163                location: location!(),
164            })?;
165        log::debug!(
166            "Reload got latest_version={} for uri={}, current_version={}",
167            latest_version,
168            dataset_uri,
169            current_version
170        );
171        drop(read_guard);
172
173        // If already up-to-date, return early
174        if latest_version == current_version {
175            log::debug!("Already up-to-date for uri={}", dataset_uri);
176            return Ok(());
177        }
178
179        // Need to reload, acquire write lock
180        let mut write_guard = self.0.write().await;
181
182        // Double-check after acquiring write lock (someone else might have reloaded)
183        let latest_version = write_guard
184            .latest_version_id()
185            .await
186            .map_err(|e| Error::IO {
187                source: box_error(std::io::Error::other(format!(
188                    "Failed to get latest version: {}",
189                    e
190                ))),
191                location: location!(),
192            })?;
193
194        if latest_version != write_guard.version().version {
195            write_guard.checkout_latest().await.map_err(|e| Error::IO {
196                source: box_error(std::io::Error::other(format!(
197                    "Failed to checkout latest: {}",
198                    e
199                ))),
200                location: location!(),
201            })?;
202        }
203
204        Ok(())
205    }
206}
207
208pub struct DatasetReadGuard<'a> {
209    guard: RwLockReadGuard<'a, Dataset>,
210}
211
212impl Deref for DatasetReadGuard<'_> {
213    type Target = Dataset;
214
215    fn deref(&self) -> &Self::Target {
216        &self.guard
217    }
218}
219
220pub struct DatasetWriteGuard<'a> {
221    guard: RwLockWriteGuard<'a, Dataset>,
222}
223
224impl Deref for DatasetWriteGuard<'_> {
225    type Target = Dataset;
226
227    fn deref(&self) -> &Self::Target {
228        &self.guard
229    }
230}
231
232impl DerefMut for DatasetWriteGuard<'_> {
233    fn deref_mut(&mut self) -> &mut Self::Target {
234        &mut self.guard
235    }
236}
237
238/// Manifest-based namespace implementation
239///
240/// Uses a special `__manifest` Lance table to track tables and nested namespaces.
241#[derive(Debug)]
242pub struct ManifestNamespace {
243    root: String,
244    storage_options: Option<HashMap<String, String>>,
245    #[allow(dead_code)]
246    session: Option<Arc<Session>>,
247    #[allow(dead_code)]
248    object_store: Arc<ObjectStore>,
249    #[allow(dead_code)]
250    base_path: Path,
251    manifest_dataset: DatasetConsistencyWrapper,
252    /// Whether directory listing is enabled in dual mode
253    /// If true, root namespace tables use {table_name}.lance naming
254    /// If false, they use namespace-prefixed names
255    dir_listing_enabled: bool,
256    /// Whether to perform inline optimization (compaction and indexing) on the __manifest table
257    /// after every write. Defaults to true.
258    inline_optimization_enabled: bool,
259}
260
261impl ManifestNamespace {
262    /// Create a new ManifestNamespace from an existing DirectoryNamespace
263    pub async fn from_directory(
264        root: String,
265        storage_options: Option<HashMap<String, String>>,
266        session: Option<Arc<Session>>,
267        object_store: Arc<ObjectStore>,
268        base_path: Path,
269        dir_listing_enabled: bool,
270        inline_optimization_enabled: bool,
271    ) -> Result<Self> {
272        let manifest_dataset =
273            Self::create_or_get_manifest(&root, &storage_options, session.clone()).await?;
274
275        Ok(Self {
276            root,
277            storage_options,
278            session,
279            object_store,
280            base_path,
281            manifest_dataset,
282            dir_listing_enabled,
283            inline_optimization_enabled,
284        })
285    }
286
287    /// Build object ID from namespace path and name
288    pub fn build_object_id(namespace: &[String], name: &str) -> String {
289        if namespace.is_empty() {
290            name.to_string()
291        } else {
292            let mut id = namespace.join(DELIMITER);
293            id.push_str(DELIMITER);
294            id.push_str(name);
295            id
296        }
297    }
298
299    /// Parse object ID into namespace path and name
300    pub fn parse_object_id(object_id: &str) -> (Vec<String>, String) {
301        let parts: Vec<&str> = object_id.split(DELIMITER).collect();
302        if parts.len() == 1 {
303            (Vec::new(), parts[0].to_string())
304        } else {
305            let namespace = parts[..parts.len() - 1]
306                .iter()
307                .map(|s| s.to_string())
308                .collect();
309            let name = parts[parts.len() - 1].to_string();
310            (namespace, name)
311        }
312    }
313
314    /// Split an object ID (table_id as vec of strings) into namespace and table name
315    fn split_object_id(table_id: &[String]) -> (Vec<String>, String) {
316        if table_id.len() == 1 {
317            (vec![], table_id[0].clone())
318        } else {
319            (
320                table_id[..table_id.len() - 1].to_vec(),
321                table_id[table_id.len() - 1].clone(),
322            )
323        }
324    }
325
326    /// Convert a table ID (vec of strings) to an object_id string
327    fn str_object_id(table_id: &[String]) -> String {
328        table_id.join(DELIMITER)
329    }
330
331    /// Generate a new directory name in format: <hash>_<object_id>
332    /// The hash is used to (1) optimize object store throughput,
333    /// (2) have high enough entropy in a short period of time to prevent issues like
334    /// failed table creation, delete and create new table of the same name, etc.
335    /// The object_id is added after the hash to ensure
336    /// dir name uniqueness and make debugging easier.
337    fn generate_dir_name(object_id: &str) -> String {
338        // Generate a random number for uniqueness
339        let random_num: u64 = rand::random();
340
341        // Create hash from random number + object_id
342        let mut hasher = DefaultHasher::new();
343        random_num.hash(&mut hasher);
344        object_id.hash(&mut hasher);
345        let hash = hasher.finish();
346
347        // Format as lowercase hex (8 characters - sufficient entropy for uniqueness)
348        format!("{:08x}_{}", (hash & 0xFFFFFFFF) as u32, object_id)
349    }
350
351    /// Construct a full URI from root and relative location
352    pub(crate) fn construct_full_uri(root: &str, relative_location: &str) -> Result<String> {
353        let mut base_url = lance_io::object_store::uri_to_url(root)?;
354
355        // Ensure the base URL has a trailing slash so that URL.join() appends
356        // rather than replaces the last path segment.
357        // Without this fix, "s3://bucket/path/subdir".join("table.lance")
358        // would incorrectly produce "s3://bucket/path/table.lance" (missing subdir).
359        if !base_url.path().ends_with('/') {
360            base_url.set_path(&format!("{}/", base_url.path()));
361        }
362
363        let full_url = base_url
364            .join(relative_location)
365            .map_err(|e| Error::InvalidInput {
366                source: format!(
367                    "Failed to join URI '{}' with '{}': {:?}",
368                    root, relative_location, e
369                )
370                .into(),
371                location: location!(),
372            })?;
373
374        Ok(full_url.to_string())
375    }
376
377    /// Perform inline optimization on the __manifest table.
378    ///
379    /// This method:
380    /// 1. Creates three indexes on the manifest table:
381    ///    - BTREE index on object_id for fast lookups
382    ///    - Bitmap index on object_type for filtering by type
383    ///    - LabelList index on base_objects for view dependencies
384    /// 2. Runs file compaction to merge small files
385    /// 3. Optimizes existing indices
386    ///
387    /// This is called automatically after writes when inline_optimization_enabled is true.
388    async fn run_inline_optimization(&self) -> Result<()> {
389        if !self.inline_optimization_enabled {
390            return Ok(());
391        }
392
393        // Get a mutable reference to the dataset to perform optimization
394        let mut dataset_guard = self.manifest_dataset.get_mut().await?;
395        let dataset: &mut Dataset = &mut dataset_guard;
396
397        // Step 1: Create indexes if they don't already exist
398        let indices = dataset.load_indices().await?;
399
400        // Check which indexes already exist
401        let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
402        let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
403        let has_base_objects_index = indices
404            .iter()
405            .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);
406
407        // Create BTREE index on object_id
408        if !has_object_id_index {
409            log::debug!(
410                "Creating BTREE index '{}' on object_id for __manifest table",
411                OBJECT_ID_INDEX_NAME
412            );
413            let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
414            if let Err(e) = dataset
415                .create_index(
416                    &["object_id"],
417                    IndexType::BTree,
418                    Some(OBJECT_ID_INDEX_NAME.to_string()),
419                    &params,
420                    true,
421                )
422                .await
423            {
424                log::warn!("Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.", e);
425            } else {
426                log::info!(
427                    "Created BTREE index '{}' on object_id for __manifest table",
428                    OBJECT_ID_INDEX_NAME
429                );
430            }
431        }
432
433        // Create Bitmap index on object_type
434        if !has_object_type_index {
435            log::debug!(
436                "Creating Bitmap index '{}' on object_type for __manifest table",
437                OBJECT_TYPE_INDEX_NAME
438            );
439            let params = ScalarIndexParams::default();
440            if let Err(e) = dataset
441                .create_index(
442                    &["object_type"],
443                    IndexType::Bitmap,
444                    Some(OBJECT_TYPE_INDEX_NAME.to_string()),
445                    &params,
446                    true,
447                )
448                .await
449            {
450                log::warn!("Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.", e);
451            } else {
452                log::info!(
453                    "Created Bitmap index '{}' on object_type for __manifest table",
454                    OBJECT_TYPE_INDEX_NAME
455                );
456            }
457        }
458
459        // Create LabelList index on base_objects
460        if !has_base_objects_index {
461            log::debug!(
462                "Creating LabelList index '{}' on base_objects for __manifest table",
463                BASE_OBJECTS_INDEX_NAME
464            );
465            let params = ScalarIndexParams::default();
466            if let Err(e) = dataset
467                .create_index(
468                    &["base_objects"],
469                    IndexType::LabelList,
470                    Some(BASE_OBJECTS_INDEX_NAME.to_string()),
471                    &params,
472                    true,
473                )
474                .await
475            {
476                log::warn!("Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.", e);
477            } else {
478                log::info!(
479                    "Created LabelList index '{}' on base_objects for __manifest table",
480                    BASE_OBJECTS_INDEX_NAME
481                );
482            }
483        }
484
485        // Step 2: Run file compaction
486        log::debug!("Running file compaction on __manifest table");
487        match compact_files(dataset, CompactionOptions::default(), None).await {
488            Ok(compaction_metrics) => {
489                if compaction_metrics.fragments_removed > 0 {
490                    log::info!(
491                        "Compacted __manifest table: removed {} fragments, added {} fragments",
492                        compaction_metrics.fragments_removed,
493                        compaction_metrics.fragments_added
494                    );
495                }
496            }
497            Err(e) => {
498                log::warn!("Failed to compact files for __manifest table: {:?}. Continuing with optimization.", e);
499            }
500        }
501
502        // Step 3: Optimize indices
503        log::debug!("Optimizing indices on __manifest table");
504        match dataset.optimize_indices(&OptimizeOptions::default()).await {
505            Ok(_) => {
506                log::info!("Successfully optimized indices on __manifest table");
507            }
508            Err(e) => {
509                log::warn!(
510                    "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.",
511                    e
512                );
513            }
514        }
515
516        Ok(())
517    }
518
519    /// Get the manifest schema
520    fn manifest_schema() -> Arc<ArrowSchema> {
521        Arc::new(ArrowSchema::new(vec![
522            Field::new("object_id", DataType::Utf8, false),
523            Field::new("object_type", DataType::Utf8, false),
524            Field::new("location", DataType::Utf8, true),
525            Field::new("metadata", DataType::Utf8, true),
526            Field::new(
527                "base_objects",
528                DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))),
529                true,
530            ),
531        ]))
532    }
533
534    /// Get a scanner for the manifest dataset
535    async fn manifest_scanner(&self) -> Result<Scanner> {
536        let dataset_guard = self.manifest_dataset.get().await?;
537        Ok(dataset_guard.scan())
538    }
539
540    /// Helper to execute a scanner and collect results into a Vec
541    async fn execute_scanner(scanner: Scanner) -> Result<Vec<RecordBatch>> {
542        let mut stream = scanner.try_into_stream().await.map_err(|e| Error::IO {
543            source: box_error(std::io::Error::other(format!(
544                "Failed to create stream: {}",
545                e
546            ))),
547            location: location!(),
548        })?;
549
550        let mut batches = Vec::new();
551        while let Some(batch) = stream.next().await {
552            batches.push(batch.map_err(|e| Error::IO {
553                source: box_error(std::io::Error::other(format!(
554                    "Failed to read batch: {}",
555                    e
556                ))),
557                location: location!(),
558            })?);
559        }
560
561        Ok(batches)
562    }
563
564    /// Helper to get a string column from a record batch
565    fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> {
566        let column = batch
567            .column_by_name(column_name)
568            .ok_or_else(|| Error::io(format!("Column '{}' not found", column_name), location!()))?;
569        column
570            .as_any()
571            .downcast_ref::<StringArray>()
572            .ok_or_else(|| {
573                Error::io(
574                    format!("Column '{}' is not a string array", column_name),
575                    location!(),
576                )
577            })
578    }
579
580    /// Check if the manifest contains an object with the given ID
581    async fn manifest_contains_object(&self, object_id: &str) -> Result<bool> {
582        let filter = format!("object_id = '{}'", object_id);
583
584        let dataset_guard = self.manifest_dataset.get().await?;
585        let mut scanner = dataset_guard.scan();
586
587        scanner.filter(&filter).map_err(|e| Error::IO {
588            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
589            location: location!(),
590        })?;
591
592        // Project no columns and enable row IDs for count_rows to work
593        scanner.project::<&str>(&[]).map_err(|e| Error::IO {
594            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
595            location: location!(),
596        })?;
597
598        scanner.with_row_id();
599
600        let count = scanner.count_rows().await.map_err(|e| Error::IO {
601            source: box_error(std::io::Error::other(format!(
602                "Failed to count rows: {}",
603                e
604            ))),
605            location: location!(),
606        })?;
607
608        Ok(count > 0)
609    }
610
611    /// Query the manifest for a table with the given object ID
612    async fn query_manifest_for_table(&self, object_id: &str) -> Result<Option<TableInfo>> {
613        let filter = format!("object_id = '{}' AND object_type = 'table'", object_id);
614        let mut scanner = self.manifest_scanner().await?;
615        scanner.filter(&filter).map_err(|e| Error::IO {
616            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
617            location: location!(),
618        })?;
619        scanner
620            .project(&["object_id", "location"])
621            .map_err(|e| Error::IO {
622                source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
623                location: location!(),
624            })?;
625        let batches = Self::execute_scanner(scanner).await?;
626
627        let mut found_result: Option<TableInfo> = None;
628        let mut total_rows = 0;
629
630        for batch in batches {
631            if batch.num_rows() == 0 {
632                continue;
633            }
634
635            total_rows += batch.num_rows();
636            if total_rows > 1 {
637                return Err(Error::io(
638                    format!(
639                        "Expected exactly 1 table with id '{}', found {}",
640                        object_id, total_rows
641                    ),
642                    location!(),
643                ));
644            }
645
646            let object_id_array = Self::get_string_column(&batch, "object_id")?;
647            let location_array = Self::get_string_column(&batch, "location")?;
648            let location = location_array.value(0).to_string();
649            let (namespace, name) = Self::parse_object_id(object_id_array.value(0));
650            found_result = Some(TableInfo {
651                namespace,
652                name,
653                location,
654            });
655        }
656
657        Ok(found_result)
658    }
659
660    /// List all table locations in the manifest (for root namespace only)
661    /// Returns a set of table locations (e.g., "table_name.lance")
662    pub async fn list_manifest_table_locations(&self) -> Result<std::collections::HashSet<String>> {
663        let filter = "object_type = 'table' AND NOT contains(object_id, '$')";
664        let mut scanner = self.manifest_scanner().await?;
665        scanner.filter(filter).map_err(|e| Error::IO {
666            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
667            location: location!(),
668        })?;
669        scanner.project(&["location"]).map_err(|e| Error::IO {
670            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
671            location: location!(),
672        })?;
673
674        let batches = Self::execute_scanner(scanner).await?;
675        let mut locations = std::collections::HashSet::new();
676
677        for batch in batches {
678            if batch.num_rows() == 0 {
679                continue;
680            }
681            let location_array = Self::get_string_column(&batch, "location")?;
682            for i in 0..location_array.len() {
683                locations.insert(location_array.value(i).to_string());
684            }
685        }
686
687        Ok(locations)
688    }
689
690    /// Insert an entry into the manifest table
691    async fn insert_into_manifest(
692        &self,
693        object_id: String,
694        object_type: ObjectType,
695        location: Option<String>,
696    ) -> Result<()> {
697        self.insert_into_manifest_with_metadata(object_id, object_type, location, None, None)
698            .await
699    }
700
701    /// Insert an entry into the manifest table with metadata and base_objects
702    async fn insert_into_manifest_with_metadata(
703        &self,
704        object_id: String,
705        object_type: ObjectType,
706        location: Option<String>,
707        metadata: Option<String>,
708        base_objects: Option<Vec<String>>,
709    ) -> Result<()> {
710        use arrow::array::builder::{ListBuilder, StringBuilder};
711
712        let schema = Self::manifest_schema();
713
714        // Create base_objects array from the provided list
715        let string_builder = StringBuilder::new();
716        let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
717            "object_id",
718            DataType::Utf8,
719            true,
720        )));
721
722        match base_objects {
723            Some(objects) => {
724                for obj in objects {
725                    list_builder.values().append_value(obj);
726                }
727                list_builder.append(true);
728            }
729            None => {
730                list_builder.append_null();
731            }
732        }
733
734        let base_objects_array = list_builder.finish();
735
736        // Create arrays with optional values
737        let location_array = match location {
738            Some(loc) => Arc::new(StringArray::from(vec![Some(loc)])),
739            None => Arc::new(StringArray::from(vec![None::<String>])),
740        };
741
742        let metadata_array = match metadata {
743            Some(meta) => Arc::new(StringArray::from(vec![Some(meta)])),
744            None => Arc::new(StringArray::from(vec![None::<String>])),
745        };
746
747        let batch = RecordBatch::try_new(
748            schema.clone(),
749            vec![
750                Arc::new(StringArray::from(vec![object_id.as_str()])),
751                Arc::new(StringArray::from(vec![object_type.as_str()])),
752                location_array,
753                metadata_array,
754                Arc::new(base_objects_array),
755            ],
756        )
757        .map_err(|e| {
758            Error::io(
759                format!("Failed to create manifest entry: {}", e),
760                location!(),
761            )
762        })?;
763
764        let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
765
766        // Use MergeInsert to ensure uniqueness on object_id
767        let dataset_guard = self.manifest_dataset.get().await?;
768        let dataset_arc = Arc::new(dataset_guard.clone());
769        drop(dataset_guard); // Drop read guard before merge insert
770
771        let mut merge_builder =
772            lance::dataset::MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()])
773                .map_err(|e| Error::IO {
774                    source: box_error(std::io::Error::other(format!(
775                        "Failed to create merge builder: {}",
776                        e
777                    ))),
778                    location: location!(),
779                })?;
780
781        merge_builder.when_matched(lance::dataset::WhenMatched::Fail);
782        merge_builder.when_not_matched(lance::dataset::WhenNotMatched::InsertAll);
783
784        let (new_dataset_arc, _merge_stats) = merge_builder
785            .try_build()
786            .map_err(|e| Error::IO {
787                source: box_error(std::io::Error::other(format!(
788                    "Failed to build merge: {}",
789                    e
790                ))),
791                location: location!(),
792            })?
793            .execute_reader(Box::new(reader))
794            .await
795            .map_err(|e| {
796                // Check if this is a "matched row" error from WhenMatched::Fail
797                let error_msg = e.to_string();
798                if error_msg.contains("matched")
799                    || error_msg.contains("duplicate")
800                    || error_msg.contains("already exists")
801                {
802                    Error::io(
803                        format!("Object with id '{}' already exists in manifest", object_id),
804                        location!(),
805                    )
806                } else {
807                    Error::IO {
808                        source: box_error(std::io::Error::other(format!(
809                            "Failed to execute merge: {}",
810                            e
811                        ))),
812                        location: location!(),
813                    }
814                }
815            })?;
816
817        let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
818        self.manifest_dataset.set_latest(new_dataset).await;
819
820        // Run inline optimization after write
821        if let Err(e) = self.run_inline_optimization().await {
822            log::warn!(
823                "Unexpected failure when running inline optimization: {:?}",
824                e
825            );
826        }
827
828        Ok(())
829    }
830
831    /// Delete an entry from the manifest table
832    pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> {
833        {
834            let predicate = format!("object_id = '{}'", object_id);
835            let mut dataset_guard = self.manifest_dataset.get_mut().await?;
836            dataset_guard
837                .delete(&predicate)
838                .await
839                .map_err(|e| Error::IO {
840                    source: box_error(std::io::Error::other(format!("Failed to delete: {}", e))),
841                    location: location!(),
842                })?;
843        } // Drop the guard here
844
845        self.manifest_dataset.reload().await?;
846
847        // Run inline optimization after delete
848        if let Err(e) = self.run_inline_optimization().await {
849            log::warn!(
850                "Unexpected failure when running inline optimization: {:?}",
851                e
852            );
853        }
854
855        Ok(())
856    }
857
858    /// Register a table in the manifest without creating the physical table (internal helper for migration)
859    pub async fn register_table(&self, name: &str, location: String) -> Result<()> {
860        let object_id = Self::build_object_id(&[], name);
861        if self.manifest_contains_object(&object_id).await? {
862            return Err(Error::io(
863                format!("Table '{}' already exists", name),
864                location!(),
865            ));
866        }
867
868        self.insert_into_manifest(object_id, ObjectType::Table, Some(location))
869            .await
870    }
871
872    /// Validate that all levels of a namespace path exist
873    async fn validate_namespace_levels_exist(&self, namespace_path: &[String]) -> Result<()> {
874        for i in 1..=namespace_path.len() {
875            let partial_path = &namespace_path[..i];
876            let object_id = partial_path.join(DELIMITER);
877            if !self.manifest_contains_object(&object_id).await? {
878                return Err(Error::Namespace {
879                    source: format!("Parent namespace '{}' does not exist", object_id).into(),
880                    location: location!(),
881                });
882            }
883        }
884        Ok(())
885    }
886
887    /// Query the manifest for a namespace with the given object ID
888    async fn query_manifest_for_namespace(&self, object_id: &str) -> Result<Option<NamespaceInfo>> {
889        let filter = format!("object_id = '{}' AND object_type = 'namespace'", object_id);
890        let mut scanner = self.manifest_scanner().await?;
891        scanner.filter(&filter).map_err(|e| Error::IO {
892            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
893            location: location!(),
894        })?;
895        scanner
896            .project(&["object_id", "metadata"])
897            .map_err(|e| Error::IO {
898                source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
899                location: location!(),
900            })?;
901        let batches = Self::execute_scanner(scanner).await?;
902
903        let mut found_result: Option<NamespaceInfo> = None;
904        let mut total_rows = 0;
905
906        for batch in batches {
907            if batch.num_rows() == 0 {
908                continue;
909            }
910
911            total_rows += batch.num_rows();
912            if total_rows > 1 {
913                return Err(Error::io(
914                    format!(
915                        "Expected exactly 1 namespace with id '{}', found {}",
916                        object_id, total_rows
917                    ),
918                    location!(),
919                ));
920            }
921
922            let object_id_array = Self::get_string_column(&batch, "object_id")?;
923            let metadata_array = Self::get_string_column(&batch, "metadata")?;
924
925            let object_id_str = object_id_array.value(0);
926            let metadata = if !metadata_array.is_null(0) {
927                let metadata_str = metadata_array.value(0);
928                match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
929                    Ok(map) => Some(map),
930                    Err(e) => {
931                        return Err(Error::io(
932                            format!(
933                                "Failed to deserialize metadata for namespace '{}': {}",
934                                object_id, e
935                            ),
936                            location!(),
937                        ));
938                    }
939                }
940            } else {
941                None
942            };
943
944            let (namespace, name) = Self::parse_object_id(object_id_str);
945            found_result = Some(NamespaceInfo {
946                namespace,
947                name,
948                metadata,
949            });
950        }
951
952        Ok(found_result)
953    }
954
955    /// Create or get the manifest dataset
956    async fn create_or_get_manifest(
957        root: &str,
958        storage_options: &Option<HashMap<String, String>>,
959        session: Option<Arc<Session>>,
960    ) -> Result<DatasetConsistencyWrapper> {
961        let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME);
962        log::debug!("Attempting to load manifest from {}", manifest_path);
963        let mut builder = DatasetBuilder::from_uri(&manifest_path);
964
965        if let Some(sess) = session.clone() {
966            builder = builder.with_session(sess);
967        }
968
969        if let Some(opts) = storage_options {
970            builder = builder.with_storage_options(opts.clone());
971        }
972
973        let dataset_result = builder.load().await;
974        if let Ok(dataset) = dataset_result {
975            Ok(DatasetConsistencyWrapper::new(dataset))
976        } else {
977            log::info!("Creating new manifest table at {}", manifest_path);
978            let schema = Self::manifest_schema();
979            let empty_batch = RecordBatch::new_empty(schema.clone());
980            let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone());
981
982            let write_params = WriteParams {
983                session,
984                store_params: storage_options.as_ref().map(|opts| ObjectStoreParams {
985                    storage_options_accessor: Some(Arc::new(
986                        lance_io::object_store::StorageOptionsAccessor::with_static_options(
987                            opts.clone(),
988                        ),
989                    )),
990                    ..Default::default()
991                }),
992                ..Default::default()
993            };
994
995            let dataset = Dataset::write(Box::new(reader), &manifest_path, Some(write_params))
996                .await
997                .map_err(|e| Error::IO {
998                    source: box_error(std::io::Error::other(format!(
999                        "Failed to create manifest dataset: {}",
1000                        e
1001                    ))),
1002                    location: location!(),
1003                })?;
1004
1005            log::info!(
1006                "Successfully created manifest table at {}, version={}, uri={}",
1007                manifest_path,
1008                dataset.version().version,
1009                dataset.uri()
1010            );
1011            Ok(DatasetConsistencyWrapper::new(dataset))
1012        }
1013    }
1014}
1015
1016#[async_trait]
1017impl LanceNamespace for ManifestNamespace {
1018    fn namespace_id(&self) -> String {
1019        self.root.clone()
1020    }
1021
1022    async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1023        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1024            source: "Namespace ID is required".into(),
1025            location: location!(),
1026        })?;
1027
1028        // Build filter to find tables in this namespace
1029        let filter = if namespace_id.is_empty() {
1030            // Root namespace: find tables without a namespace prefix
1031            "object_type = 'table' AND NOT contains(object_id, '$')".to_string()
1032        } else {
1033            // Namespaced: find tables that start with namespace$ but have no additional $
1034            let prefix = namespace_id.join(DELIMITER);
1035            format!(
1036                "object_type = 'table' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1037                prefix, DELIMITER, prefix.len() + 2
1038            )
1039        };
1040
1041        let mut scanner = self.manifest_scanner().await?;
1042        scanner.filter(&filter).map_err(|e| Error::IO {
1043            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1044            location: location!(),
1045        })?;
1046        scanner.project(&["object_id"]).map_err(|e| Error::IO {
1047            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1048            location: location!(),
1049        })?;
1050
1051        let batches = Self::execute_scanner(scanner).await?;
1052
1053        let mut tables = Vec::new();
1054        for batch in batches {
1055            if batch.num_rows() == 0 {
1056                continue;
1057            }
1058
1059            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1060            for i in 0..batch.num_rows() {
1061                let object_id = object_id_array.value(i);
1062                let (_namespace, name) = Self::parse_object_id(object_id);
1063                tables.push(name);
1064            }
1065        }
1066
1067        Ok(ListTablesResponse::new(tables))
1068    }
1069
1070    async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1071        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1072            source: "Table ID is required".into(),
1073            location: location!(),
1074        })?;
1075
1076        if table_id.is_empty() {
1077            return Err(Error::InvalidInput {
1078                source: "Table ID cannot be empty".into(),
1079                location: location!(),
1080            });
1081        }
1082
1083        let object_id = Self::str_object_id(table_id);
1084        let table_info = self.query_manifest_for_table(&object_id).await?;
1085
1086        // Extract table name and namespace from table_id
1087        let table_name = table_id.last().cloned().unwrap_or_default();
1088        let namespace_id: Vec<String> = if table_id.len() > 1 {
1089            table_id[..table_id.len() - 1].to_vec()
1090        } else {
1091            vec![]
1092        };
1093
1094        let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
1095        // For backwards compatibility, only skip vending credentials when explicitly set to false
1096        let vend_credentials = request.vend_credentials.unwrap_or(true);
1097
1098        match table_info {
1099            Some(info) => {
1100                // Construct full URI from relative location
1101                let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1102
1103                let storage_options = if vend_credentials {
1104                    self.storage_options.clone()
1105                } else {
1106                    None
1107                };
1108
1109                // If not loading detailed metadata, return minimal response with just location
1110                if !load_detailed_metadata {
1111                    return Ok(DescribeTableResponse {
1112                        table: Some(table_name),
1113                        namespace: Some(namespace_id),
1114                        location: Some(table_uri.clone()),
1115                        table_uri: Some(table_uri),
1116                        storage_options,
1117                        ..Default::default()
1118                    });
1119                }
1120
1121                // Try to open the dataset to get version and schema
1122                match Dataset::open(&table_uri).await {
1123                    Ok(mut dataset) => {
1124                        // If a specific version is requested, checkout that version
1125                        if let Some(requested_version) = request.version {
1126                            dataset = dataset.checkout_version(requested_version as u64).await?;
1127                        }
1128
1129                        let version = dataset.version().version;
1130                        let lance_schema = dataset.schema();
1131                        let arrow_schema: arrow_schema::Schema = lance_schema.into();
1132                        let json_schema = arrow_schema_to_json(&arrow_schema)?;
1133
1134                        Ok(DescribeTableResponse {
1135                            table: Some(table_name.clone()),
1136                            namespace: Some(namespace_id.clone()),
1137                            version: Some(version as i64),
1138                            location: Some(table_uri.clone()),
1139                            table_uri: Some(table_uri),
1140                            schema: Some(Box::new(json_schema)),
1141                            storage_options,
1142                            ..Default::default()
1143                        })
1144                    }
1145                    Err(_) => {
1146                        // If dataset can't be opened (e.g., empty table), return minimal info
1147                        Ok(DescribeTableResponse {
1148                            table: Some(table_name),
1149                            namespace: Some(namespace_id),
1150                            location: Some(table_uri.clone()),
1151                            table_uri: Some(table_uri),
1152                            storage_options,
1153                            ..Default::default()
1154                        })
1155                    }
1156                }
1157            }
1158            None => Err(Error::Namespace {
1159                source: format!("Table '{}' not found", object_id).into(),
1160                location: location!(),
1161            }),
1162        }
1163    }
1164
1165    async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1166        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1167            source: "Table ID is required".into(),
1168            location: location!(),
1169        })?;
1170
1171        if table_id.is_empty() {
1172            return Err(Error::InvalidInput {
1173                source: "Table ID cannot be empty".into(),
1174                location: location!(),
1175            });
1176        }
1177
1178        let (namespace, table_name) = Self::split_object_id(table_id);
1179        let object_id = Self::build_object_id(&namespace, &table_name);
1180        let exists = self.manifest_contains_object(&object_id).await?;
1181        if exists {
1182            Ok(())
1183        } else {
1184            Err(Error::Namespace {
1185                source: format!("Table '{}' not found", table_name).into(),
1186                location: location!(),
1187            })
1188        }
1189    }
1190
1191    async fn create_table(
1192        &self,
1193        request: CreateTableRequest,
1194        data: Bytes,
1195    ) -> Result<CreateTableResponse> {
1196        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1197            source: "Table ID is required".into(),
1198            location: location!(),
1199        })?;
1200
1201        if table_id.is_empty() {
1202            return Err(Error::InvalidInput {
1203                source: "Table ID cannot be empty".into(),
1204                location: location!(),
1205            });
1206        }
1207
1208        let (namespace, table_name) = Self::split_object_id(table_id);
1209        let object_id = Self::build_object_id(&namespace, &table_name);
1210
1211        // Check if table already exists in manifest
1212        if self.manifest_contains_object(&object_id).await? {
1213            return Err(Error::io(
1214                format!("Table '{}' already exists", table_name),
1215                location!(),
1216            ));
1217        }
1218
1219        // Create the physical table location with hash-based naming
1220        // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance
1221        // Otherwise, use hash-based naming: {hash}_{object_id}
1222        let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1223            // Root table with directory listing enabled: use {table_name}.lance
1224            format!("{}.lance", table_name)
1225        } else {
1226            // Child namespace table or dir listing disabled: use hash-based naming
1227            Self::generate_dir_name(&object_id)
1228        };
1229        let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1230
1231        // Validate that request_data is provided
1232        if data.is_empty() {
1233            return Err(Error::Namespace {
1234                source: "Request data (Arrow IPC stream) is required for create_table".into(),
1235                location: location!(),
1236            });
1237        }
1238
1239        // Write the data using Lance Dataset
1240        let cursor = Cursor::new(data.to_vec());
1241        let stream_reader = StreamReader::try_new(cursor, None)
1242            .map_err(|e| Error::io(format!("Failed to read IPC stream: {}", e), location!()))?;
1243
1244        let batches: Vec<RecordBatch> =
1245            stream_reader
1246                .collect::<std::result::Result<Vec<_>, _>>()
1247                .map_err(|e| Error::io(format!("Failed to collect batches: {}", e), location!()))?;
1248
1249        if batches.is_empty() {
1250            return Err(Error::io(
1251                "No data provided for table creation",
1252                location!(),
1253            ));
1254        }
1255
1256        let schema = batches[0].schema();
1257        let batch_results: Vec<std::result::Result<RecordBatch, arrow_schema::ArrowError>> =
1258            batches.into_iter().map(Ok).collect();
1259        let reader = RecordBatchIterator::new(batch_results, schema);
1260
1261        let write_params = WriteParams::default();
1262        let _dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params))
1263            .await
1264            .map_err(|e| Error::IO {
1265                source: box_error(std::io::Error::other(format!(
1266                    "Failed to write dataset: {}",
1267                    e
1268                ))),
1269                location: location!(),
1270            })?;
1271
1272        // Register in manifest (store dir_name, not full URI)
1273        self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1274            .await?;
1275
1276        Ok(CreateTableResponse {
1277            version: Some(1),
1278            location: Some(table_uri),
1279            storage_options: self.storage_options.clone(),
1280            ..Default::default()
1281        })
1282    }
1283
1284    async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1285        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1286            source: "Table ID is required".into(),
1287            location: location!(),
1288        })?;
1289
1290        if table_id.is_empty() {
1291            return Err(Error::InvalidInput {
1292                source: "Table ID cannot be empty".into(),
1293                location: location!(),
1294            });
1295        }
1296
1297        let (namespace, table_name) = Self::split_object_id(table_id);
1298        let object_id = Self::build_object_id(&namespace, &table_name);
1299
1300        // Query manifest for table location
1301        let table_info = self.query_manifest_for_table(&object_id).await?;
1302
1303        match table_info {
1304            Some(info) => {
1305                // Delete from manifest first
1306                self.delete_from_manifest(&object_id).await?;
1307
1308                // Delete physical data directory using the dir_name from manifest
1309                let table_path = self.base_path.child(info.location.as_str());
1310                let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1311
1312                // Remove the table directory
1313                self.object_store
1314                    .remove_dir_all(table_path)
1315                    .await
1316                    .map_err(|e| Error::Namespace {
1317                        source: format!("Failed to delete table directory: {}", e).into(),
1318                        location: location!(),
1319                    })?;
1320
1321                Ok(DropTableResponse {
1322                    id: request.id.clone(),
1323                    location: Some(table_uri),
1324                    ..Default::default()
1325                })
1326            }
1327            None => Err(Error::Namespace {
1328                source: format!("Table '{}' not found", table_name).into(),
1329                location: location!(),
1330            }),
1331        }
1332    }
1333
1334    async fn list_namespaces(
1335        &self,
1336        request: ListNamespacesRequest,
1337    ) -> Result<ListNamespacesResponse> {
1338        let parent_namespace = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1339            source: "Namespace ID is required".into(),
1340            location: location!(),
1341        })?;
1342
1343        // Build filter to find direct child namespaces
1344        let filter = if parent_namespace.is_empty() {
1345            // Root namespace: find all namespaces without a parent
1346            "object_type = 'namespace' AND NOT contains(object_id, '$')".to_string()
1347        } else {
1348            // Non-root: find namespaces that start with parent$ but have no additional $
1349            let prefix = parent_namespace.join(DELIMITER);
1350            format!(
1351                "object_type = 'namespace' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1352                prefix, DELIMITER, prefix.len() + 2
1353            )
1354        };
1355
1356        let mut scanner = self.manifest_scanner().await?;
1357        scanner.filter(&filter).map_err(|e| Error::IO {
1358            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1359            location: location!(),
1360        })?;
1361        scanner.project(&["object_id"]).map_err(|e| Error::IO {
1362            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1363            location: location!(),
1364        })?;
1365
1366        let batches = Self::execute_scanner(scanner).await?;
1367        let mut namespaces = Vec::new();
1368
1369        for batch in batches {
1370            if batch.num_rows() == 0 {
1371                continue;
1372            }
1373
1374            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1375            for i in 0..batch.num_rows() {
1376                let object_id = object_id_array.value(i);
1377                let (_namespace, name) = Self::parse_object_id(object_id);
1378                namespaces.push(name);
1379            }
1380        }
1381
1382        Ok(ListNamespacesResponse::new(namespaces))
1383    }
1384
1385    async fn describe_namespace(
1386        &self,
1387        request: DescribeNamespaceRequest,
1388    ) -> Result<DescribeNamespaceResponse> {
1389        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1390            source: "Namespace ID is required".into(),
1391            location: location!(),
1392        })?;
1393
1394        // Root namespace always exists
1395        if namespace_id.is_empty() {
1396            #[allow(clippy::needless_update)]
1397            return Ok(DescribeNamespaceResponse {
1398                properties: Some(HashMap::new()),
1399                ..Default::default()
1400            });
1401        }
1402
1403        // Check if namespace exists in manifest
1404        let object_id = namespace_id.join(DELIMITER);
1405        let namespace_info = self.query_manifest_for_namespace(&object_id).await?;
1406
1407        match namespace_info {
1408            #[allow(clippy::needless_update)]
1409            Some(info) => Ok(DescribeNamespaceResponse {
1410                properties: info.metadata,
1411                ..Default::default()
1412            }),
1413            None => Err(Error::Namespace {
1414                source: format!("Namespace '{}' not found", object_id).into(),
1415                location: location!(),
1416            }),
1417        }
1418    }
1419
1420    async fn create_namespace(
1421        &self,
1422        request: CreateNamespaceRequest,
1423    ) -> Result<CreateNamespaceResponse> {
1424        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1425            source: "Namespace ID is required".into(),
1426            location: location!(),
1427        })?;
1428
1429        // Root namespace always exists and cannot be created
1430        if namespace_id.is_empty() {
1431            return Err(Error::Namespace {
1432                source: "Root namespace already exists and cannot be created".into(),
1433                location: location!(),
1434            });
1435        }
1436
1437        // Validate parent namespaces exist (but not the namespace being created)
1438        if namespace_id.len() > 1 {
1439            self.validate_namespace_levels_exist(&namespace_id[..namespace_id.len() - 1])
1440                .await?;
1441        }
1442
1443        let object_id = namespace_id.join(DELIMITER);
1444        if self.manifest_contains_object(&object_id).await? {
1445            return Err(Error::Namespace {
1446                source: format!("Namespace '{}' already exists", object_id).into(),
1447                location: location!(),
1448            });
1449        }
1450
1451        // Serialize properties if provided
1452        let metadata = request.properties.as_ref().and_then(|props| {
1453            if props.is_empty() {
1454                None
1455            } else {
1456                Some(serde_json::to_string(props).ok()?)
1457            }
1458        });
1459
1460        self.insert_into_manifest_with_metadata(
1461            object_id,
1462            ObjectType::Namespace,
1463            None,
1464            metadata,
1465            None,
1466        )
1467        .await?;
1468
1469        Ok(CreateNamespaceResponse {
1470            properties: request.properties,
1471            ..Default::default()
1472        })
1473    }
1474
1475    async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
1476        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1477            source: "Namespace ID is required".into(),
1478            location: location!(),
1479        })?;
1480
1481        // Root namespace always exists and cannot be dropped
1482        if namespace_id.is_empty() {
1483            return Err(Error::Namespace {
1484                source: "Root namespace cannot be dropped".into(),
1485                location: location!(),
1486            });
1487        }
1488
1489        let object_id = namespace_id.join(DELIMITER);
1490
1491        // Check if namespace exists
1492        if !self.manifest_contains_object(&object_id).await? {
1493            return Err(Error::Namespace {
1494                source: format!("Namespace '{}' not found", object_id).into(),
1495                location: location!(),
1496            });
1497        }
1498
1499        // Check for child namespaces
1500        let prefix = format!("{}{}", object_id, DELIMITER);
1501        let filter = format!("starts_with(object_id, '{}')", prefix);
1502        let mut scanner = self.manifest_scanner().await?;
1503        scanner.filter(&filter).map_err(|e| Error::IO {
1504            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1505            location: location!(),
1506        })?;
1507        scanner.project::<&str>(&[]).map_err(|e| Error::IO {
1508            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1509            location: location!(),
1510        })?;
1511        scanner.with_row_id();
1512        let count = scanner.count_rows().await.map_err(|e| Error::IO {
1513            source: box_error(std::io::Error::other(format!(
1514                "Failed to count rows: {}",
1515                e
1516            ))),
1517            location: location!(),
1518        })?;
1519
1520        if count > 0 {
1521            return Err(Error::Namespace {
1522                source: format!(
1523                    "Namespace '{}' is not empty (contains {} child objects)",
1524                    object_id, count
1525                )
1526                .into(),
1527                location: location!(),
1528            });
1529        }
1530
1531        self.delete_from_manifest(&object_id).await?;
1532
1533        Ok(DropNamespaceResponse::default())
1534    }
1535
1536    async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
1537        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1538            source: "Namespace ID is required".into(),
1539            location: location!(),
1540        })?;
1541
1542        // Root namespace always exists
1543        if namespace_id.is_empty() {
1544            return Ok(());
1545        }
1546
1547        let object_id = namespace_id.join(DELIMITER);
1548        if self.manifest_contains_object(&object_id).await? {
1549            Ok(())
1550        } else {
1551            Err(Error::Namespace {
1552                source: format!("Namespace '{}' not found", object_id).into(),
1553                location: location!(),
1554            })
1555        }
1556    }
1557
1558    async fn create_empty_table(
1559        &self,
1560        request: CreateEmptyTableRequest,
1561    ) -> Result<CreateEmptyTableResponse> {
1562        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1563            source: "Table ID is required".into(),
1564            location: location!(),
1565        })?;
1566
1567        if table_id.is_empty() {
1568            return Err(Error::InvalidInput {
1569                source: "Table ID cannot be empty".into(),
1570                location: location!(),
1571            });
1572        }
1573
1574        let (namespace, table_name) = Self::split_object_id(table_id);
1575        let object_id = Self::build_object_id(&namespace, &table_name);
1576
1577        // Check if table already exists in manifest
1578        let existing = self.query_manifest_for_table(&object_id).await?;
1579        if existing.is_some() {
1580            return Err(Error::Namespace {
1581                source: format!("Table '{}' already exists", table_name).into(),
1582                location: location!(),
1583            });
1584        }
1585
1586        // Create table location path with hash-based naming
1587        // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance
1588        // Otherwise, use hash-based naming: {hash}_{object_id}
1589        let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1590            // Root table with directory listing enabled: use {table_name}.lance
1591            format!("{}.lance", table_name)
1592        } else {
1593            // Child namespace table or dir listing disabled: use hash-based naming
1594            Self::generate_dir_name(&object_id)
1595        };
1596        let table_path = self.base_path.child(dir_name.as_str());
1597        let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1598
1599        // Validate location if provided
1600        if let Some(req_location) = &request.location {
1601            let req_location = req_location.trim_end_matches('/');
1602            if req_location != table_uri {
1603                return Err(Error::Namespace {
1604                    source: format!(
1605                        "Cannot create table {} at location {}, must be at location {}",
1606                        table_name, req_location, table_uri
1607                    )
1608                    .into(),
1609                    location: location!(),
1610                });
1611            }
1612        }
1613
1614        // Create the .lance-reserved file to mark the table as existing
1615        let reserved_file_path = table_path.child(".lance-reserved");
1616
1617        self.object_store
1618            .create(&reserved_file_path)
1619            .await
1620            .map_err(|e| Error::Namespace {
1621                source: format!(
1622                    "Failed to create .lance-reserved file for table {}: {}",
1623                    table_name, e
1624                )
1625                .into(),
1626                location: location!(),
1627            })?
1628            .shutdown()
1629            .await
1630            .map_err(|e| Error::Namespace {
1631                source: format!(
1632                    "Failed to finalize .lance-reserved file for table {}: {}",
1633                    table_name, e
1634                )
1635                .into(),
1636                location: location!(),
1637            })?;
1638
1639        // Add entry to manifest marking this as an empty table (store dir_name, not full path)
1640        self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1641            .await?;
1642
1643        log::info!(
1644            "Created empty table '{}' in manifest at {}",
1645            table_name,
1646            table_uri
1647        );
1648
1649        // For backwards compatibility, only skip vending credentials when explicitly set to false
1650        let vend_credentials = request.vend_credentials.unwrap_or(true);
1651        let storage_options = if vend_credentials {
1652            self.storage_options.clone()
1653        } else {
1654            None
1655        };
1656
1657        Ok(CreateEmptyTableResponse {
1658            location: Some(table_uri),
1659            storage_options,
1660            ..Default::default()
1661        })
1662    }
1663
1664    async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
1665        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1666            source: "Table ID is required".into(),
1667            location: location!(),
1668        })?;
1669
1670        if table_id.is_empty() {
1671            return Err(Error::InvalidInput {
1672                source: "Table ID cannot be empty".into(),
1673                location: location!(),
1674            });
1675        }
1676
1677        let (namespace, table_name) = Self::split_object_id(table_id);
1678        let object_id = Self::build_object_id(&namespace, &table_name);
1679
1680        // Check if table already exists in manifest
1681        let existing = self.query_manifest_for_table(&object_id).await?;
1682        if existing.is_some() {
1683            return Err(Error::Namespace {
1684                source: format!("Table '{}' already exists", table_name).into(),
1685                location: location!(),
1686            });
1687        }
1688
1689        // Create table location path with hash-based naming
1690        // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance
1691        // Otherwise, use hash-based naming: {hash}_{object_id}
1692        let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1693            // Root table with directory listing enabled: use {table_name}.lance
1694            format!("{}.lance", table_name)
1695        } else {
1696            // Child namespace table or dir listing disabled: use hash-based naming
1697            Self::generate_dir_name(&object_id)
1698        };
1699        let table_path = self.base_path.child(dir_name.as_str());
1700        let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1701
1702        // Validate location if provided
1703        if let Some(req_location) = &request.location {
1704            let req_location = req_location.trim_end_matches('/');
1705            if req_location != table_uri {
1706                return Err(Error::Namespace {
1707                    source: format!(
1708                        "Cannot declare table {} at location {}, must be at location {}",
1709                        table_name, req_location, table_uri
1710                    )
1711                    .into(),
1712                    location: location!(),
1713                });
1714            }
1715        }
1716
1717        // Create the .lance-reserved file to mark the table as existing
1718        let reserved_file_path = table_path.child(".lance-reserved");
1719
1720        self.object_store
1721            .create(&reserved_file_path)
1722            .await
1723            .map_err(|e| Error::Namespace {
1724                source: format!(
1725                    "Failed to create .lance-reserved file for table {}: {}",
1726                    table_name, e
1727                )
1728                .into(),
1729                location: location!(),
1730            })?
1731            .shutdown()
1732            .await
1733            .map_err(|e| Error::Namespace {
1734                source: format!(
1735                    "Failed to finalize .lance-reserved file for table {}: {}",
1736                    table_name, e
1737                )
1738                .into(),
1739                location: location!(),
1740            })?;
1741
1742        // Add entry to manifest marking this as a declared table (store dir_name, not full path)
1743        self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1744            .await?;
1745
1746        log::info!(
1747            "Declared table '{}' in manifest at {}",
1748            table_name,
1749            table_uri
1750        );
1751
1752        // For backwards compatibility, only skip vending credentials when explicitly set to false
1753        let vend_credentials = request.vend_credentials.unwrap_or(true);
1754        let storage_options = if vend_credentials {
1755            self.storage_options.clone()
1756        } else {
1757            None
1758        };
1759
1760        Ok(DeclareTableResponse {
1761            location: Some(table_uri),
1762            storage_options,
1763            ..Default::default()
1764        })
1765    }
1766
1767    async fn register_table(&self, request: RegisterTableRequest) -> Result<RegisterTableResponse> {
1768        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1769            source: "Table ID is required".into(),
1770            location: location!(),
1771        })?;
1772
1773        if table_id.is_empty() {
1774            return Err(Error::InvalidInput {
1775                source: "Table ID cannot be empty".into(),
1776                location: location!(),
1777            });
1778        }
1779
1780        let location = request.location.clone();
1781
1782        // Validate that location is a relative path within the root directory
1783        // We don't allow absolute URIs or paths that escape the root
1784        if location.contains("://") {
1785            return Err(Error::InvalidInput {
1786                source: format!(
1787                    "Absolute URIs are not allowed for register_table. Location must be a relative path within the root directory: {}",
1788                    location
1789                ).into(),
1790                location: location!(),
1791            });
1792        }
1793
1794        if location.starts_with('/') {
1795            return Err(Error::InvalidInput {
1796                source: format!(
1797                    "Absolute paths are not allowed for register_table. Location must be a relative path within the root directory: {}",
1798                    location
1799                ).into(),
1800                location: location!(),
1801            });
1802        }
1803
1804        // Check for path traversal attempts
1805        if location.contains("..") {
1806            return Err(Error::InvalidInput {
1807                source: format!(
1808                    "Path traversal is not allowed. Location must be a relative path within the root directory: {}",
1809                    location
1810                ).into(),
1811                location: location!(),
1812            });
1813        }
1814
1815        let (namespace, table_name) = Self::split_object_id(table_id);
1816        let object_id = Self::build_object_id(&namespace, &table_name);
1817
1818        // Validate that parent namespaces exist (if not root)
1819        if !namespace.is_empty() {
1820            self.validate_namespace_levels_exist(&namespace).await?;
1821        }
1822
1823        // Check if table already exists
1824        if self.manifest_contains_object(&object_id).await? {
1825            return Err(Error::Namespace {
1826                source: format!("Table '{}' already exists", object_id).into(),
1827                location: location!(),
1828            });
1829        }
1830
1831        // Register the table with its location in the manifest
1832        self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone()))
1833            .await?;
1834
1835        Ok(RegisterTableResponse {
1836            location: Some(location),
1837            ..Default::default()
1838        })
1839    }
1840
1841    async fn deregister_table(
1842        &self,
1843        request: DeregisterTableRequest,
1844    ) -> Result<DeregisterTableResponse> {
1845        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1846            source: "Table ID is required".into(),
1847            location: location!(),
1848        })?;
1849
1850        if table_id.is_empty() {
1851            return Err(Error::InvalidInput {
1852                source: "Table ID cannot be empty".into(),
1853                location: location!(),
1854            });
1855        }
1856
1857        let (namespace, table_name) = Self::split_object_id(table_id);
1858        let object_id = Self::build_object_id(&namespace, &table_name);
1859
1860        // Get table info before deleting
1861        let table_info = self.query_manifest_for_table(&object_id).await?;
1862
1863        let table_uri = match table_info {
1864            Some(info) => {
1865                // Delete from manifest only (leave physical data intact)
1866                self.delete_from_manifest(&object_id).await?;
1867                Self::construct_full_uri(&self.root, &info.location)?
1868            }
1869            None => {
1870                return Err(Error::Namespace {
1871                    source: format!("Table '{}' not found", object_id).into(),
1872                    location: location!(),
1873                });
1874            }
1875        };
1876
1877        Ok(DeregisterTableResponse {
1878            id: request.id.clone(),
1879            location: Some(table_uri),
1880            ..Default::default()
1881        })
1882    }
1883}
1884
1885#[cfg(test)]
1886mod tests {
1887    use crate::{DirectoryNamespaceBuilder, ManifestNamespace};
1888    use bytes::Bytes;
1889    use lance_core::utils::tempfile::TempStdDir;
1890    use lance_namespace::models::{
1891        CreateTableRequest, DescribeTableRequest, DropTableRequest, ListTablesRequest,
1892        TableExistsRequest,
1893    };
1894    use lance_namespace::LanceNamespace;
1895    use rstest::rstest;
1896
1897    fn create_test_ipc_data() -> Vec<u8> {
1898        use arrow::array::{Int32Array, StringArray};
1899        use arrow::datatypes::{DataType, Field, Schema};
1900        use arrow::ipc::writer::StreamWriter;
1901        use arrow::record_batch::RecordBatch;
1902        use std::sync::Arc;
1903
1904        let schema = Arc::new(Schema::new(vec![
1905            Field::new("id", DataType::Int32, false),
1906            Field::new("name", DataType::Utf8, false),
1907        ]));
1908
1909        let batch = RecordBatch::try_new(
1910            schema.clone(),
1911            vec![
1912                Arc::new(Int32Array::from(vec![1, 2, 3])),
1913                Arc::new(StringArray::from(vec!["a", "b", "c"])),
1914            ],
1915        )
1916        .unwrap();
1917
1918        let mut buffer = Vec::new();
1919        {
1920            let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap();
1921            writer.write(&batch).unwrap();
1922            writer.finish().unwrap();
1923        }
1924        buffer
1925    }
1926
1927    #[rstest]
1928    #[case::with_optimization(true)]
1929    #[case::without_optimization(false)]
1930    #[tokio::test]
1931    async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) {
1932        let temp_dir = TempStdDir::default();
1933        let temp_path = temp_dir.to_str().unwrap();
1934
1935        // Create a DirectoryNamespace with manifest enabled (default)
1936        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1937            .inline_optimization_enabled(inline_optimization)
1938            .build()
1939            .await
1940            .unwrap();
1941
1942        // Verify we can list tables (should be empty)
1943        let mut request = ListTablesRequest::new();
1944        request.id = Some(vec![]);
1945        let response = dir_namespace.list_tables(request).await.unwrap();
1946        assert_eq!(response.tables.len(), 0);
1947
1948        // Create a test table
1949        let buffer = create_test_ipc_data();
1950        let mut create_request = CreateTableRequest::new();
1951        create_request.id = Some(vec!["test_table".to_string()]);
1952
1953        let _response = dir_namespace
1954            .create_table(create_request, Bytes::from(buffer))
1955            .await
1956            .unwrap();
1957
1958        // List tables again - should see our new table
1959        let mut request = ListTablesRequest::new();
1960        request.id = Some(vec![]);
1961        let response = dir_namespace.list_tables(request).await.unwrap();
1962        assert_eq!(response.tables.len(), 1);
1963        assert_eq!(response.tables[0], "test_table");
1964    }
1965
1966    #[rstest]
1967    #[case::with_optimization(true)]
1968    #[case::without_optimization(false)]
1969    #[tokio::test]
1970    async fn test_manifest_namespace_table_exists(#[case] inline_optimization: bool) {
1971        let temp_dir = TempStdDir::default();
1972        let temp_path = temp_dir.to_str().unwrap();
1973
1974        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1975            .inline_optimization_enabled(inline_optimization)
1976            .build()
1977            .await
1978            .unwrap();
1979
1980        // Check non-existent table
1981        let mut request = TableExistsRequest::new();
1982        request.id = Some(vec!["nonexistent".to_string()]);
1983        let result = dir_namespace.table_exists(request).await;
1984        assert!(result.is_err());
1985
1986        // Create table
1987        let buffer = create_test_ipc_data();
1988        let mut create_request = CreateTableRequest::new();
1989        create_request.id = Some(vec!["test_table".to_string()]);
1990        dir_namespace
1991            .create_table(create_request, Bytes::from(buffer))
1992            .await
1993            .unwrap();
1994
1995        // Check existing table
1996        let mut request = TableExistsRequest::new();
1997        request.id = Some(vec!["test_table".to_string()]);
1998        let result = dir_namespace.table_exists(request).await;
1999        assert!(result.is_ok());
2000    }
2001
2002    #[rstest]
2003    #[case::with_optimization(true)]
2004    #[case::without_optimization(false)]
2005    #[tokio::test]
2006    async fn test_manifest_namespace_describe_table(#[case] inline_optimization: bool) {
2007        let temp_dir = TempStdDir::default();
2008        let temp_path = temp_dir.to_str().unwrap();
2009
2010        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2011            .inline_optimization_enabled(inline_optimization)
2012            .build()
2013            .await
2014            .unwrap();
2015
2016        // Describe non-existent table
2017        let mut request = DescribeTableRequest::new();
2018        request.id = Some(vec!["nonexistent".to_string()]);
2019        let result = dir_namespace.describe_table(request).await;
2020        assert!(result.is_err());
2021
2022        // Create table
2023        let buffer = create_test_ipc_data();
2024        let mut create_request = CreateTableRequest::new();
2025        create_request.id = Some(vec!["test_table".to_string()]);
2026        dir_namespace
2027            .create_table(create_request, Bytes::from(buffer))
2028            .await
2029            .unwrap();
2030
2031        // Describe existing table
2032        let mut request = DescribeTableRequest::new();
2033        request.id = Some(vec!["test_table".to_string()]);
2034        let response = dir_namespace.describe_table(request).await.unwrap();
2035        assert!(response.location.is_some());
2036        assert!(response.location.unwrap().contains("test_table"));
2037    }
2038
2039    #[rstest]
2040    #[case::with_optimization(true)]
2041    #[case::without_optimization(false)]
2042    #[tokio::test]
2043    async fn test_manifest_namespace_drop_table(#[case] inline_optimization: bool) {
2044        let temp_dir = TempStdDir::default();
2045        let temp_path = temp_dir.to_str().unwrap();
2046
2047        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2048            .inline_optimization_enabled(inline_optimization)
2049            .build()
2050            .await
2051            .unwrap();
2052
2053        // Create table
2054        let buffer = create_test_ipc_data();
2055        let mut create_request = CreateTableRequest::new();
2056        create_request.id = Some(vec!["test_table".to_string()]);
2057        dir_namespace
2058            .create_table(create_request, Bytes::from(buffer))
2059            .await
2060            .unwrap();
2061
2062        // Verify table exists
2063        let mut request = ListTablesRequest::new();
2064        request.id = Some(vec![]);
2065        let response = dir_namespace.list_tables(request).await.unwrap();
2066        assert_eq!(response.tables.len(), 1);
2067
2068        // Drop table
2069        let mut drop_request = DropTableRequest::new();
2070        drop_request.id = Some(vec!["test_table".to_string()]);
2071        let _response = dir_namespace.drop_table(drop_request).await.unwrap();
2072
2073        // Verify table is gone
2074        let mut request = ListTablesRequest::new();
2075        request.id = Some(vec![]);
2076        let response = dir_namespace.list_tables(request).await.unwrap();
2077        assert_eq!(response.tables.len(), 0);
2078    }
2079
2080    #[rstest]
2081    #[case::with_optimization(true)]
2082    #[case::without_optimization(false)]
2083    #[tokio::test]
2084    async fn test_manifest_namespace_multiple_tables(#[case] inline_optimization: bool) {
2085        let temp_dir = TempStdDir::default();
2086        let temp_path = temp_dir.to_str().unwrap();
2087
2088        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2089            .inline_optimization_enabled(inline_optimization)
2090            .build()
2091            .await
2092            .unwrap();
2093
2094        // Create multiple tables
2095        let buffer = create_test_ipc_data();
2096        for i in 1..=3 {
2097            let mut create_request = CreateTableRequest::new();
2098            create_request.id = Some(vec![format!("table{}", i)]);
2099            dir_namespace
2100                .create_table(create_request, Bytes::from(buffer.clone()))
2101                .await
2102                .unwrap();
2103        }
2104
2105        // List all tables
2106        let mut request = ListTablesRequest::new();
2107        request.id = Some(vec![]);
2108        let response = dir_namespace.list_tables(request).await.unwrap();
2109        assert_eq!(response.tables.len(), 3);
2110        assert!(response.tables.contains(&"table1".to_string()));
2111        assert!(response.tables.contains(&"table2".to_string()));
2112        assert!(response.tables.contains(&"table3".to_string()));
2113    }
2114
2115    #[rstest]
2116    #[case::with_optimization(true)]
2117    #[case::without_optimization(false)]
2118    #[tokio::test]
2119    async fn test_directory_only_mode(#[case] inline_optimization: bool) {
2120        let temp_dir = TempStdDir::default();
2121        let temp_path = temp_dir.to_str().unwrap();
2122
2123        // Create a DirectoryNamespace with manifest disabled
2124        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2125            .manifest_enabled(false)
2126            .inline_optimization_enabled(inline_optimization)
2127            .build()
2128            .await
2129            .unwrap();
2130
2131        // Verify we can list tables (should be empty)
2132        let mut request = ListTablesRequest::new();
2133        request.id = Some(vec![]);
2134        let response = dir_namespace.list_tables(request).await.unwrap();
2135        assert_eq!(response.tables.len(), 0);
2136
2137        // Create a test table
2138        let buffer = create_test_ipc_data();
2139        let mut create_request = CreateTableRequest::new();
2140        create_request.id = Some(vec!["test_table".to_string()]);
2141
2142        // Create table - this should use directory-only mode
2143        let _response = dir_namespace
2144            .create_table(create_request, Bytes::from(buffer))
2145            .await
2146            .unwrap();
2147
2148        // List tables - should see our new table
2149        let mut request = ListTablesRequest::new();
2150        request.id = Some(vec![]);
2151        let response = dir_namespace.list_tables(request).await.unwrap();
2152        assert_eq!(response.tables.len(), 1);
2153        assert_eq!(response.tables[0], "test_table");
2154    }
2155
2156    #[rstest]
2157    #[case::with_optimization(true)]
2158    #[case::without_optimization(false)]
2159    #[tokio::test]
2160    async fn test_dual_mode_merge(#[case] inline_optimization: bool) {
2161        let temp_dir = TempStdDir::default();
2162        let temp_path = temp_dir.to_str().unwrap();
2163
2164        // Create a DirectoryNamespace with both manifest and directory enabled
2165        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2166            .manifest_enabled(true)
2167            .dir_listing_enabled(true)
2168            .inline_optimization_enabled(inline_optimization)
2169            .build()
2170            .await
2171            .unwrap();
2172
2173        // Create tables through manifest
2174        let buffer = create_test_ipc_data();
2175        let mut create_request = CreateTableRequest::new();
2176        create_request.id = Some(vec!["table1".to_string()]);
2177        dir_namespace
2178            .create_table(create_request, Bytes::from(buffer))
2179            .await
2180            .unwrap();
2181
2182        // List tables - should see table from both manifest and directory
2183        let mut request = ListTablesRequest::new();
2184        request.id = Some(vec![]);
2185        let response = dir_namespace.list_tables(request).await.unwrap();
2186        assert_eq!(response.tables.len(), 1);
2187        assert_eq!(response.tables[0], "table1");
2188    }
2189
2190    #[rstest]
2191    #[case::with_optimization(true)]
2192    #[case::without_optimization(false)]
2193    #[tokio::test]
2194    async fn test_manifest_only_mode(#[case] inline_optimization: bool) {
2195        let temp_dir = TempStdDir::default();
2196        let temp_path = temp_dir.to_str().unwrap();
2197
2198        // Create a DirectoryNamespace with only manifest enabled
2199        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2200            .manifest_enabled(true)
2201            .dir_listing_enabled(false)
2202            .inline_optimization_enabled(inline_optimization)
2203            .build()
2204            .await
2205            .unwrap();
2206
2207        // Create table
2208        let buffer = create_test_ipc_data();
2209        let mut create_request = CreateTableRequest::new();
2210        create_request.id = Some(vec!["test_table".to_string()]);
2211        dir_namespace
2212            .create_table(create_request, Bytes::from(buffer))
2213            .await
2214            .unwrap();
2215
2216        // List tables - should only use manifest
2217        let mut request = ListTablesRequest::new();
2218        request.id = Some(vec![]);
2219        let response = dir_namespace.list_tables(request).await.unwrap();
2220        assert_eq!(response.tables.len(), 1);
2221        assert_eq!(response.tables[0], "test_table");
2222    }
2223
2224    #[rstest]
2225    #[case::with_optimization(true)]
2226    #[case::without_optimization(false)]
2227    #[tokio::test]
2228    async fn test_drop_nonexistent_table(#[case] inline_optimization: bool) {
2229        let temp_dir = TempStdDir::default();
2230        let temp_path = temp_dir.to_str().unwrap();
2231
2232        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2233            .inline_optimization_enabled(inline_optimization)
2234            .build()
2235            .await
2236            .unwrap();
2237
2238        // Try to drop non-existent table
2239        let mut drop_request = DropTableRequest::new();
2240        drop_request.id = Some(vec!["nonexistent".to_string()]);
2241        let result = dir_namespace.drop_table(drop_request).await;
2242        assert!(result.is_err());
2243    }
2244
2245    #[rstest]
2246    #[case::with_optimization(true)]
2247    #[case::without_optimization(false)]
2248    #[tokio::test]
2249    async fn test_create_duplicate_table_fails(#[case] inline_optimization: bool) {
2250        let temp_dir = TempStdDir::default();
2251        let temp_path = temp_dir.to_str().unwrap();
2252
2253        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2254            .inline_optimization_enabled(inline_optimization)
2255            .build()
2256            .await
2257            .unwrap();
2258
2259        // Create table
2260        let buffer = create_test_ipc_data();
2261        let mut create_request = CreateTableRequest::new();
2262        create_request.id = Some(vec!["test_table".to_string()]);
2263        dir_namespace
2264            .create_table(create_request, Bytes::from(buffer.clone()))
2265            .await
2266            .unwrap();
2267
2268        // Try to create table with same name - should fail
2269        let mut create_request = CreateTableRequest::new();
2270        create_request.id = Some(vec!["test_table".to_string()]);
2271        let result = dir_namespace
2272            .create_table(create_request, Bytes::from(buffer))
2273            .await;
2274        assert!(result.is_err());
2275    }
2276
2277    #[rstest]
2278    #[case::with_optimization(true)]
2279    #[case::without_optimization(false)]
2280    #[tokio::test]
2281    async fn test_create_child_namespace(#[case] inline_optimization: bool) {
2282        use lance_namespace::models::{
2283            CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2284        };
2285
2286        let temp_dir = TempStdDir::default();
2287        let temp_path = temp_dir.to_str().unwrap();
2288
2289        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2290            .inline_optimization_enabled(inline_optimization)
2291            .build()
2292            .await
2293            .unwrap();
2294
2295        // Create a child namespace
2296        let mut create_req = CreateNamespaceRequest::new();
2297        create_req.id = Some(vec!["ns1".to_string()]);
2298        let result = dir_namespace.create_namespace(create_req).await;
2299        assert!(
2300            result.is_ok(),
2301            "Failed to create child namespace: {:?}",
2302            result.err()
2303        );
2304
2305        // Verify namespace exists
2306        let exists_req = NamespaceExistsRequest {
2307            id: Some(vec!["ns1".to_string()]),
2308            ..Default::default()
2309        };
2310        let result = dir_namespace.namespace_exists(exists_req).await;
2311        assert!(result.is_ok(), "Namespace should exist");
2312
2313        // List child namespaces of root
2314        let list_req = ListNamespacesRequest {
2315            id: Some(vec![]),
2316            page_token: None,
2317            limit: None,
2318            ..Default::default()
2319        };
2320        let result = dir_namespace.list_namespaces(list_req).await;
2321        assert!(result.is_ok());
2322        let namespaces = result.unwrap();
2323        assert_eq!(namespaces.namespaces.len(), 1);
2324        assert_eq!(namespaces.namespaces[0], "ns1");
2325    }
2326
2327    #[rstest]
2328    #[case::with_optimization(true)]
2329    #[case::without_optimization(false)]
2330    #[tokio::test]
2331    async fn test_create_nested_namespace(#[case] inline_optimization: bool) {
2332        use lance_namespace::models::{
2333            CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2334        };
2335
2336        let temp_dir = TempStdDir::default();
2337        let temp_path = temp_dir.to_str().unwrap();
2338
2339        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2340            .inline_optimization_enabled(inline_optimization)
2341            .build()
2342            .await
2343            .unwrap();
2344
2345        // Create parent namespace
2346        let mut create_req = CreateNamespaceRequest::new();
2347        create_req.id = Some(vec!["parent".to_string()]);
2348        dir_namespace.create_namespace(create_req).await.unwrap();
2349
2350        // Create nested child namespace
2351        let mut create_req = CreateNamespaceRequest::new();
2352        create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2353        let result = dir_namespace.create_namespace(create_req).await;
2354        assert!(
2355            result.is_ok(),
2356            "Failed to create nested namespace: {:?}",
2357            result.err()
2358        );
2359
2360        // Verify nested namespace exists
2361        let exists_req = NamespaceExistsRequest {
2362            id: Some(vec!["parent".to_string(), "child".to_string()]),
2363            ..Default::default()
2364        };
2365        let result = dir_namespace.namespace_exists(exists_req).await;
2366        assert!(result.is_ok(), "Nested namespace should exist");
2367
2368        // List child namespaces of parent
2369        let list_req = ListNamespacesRequest {
2370            id: Some(vec!["parent".to_string()]),
2371            page_token: None,
2372            limit: None,
2373            ..Default::default()
2374        };
2375        let result = dir_namespace.list_namespaces(list_req).await;
2376        assert!(result.is_ok());
2377        let namespaces = result.unwrap();
2378        assert_eq!(namespaces.namespaces.len(), 1);
2379        assert_eq!(namespaces.namespaces[0], "child");
2380    }
2381
2382    #[rstest]
2383    #[case::with_optimization(true)]
2384    #[case::without_optimization(false)]
2385    #[tokio::test]
2386    async fn test_create_namespace_without_parent_fails(#[case] inline_optimization: bool) {
2387        use lance_namespace::models::CreateNamespaceRequest;
2388
2389        let temp_dir = TempStdDir::default();
2390        let temp_path = temp_dir.to_str().unwrap();
2391
2392        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2393            .inline_optimization_enabled(inline_optimization)
2394            .build()
2395            .await
2396            .unwrap();
2397
2398        // Try to create nested namespace without parent
2399        let mut create_req = CreateNamespaceRequest::new();
2400        create_req.id = Some(vec!["nonexistent_parent".to_string(), "child".to_string()]);
2401        let result = dir_namespace.create_namespace(create_req).await;
2402        assert!(result.is_err(), "Should fail when parent doesn't exist");
2403    }
2404
2405    #[rstest]
2406    #[case::with_optimization(true)]
2407    #[case::without_optimization(false)]
2408    #[tokio::test]
2409    async fn test_drop_child_namespace(#[case] inline_optimization: bool) {
2410        use lance_namespace::models::{
2411            CreateNamespaceRequest, DropNamespaceRequest, NamespaceExistsRequest,
2412        };
2413
2414        let temp_dir = TempStdDir::default();
2415        let temp_path = temp_dir.to_str().unwrap();
2416
2417        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2418            .inline_optimization_enabled(inline_optimization)
2419            .build()
2420            .await
2421            .unwrap();
2422
2423        // Create a child namespace
2424        let mut create_req = CreateNamespaceRequest::new();
2425        create_req.id = Some(vec!["ns1".to_string()]);
2426        dir_namespace.create_namespace(create_req).await.unwrap();
2427
2428        // Drop the namespace
2429        let mut drop_req = DropNamespaceRequest::new();
2430        drop_req.id = Some(vec!["ns1".to_string()]);
2431        let result = dir_namespace.drop_namespace(drop_req).await;
2432        assert!(
2433            result.is_ok(),
2434            "Failed to drop namespace: {:?}",
2435            result.err()
2436        );
2437
2438        // Verify namespace no longer exists
2439        let exists_req = NamespaceExistsRequest {
2440            id: Some(vec!["ns1".to_string()]),
2441            ..Default::default()
2442        };
2443        let result = dir_namespace.namespace_exists(exists_req).await;
2444        assert!(result.is_err(), "Namespace should not exist after drop");
2445    }
2446
2447    #[rstest]
2448    #[case::with_optimization(true)]
2449    #[case::without_optimization(false)]
2450    #[tokio::test]
2451    async fn test_drop_namespace_with_children_fails(#[case] inline_optimization: bool) {
2452        use lance_namespace::models::{CreateNamespaceRequest, DropNamespaceRequest};
2453
2454        let temp_dir = TempStdDir::default();
2455        let temp_path = temp_dir.to_str().unwrap();
2456
2457        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2458            .inline_optimization_enabled(inline_optimization)
2459            .build()
2460            .await
2461            .unwrap();
2462
2463        // Create parent and child namespaces
2464        let mut create_req = CreateNamespaceRequest::new();
2465        create_req.id = Some(vec!["parent".to_string()]);
2466        dir_namespace.create_namespace(create_req).await.unwrap();
2467
2468        let mut create_req = CreateNamespaceRequest::new();
2469        create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2470        dir_namespace.create_namespace(create_req).await.unwrap();
2471
2472        // Try to drop parent namespace - should fail because it has children
2473        let mut drop_req = DropNamespaceRequest::new();
2474        drop_req.id = Some(vec!["parent".to_string()]);
2475        let result = dir_namespace.drop_namespace(drop_req).await;
2476        assert!(result.is_err(), "Should fail when namespace has children");
2477    }
2478
2479    #[rstest]
2480    #[case::with_optimization(true)]
2481    #[case::without_optimization(false)]
2482    #[tokio::test]
2483    async fn test_create_table_in_child_namespace(#[case] inline_optimization: bool) {
2484        use lance_namespace::models::{
2485            CreateNamespaceRequest, CreateTableRequest, ListTablesRequest,
2486        };
2487
2488        let temp_dir = TempStdDir::default();
2489        let temp_path = temp_dir.to_str().unwrap();
2490
2491        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2492            .inline_optimization_enabled(inline_optimization)
2493            .build()
2494            .await
2495            .unwrap();
2496
2497        // Create a child namespace
2498        let mut create_ns_req = CreateNamespaceRequest::new();
2499        create_ns_req.id = Some(vec!["ns1".to_string()]);
2500        dir_namespace.create_namespace(create_ns_req).await.unwrap();
2501
2502        // Create a table in the child namespace
2503        let buffer = create_test_ipc_data();
2504        let mut create_table_req = CreateTableRequest::new();
2505        create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2506        let result = dir_namespace
2507            .create_table(create_table_req, Bytes::from(buffer))
2508            .await;
2509        assert!(
2510            result.is_ok(),
2511            "Failed to create table in child namespace: {:?}",
2512            result.err()
2513        );
2514
2515        // List tables in the namespace
2516        let list_req = ListTablesRequest {
2517            id: Some(vec!["ns1".to_string()]),
2518            page_token: None,
2519            limit: None,
2520            ..Default::default()
2521        };
2522        let result = dir_namespace.list_tables(list_req).await;
2523        assert!(result.is_ok());
2524        let tables = result.unwrap();
2525        assert_eq!(tables.tables.len(), 1);
2526        assert_eq!(tables.tables[0], "table1");
2527    }
2528
2529    #[rstest]
2530    #[case::with_optimization(true)]
2531    #[case::without_optimization(false)]
2532    #[tokio::test]
2533    async fn test_describe_child_namespace(#[case] inline_optimization: bool) {
2534        use lance_namespace::models::{CreateNamespaceRequest, DescribeNamespaceRequest};
2535
2536        let temp_dir = TempStdDir::default();
2537        let temp_path = temp_dir.to_str().unwrap();
2538
2539        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2540            .inline_optimization_enabled(inline_optimization)
2541            .build()
2542            .await
2543            .unwrap();
2544
2545        // Create a child namespace with properties
2546        let mut properties = std::collections::HashMap::new();
2547        properties.insert("key1".to_string(), "value1".to_string());
2548
2549        let mut create_req = CreateNamespaceRequest::new();
2550        create_req.id = Some(vec!["ns1".to_string()]);
2551        create_req.properties = Some(properties.clone());
2552        dir_namespace.create_namespace(create_req).await.unwrap();
2553
2554        // Describe the namespace
2555        let describe_req = DescribeNamespaceRequest {
2556            id: Some(vec!["ns1".to_string()]),
2557            ..Default::default()
2558        };
2559        let result = dir_namespace.describe_namespace(describe_req).await;
2560        assert!(
2561            result.is_ok(),
2562            "Failed to describe namespace: {:?}",
2563            result.err()
2564        );
2565        let response = result.unwrap();
2566        assert!(response.properties.is_some());
2567        assert_eq!(
2568            response.properties.unwrap().get("key1"),
2569            Some(&"value1".to_string())
2570        );
2571    }
2572
2573    #[test]
2574    fn test_construct_full_uri_with_cloud_urls() {
2575        // Test S3-style URL with nested path (no trailing slash)
2576        let s3_result =
2577            ManifestNamespace::construct_full_uri("s3://bucket/path/subdir", "table.lance")
2578                .unwrap();
2579        assert_eq!(
2580            s3_result, "s3://bucket/path/subdir/table.lance",
2581            "S3 URL should correctly append table name to nested path"
2582        );
2583
2584        // Test Azure-style URL with nested path (no trailing slash)
2585        let az_result =
2586            ManifestNamespace::construct_full_uri("az://container/path/subdir", "table.lance")
2587                .unwrap();
2588        assert_eq!(
2589            az_result, "az://container/path/subdir/table.lance",
2590            "Azure URL should correctly append table name to nested path"
2591        );
2592
2593        // Test GCS-style URL with nested path (no trailing slash)
2594        let gs_result =
2595            ManifestNamespace::construct_full_uri("gs://bucket/path/subdir", "table.lance")
2596                .unwrap();
2597        assert_eq!(
2598            gs_result, "gs://bucket/path/subdir/table.lance",
2599            "GCS URL should correctly append table name to nested path"
2600        );
2601
2602        // Test with deeper nesting
2603        let deep_result =
2604            ManifestNamespace::construct_full_uri("s3://bucket/a/b/c/d", "my_table.lance").unwrap();
2605        assert_eq!(
2606            deep_result, "s3://bucket/a/b/c/d/my_table.lance",
2607            "Deeply nested path should work correctly"
2608        );
2609
2610        // Test with root-level path (single segment after bucket)
2611        let shallow_result =
2612            ManifestNamespace::construct_full_uri("s3://bucket", "table.lance").unwrap();
2613        assert_eq!(
2614            shallow_result, "s3://bucket/table.lance",
2615            "Single-level nested path should work correctly"
2616        );
2617
2618        // Test that URLs with trailing slash already work (no regression)
2619        let trailing_slash_result =
2620            ManifestNamespace::construct_full_uri("s3://bucket/path/subdir/", "table.lance")
2621                .unwrap();
2622        assert_eq!(
2623            trailing_slash_result, "s3://bucket/path/subdir/table.lance",
2624            "URL with existing trailing slash should still work"
2625        );
2626    }
2627}