lance_namespace_impls/dir/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Manifest-based namespace implementation
5//!
6//! This module provides a namespace implementation that uses a manifest table
7//! to track tables and nested namespaces.
8
9use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray};
10use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
11use arrow_ipc::reader::StreamReader;
12use async_trait::async_trait;
13use bytes::Bytes;
14use futures::stream::StreamExt;
15use lance::dataset::optimize::{compact_files, CompactionOptions};
16use lance::dataset::{builder::DatasetBuilder, WriteParams};
17use lance::session::Session;
18use lance::{dataset::scanner::Scanner, Dataset};
19use lance_core::{box_error, Error, Result};
20use lance_index::optimize::OptimizeOptions;
21use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
22use lance_index::traits::DatasetIndexExt;
23use lance_index::IndexType;
24use lance_io::object_store::{ObjectStore, ObjectStoreParams};
25use lance_namespace::models::{
26    CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
27    CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DeregisterTableRequest,
28    DeregisterTableResponse, DescribeNamespaceRequest, DescribeNamespaceResponse,
29    DescribeTableRequest, DescribeTableResponse, DropNamespaceRequest, DropNamespaceResponse,
30    DropTableRequest, DropTableResponse, ListNamespacesRequest, ListNamespacesResponse,
31    ListTablesRequest, ListTablesResponse, NamespaceExistsRequest, RegisterTableRequest,
32    RegisterTableResponse, TableExistsRequest,
33};
34use lance_namespace::schema::arrow_schema_to_json;
35use lance_namespace::LanceNamespace;
36use object_store::path::Path;
37use snafu::location;
38use std::io::Cursor;
39use std::{
40    collections::HashMap,
41    hash::{DefaultHasher, Hash, Hasher},
42    ops::{Deref, DerefMut},
43    sync::Arc,
44};
45use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
46
47const MANIFEST_TABLE_NAME: &str = "__manifest";
48const DELIMITER: &str = "$";
49
50// Index names for the __manifest table
51/// BTREE index on the object_id column for fast lookups
52const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
53/// Bitmap index on the object_type column for filtering by type
54const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
55/// LabelList index on the base_objects column for view dependencies
56const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
57
58/// Object types that can be stored in the manifest
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum ObjectType {
61    Namespace,
62    Table,
63}
64
65impl ObjectType {
66    pub fn as_str(&self) -> &str {
67        match self {
68            Self::Namespace => "namespace",
69            Self::Table => "table",
70        }
71    }
72
73    pub fn parse(s: &str) -> Result<Self> {
74        match s {
75            "namespace" => Ok(Self::Namespace),
76            "table" => Ok(Self::Table),
77            _ => Err(Error::io(
78                format!("Invalid object type: {}", s),
79                location!(),
80            )),
81        }
82    }
83}
84
85/// Information about a table stored in the manifest
86#[derive(Debug, Clone)]
87pub struct TableInfo {
88    pub namespace: Vec<String>,
89    pub name: String,
90    pub location: String,
91}
92
93/// Information about a namespace stored in the manifest
94#[derive(Debug, Clone)]
95pub struct NamespaceInfo {
96    pub namespace: Vec<String>,
97    pub name: String,
98    pub metadata: Option<HashMap<String, String>>,
99}
100
101/// A wrapper around a Dataset that provides concurrent access.
102///
103/// This can be cloned cheaply. It supports concurrent reads or exclusive writes.
104/// The manifest dataset is always kept strongly consistent by reloading on each read.
105#[derive(Debug, Clone)]
106pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);
107
108impl DatasetConsistencyWrapper {
109    /// Create a new wrapper with the given dataset.
110    pub fn new(dataset: Dataset) -> Self {
111        Self(Arc::new(RwLock::new(dataset)))
112    }
113
114    /// Get an immutable reference to the dataset.
115    /// Always reloads to ensure strong consistency.
116    pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
117        self.reload().await?;
118        Ok(DatasetReadGuard {
119            guard: self.0.read().await,
120        })
121    }
122
123    /// Get a mutable reference to the dataset.
124    /// Always reloads to ensure strong consistency.
125    pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
126        self.reload().await?;
127        Ok(DatasetWriteGuard {
128            guard: self.0.write().await,
129        })
130    }
131
132    /// Provide a known latest version of the dataset.
133    ///
134    /// This is usually done after some write operation, which inherently will
135    /// have the latest version.
136    pub async fn set_latest(&self, dataset: Dataset) {
137        let mut write_guard = self.0.write().await;
138        if dataset.manifest().version > write_guard.manifest().version {
139            *write_guard = dataset;
140        }
141    }
142
143    /// Reload the dataset to the latest version.
144    async fn reload(&self) -> Result<()> {
145        // First check if we need to reload (with read lock)
146        let read_guard = self.0.read().await;
147        let dataset_uri = read_guard.uri().to_string();
148        let current_version = read_guard.version().version;
149        log::debug!(
150            "Reload starting for uri={}, current_version={}",
151            dataset_uri,
152            current_version
153        );
154        let latest_version = read_guard
155            .latest_version_id()
156            .await
157            .map_err(|e| Error::IO {
158                source: box_error(std::io::Error::other(format!(
159                    "Failed to get latest version: {}",
160                    e
161                ))),
162                location: location!(),
163            })?;
164        log::debug!(
165            "Reload got latest_version={} for uri={}, current_version={}",
166            latest_version,
167            dataset_uri,
168            current_version
169        );
170        drop(read_guard);
171
172        // If already up-to-date, return early
173        if latest_version == current_version {
174            log::debug!("Already up-to-date for uri={}", dataset_uri);
175            return Ok(());
176        }
177
178        // Need to reload, acquire write lock
179        let mut write_guard = self.0.write().await;
180
181        // Double-check after acquiring write lock (someone else might have reloaded)
182        let latest_version = write_guard
183            .latest_version_id()
184            .await
185            .map_err(|e| Error::IO {
186                source: box_error(std::io::Error::other(format!(
187                    "Failed to get latest version: {}",
188                    e
189                ))),
190                location: location!(),
191            })?;
192
193        if latest_version != write_guard.version().version {
194            write_guard.checkout_latest().await.map_err(|e| Error::IO {
195                source: box_error(std::io::Error::other(format!(
196                    "Failed to checkout latest: {}",
197                    e
198                ))),
199                location: location!(),
200            })?;
201        }
202
203        Ok(())
204    }
205}
206
207pub struct DatasetReadGuard<'a> {
208    guard: RwLockReadGuard<'a, Dataset>,
209}
210
211impl Deref for DatasetReadGuard<'_> {
212    type Target = Dataset;
213
214    fn deref(&self) -> &Self::Target {
215        &self.guard
216    }
217}
218
219pub struct DatasetWriteGuard<'a> {
220    guard: RwLockWriteGuard<'a, Dataset>,
221}
222
223impl Deref for DatasetWriteGuard<'_> {
224    type Target = Dataset;
225
226    fn deref(&self) -> &Self::Target {
227        &self.guard
228    }
229}
230
231impl DerefMut for DatasetWriteGuard<'_> {
232    fn deref_mut(&mut self) -> &mut Self::Target {
233        &mut self.guard
234    }
235}
236
237/// Manifest-based namespace implementation
238///
239/// Uses a special `__manifest` Lance table to track tables and nested namespaces.
240#[derive(Debug)]
241pub struct ManifestNamespace {
242    root: String,
243    storage_options: Option<HashMap<String, String>>,
244    #[allow(dead_code)]
245    session: Option<Arc<Session>>,
246    #[allow(dead_code)]
247    object_store: Arc<ObjectStore>,
248    #[allow(dead_code)]
249    base_path: Path,
250    manifest_dataset: DatasetConsistencyWrapper,
251    /// Whether directory listing is enabled in dual mode
252    /// If true, root namespace tables use {table_name}.lance naming
253    /// If false, they use namespace-prefixed names
254    dir_listing_enabled: bool,
255    /// Whether to perform inline optimization (compaction and indexing) on the __manifest table
256    /// after every write. Defaults to true.
257    inline_optimization_enabled: bool,
258}
259
260impl ManifestNamespace {
261    /// Create a new ManifestNamespace from an existing DirectoryNamespace
262    pub async fn from_directory(
263        root: String,
264        storage_options: Option<HashMap<String, String>>,
265        session: Option<Arc<Session>>,
266        object_store: Arc<ObjectStore>,
267        base_path: Path,
268        dir_listing_enabled: bool,
269        inline_optimization_enabled: bool,
270    ) -> Result<Self> {
271        let manifest_dataset =
272            Self::create_or_get_manifest(&root, &storage_options, session.clone()).await?;
273
274        Ok(Self {
275            root,
276            storage_options,
277            session,
278            object_store,
279            base_path,
280            manifest_dataset,
281            dir_listing_enabled,
282            inline_optimization_enabled,
283        })
284    }
285
286    /// Build object ID from namespace path and name
287    pub fn build_object_id(namespace: &[String], name: &str) -> String {
288        if namespace.is_empty() {
289            name.to_string()
290        } else {
291            let mut id = namespace.join(DELIMITER);
292            id.push_str(DELIMITER);
293            id.push_str(name);
294            id
295        }
296    }
297
298    /// Parse object ID into namespace path and name
299    pub fn parse_object_id(object_id: &str) -> (Vec<String>, String) {
300        let parts: Vec<&str> = object_id.split(DELIMITER).collect();
301        if parts.len() == 1 {
302            (Vec::new(), parts[0].to_string())
303        } else {
304            let namespace = parts[..parts.len() - 1]
305                .iter()
306                .map(|s| s.to_string())
307                .collect();
308            let name = parts[parts.len() - 1].to_string();
309            (namespace, name)
310        }
311    }
312
313    /// Split an object ID (table_id as vec of strings) into namespace and table name
314    fn split_object_id(table_id: &[String]) -> (Vec<String>, String) {
315        if table_id.len() == 1 {
316            (vec![], table_id[0].clone())
317        } else {
318            (
319                table_id[..table_id.len() - 1].to_vec(),
320                table_id[table_id.len() - 1].clone(),
321            )
322        }
323    }
324
325    /// Convert a table ID (vec of strings) to an object_id string
326    fn str_object_id(table_id: &[String]) -> String {
327        table_id.join(DELIMITER)
328    }
329
330    /// Generate a new directory name in format: <hash>_<object_id>
331    /// The hash is used to (1) optimize object store throughput,
332    /// (2) have high enough entropy in a short period of time to prevent issues like
333    /// failed table creation, delete and create new table of the same name, etc.
334    /// The object_id is added after the hash to ensure
335    /// dir name uniqueness and make debugging easier.
336    fn generate_dir_name(object_id: &str) -> String {
337        // Generate a random number for uniqueness
338        let random_num: u64 = rand::random();
339
340        // Create hash from random number + object_id
341        let mut hasher = DefaultHasher::new();
342        random_num.hash(&mut hasher);
343        object_id.hash(&mut hasher);
344        let hash = hasher.finish();
345
346        // Format as lowercase hex (8 characters - sufficient entropy for uniqueness)
347        format!("{:08x}_{}", (hash & 0xFFFFFFFF) as u32, object_id)
348    }
349
350    /// Construct a full URI from root and relative location
351    fn construct_full_uri(&self, relative_location: &str) -> Result<String> {
352        let base_url = lance_io::object_store::uri_to_url(&self.root)?;
353        let full_url = base_url
354            .join(relative_location)
355            .map_err(|e| Error::InvalidInput {
356                source: format!(
357                    "Failed to join URI '{}' with '{}': {}",
358                    self.root, relative_location, e
359                )
360                .into(),
361                location: location!(),
362            })?;
363
364        Ok(full_url.to_string())
365    }
366
367    /// Perform inline optimization on the __manifest table.
368    ///
369    /// This method:
370    /// 1. Creates three indexes on the manifest table:
371    ///    - BTREE index on object_id for fast lookups
372    ///    - Bitmap index on object_type for filtering by type
373    ///    - LabelList index on base_objects for view dependencies
374    /// 2. Runs file compaction to merge small files
375    /// 3. Optimizes existing indices
376    ///
377    /// This is called automatically after writes when inline_optimization_enabled is true.
378    async fn run_inline_optimization(&self) -> Result<()> {
379        if !self.inline_optimization_enabled {
380            return Ok(());
381        }
382
383        // Get a mutable reference to the dataset to perform optimization
384        let mut dataset_guard = self.manifest_dataset.get_mut().await?;
385        let dataset: &mut Dataset = &mut dataset_guard;
386
387        // Step 1: Create indexes if they don't already exist
388        let indices = dataset.load_indices().await?;
389
390        // Check which indexes already exist
391        let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
392        let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
393        let has_base_objects_index = indices
394            .iter()
395            .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);
396
397        // Create BTREE index on object_id
398        if !has_object_id_index {
399            log::debug!(
400                "Creating BTREE index '{}' on object_id for __manifest table",
401                OBJECT_ID_INDEX_NAME
402            );
403            let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
404            if let Err(e) = dataset
405                .create_index(
406                    &["object_id"],
407                    IndexType::BTree,
408                    Some(OBJECT_ID_INDEX_NAME.to_string()),
409                    &params,
410                    true,
411                )
412                .await
413            {
414                log::warn!("Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.", e);
415            } else {
416                log::info!(
417                    "Created BTREE index '{}' on object_id for __manifest table",
418                    OBJECT_ID_INDEX_NAME
419                );
420            }
421        }
422
423        // Create Bitmap index on object_type
424        if !has_object_type_index {
425            log::debug!(
426                "Creating Bitmap index '{}' on object_type for __manifest table",
427                OBJECT_TYPE_INDEX_NAME
428            );
429            let params = ScalarIndexParams::default();
430            if let Err(e) = dataset
431                .create_index(
432                    &["object_type"],
433                    IndexType::Bitmap,
434                    Some(OBJECT_TYPE_INDEX_NAME.to_string()),
435                    &params,
436                    true,
437                )
438                .await
439            {
440                log::warn!("Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.", e);
441            } else {
442                log::info!(
443                    "Created Bitmap index '{}' on object_type for __manifest table",
444                    OBJECT_TYPE_INDEX_NAME
445                );
446            }
447        }
448
449        // Create LabelList index on base_objects
450        if !has_base_objects_index {
451            log::debug!(
452                "Creating LabelList index '{}' on base_objects for __manifest table",
453                BASE_OBJECTS_INDEX_NAME
454            );
455            let params = ScalarIndexParams::default();
456            if let Err(e) = dataset
457                .create_index(
458                    &["base_objects"],
459                    IndexType::LabelList,
460                    Some(BASE_OBJECTS_INDEX_NAME.to_string()),
461                    &params,
462                    true,
463                )
464                .await
465            {
466                log::warn!("Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.", e);
467            } else {
468                log::info!(
469                    "Created LabelList index '{}' on base_objects for __manifest table",
470                    BASE_OBJECTS_INDEX_NAME
471                );
472            }
473        }
474
475        // Step 2: Run file compaction
476        log::debug!("Running file compaction on __manifest table");
477        match compact_files(dataset, CompactionOptions::default(), None).await {
478            Ok(compaction_metrics) => {
479                if compaction_metrics.fragments_removed > 0 {
480                    log::info!(
481                        "Compacted __manifest table: removed {} fragments, added {} fragments",
482                        compaction_metrics.fragments_removed,
483                        compaction_metrics.fragments_added
484                    );
485                }
486            }
487            Err(e) => {
488                log::warn!("Failed to compact files for __manifest table: {:?}. Continuing with optimization.", e);
489            }
490        }
491
492        // Step 3: Optimize indices
493        log::debug!("Optimizing indices on __manifest table");
494        match dataset.optimize_indices(&OptimizeOptions::default()).await {
495            Ok(_) => {
496                log::info!("Successfully optimized indices on __manifest table");
497            }
498            Err(e) => {
499                log::warn!(
500                    "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.",
501                    e
502                );
503            }
504        }
505
506        Ok(())
507    }
508
509    /// Get the manifest schema
510    fn manifest_schema() -> Arc<ArrowSchema> {
511        Arc::new(ArrowSchema::new(vec![
512            Field::new("object_id", DataType::Utf8, false),
513            Field::new("object_type", DataType::Utf8, false),
514            Field::new("location", DataType::Utf8, true),
515            Field::new("metadata", DataType::Utf8, true),
516            Field::new(
517                "base_objects",
518                DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))),
519                true,
520            ),
521        ]))
522    }
523
524    /// Get a scanner for the manifest dataset
525    async fn manifest_scanner(&self) -> Result<Scanner> {
526        let dataset_guard = self.manifest_dataset.get().await?;
527        Ok(dataset_guard.scan())
528    }
529
530    /// Helper to execute a scanner and collect results into a Vec
531    async fn execute_scanner(scanner: Scanner) -> Result<Vec<RecordBatch>> {
532        let mut stream = scanner.try_into_stream().await.map_err(|e| Error::IO {
533            source: box_error(std::io::Error::other(format!(
534                "Failed to create stream: {}",
535                e
536            ))),
537            location: location!(),
538        })?;
539
540        let mut batches = Vec::new();
541        while let Some(batch) = stream.next().await {
542            batches.push(batch.map_err(|e| Error::IO {
543                source: box_error(std::io::Error::other(format!(
544                    "Failed to read batch: {}",
545                    e
546                ))),
547                location: location!(),
548            })?);
549        }
550
551        Ok(batches)
552    }
553
554    /// Helper to get a string column from a record batch
555    fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> {
556        let column = batch
557            .column_by_name(column_name)
558            .ok_or_else(|| Error::io(format!("Column '{}' not found", column_name), location!()))?;
559        column
560            .as_any()
561            .downcast_ref::<StringArray>()
562            .ok_or_else(|| {
563                Error::io(
564                    format!("Column '{}' is not a string array", column_name),
565                    location!(),
566                )
567            })
568    }
569
570    /// Check if the manifest contains an object with the given ID
571    async fn manifest_contains_object(&self, object_id: &str) -> Result<bool> {
572        let filter = format!("object_id = '{}'", object_id);
573
574        let dataset_guard = self.manifest_dataset.get().await?;
575        let mut scanner = dataset_guard.scan();
576
577        scanner.filter(&filter).map_err(|e| Error::IO {
578            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
579            location: location!(),
580        })?;
581
582        // Project no columns and enable row IDs for count_rows to work
583        scanner.project::<&str>(&[]).map_err(|e| Error::IO {
584            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
585            location: location!(),
586        })?;
587
588        scanner.with_row_id();
589
590        let count = scanner.count_rows().await.map_err(|e| Error::IO {
591            source: box_error(std::io::Error::other(format!(
592                "Failed to count rows: {}",
593                e
594            ))),
595            location: location!(),
596        })?;
597
598        Ok(count > 0)
599    }
600
601    /// Query the manifest for a table with the given object ID
602    async fn query_manifest_for_table(&self, object_id: &str) -> Result<Option<TableInfo>> {
603        let filter = format!("object_id = '{}' AND object_type = 'table'", object_id);
604        let mut scanner = self.manifest_scanner().await?;
605        scanner.filter(&filter).map_err(|e| Error::IO {
606            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
607            location: location!(),
608        })?;
609        scanner
610            .project(&["object_id", "location"])
611            .map_err(|e| Error::IO {
612                source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
613                location: location!(),
614            })?;
615        let batches = Self::execute_scanner(scanner).await?;
616
617        let mut found_result: Option<TableInfo> = None;
618        let mut total_rows = 0;
619
620        for batch in batches {
621            if batch.num_rows() == 0 {
622                continue;
623            }
624
625            total_rows += batch.num_rows();
626            if total_rows > 1 {
627                return Err(Error::io(
628                    format!(
629                        "Expected exactly 1 table with id '{}', found {}",
630                        object_id, total_rows
631                    ),
632                    location!(),
633                ));
634            }
635
636            let object_id_array = Self::get_string_column(&batch, "object_id")?;
637            let location_array = Self::get_string_column(&batch, "location")?;
638            let location = location_array.value(0).to_string();
639            let (namespace, name) = Self::parse_object_id(object_id_array.value(0));
640            found_result = Some(TableInfo {
641                namespace,
642                name,
643                location,
644            });
645        }
646
647        Ok(found_result)
648    }
649
650    /// List all table locations in the manifest (for root namespace only)
651    /// Returns a set of table locations (e.g., "table_name.lance")
652    pub async fn list_manifest_table_locations(&self) -> Result<std::collections::HashSet<String>> {
653        let filter = "object_type = 'table' AND NOT contains(object_id, '$')";
654        let mut scanner = self.manifest_scanner().await?;
655        scanner.filter(filter).map_err(|e| Error::IO {
656            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
657            location: location!(),
658        })?;
659        scanner.project(&["location"]).map_err(|e| Error::IO {
660            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
661            location: location!(),
662        })?;
663
664        let batches = Self::execute_scanner(scanner).await?;
665        let mut locations = std::collections::HashSet::new();
666
667        for batch in batches {
668            if batch.num_rows() == 0 {
669                continue;
670            }
671            let location_array = Self::get_string_column(&batch, "location")?;
672            for i in 0..location_array.len() {
673                locations.insert(location_array.value(i).to_string());
674            }
675        }
676
677        Ok(locations)
678    }
679
680    /// Insert an entry into the manifest table
681    async fn insert_into_manifest(
682        &self,
683        object_id: String,
684        object_type: ObjectType,
685        location: Option<String>,
686    ) -> Result<()> {
687        self.insert_into_manifest_with_metadata(object_id, object_type, location, None, None)
688            .await
689    }
690
691    /// Insert an entry into the manifest table with metadata and base_objects
692    async fn insert_into_manifest_with_metadata(
693        &self,
694        object_id: String,
695        object_type: ObjectType,
696        location: Option<String>,
697        metadata: Option<String>,
698        base_objects: Option<Vec<String>>,
699    ) -> Result<()> {
700        use arrow::array::builder::{ListBuilder, StringBuilder};
701
702        let schema = Self::manifest_schema();
703
704        // Create base_objects array from the provided list
705        let string_builder = StringBuilder::new();
706        let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
707            "object_id",
708            DataType::Utf8,
709            true,
710        )));
711
712        match base_objects {
713            Some(objects) => {
714                for obj in objects {
715                    list_builder.values().append_value(obj);
716                }
717                list_builder.append(true);
718            }
719            None => {
720                list_builder.append_null();
721            }
722        }
723
724        let base_objects_array = list_builder.finish();
725
726        // Create arrays with optional values
727        let location_array = match location {
728            Some(loc) => Arc::new(StringArray::from(vec![Some(loc)])),
729            None => Arc::new(StringArray::from(vec![None::<String>])),
730        };
731
732        let metadata_array = match metadata {
733            Some(meta) => Arc::new(StringArray::from(vec![Some(meta)])),
734            None => Arc::new(StringArray::from(vec![None::<String>])),
735        };
736
737        let batch = RecordBatch::try_new(
738            schema.clone(),
739            vec![
740                Arc::new(StringArray::from(vec![object_id.as_str()])),
741                Arc::new(StringArray::from(vec![object_type.as_str()])),
742                location_array,
743                metadata_array,
744                Arc::new(base_objects_array),
745            ],
746        )
747        .map_err(|e| {
748            Error::io(
749                format!("Failed to create manifest entry: {}", e),
750                location!(),
751            )
752        })?;
753
754        let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
755
756        // Use MergeInsert to ensure uniqueness on object_id
757        let dataset_guard = self.manifest_dataset.get().await?;
758        let dataset_arc = Arc::new(dataset_guard.clone());
759        drop(dataset_guard); // Drop read guard before merge insert
760
761        let mut merge_builder =
762            lance::dataset::MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()])
763                .map_err(|e| Error::IO {
764                    source: box_error(std::io::Error::other(format!(
765                        "Failed to create merge builder: {}",
766                        e
767                    ))),
768                    location: location!(),
769                })?;
770
771        merge_builder.when_matched(lance::dataset::WhenMatched::Fail);
772        merge_builder.when_not_matched(lance::dataset::WhenNotMatched::InsertAll);
773
774        let (new_dataset_arc, _merge_stats) = merge_builder
775            .try_build()
776            .map_err(|e| Error::IO {
777                source: box_error(std::io::Error::other(format!(
778                    "Failed to build merge: {}",
779                    e
780                ))),
781                location: location!(),
782            })?
783            .execute_reader(Box::new(reader))
784            .await
785            .map_err(|e| {
786                // Check if this is a "matched row" error from WhenMatched::Fail
787                let error_msg = e.to_string();
788                if error_msg.contains("matched")
789                    || error_msg.contains("duplicate")
790                    || error_msg.contains("already exists")
791                {
792                    Error::io(
793                        format!("Object with id '{}' already exists in manifest", object_id),
794                        location!(),
795                    )
796                } else {
797                    Error::IO {
798                        source: box_error(std::io::Error::other(format!(
799                            "Failed to execute merge: {}",
800                            e
801                        ))),
802                        location: location!(),
803                    }
804                }
805            })?;
806
807        let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
808        self.manifest_dataset.set_latest(new_dataset).await;
809
810        // Run inline optimization after write
811        if let Err(e) = self.run_inline_optimization().await {
812            log::warn!(
813                "Unexpected failure when running inline optimization: {:?}",
814                e
815            );
816        }
817
818        Ok(())
819    }
820
821    /// Delete an entry from the manifest table
822    pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> {
823        {
824            let predicate = format!("object_id = '{}'", object_id);
825            let mut dataset_guard = self.manifest_dataset.get_mut().await?;
826            dataset_guard
827                .delete(&predicate)
828                .await
829                .map_err(|e| Error::IO {
830                    source: box_error(std::io::Error::other(format!("Failed to delete: {}", e))),
831                    location: location!(),
832                })?;
833        } // Drop the guard here
834
835        self.manifest_dataset.reload().await?;
836
837        // Run inline optimization after delete
838        if let Err(e) = self.run_inline_optimization().await {
839            log::warn!(
840                "Unexpected failure when running inline optimization: {:?}",
841                e
842            );
843        }
844
845        Ok(())
846    }
847
848    /// Register a table in the manifest without creating the physical table (internal helper for migration)
849    pub async fn register_table(&self, name: &str, location: String) -> Result<()> {
850        let object_id = Self::build_object_id(&[], name);
851        if self.manifest_contains_object(&object_id).await? {
852            return Err(Error::io(
853                format!("Table '{}' already exists", name),
854                location!(),
855            ));
856        }
857
858        self.insert_into_manifest(object_id, ObjectType::Table, Some(location))
859            .await
860    }
861
862    /// Validate that all levels of a namespace path exist
863    async fn validate_namespace_levels_exist(&self, namespace_path: &[String]) -> Result<()> {
864        for i in 1..=namespace_path.len() {
865            let partial_path = &namespace_path[..i];
866            let object_id = partial_path.join(DELIMITER);
867            if !self.manifest_contains_object(&object_id).await? {
868                return Err(Error::Namespace {
869                    source: format!("Parent namespace '{}' does not exist", object_id).into(),
870                    location: location!(),
871                });
872            }
873        }
874        Ok(())
875    }
876
877    /// Query the manifest for a namespace with the given object ID
878    async fn query_manifest_for_namespace(&self, object_id: &str) -> Result<Option<NamespaceInfo>> {
879        let filter = format!("object_id = '{}' AND object_type = 'namespace'", object_id);
880        let mut scanner = self.manifest_scanner().await?;
881        scanner.filter(&filter).map_err(|e| Error::IO {
882            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
883            location: location!(),
884        })?;
885        scanner
886            .project(&["object_id", "metadata"])
887            .map_err(|e| Error::IO {
888                source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
889                location: location!(),
890            })?;
891        let batches = Self::execute_scanner(scanner).await?;
892
893        let mut found_result: Option<NamespaceInfo> = None;
894        let mut total_rows = 0;
895
896        for batch in batches {
897            if batch.num_rows() == 0 {
898                continue;
899            }
900
901            total_rows += batch.num_rows();
902            if total_rows > 1 {
903                return Err(Error::io(
904                    format!(
905                        "Expected exactly 1 namespace with id '{}', found {}",
906                        object_id, total_rows
907                    ),
908                    location!(),
909                ));
910            }
911
912            let object_id_array = Self::get_string_column(&batch, "object_id")?;
913            let metadata_array = Self::get_string_column(&batch, "metadata")?;
914
915            let object_id_str = object_id_array.value(0);
916            let metadata = if !metadata_array.is_null(0) {
917                let metadata_str = metadata_array.value(0);
918                match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
919                    Ok(map) => Some(map),
920                    Err(e) => {
921                        return Err(Error::io(
922                            format!(
923                                "Failed to deserialize metadata for namespace '{}': {}",
924                                object_id, e
925                            ),
926                            location!(),
927                        ));
928                    }
929                }
930            } else {
931                None
932            };
933
934            let (namespace, name) = Self::parse_object_id(object_id_str);
935            found_result = Some(NamespaceInfo {
936                namespace,
937                name,
938                metadata,
939            });
940        }
941
942        Ok(found_result)
943    }
944
945    /// Create or get the manifest dataset
946    async fn create_or_get_manifest(
947        root: &str,
948        storage_options: &Option<HashMap<String, String>>,
949        session: Option<Arc<Session>>,
950    ) -> Result<DatasetConsistencyWrapper> {
951        let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME);
952        log::debug!("Attempting to load manifest from {}", manifest_path);
953        let mut builder = DatasetBuilder::from_uri(&manifest_path);
954
955        if let Some(sess) = session.clone() {
956            builder = builder.with_session(sess);
957        }
958
959        if let Some(opts) = storage_options {
960            builder = builder.with_storage_options(opts.clone());
961        }
962
963        let dataset_result = builder.load().await;
964        if let Ok(dataset) = dataset_result {
965            Ok(DatasetConsistencyWrapper::new(dataset))
966        } else {
967            log::info!("Creating new manifest table at {}", manifest_path);
968            let schema = Self::manifest_schema();
969            let empty_batch = RecordBatch::new_empty(schema.clone());
970            let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone());
971
972            let write_params = WriteParams {
973                session,
974                store_params: storage_options.as_ref().map(|opts| ObjectStoreParams {
975                    storage_options: Some(opts.clone()),
976                    ..Default::default()
977                }),
978                ..Default::default()
979            };
980
981            let dataset = Dataset::write(Box::new(reader), &manifest_path, Some(write_params))
982                .await
983                .map_err(|e| Error::IO {
984                    source: box_error(std::io::Error::other(format!(
985                        "Failed to create manifest dataset: {}",
986                        e
987                    ))),
988                    location: location!(),
989                })?;
990
991            log::info!(
992                "Successfully created manifest table at {}, version={}, uri={}",
993                manifest_path,
994                dataset.version().version,
995                dataset.uri()
996            );
997            Ok(DatasetConsistencyWrapper::new(dataset))
998        }
999    }
1000}
1001
1002#[async_trait]
1003impl LanceNamespace for ManifestNamespace {
1004    fn namespace_id(&self) -> String {
1005        self.root.clone()
1006    }
1007
1008    async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1009        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1010            source: "Namespace ID is required".into(),
1011            location: location!(),
1012        })?;
1013
1014        // Build filter to find tables in this namespace
1015        let filter = if namespace_id.is_empty() {
1016            // Root namespace: find tables without a namespace prefix
1017            "object_type = 'table' AND NOT contains(object_id, '$')".to_string()
1018        } else {
1019            // Namespaced: find tables that start with namespace$ but have no additional $
1020            let prefix = namespace_id.join(DELIMITER);
1021            format!(
1022                "object_type = 'table' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1023                prefix, DELIMITER, prefix.len() + 2
1024            )
1025        };
1026
1027        let mut scanner = self.manifest_scanner().await?;
1028        scanner.filter(&filter).map_err(|e| Error::IO {
1029            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1030            location: location!(),
1031        })?;
1032        scanner.project(&["object_id"]).map_err(|e| Error::IO {
1033            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1034            location: location!(),
1035        })?;
1036
1037        let batches = Self::execute_scanner(scanner).await?;
1038
1039        let mut tables = Vec::new();
1040        for batch in batches {
1041            if batch.num_rows() == 0 {
1042                continue;
1043            }
1044
1045            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1046            for i in 0..batch.num_rows() {
1047                let object_id = object_id_array.value(i);
1048                let (_namespace, name) = Self::parse_object_id(object_id);
1049                tables.push(name);
1050            }
1051        }
1052
1053        Ok(ListTablesResponse::new(tables))
1054    }
1055
1056    async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1057        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1058            source: "Table ID is required".into(),
1059            location: location!(),
1060        })?;
1061
1062        if table_id.is_empty() {
1063            return Err(Error::InvalidInput {
1064                source: "Table ID cannot be empty".into(),
1065                location: location!(),
1066            });
1067        }
1068
1069        let object_id = Self::str_object_id(table_id);
1070        let table_info = self.query_manifest_for_table(&object_id).await?;
1071
1072        match table_info {
1073            Some(info) => {
1074                // Construct full URI from relative location
1075                let table_uri = self.construct_full_uri(&info.location)?;
1076
1077                // Try to open the dataset to get version and schema
1078                match Dataset::open(&table_uri).await {
1079                    Ok(mut dataset) => {
1080                        // If a specific version is requested, checkout that version
1081                        if let Some(requested_version) = request.version {
1082                            dataset = dataset.checkout_version(requested_version as u64).await?;
1083                        }
1084
1085                        let version = dataset.version().version;
1086                        let lance_schema = dataset.schema();
1087                        let arrow_schema: arrow_schema::Schema = lance_schema.into();
1088                        let json_schema = arrow_schema_to_json(&arrow_schema)?;
1089
1090                        Ok(DescribeTableResponse {
1091                            version: Some(version as i64),
1092                            location: Some(table_uri),
1093                            schema: Some(Box::new(json_schema)),
1094                            properties: None,
1095                            storage_options: self.storage_options.clone(),
1096                        })
1097                    }
1098                    Err(_) => {
1099                        // If dataset can't be opened (e.g., empty table), return minimal info
1100                        Ok(DescribeTableResponse {
1101                            version: None,
1102                            location: Some(table_uri),
1103                            schema: None,
1104                            properties: None,
1105                            storage_options: self.storage_options.clone(),
1106                        })
1107                    }
1108                }
1109            }
1110            None => Err(Error::Namespace {
1111                source: format!("Table '{}' not found", object_id).into(),
1112                location: location!(),
1113            }),
1114        }
1115    }
1116
1117    async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1118        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1119            source: "Table ID is required".into(),
1120            location: location!(),
1121        })?;
1122
1123        if table_id.is_empty() {
1124            return Err(Error::InvalidInput {
1125                source: "Table ID cannot be empty".into(),
1126                location: location!(),
1127            });
1128        }
1129
1130        let (namespace, table_name) = Self::split_object_id(table_id);
1131        let object_id = Self::build_object_id(&namespace, &table_name);
1132        let exists = self.manifest_contains_object(&object_id).await?;
1133        if exists {
1134            Ok(())
1135        } else {
1136            Err(Error::Namespace {
1137                source: format!("Table '{}' not found", table_name).into(),
1138                location: location!(),
1139            })
1140        }
1141    }
1142
1143    async fn create_table(
1144        &self,
1145        request: CreateTableRequest,
1146        data: Bytes,
1147    ) -> Result<CreateTableResponse> {
1148        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1149            source: "Table ID is required".into(),
1150            location: location!(),
1151        })?;
1152
1153        if table_id.is_empty() {
1154            return Err(Error::InvalidInput {
1155                source: "Table ID cannot be empty".into(),
1156                location: location!(),
1157            });
1158        }
1159
1160        let (namespace, table_name) = Self::split_object_id(table_id);
1161        let object_id = Self::build_object_id(&namespace, &table_name);
1162
1163        // Check if table already exists in manifest
1164        if self.manifest_contains_object(&object_id).await? {
1165            return Err(Error::io(
1166                format!("Table '{}' already exists", table_name),
1167                location!(),
1168            ));
1169        }
1170
1171        // Create the physical table location with hash-based naming
1172        // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance
1173        // Otherwise, use hash-based naming: {hash}_{object_id}
1174        let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1175            // Root table with directory listing enabled: use {table_name}.lance
1176            format!("{}.lance", table_name)
1177        } else {
1178            // Child namespace table or dir listing disabled: use hash-based naming
1179            Self::generate_dir_name(&object_id)
1180        };
1181        let table_uri = self.construct_full_uri(&dir_name)?;
1182
1183        // Validate that request_data is provided
1184        if data.is_empty() {
1185            return Err(Error::Namespace {
1186                source: "Request data (Arrow IPC stream) is required for create_table".into(),
1187                location: location!(),
1188            });
1189        }
1190
1191        // Validate location if provided
1192        if let Some(location) = &request.location {
1193            let location = location.trim_end_matches('/');
1194            if location != table_uri {
1195                return Err(Error::Namespace {
1196                    source: format!(
1197                        "Cannot create table {} at location {}, must be at location {}",
1198                        table_name, location, table_uri
1199                    )
1200                    .into(),
1201                    location: location!(),
1202                });
1203            }
1204        }
1205
1206        // Write the data using Lance Dataset
1207        let cursor = Cursor::new(data.to_vec());
1208        let stream_reader = StreamReader::try_new(cursor, None)
1209            .map_err(|e| Error::io(format!("Failed to read IPC stream: {}", e), location!()))?;
1210
1211        let batches: Vec<RecordBatch> =
1212            stream_reader
1213                .collect::<std::result::Result<Vec<_>, _>>()
1214                .map_err(|e| Error::io(format!("Failed to collect batches: {}", e), location!()))?;
1215
1216        if batches.is_empty() {
1217            return Err(Error::io(
1218                "No data provided for table creation",
1219                location!(),
1220            ));
1221        }
1222
1223        let schema = batches[0].schema();
1224        let batch_results: Vec<std::result::Result<RecordBatch, arrow_schema::ArrowError>> =
1225            batches.into_iter().map(Ok).collect();
1226        let reader = RecordBatchIterator::new(batch_results, schema);
1227
1228        let write_params = WriteParams::default();
1229        let _dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params))
1230            .await
1231            .map_err(|e| Error::IO {
1232                source: box_error(std::io::Error::other(format!(
1233                    "Failed to write dataset: {}",
1234                    e
1235                ))),
1236                location: location!(),
1237            })?;
1238
1239        // Register in manifest (store dir_name, not full URI)
1240        self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1241            .await?;
1242
1243        Ok(CreateTableResponse {
1244            version: Some(1),
1245            location: Some(table_uri),
1246            properties: None,
1247            storage_options: self.storage_options.clone(),
1248        })
1249    }
1250
1251    async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1252        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1253            source: "Table ID is required".into(),
1254            location: location!(),
1255        })?;
1256
1257        if table_id.is_empty() {
1258            return Err(Error::InvalidInput {
1259                source: "Table ID cannot be empty".into(),
1260                location: location!(),
1261            });
1262        }
1263
1264        let (namespace, table_name) = Self::split_object_id(table_id);
1265        let object_id = Self::build_object_id(&namespace, &table_name);
1266
1267        // Query manifest for table location
1268        let table_info = self.query_manifest_for_table(&object_id).await?;
1269
1270        match table_info {
1271            Some(info) => {
1272                // Delete from manifest first
1273                self.delete_from_manifest(&object_id).await?;
1274
1275                // Delete physical data directory using the dir_name from manifest
1276                let table_path = self.base_path.child(info.location.as_str());
1277                let table_uri = self.construct_full_uri(&info.location)?;
1278
1279                // Remove the table directory
1280                self.object_store
1281                    .remove_dir_all(table_path)
1282                    .await
1283                    .map_err(|e| Error::Namespace {
1284                        source: format!("Failed to delete table directory: {}", e).into(),
1285                        location: location!(),
1286                    })?;
1287
1288                Ok(DropTableResponse {
1289                    id: request.id.clone(),
1290                    location: Some(table_uri),
1291                    properties: None,
1292                    transaction_id: None,
1293                })
1294            }
1295            None => Err(Error::Namespace {
1296                source: format!("Table '{}' not found", table_name).into(),
1297                location: location!(),
1298            }),
1299        }
1300    }
1301
1302    async fn list_namespaces(
1303        &self,
1304        request: ListNamespacesRequest,
1305    ) -> Result<ListNamespacesResponse> {
1306        let parent_namespace = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1307            source: "Namespace ID is required".into(),
1308            location: location!(),
1309        })?;
1310
1311        // Build filter to find direct child namespaces
1312        let filter = if parent_namespace.is_empty() {
1313            // Root namespace: find all namespaces without a parent
1314            "object_type = 'namespace' AND NOT contains(object_id, '$')".to_string()
1315        } else {
1316            // Non-root: find namespaces that start with parent$ but have no additional $
1317            let prefix = parent_namespace.join(DELIMITER);
1318            format!(
1319                "object_type = 'namespace' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1320                prefix, DELIMITER, prefix.len() + 2
1321            )
1322        };
1323
1324        let mut scanner = self.manifest_scanner().await?;
1325        scanner.filter(&filter).map_err(|e| Error::IO {
1326            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1327            location: location!(),
1328        })?;
1329        scanner.project(&["object_id"]).map_err(|e| Error::IO {
1330            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1331            location: location!(),
1332        })?;
1333
1334        let batches = Self::execute_scanner(scanner).await?;
1335        let mut namespaces = Vec::new();
1336
1337        for batch in batches {
1338            if batch.num_rows() == 0 {
1339                continue;
1340            }
1341
1342            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1343            for i in 0..batch.num_rows() {
1344                let object_id = object_id_array.value(i);
1345                let (_namespace, name) = Self::parse_object_id(object_id);
1346                namespaces.push(name);
1347            }
1348        }
1349
1350        Ok(ListNamespacesResponse::new(namespaces))
1351    }
1352
1353    async fn describe_namespace(
1354        &self,
1355        request: DescribeNamespaceRequest,
1356    ) -> Result<DescribeNamespaceResponse> {
1357        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1358            source: "Namespace ID is required".into(),
1359            location: location!(),
1360        })?;
1361
1362        // Root namespace always exists
1363        if namespace_id.is_empty() {
1364            return Ok(DescribeNamespaceResponse {
1365                properties: Some(HashMap::new()),
1366            });
1367        }
1368
1369        // Check if namespace exists in manifest
1370        let object_id = namespace_id.join(DELIMITER);
1371        let namespace_info = self.query_manifest_for_namespace(&object_id).await?;
1372
1373        match namespace_info {
1374            Some(info) => Ok(DescribeNamespaceResponse {
1375                properties: info.metadata,
1376            }),
1377            None => Err(Error::Namespace {
1378                source: format!("Namespace '{}' not found", object_id).into(),
1379                location: location!(),
1380            }),
1381        }
1382    }
1383
1384    async fn create_namespace(
1385        &self,
1386        request: CreateNamespaceRequest,
1387    ) -> Result<CreateNamespaceResponse> {
1388        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1389            source: "Namespace ID is required".into(),
1390            location: location!(),
1391        })?;
1392
1393        // Root namespace always exists and cannot be created
1394        if namespace_id.is_empty() {
1395            return Err(Error::Namespace {
1396                source: "Root namespace already exists and cannot be created".into(),
1397                location: location!(),
1398            });
1399        }
1400
1401        // Validate parent namespaces exist (but not the namespace being created)
1402        if namespace_id.len() > 1 {
1403            self.validate_namespace_levels_exist(&namespace_id[..namespace_id.len() - 1])
1404                .await?;
1405        }
1406
1407        let object_id = namespace_id.join(DELIMITER);
1408        if self.manifest_contains_object(&object_id).await? {
1409            return Err(Error::Namespace {
1410                source: format!("Namespace '{}' already exists", object_id).into(),
1411                location: location!(),
1412            });
1413        }
1414
1415        // Serialize properties if provided
1416        let metadata = request.properties.as_ref().and_then(|props| {
1417            if props.is_empty() {
1418                None
1419            } else {
1420                Some(serde_json::to_string(props).ok()?)
1421            }
1422        });
1423
1424        self.insert_into_manifest_with_metadata(
1425            object_id,
1426            ObjectType::Namespace,
1427            None,
1428            metadata,
1429            None,
1430        )
1431        .await?;
1432
1433        Ok(CreateNamespaceResponse {
1434            properties: request.properties,
1435        })
1436    }
1437
1438    async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
1439        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1440            source: "Namespace ID is required".into(),
1441            location: location!(),
1442        })?;
1443
1444        // Root namespace always exists and cannot be dropped
1445        if namespace_id.is_empty() {
1446            return Err(Error::Namespace {
1447                source: "Root namespace cannot be dropped".into(),
1448                location: location!(),
1449            });
1450        }
1451
1452        let object_id = namespace_id.join(DELIMITER);
1453
1454        // Check if namespace exists
1455        if !self.manifest_contains_object(&object_id).await? {
1456            return Err(Error::Namespace {
1457                source: format!("Namespace '{}' not found", object_id).into(),
1458                location: location!(),
1459            });
1460        }
1461
1462        // Check for child namespaces
1463        let prefix = format!("{}{}", object_id, DELIMITER);
1464        let filter = format!("starts_with(object_id, '{}')", prefix);
1465        let mut scanner = self.manifest_scanner().await?;
1466        scanner.filter(&filter).map_err(|e| Error::IO {
1467            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1468            location: location!(),
1469        })?;
1470        scanner.project::<&str>(&[]).map_err(|e| Error::IO {
1471            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1472            location: location!(),
1473        })?;
1474        scanner.with_row_id();
1475        let count = scanner.count_rows().await.map_err(|e| Error::IO {
1476            source: box_error(std::io::Error::other(format!(
1477                "Failed to count rows: {}",
1478                e
1479            ))),
1480            location: location!(),
1481        })?;
1482
1483        if count > 0 {
1484            return Err(Error::Namespace {
1485                source: format!(
1486                    "Namespace '{}' is not empty (contains {} child objects)",
1487                    object_id, count
1488                )
1489                .into(),
1490                location: location!(),
1491            });
1492        }
1493
1494        self.delete_from_manifest(&object_id).await?;
1495
1496        Ok(DropNamespaceResponse {
1497            properties: None,
1498            transaction_id: None,
1499        })
1500    }
1501
1502    async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
1503        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1504            source: "Namespace ID is required".into(),
1505            location: location!(),
1506        })?;
1507
1508        // Root namespace always exists
1509        if namespace_id.is_empty() {
1510            return Ok(());
1511        }
1512
1513        let object_id = namespace_id.join(DELIMITER);
1514        if self.manifest_contains_object(&object_id).await? {
1515            Ok(())
1516        } else {
1517            Err(Error::Namespace {
1518                source: format!("Namespace '{}' not found", object_id).into(),
1519                location: location!(),
1520            })
1521        }
1522    }
1523
1524    async fn create_empty_table(
1525        &self,
1526        request: CreateEmptyTableRequest,
1527    ) -> Result<CreateEmptyTableResponse> {
1528        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1529            source: "Table ID is required".into(),
1530            location: location!(),
1531        })?;
1532
1533        if table_id.is_empty() {
1534            return Err(Error::InvalidInput {
1535                source: "Table ID cannot be empty".into(),
1536                location: location!(),
1537            });
1538        }
1539
1540        let (namespace, table_name) = Self::split_object_id(table_id);
1541        let object_id = Self::build_object_id(&namespace, &table_name);
1542
1543        // Check if table already exists in manifest
1544        let existing = self.query_manifest_for_table(&object_id).await?;
1545        if existing.is_some() {
1546            return Err(Error::Namespace {
1547                source: format!("Table '{}' already exists", table_name).into(),
1548                location: location!(),
1549            });
1550        }
1551
1552        // Create table location path with hash-based naming
1553        // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance
1554        // Otherwise, use hash-based naming: {hash}_{object_id}
1555        let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1556            // Root table with directory listing enabled: use {table_name}.lance
1557            format!("{}.lance", table_name)
1558        } else {
1559            // Child namespace table or dir listing disabled: use hash-based naming
1560            Self::generate_dir_name(&object_id)
1561        };
1562        let table_path = self.base_path.child(dir_name.as_str());
1563        let table_uri = self.construct_full_uri(&dir_name)?;
1564
1565        // Validate location if provided
1566        if let Some(req_location) = &request.location {
1567            let req_location = req_location.trim_end_matches('/');
1568            if req_location != table_uri {
1569                return Err(Error::Namespace {
1570                    source: format!(
1571                        "Cannot create table {} at location {}, must be at location {}",
1572                        table_name, req_location, table_uri
1573                    )
1574                    .into(),
1575                    location: location!(),
1576                });
1577            }
1578        }
1579
1580        // Create the .lance-reserved file to mark the table as existing
1581        let reserved_file_path = table_path.child(".lance-reserved");
1582
1583        self.object_store
1584            .create(&reserved_file_path)
1585            .await
1586            .map_err(|e| Error::Namespace {
1587                source: format!(
1588                    "Failed to create .lance-reserved file for table {}: {}",
1589                    table_name, e
1590                )
1591                .into(),
1592                location: location!(),
1593            })?
1594            .shutdown()
1595            .await
1596            .map_err(|e| Error::Namespace {
1597                source: format!(
1598                    "Failed to finalize .lance-reserved file for table {}: {}",
1599                    table_name, e
1600                )
1601                .into(),
1602                location: location!(),
1603            })?;
1604
1605        // Add entry to manifest marking this as an empty table (store dir_name, not full path)
1606        self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1607            .await?;
1608
1609        log::info!(
1610            "Created empty table '{}' in manifest at {}",
1611            table_name,
1612            table_uri
1613        );
1614
1615        Ok(CreateEmptyTableResponse {
1616            location: Some(table_uri),
1617            properties: None,
1618            storage_options: self.storage_options.clone(),
1619        })
1620    }
1621
1622    async fn register_table(&self, request: RegisterTableRequest) -> Result<RegisterTableResponse> {
1623        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1624            source: "Table ID is required".into(),
1625            location: location!(),
1626        })?;
1627
1628        if table_id.is_empty() {
1629            return Err(Error::InvalidInput {
1630                source: "Table ID cannot be empty".into(),
1631                location: location!(),
1632            });
1633        }
1634
1635        let location = request.location.clone();
1636
1637        // Validate that location is a relative path within the root directory
1638        // We don't allow absolute URIs or paths that escape the root
1639        if location.contains("://") {
1640            return Err(Error::InvalidInput {
1641                source: format!(
1642                    "Absolute URIs are not allowed for register_table. Location must be a relative path within the root directory: {}",
1643                    location
1644                ).into(),
1645                location: location!(),
1646            });
1647        }
1648
1649        if location.starts_with('/') {
1650            return Err(Error::InvalidInput {
1651                source: format!(
1652                    "Absolute paths are not allowed for register_table. Location must be a relative path within the root directory: {}",
1653                    location
1654                ).into(),
1655                location: location!(),
1656            });
1657        }
1658
1659        // Check for path traversal attempts
1660        if location.contains("..") {
1661            return Err(Error::InvalidInput {
1662                source: format!(
1663                    "Path traversal is not allowed. Location must be a relative path within the root directory: {}",
1664                    location
1665                ).into(),
1666                location: location!(),
1667            });
1668        }
1669
1670        let (namespace, table_name) = Self::split_object_id(table_id);
1671        let object_id = Self::build_object_id(&namespace, &table_name);
1672
1673        // Validate that parent namespaces exist (if not root)
1674        if !namespace.is_empty() {
1675            self.validate_namespace_levels_exist(&namespace).await?;
1676        }
1677
1678        // Check if table already exists
1679        if self.manifest_contains_object(&object_id).await? {
1680            return Err(Error::Namespace {
1681                source: format!("Table '{}' already exists", object_id).into(),
1682                location: location!(),
1683            });
1684        }
1685
1686        // Register the table with its location in the manifest
1687        self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone()))
1688            .await?;
1689
1690        Ok(RegisterTableResponse {
1691            location,
1692            properties: None,
1693        })
1694    }
1695
1696    async fn deregister_table(
1697        &self,
1698        request: DeregisterTableRequest,
1699    ) -> Result<DeregisterTableResponse> {
1700        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1701            source: "Table ID is required".into(),
1702            location: location!(),
1703        })?;
1704
1705        if table_id.is_empty() {
1706            return Err(Error::InvalidInput {
1707                source: "Table ID cannot be empty".into(),
1708                location: location!(),
1709            });
1710        }
1711
1712        let (namespace, table_name) = Self::split_object_id(table_id);
1713        let object_id = Self::build_object_id(&namespace, &table_name);
1714
1715        // Get table info before deleting
1716        let table_info = self.query_manifest_for_table(&object_id).await?;
1717
1718        let table_uri = match table_info {
1719            Some(info) => {
1720                // Delete from manifest only (leave physical data intact)
1721                self.delete_from_manifest(&object_id).await?;
1722
1723                // Construct the full URI using helper function
1724                self.construct_full_uri(&info.location)?
1725            }
1726            None => {
1727                return Err(Error::Namespace {
1728                    source: format!("Table '{}' not found", object_id).into(),
1729                    location: location!(),
1730                });
1731            }
1732        };
1733
1734        Ok(DeregisterTableResponse {
1735            id: request.id.clone(),
1736            location: Some(table_uri),
1737            properties: None,
1738        })
1739    }
1740}
1741
1742#[cfg(test)]
1743mod tests {
1744    use crate::DirectoryNamespaceBuilder;
1745    use bytes::Bytes;
1746    use lance_core::utils::tempfile::TempStdDir;
1747    use lance_namespace::models::{
1748        CreateTableRequest, DescribeTableRequest, DropTableRequest, ListTablesRequest,
1749        TableExistsRequest,
1750    };
1751    use lance_namespace::LanceNamespace;
1752    use rstest::rstest;
1753
1754    fn create_test_ipc_data() -> Vec<u8> {
1755        use arrow::array::{Int32Array, StringArray};
1756        use arrow::datatypes::{DataType, Field, Schema};
1757        use arrow::ipc::writer::StreamWriter;
1758        use arrow::record_batch::RecordBatch;
1759        use std::sync::Arc;
1760
1761        let schema = Arc::new(Schema::new(vec![
1762            Field::new("id", DataType::Int32, false),
1763            Field::new("name", DataType::Utf8, false),
1764        ]));
1765
1766        let batch = RecordBatch::try_new(
1767            schema.clone(),
1768            vec![
1769                Arc::new(Int32Array::from(vec![1, 2, 3])),
1770                Arc::new(StringArray::from(vec!["a", "b", "c"])),
1771            ],
1772        )
1773        .unwrap();
1774
1775        let mut buffer = Vec::new();
1776        {
1777            let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap();
1778            writer.write(&batch).unwrap();
1779            writer.finish().unwrap();
1780        }
1781        buffer
1782    }
1783
1784    #[rstest]
1785    #[case::with_optimization(true)]
1786    #[case::without_optimization(false)]
1787    #[tokio::test]
1788    async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) {
1789        let temp_dir = TempStdDir::default();
1790        let temp_path = temp_dir.to_str().unwrap();
1791
1792        // Create a DirectoryNamespace with manifest enabled (default)
1793        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1794            .inline_optimization_enabled(inline_optimization)
1795            .build()
1796            .await
1797            .unwrap();
1798
1799        // Verify we can list tables (should be empty)
1800        let mut request = ListTablesRequest::new();
1801        request.id = Some(vec![]);
1802        let response = dir_namespace.list_tables(request).await.unwrap();
1803        assert_eq!(response.tables.len(), 0);
1804
1805        // Create a test table
1806        let buffer = create_test_ipc_data();
1807        let mut create_request = CreateTableRequest::new();
1808        create_request.id = Some(vec!["test_table".to_string()]);
1809
1810        let _response = dir_namespace
1811            .create_table(create_request, Bytes::from(buffer))
1812            .await
1813            .unwrap();
1814
1815        // List tables again - should see our new table
1816        let mut request = ListTablesRequest::new();
1817        request.id = Some(vec![]);
1818        let response = dir_namespace.list_tables(request).await.unwrap();
1819        assert_eq!(response.tables.len(), 1);
1820        assert_eq!(response.tables[0], "test_table");
1821    }
1822
1823    #[rstest]
1824    #[case::with_optimization(true)]
1825    #[case::without_optimization(false)]
1826    #[tokio::test]
1827    async fn test_manifest_namespace_table_exists(#[case] inline_optimization: bool) {
1828        let temp_dir = TempStdDir::default();
1829        let temp_path = temp_dir.to_str().unwrap();
1830
1831        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1832            .inline_optimization_enabled(inline_optimization)
1833            .build()
1834            .await
1835            .unwrap();
1836
1837        // Check non-existent table
1838        let mut request = TableExistsRequest::new();
1839        request.id = Some(vec!["nonexistent".to_string()]);
1840        let result = dir_namespace.table_exists(request).await;
1841        assert!(result.is_err());
1842
1843        // Create table
1844        let buffer = create_test_ipc_data();
1845        let mut create_request = CreateTableRequest::new();
1846        create_request.id = Some(vec!["test_table".to_string()]);
1847        dir_namespace
1848            .create_table(create_request, Bytes::from(buffer))
1849            .await
1850            .unwrap();
1851
1852        // Check existing table
1853        let mut request = TableExistsRequest::new();
1854        request.id = Some(vec!["test_table".to_string()]);
1855        let result = dir_namespace.table_exists(request).await;
1856        assert!(result.is_ok());
1857    }
1858
1859    #[rstest]
1860    #[case::with_optimization(true)]
1861    #[case::without_optimization(false)]
1862    #[tokio::test]
1863    async fn test_manifest_namespace_describe_table(#[case] inline_optimization: bool) {
1864        let temp_dir = TempStdDir::default();
1865        let temp_path = temp_dir.to_str().unwrap();
1866
1867        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1868            .inline_optimization_enabled(inline_optimization)
1869            .build()
1870            .await
1871            .unwrap();
1872
1873        // Describe non-existent table
1874        let mut request = DescribeTableRequest::new();
1875        request.id = Some(vec!["nonexistent".to_string()]);
1876        let result = dir_namespace.describe_table(request).await;
1877        assert!(result.is_err());
1878
1879        // Create table
1880        let buffer = create_test_ipc_data();
1881        let mut create_request = CreateTableRequest::new();
1882        create_request.id = Some(vec!["test_table".to_string()]);
1883        dir_namespace
1884            .create_table(create_request, Bytes::from(buffer))
1885            .await
1886            .unwrap();
1887
1888        // Describe existing table
1889        let mut request = DescribeTableRequest::new();
1890        request.id = Some(vec!["test_table".to_string()]);
1891        let response = dir_namespace.describe_table(request).await.unwrap();
1892        assert!(response.location.is_some());
1893        assert!(response.location.unwrap().contains("test_table"));
1894    }
1895
1896    #[rstest]
1897    #[case::with_optimization(true)]
1898    #[case::without_optimization(false)]
1899    #[tokio::test]
1900    async fn test_manifest_namespace_drop_table(#[case] inline_optimization: bool) {
1901        let temp_dir = TempStdDir::default();
1902        let temp_path = temp_dir.to_str().unwrap();
1903
1904        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1905            .inline_optimization_enabled(inline_optimization)
1906            .build()
1907            .await
1908            .unwrap();
1909
1910        // Create table
1911        let buffer = create_test_ipc_data();
1912        let mut create_request = CreateTableRequest::new();
1913        create_request.id = Some(vec!["test_table".to_string()]);
1914        dir_namespace
1915            .create_table(create_request, Bytes::from(buffer))
1916            .await
1917            .unwrap();
1918
1919        // Verify table exists
1920        let mut request = ListTablesRequest::new();
1921        request.id = Some(vec![]);
1922        let response = dir_namespace.list_tables(request).await.unwrap();
1923        assert_eq!(response.tables.len(), 1);
1924
1925        // Drop table
1926        let mut drop_request = DropTableRequest::new();
1927        drop_request.id = Some(vec!["test_table".to_string()]);
1928        let _response = dir_namespace.drop_table(drop_request).await.unwrap();
1929
1930        // Verify table is gone
1931        let mut request = ListTablesRequest::new();
1932        request.id = Some(vec![]);
1933        let response = dir_namespace.list_tables(request).await.unwrap();
1934        assert_eq!(response.tables.len(), 0);
1935    }
1936
1937    #[rstest]
1938    #[case::with_optimization(true)]
1939    #[case::without_optimization(false)]
1940    #[tokio::test]
1941    async fn test_manifest_namespace_multiple_tables(#[case] inline_optimization: bool) {
1942        let temp_dir = TempStdDir::default();
1943        let temp_path = temp_dir.to_str().unwrap();
1944
1945        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1946            .inline_optimization_enabled(inline_optimization)
1947            .build()
1948            .await
1949            .unwrap();
1950
1951        // Create multiple tables
1952        let buffer = create_test_ipc_data();
1953        for i in 1..=3 {
1954            let mut create_request = CreateTableRequest::new();
1955            create_request.id = Some(vec![format!("table{}", i)]);
1956            dir_namespace
1957                .create_table(create_request, Bytes::from(buffer.clone()))
1958                .await
1959                .unwrap();
1960        }
1961
1962        // List all tables
1963        let mut request = ListTablesRequest::new();
1964        request.id = Some(vec![]);
1965        let response = dir_namespace.list_tables(request).await.unwrap();
1966        assert_eq!(response.tables.len(), 3);
1967        assert!(response.tables.contains(&"table1".to_string()));
1968        assert!(response.tables.contains(&"table2".to_string()));
1969        assert!(response.tables.contains(&"table3".to_string()));
1970    }
1971
1972    #[rstest]
1973    #[case::with_optimization(true)]
1974    #[case::without_optimization(false)]
1975    #[tokio::test]
1976    async fn test_directory_only_mode(#[case] inline_optimization: bool) {
1977        let temp_dir = TempStdDir::default();
1978        let temp_path = temp_dir.to_str().unwrap();
1979
1980        // Create a DirectoryNamespace with manifest disabled
1981        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1982            .manifest_enabled(false)
1983            .inline_optimization_enabled(inline_optimization)
1984            .build()
1985            .await
1986            .unwrap();
1987
1988        // Verify we can list tables (should be empty)
1989        let mut request = ListTablesRequest::new();
1990        request.id = Some(vec![]);
1991        let response = dir_namespace.list_tables(request).await.unwrap();
1992        assert_eq!(response.tables.len(), 0);
1993
1994        // Create a test table
1995        let buffer = create_test_ipc_data();
1996        let mut create_request = CreateTableRequest::new();
1997        create_request.id = Some(vec!["test_table".to_string()]);
1998
1999        // Create table - this should use directory-only mode
2000        let _response = dir_namespace
2001            .create_table(create_request, Bytes::from(buffer))
2002            .await
2003            .unwrap();
2004
2005        // List tables - should see our new table
2006        let mut request = ListTablesRequest::new();
2007        request.id = Some(vec![]);
2008        let response = dir_namespace.list_tables(request).await.unwrap();
2009        assert_eq!(response.tables.len(), 1);
2010        assert_eq!(response.tables[0], "test_table");
2011    }
2012
2013    #[rstest]
2014    #[case::with_optimization(true)]
2015    #[case::without_optimization(false)]
2016    #[tokio::test]
2017    async fn test_dual_mode_merge(#[case] inline_optimization: bool) {
2018        let temp_dir = TempStdDir::default();
2019        let temp_path = temp_dir.to_str().unwrap();
2020
2021        // Create a DirectoryNamespace with both manifest and directory enabled
2022        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2023            .manifest_enabled(true)
2024            .dir_listing_enabled(true)
2025            .inline_optimization_enabled(inline_optimization)
2026            .build()
2027            .await
2028            .unwrap();
2029
2030        // Create tables through manifest
2031        let buffer = create_test_ipc_data();
2032        let mut create_request = CreateTableRequest::new();
2033        create_request.id = Some(vec!["table1".to_string()]);
2034        dir_namespace
2035            .create_table(create_request, Bytes::from(buffer))
2036            .await
2037            .unwrap();
2038
2039        // List tables - should see table from both manifest and directory
2040        let mut request = ListTablesRequest::new();
2041        request.id = Some(vec![]);
2042        let response = dir_namespace.list_tables(request).await.unwrap();
2043        assert_eq!(response.tables.len(), 1);
2044        assert_eq!(response.tables[0], "table1");
2045    }
2046
2047    #[rstest]
2048    #[case::with_optimization(true)]
2049    #[case::without_optimization(false)]
2050    #[tokio::test]
2051    async fn test_manifest_only_mode(#[case] inline_optimization: bool) {
2052        let temp_dir = TempStdDir::default();
2053        let temp_path = temp_dir.to_str().unwrap();
2054
2055        // Create a DirectoryNamespace with only manifest enabled
2056        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2057            .manifest_enabled(true)
2058            .dir_listing_enabled(false)
2059            .inline_optimization_enabled(inline_optimization)
2060            .build()
2061            .await
2062            .unwrap();
2063
2064        // Create table
2065        let buffer = create_test_ipc_data();
2066        let mut create_request = CreateTableRequest::new();
2067        create_request.id = Some(vec!["test_table".to_string()]);
2068        dir_namespace
2069            .create_table(create_request, Bytes::from(buffer))
2070            .await
2071            .unwrap();
2072
2073        // List tables - should only use manifest
2074        let mut request = ListTablesRequest::new();
2075        request.id = Some(vec![]);
2076        let response = dir_namespace.list_tables(request).await.unwrap();
2077        assert_eq!(response.tables.len(), 1);
2078        assert_eq!(response.tables[0], "test_table");
2079    }
2080
2081    #[rstest]
2082    #[case::with_optimization(true)]
2083    #[case::without_optimization(false)]
2084    #[tokio::test]
2085    async fn test_drop_nonexistent_table(#[case] inline_optimization: bool) {
2086        let temp_dir = TempStdDir::default();
2087        let temp_path = temp_dir.to_str().unwrap();
2088
2089        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2090            .inline_optimization_enabled(inline_optimization)
2091            .build()
2092            .await
2093            .unwrap();
2094
2095        // Try to drop non-existent table
2096        let mut drop_request = DropTableRequest::new();
2097        drop_request.id = Some(vec!["nonexistent".to_string()]);
2098        let result = dir_namespace.drop_table(drop_request).await;
2099        assert!(result.is_err());
2100    }
2101
2102    #[rstest]
2103    #[case::with_optimization(true)]
2104    #[case::without_optimization(false)]
2105    #[tokio::test]
2106    async fn test_create_duplicate_table_fails(#[case] inline_optimization: bool) {
2107        let temp_dir = TempStdDir::default();
2108        let temp_path = temp_dir.to_str().unwrap();
2109
2110        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2111            .inline_optimization_enabled(inline_optimization)
2112            .build()
2113            .await
2114            .unwrap();
2115
2116        // Create table
2117        let buffer = create_test_ipc_data();
2118        let mut create_request = CreateTableRequest::new();
2119        create_request.id = Some(vec!["test_table".to_string()]);
2120        dir_namespace
2121            .create_table(create_request, Bytes::from(buffer.clone()))
2122            .await
2123            .unwrap();
2124
2125        // Try to create table with same name - should fail
2126        let mut create_request = CreateTableRequest::new();
2127        create_request.id = Some(vec!["test_table".to_string()]);
2128        let result = dir_namespace
2129            .create_table(create_request, Bytes::from(buffer))
2130            .await;
2131        assert!(result.is_err());
2132    }
2133
2134    #[rstest]
2135    #[case::with_optimization(true)]
2136    #[case::without_optimization(false)]
2137    #[tokio::test]
2138    async fn test_create_child_namespace(#[case] inline_optimization: bool) {
2139        use lance_namespace::models::{
2140            CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2141        };
2142
2143        let temp_dir = TempStdDir::default();
2144        let temp_path = temp_dir.to_str().unwrap();
2145
2146        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2147            .inline_optimization_enabled(inline_optimization)
2148            .build()
2149            .await
2150            .unwrap();
2151
2152        // Create a child namespace
2153        let mut create_req = CreateNamespaceRequest::new();
2154        create_req.id = Some(vec!["ns1".to_string()]);
2155        let result = dir_namespace.create_namespace(create_req).await;
2156        assert!(
2157            result.is_ok(),
2158            "Failed to create child namespace: {:?}",
2159            result.err()
2160        );
2161
2162        // Verify namespace exists
2163        let exists_req = NamespaceExistsRequest {
2164            id: Some(vec!["ns1".to_string()]),
2165        };
2166        let result = dir_namespace.namespace_exists(exists_req).await;
2167        assert!(result.is_ok(), "Namespace should exist");
2168
2169        // List child namespaces of root
2170        let list_req = ListNamespacesRequest {
2171            id: Some(vec![]),
2172            page_token: None,
2173            limit: None,
2174        };
2175        let result = dir_namespace.list_namespaces(list_req).await;
2176        assert!(result.is_ok());
2177        let namespaces = result.unwrap();
2178        assert_eq!(namespaces.namespaces.len(), 1);
2179        assert_eq!(namespaces.namespaces[0], "ns1");
2180    }
2181
2182    #[rstest]
2183    #[case::with_optimization(true)]
2184    #[case::without_optimization(false)]
2185    #[tokio::test]
2186    async fn test_create_nested_namespace(#[case] inline_optimization: bool) {
2187        use lance_namespace::models::{
2188            CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2189        };
2190
2191        let temp_dir = TempStdDir::default();
2192        let temp_path = temp_dir.to_str().unwrap();
2193
2194        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2195            .inline_optimization_enabled(inline_optimization)
2196            .build()
2197            .await
2198            .unwrap();
2199
2200        // Create parent namespace
2201        let mut create_req = CreateNamespaceRequest::new();
2202        create_req.id = Some(vec!["parent".to_string()]);
2203        dir_namespace.create_namespace(create_req).await.unwrap();
2204
2205        // Create nested child namespace
2206        let mut create_req = CreateNamespaceRequest::new();
2207        create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2208        let result = dir_namespace.create_namespace(create_req).await;
2209        assert!(
2210            result.is_ok(),
2211            "Failed to create nested namespace: {:?}",
2212            result.err()
2213        );
2214
2215        // Verify nested namespace exists
2216        let exists_req = NamespaceExistsRequest {
2217            id: Some(vec!["parent".to_string(), "child".to_string()]),
2218        };
2219        let result = dir_namespace.namespace_exists(exists_req).await;
2220        assert!(result.is_ok(), "Nested namespace should exist");
2221
2222        // List child namespaces of parent
2223        let list_req = ListNamespacesRequest {
2224            id: Some(vec!["parent".to_string()]),
2225            page_token: None,
2226            limit: None,
2227        };
2228        let result = dir_namespace.list_namespaces(list_req).await;
2229        assert!(result.is_ok());
2230        let namespaces = result.unwrap();
2231        assert_eq!(namespaces.namespaces.len(), 1);
2232        assert_eq!(namespaces.namespaces[0], "child");
2233    }
2234
2235    #[rstest]
2236    #[case::with_optimization(true)]
2237    #[case::without_optimization(false)]
2238    #[tokio::test]
2239    async fn test_create_namespace_without_parent_fails(#[case] inline_optimization: bool) {
2240        use lance_namespace::models::CreateNamespaceRequest;
2241
2242        let temp_dir = TempStdDir::default();
2243        let temp_path = temp_dir.to_str().unwrap();
2244
2245        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2246            .inline_optimization_enabled(inline_optimization)
2247            .build()
2248            .await
2249            .unwrap();
2250
2251        // Try to create nested namespace without parent
2252        let mut create_req = CreateNamespaceRequest::new();
2253        create_req.id = Some(vec!["nonexistent_parent".to_string(), "child".to_string()]);
2254        let result = dir_namespace.create_namespace(create_req).await;
2255        assert!(result.is_err(), "Should fail when parent doesn't exist");
2256    }
2257
2258    #[rstest]
2259    #[case::with_optimization(true)]
2260    #[case::without_optimization(false)]
2261    #[tokio::test]
2262    async fn test_drop_child_namespace(#[case] inline_optimization: bool) {
2263        use lance_namespace::models::{
2264            CreateNamespaceRequest, DropNamespaceRequest, NamespaceExistsRequest,
2265        };
2266
2267        let temp_dir = TempStdDir::default();
2268        let temp_path = temp_dir.to_str().unwrap();
2269
2270        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2271            .inline_optimization_enabled(inline_optimization)
2272            .build()
2273            .await
2274            .unwrap();
2275
2276        // Create a child namespace
2277        let mut create_req = CreateNamespaceRequest::new();
2278        create_req.id = Some(vec!["ns1".to_string()]);
2279        dir_namespace.create_namespace(create_req).await.unwrap();
2280
2281        // Drop the namespace
2282        let mut drop_req = DropNamespaceRequest::new();
2283        drop_req.id = Some(vec!["ns1".to_string()]);
2284        let result = dir_namespace.drop_namespace(drop_req).await;
2285        assert!(
2286            result.is_ok(),
2287            "Failed to drop namespace: {:?}",
2288            result.err()
2289        );
2290
2291        // Verify namespace no longer exists
2292        let exists_req = NamespaceExistsRequest {
2293            id: Some(vec!["ns1".to_string()]),
2294        };
2295        let result = dir_namespace.namespace_exists(exists_req).await;
2296        assert!(result.is_err(), "Namespace should not exist after drop");
2297    }
2298
2299    #[rstest]
2300    #[case::with_optimization(true)]
2301    #[case::without_optimization(false)]
2302    #[tokio::test]
2303    async fn test_drop_namespace_with_children_fails(#[case] inline_optimization: bool) {
2304        use lance_namespace::models::{CreateNamespaceRequest, DropNamespaceRequest};
2305
2306        let temp_dir = TempStdDir::default();
2307        let temp_path = temp_dir.to_str().unwrap();
2308
2309        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2310            .inline_optimization_enabled(inline_optimization)
2311            .build()
2312            .await
2313            .unwrap();
2314
2315        // Create parent and child namespaces
2316        let mut create_req = CreateNamespaceRequest::new();
2317        create_req.id = Some(vec!["parent".to_string()]);
2318        dir_namespace.create_namespace(create_req).await.unwrap();
2319
2320        let mut create_req = CreateNamespaceRequest::new();
2321        create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2322        dir_namespace.create_namespace(create_req).await.unwrap();
2323
2324        // Try to drop parent namespace - should fail because it has children
2325        let mut drop_req = DropNamespaceRequest::new();
2326        drop_req.id = Some(vec!["parent".to_string()]);
2327        let result = dir_namespace.drop_namespace(drop_req).await;
2328        assert!(result.is_err(), "Should fail when namespace has children");
2329    }
2330
2331    #[rstest]
2332    #[case::with_optimization(true)]
2333    #[case::without_optimization(false)]
2334    #[tokio::test]
2335    async fn test_create_table_in_child_namespace(#[case] inline_optimization: bool) {
2336        use lance_namespace::models::{
2337            CreateNamespaceRequest, CreateTableRequest, ListTablesRequest,
2338        };
2339
2340        let temp_dir = TempStdDir::default();
2341        let temp_path = temp_dir.to_str().unwrap();
2342
2343        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2344            .inline_optimization_enabled(inline_optimization)
2345            .build()
2346            .await
2347            .unwrap();
2348
2349        // Create a child namespace
2350        let mut create_ns_req = CreateNamespaceRequest::new();
2351        create_ns_req.id = Some(vec!["ns1".to_string()]);
2352        dir_namespace.create_namespace(create_ns_req).await.unwrap();
2353
2354        // Create a table in the child namespace
2355        let buffer = create_test_ipc_data();
2356        let mut create_table_req = CreateTableRequest::new();
2357        create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2358        let result = dir_namespace
2359            .create_table(create_table_req, Bytes::from(buffer))
2360            .await;
2361        assert!(
2362            result.is_ok(),
2363            "Failed to create table in child namespace: {:?}",
2364            result.err()
2365        );
2366
2367        // List tables in the namespace
2368        let list_req = ListTablesRequest {
2369            id: Some(vec!["ns1".to_string()]),
2370            page_token: None,
2371            limit: None,
2372        };
2373        let result = dir_namespace.list_tables(list_req).await;
2374        assert!(result.is_ok());
2375        let tables = result.unwrap();
2376        assert_eq!(tables.tables.len(), 1);
2377        assert_eq!(tables.tables[0], "table1");
2378    }
2379
2380    #[rstest]
2381    #[case::with_optimization(true)]
2382    #[case::without_optimization(false)]
2383    #[tokio::test]
2384    async fn test_describe_child_namespace(#[case] inline_optimization: bool) {
2385        use lance_namespace::models::{CreateNamespaceRequest, DescribeNamespaceRequest};
2386
2387        let temp_dir = TempStdDir::default();
2388        let temp_path = temp_dir.to_str().unwrap();
2389
2390        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2391            .inline_optimization_enabled(inline_optimization)
2392            .build()
2393            .await
2394            .unwrap();
2395
2396        // Create a child namespace with properties
2397        let mut properties = std::collections::HashMap::new();
2398        properties.insert("key1".to_string(), "value1".to_string());
2399
2400        let mut create_req = CreateNamespaceRequest::new();
2401        create_req.id = Some(vec!["ns1".to_string()]);
2402        create_req.properties = Some(properties.clone());
2403        dir_namespace.create_namespace(create_req).await.unwrap();
2404
2405        // Describe the namespace
2406        let describe_req = DescribeNamespaceRequest {
2407            id: Some(vec!["ns1".to_string()]),
2408        };
2409        let result = dir_namespace.describe_namespace(describe_req).await;
2410        assert!(
2411            result.is_ok(),
2412            "Failed to describe namespace: {:?}",
2413            result.err()
2414        );
2415        let response = result.unwrap();
2416        assert!(response.properties.is_some());
2417        assert_eq!(
2418            response.properties.unwrap().get("key1"),
2419            Some(&"value1".to_string())
2420        );
2421    }
2422}