Skip to main content

lance_namespace_impls/dir/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Manifest-based namespace implementation
5//!
6//! This module provides a namespace implementation that uses a manifest table
7//! to track tables and nested namespaces.
8
9use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray};
10use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
11use arrow_ipc::reader::StreamReader;
12use async_trait::async_trait;
13use bytes::Bytes;
14use futures::stream::StreamExt;
15use lance::dataset::optimize::{compact_files, CompactionOptions};
16use lance::dataset::{builder::DatasetBuilder, WriteParams};
17use lance::session::Session;
18use lance::{dataset::scanner::Scanner, Dataset};
19use lance_core::{box_error, Error, Result};
20use lance_index::optimize::OptimizeOptions;
21use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
22use lance_index::traits::DatasetIndexExt;
23use lance_index::IndexType;
24use lance_io::object_store::{ObjectStore, ObjectStoreParams};
25use lance_namespace::models::{
26    CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
27    CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DeregisterTableRequest,
28    DeregisterTableResponse, DescribeNamespaceRequest, DescribeNamespaceResponse,
29    DescribeTableRequest, DescribeTableResponse, DropNamespaceRequest, DropNamespaceResponse,
30    DropTableRequest, DropTableResponse, ListNamespacesRequest, ListNamespacesResponse,
31    ListTablesRequest, ListTablesResponse, NamespaceExistsRequest, RegisterTableRequest,
32    RegisterTableResponse, TableExistsRequest,
33};
34use lance_namespace::schema::arrow_schema_to_json;
35use lance_namespace::LanceNamespace;
36use object_store::path::Path;
37use snafu::location;
38use std::io::Cursor;
39use std::{
40    collections::HashMap,
41    hash::{DefaultHasher, Hash, Hasher},
42    ops::{Deref, DerefMut},
43    sync::Arc,
44};
45use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
46
47const MANIFEST_TABLE_NAME: &str = "__manifest";
48const DELIMITER: &str = "$";
49
50// Index names for the __manifest table
51/// BTREE index on the object_id column for fast lookups
52const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
53/// Bitmap index on the object_type column for filtering by type
54const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
55/// LabelList index on the base_objects column for view dependencies
56const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
57
58/// Object types that can be stored in the manifest
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum ObjectType {
61    Namespace,
62    Table,
63}
64
65impl ObjectType {
66    pub fn as_str(&self) -> &str {
67        match self {
68            Self::Namespace => "namespace",
69            Self::Table => "table",
70        }
71    }
72
73    pub fn parse(s: &str) -> Result<Self> {
74        match s {
75            "namespace" => Ok(Self::Namespace),
76            "table" => Ok(Self::Table),
77            _ => Err(Error::io(
78                format!("Invalid object type: {}", s),
79                location!(),
80            )),
81        }
82    }
83}
84
85/// Information about a table stored in the manifest
86#[derive(Debug, Clone)]
87pub struct TableInfo {
88    pub namespace: Vec<String>,
89    pub name: String,
90    pub location: String,
91}
92
93/// Information about a namespace stored in the manifest
94#[derive(Debug, Clone)]
95pub struct NamespaceInfo {
96    pub namespace: Vec<String>,
97    pub name: String,
98    pub metadata: Option<HashMap<String, String>>,
99}
100
101/// A wrapper around a Dataset that provides concurrent access.
102///
103/// This can be cloned cheaply. It supports concurrent reads or exclusive writes.
104/// The manifest dataset is always kept strongly consistent by reloading on each read.
105#[derive(Debug, Clone)]
106pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);
107
108impl DatasetConsistencyWrapper {
109    /// Create a new wrapper with the given dataset.
110    pub fn new(dataset: Dataset) -> Self {
111        Self(Arc::new(RwLock::new(dataset)))
112    }
113
114    /// Get an immutable reference to the dataset.
115    /// Always reloads to ensure strong consistency.
116    pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
117        self.reload().await?;
118        Ok(DatasetReadGuard {
119            guard: self.0.read().await,
120        })
121    }
122
123    /// Get a mutable reference to the dataset.
124    /// Always reloads to ensure strong consistency.
125    pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
126        self.reload().await?;
127        Ok(DatasetWriteGuard {
128            guard: self.0.write().await,
129        })
130    }
131
132    /// Provide a known latest version of the dataset.
133    ///
134    /// This is usually done after some write operation, which inherently will
135    /// have the latest version.
136    pub async fn set_latest(&self, dataset: Dataset) {
137        let mut write_guard = self.0.write().await;
138        if dataset.manifest().version > write_guard.manifest().version {
139            *write_guard = dataset;
140        }
141    }
142
143    /// Reload the dataset to the latest version.
144    async fn reload(&self) -> Result<()> {
145        // First check if we need to reload (with read lock)
146        let read_guard = self.0.read().await;
147        let dataset_uri = read_guard.uri().to_string();
148        let current_version = read_guard.version().version;
149        log::debug!(
150            "Reload starting for uri={}, current_version={}",
151            dataset_uri,
152            current_version
153        );
154        let latest_version = read_guard
155            .latest_version_id()
156            .await
157            .map_err(|e| Error::IO {
158                source: box_error(std::io::Error::other(format!(
159                    "Failed to get latest version: {}",
160                    e
161                ))),
162                location: location!(),
163            })?;
164        log::debug!(
165            "Reload got latest_version={} for uri={}, current_version={}",
166            latest_version,
167            dataset_uri,
168            current_version
169        );
170        drop(read_guard);
171
172        // If already up-to-date, return early
173        if latest_version == current_version {
174            log::debug!("Already up-to-date for uri={}", dataset_uri);
175            return Ok(());
176        }
177
178        // Need to reload, acquire write lock
179        let mut write_guard = self.0.write().await;
180
181        // Double-check after acquiring write lock (someone else might have reloaded)
182        let latest_version = write_guard
183            .latest_version_id()
184            .await
185            .map_err(|e| Error::IO {
186                source: box_error(std::io::Error::other(format!(
187                    "Failed to get latest version: {}",
188                    e
189                ))),
190                location: location!(),
191            })?;
192
193        if latest_version != write_guard.version().version {
194            write_guard.checkout_latest().await.map_err(|e| Error::IO {
195                source: box_error(std::io::Error::other(format!(
196                    "Failed to checkout latest: {}",
197                    e
198                ))),
199                location: location!(),
200            })?;
201        }
202
203        Ok(())
204    }
205}
206
207pub struct DatasetReadGuard<'a> {
208    guard: RwLockReadGuard<'a, Dataset>,
209}
210
211impl Deref for DatasetReadGuard<'_> {
212    type Target = Dataset;
213
214    fn deref(&self) -> &Self::Target {
215        &self.guard
216    }
217}
218
219pub struct DatasetWriteGuard<'a> {
220    guard: RwLockWriteGuard<'a, Dataset>,
221}
222
223impl Deref for DatasetWriteGuard<'_> {
224    type Target = Dataset;
225
226    fn deref(&self) -> &Self::Target {
227        &self.guard
228    }
229}
230
231impl DerefMut for DatasetWriteGuard<'_> {
232    fn deref_mut(&mut self) -> &mut Self::Target {
233        &mut self.guard
234    }
235}
236
237/// Manifest-based namespace implementation
238///
239/// Uses a special `__manifest` Lance table to track tables and nested namespaces.
240#[derive(Debug)]
241pub struct ManifestNamespace {
242    root: String,
243    storage_options: Option<HashMap<String, String>>,
244    #[allow(dead_code)]
245    session: Option<Arc<Session>>,
246    #[allow(dead_code)]
247    object_store: Arc<ObjectStore>,
248    #[allow(dead_code)]
249    base_path: Path,
250    manifest_dataset: DatasetConsistencyWrapper,
251    /// Whether directory listing is enabled in dual mode
252    /// If true, root namespace tables use {table_name}.lance naming
253    /// If false, they use namespace-prefixed names
254    dir_listing_enabled: bool,
255    /// Whether to perform inline optimization (compaction and indexing) on the __manifest table
256    /// after every write. Defaults to true.
257    inline_optimization_enabled: bool,
258}
259
260impl ManifestNamespace {
261    /// Create a new ManifestNamespace from an existing DirectoryNamespace
262    pub async fn from_directory(
263        root: String,
264        storage_options: Option<HashMap<String, String>>,
265        session: Option<Arc<Session>>,
266        object_store: Arc<ObjectStore>,
267        base_path: Path,
268        dir_listing_enabled: bool,
269        inline_optimization_enabled: bool,
270    ) -> Result<Self> {
271        let manifest_dataset =
272            Self::create_or_get_manifest(&root, &storage_options, session.clone()).await?;
273
274        Ok(Self {
275            root,
276            storage_options,
277            session,
278            object_store,
279            base_path,
280            manifest_dataset,
281            dir_listing_enabled,
282            inline_optimization_enabled,
283        })
284    }
285
286    /// Build object ID from namespace path and name
287    pub fn build_object_id(namespace: &[String], name: &str) -> String {
288        if namespace.is_empty() {
289            name.to_string()
290        } else {
291            let mut id = namespace.join(DELIMITER);
292            id.push_str(DELIMITER);
293            id.push_str(name);
294            id
295        }
296    }
297
298    /// Parse object ID into namespace path and name
299    pub fn parse_object_id(object_id: &str) -> (Vec<String>, String) {
300        let parts: Vec<&str> = object_id.split(DELIMITER).collect();
301        if parts.len() == 1 {
302            (Vec::new(), parts[0].to_string())
303        } else {
304            let namespace = parts[..parts.len() - 1]
305                .iter()
306                .map(|s| s.to_string())
307                .collect();
308            let name = parts[parts.len() - 1].to_string();
309            (namespace, name)
310        }
311    }
312
313    /// Split an object ID (table_id as vec of strings) into namespace and table name
314    fn split_object_id(table_id: &[String]) -> (Vec<String>, String) {
315        if table_id.len() == 1 {
316            (vec![], table_id[0].clone())
317        } else {
318            (
319                table_id[..table_id.len() - 1].to_vec(),
320                table_id[table_id.len() - 1].clone(),
321            )
322        }
323    }
324
325    /// Convert a table ID (vec of strings) to an object_id string
326    fn str_object_id(table_id: &[String]) -> String {
327        table_id.join(DELIMITER)
328    }
329
330    /// Generate a new directory name in format: <hash>_<object_id>
331    /// The hash is used to (1) optimize object store throughput,
332    /// (2) have high enough entropy in a short period of time to prevent issues like
333    /// failed table creation, delete and create new table of the same name, etc.
334    /// The object_id is added after the hash to ensure
335    /// dir name uniqueness and make debugging easier.
336    fn generate_dir_name(object_id: &str) -> String {
337        // Generate a random number for uniqueness
338        let random_num: u64 = rand::random();
339
340        // Create hash from random number + object_id
341        let mut hasher = DefaultHasher::new();
342        random_num.hash(&mut hasher);
343        object_id.hash(&mut hasher);
344        let hash = hasher.finish();
345
346        // Format as lowercase hex (8 characters - sufficient entropy for uniqueness)
347        format!("{:08x}_{}", (hash & 0xFFFFFFFF) as u32, object_id)
348    }
349
350    /// Construct a full URI from root and relative location
351    pub(crate) fn construct_full_uri(root: &str, relative_location: &str) -> Result<String> {
352        let mut base_url = lance_io::object_store::uri_to_url(root)?;
353
354        // Ensure the base URL has a trailing slash so that URL.join() appends
355        // rather than replaces the last path segment.
356        // Without this fix, "s3://bucket/path/subdir".join("table.lance")
357        // would incorrectly produce "s3://bucket/path/table.lance" (missing subdir).
358        if !base_url.path().ends_with('/') {
359            base_url.set_path(&format!("{}/", base_url.path()));
360        }
361
362        let full_url = base_url
363            .join(relative_location)
364            .map_err(|e| Error::InvalidInput {
365                source: format!(
366                    "Failed to join URI '{}' with '{}': {:?}",
367                    root, relative_location, e
368                )
369                .into(),
370                location: location!(),
371            })?;
372
373        Ok(full_url.to_string())
374    }
375
376    /// Perform inline optimization on the __manifest table.
377    ///
378    /// This method:
379    /// 1. Creates three indexes on the manifest table:
380    ///    - BTREE index on object_id for fast lookups
381    ///    - Bitmap index on object_type for filtering by type
382    ///    - LabelList index on base_objects for view dependencies
383    /// 2. Runs file compaction to merge small files
384    /// 3. Optimizes existing indices
385    ///
386    /// This is called automatically after writes when inline_optimization_enabled is true.
387    async fn run_inline_optimization(&self) -> Result<()> {
388        if !self.inline_optimization_enabled {
389            return Ok(());
390        }
391
392        // Get a mutable reference to the dataset to perform optimization
393        let mut dataset_guard = self.manifest_dataset.get_mut().await?;
394        let dataset: &mut Dataset = &mut dataset_guard;
395
396        // Step 1: Create indexes if they don't already exist
397        let indices = dataset.load_indices().await?;
398
399        // Check which indexes already exist
400        let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
401        let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
402        let has_base_objects_index = indices
403            .iter()
404            .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);
405
406        // Create BTREE index on object_id
407        if !has_object_id_index {
408            log::debug!(
409                "Creating BTREE index '{}' on object_id for __manifest table",
410                OBJECT_ID_INDEX_NAME
411            );
412            let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
413            if let Err(e) = dataset
414                .create_index(
415                    &["object_id"],
416                    IndexType::BTree,
417                    Some(OBJECT_ID_INDEX_NAME.to_string()),
418                    &params,
419                    true,
420                )
421                .await
422            {
423                log::warn!("Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.", e);
424            } else {
425                log::info!(
426                    "Created BTREE index '{}' on object_id for __manifest table",
427                    OBJECT_ID_INDEX_NAME
428                );
429            }
430        }
431
432        // Create Bitmap index on object_type
433        if !has_object_type_index {
434            log::debug!(
435                "Creating Bitmap index '{}' on object_type for __manifest table",
436                OBJECT_TYPE_INDEX_NAME
437            );
438            let params = ScalarIndexParams::default();
439            if let Err(e) = dataset
440                .create_index(
441                    &["object_type"],
442                    IndexType::Bitmap,
443                    Some(OBJECT_TYPE_INDEX_NAME.to_string()),
444                    &params,
445                    true,
446                )
447                .await
448            {
449                log::warn!("Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.", e);
450            } else {
451                log::info!(
452                    "Created Bitmap index '{}' on object_type for __manifest table",
453                    OBJECT_TYPE_INDEX_NAME
454                );
455            }
456        }
457
458        // Create LabelList index on base_objects
459        if !has_base_objects_index {
460            log::debug!(
461                "Creating LabelList index '{}' on base_objects for __manifest table",
462                BASE_OBJECTS_INDEX_NAME
463            );
464            let params = ScalarIndexParams::default();
465            if let Err(e) = dataset
466                .create_index(
467                    &["base_objects"],
468                    IndexType::LabelList,
469                    Some(BASE_OBJECTS_INDEX_NAME.to_string()),
470                    &params,
471                    true,
472                )
473                .await
474            {
475                log::warn!("Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.", e);
476            } else {
477                log::info!(
478                    "Created LabelList index '{}' on base_objects for __manifest table",
479                    BASE_OBJECTS_INDEX_NAME
480                );
481            }
482        }
483
484        // Step 2: Run file compaction
485        log::debug!("Running file compaction on __manifest table");
486        match compact_files(dataset, CompactionOptions::default(), None).await {
487            Ok(compaction_metrics) => {
488                if compaction_metrics.fragments_removed > 0 {
489                    log::info!(
490                        "Compacted __manifest table: removed {} fragments, added {} fragments",
491                        compaction_metrics.fragments_removed,
492                        compaction_metrics.fragments_added
493                    );
494                }
495            }
496            Err(e) => {
497                log::warn!("Failed to compact files for __manifest table: {:?}. Continuing with optimization.", e);
498            }
499        }
500
501        // Step 3: Optimize indices
502        log::debug!("Optimizing indices on __manifest table");
503        match dataset.optimize_indices(&OptimizeOptions::default()).await {
504            Ok(_) => {
505                log::info!("Successfully optimized indices on __manifest table");
506            }
507            Err(e) => {
508                log::warn!(
509                    "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.",
510                    e
511                );
512            }
513        }
514
515        Ok(())
516    }
517
518    /// Get the manifest schema
519    fn manifest_schema() -> Arc<ArrowSchema> {
520        Arc::new(ArrowSchema::new(vec![
521            Field::new("object_id", DataType::Utf8, false),
522            Field::new("object_type", DataType::Utf8, false),
523            Field::new("location", DataType::Utf8, true),
524            Field::new("metadata", DataType::Utf8, true),
525            Field::new(
526                "base_objects",
527                DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))),
528                true,
529            ),
530        ]))
531    }
532
533    /// Get a scanner for the manifest dataset
534    async fn manifest_scanner(&self) -> Result<Scanner> {
535        let dataset_guard = self.manifest_dataset.get().await?;
536        Ok(dataset_guard.scan())
537    }
538
539    /// Helper to execute a scanner and collect results into a Vec
540    async fn execute_scanner(scanner: Scanner) -> Result<Vec<RecordBatch>> {
541        let mut stream = scanner.try_into_stream().await.map_err(|e| Error::IO {
542            source: box_error(std::io::Error::other(format!(
543                "Failed to create stream: {}",
544                e
545            ))),
546            location: location!(),
547        })?;
548
549        let mut batches = Vec::new();
550        while let Some(batch) = stream.next().await {
551            batches.push(batch.map_err(|e| Error::IO {
552                source: box_error(std::io::Error::other(format!(
553                    "Failed to read batch: {}",
554                    e
555                ))),
556                location: location!(),
557            })?);
558        }
559
560        Ok(batches)
561    }
562
563    /// Helper to get a string column from a record batch
564    fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> {
565        let column = batch
566            .column_by_name(column_name)
567            .ok_or_else(|| Error::io(format!("Column '{}' not found", column_name), location!()))?;
568        column
569            .as_any()
570            .downcast_ref::<StringArray>()
571            .ok_or_else(|| {
572                Error::io(
573                    format!("Column '{}' is not a string array", column_name),
574                    location!(),
575                )
576            })
577    }
578
579    /// Check if the manifest contains an object with the given ID
580    async fn manifest_contains_object(&self, object_id: &str) -> Result<bool> {
581        let filter = format!("object_id = '{}'", object_id);
582
583        let dataset_guard = self.manifest_dataset.get().await?;
584        let mut scanner = dataset_guard.scan();
585
586        scanner.filter(&filter).map_err(|e| Error::IO {
587            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
588            location: location!(),
589        })?;
590
591        // Project no columns and enable row IDs for count_rows to work
592        scanner.project::<&str>(&[]).map_err(|e| Error::IO {
593            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
594            location: location!(),
595        })?;
596
597        scanner.with_row_id();
598
599        let count = scanner.count_rows().await.map_err(|e| Error::IO {
600            source: box_error(std::io::Error::other(format!(
601                "Failed to count rows: {}",
602                e
603            ))),
604            location: location!(),
605        })?;
606
607        Ok(count > 0)
608    }
609
610    /// Query the manifest for a table with the given object ID
611    async fn query_manifest_for_table(&self, object_id: &str) -> Result<Option<TableInfo>> {
612        let filter = format!("object_id = '{}' AND object_type = 'table'", object_id);
613        let mut scanner = self.manifest_scanner().await?;
614        scanner.filter(&filter).map_err(|e| Error::IO {
615            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
616            location: location!(),
617        })?;
618        scanner
619            .project(&["object_id", "location"])
620            .map_err(|e| Error::IO {
621                source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
622                location: location!(),
623            })?;
624        let batches = Self::execute_scanner(scanner).await?;
625
626        let mut found_result: Option<TableInfo> = None;
627        let mut total_rows = 0;
628
629        for batch in batches {
630            if batch.num_rows() == 0 {
631                continue;
632            }
633
634            total_rows += batch.num_rows();
635            if total_rows > 1 {
636                return Err(Error::io(
637                    format!(
638                        "Expected exactly 1 table with id '{}', found {}",
639                        object_id, total_rows
640                    ),
641                    location!(),
642                ));
643            }
644
645            let object_id_array = Self::get_string_column(&batch, "object_id")?;
646            let location_array = Self::get_string_column(&batch, "location")?;
647            let location = location_array.value(0).to_string();
648            let (namespace, name) = Self::parse_object_id(object_id_array.value(0));
649            found_result = Some(TableInfo {
650                namespace,
651                name,
652                location,
653            });
654        }
655
656        Ok(found_result)
657    }
658
659    /// List all table locations in the manifest (for root namespace only)
660    /// Returns a set of table locations (e.g., "table_name.lance")
661    pub async fn list_manifest_table_locations(&self) -> Result<std::collections::HashSet<String>> {
662        let filter = "object_type = 'table' AND NOT contains(object_id, '$')";
663        let mut scanner = self.manifest_scanner().await?;
664        scanner.filter(filter).map_err(|e| Error::IO {
665            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
666            location: location!(),
667        })?;
668        scanner.project(&["location"]).map_err(|e| Error::IO {
669            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
670            location: location!(),
671        })?;
672
673        let batches = Self::execute_scanner(scanner).await?;
674        let mut locations = std::collections::HashSet::new();
675
676        for batch in batches {
677            if batch.num_rows() == 0 {
678                continue;
679            }
680            let location_array = Self::get_string_column(&batch, "location")?;
681            for i in 0..location_array.len() {
682                locations.insert(location_array.value(i).to_string());
683            }
684        }
685
686        Ok(locations)
687    }
688
689    /// Insert an entry into the manifest table
690    async fn insert_into_manifest(
691        &self,
692        object_id: String,
693        object_type: ObjectType,
694        location: Option<String>,
695    ) -> Result<()> {
696        self.insert_into_manifest_with_metadata(object_id, object_type, location, None, None)
697            .await
698    }
699
700    /// Insert an entry into the manifest table with metadata and base_objects
701    async fn insert_into_manifest_with_metadata(
702        &self,
703        object_id: String,
704        object_type: ObjectType,
705        location: Option<String>,
706        metadata: Option<String>,
707        base_objects: Option<Vec<String>>,
708    ) -> Result<()> {
709        use arrow::array::builder::{ListBuilder, StringBuilder};
710
711        let schema = Self::manifest_schema();
712
713        // Create base_objects array from the provided list
714        let string_builder = StringBuilder::new();
715        let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
716            "object_id",
717            DataType::Utf8,
718            true,
719        )));
720
721        match base_objects {
722            Some(objects) => {
723                for obj in objects {
724                    list_builder.values().append_value(obj);
725                }
726                list_builder.append(true);
727            }
728            None => {
729                list_builder.append_null();
730            }
731        }
732
733        let base_objects_array = list_builder.finish();
734
735        // Create arrays with optional values
736        let location_array = match location {
737            Some(loc) => Arc::new(StringArray::from(vec![Some(loc)])),
738            None => Arc::new(StringArray::from(vec![None::<String>])),
739        };
740
741        let metadata_array = match metadata {
742            Some(meta) => Arc::new(StringArray::from(vec![Some(meta)])),
743            None => Arc::new(StringArray::from(vec![None::<String>])),
744        };
745
746        let batch = RecordBatch::try_new(
747            schema.clone(),
748            vec![
749                Arc::new(StringArray::from(vec![object_id.as_str()])),
750                Arc::new(StringArray::from(vec![object_type.as_str()])),
751                location_array,
752                metadata_array,
753                Arc::new(base_objects_array),
754            ],
755        )
756        .map_err(|e| {
757            Error::io(
758                format!("Failed to create manifest entry: {}", e),
759                location!(),
760            )
761        })?;
762
763        let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
764
765        // Use MergeInsert to ensure uniqueness on object_id
766        let dataset_guard = self.manifest_dataset.get().await?;
767        let dataset_arc = Arc::new(dataset_guard.clone());
768        drop(dataset_guard); // Drop read guard before merge insert
769
770        let mut merge_builder =
771            lance::dataset::MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()])
772                .map_err(|e| Error::IO {
773                    source: box_error(std::io::Error::other(format!(
774                        "Failed to create merge builder: {}",
775                        e
776                    ))),
777                    location: location!(),
778                })?;
779
780        merge_builder.when_matched(lance::dataset::WhenMatched::Fail);
781        merge_builder.when_not_matched(lance::dataset::WhenNotMatched::InsertAll);
782
783        let (new_dataset_arc, _merge_stats) = merge_builder
784            .try_build()
785            .map_err(|e| Error::IO {
786                source: box_error(std::io::Error::other(format!(
787                    "Failed to build merge: {}",
788                    e
789                ))),
790                location: location!(),
791            })?
792            .execute_reader(Box::new(reader))
793            .await
794            .map_err(|e| {
795                // Check if this is a "matched row" error from WhenMatched::Fail
796                let error_msg = e.to_string();
797                if error_msg.contains("matched")
798                    || error_msg.contains("duplicate")
799                    || error_msg.contains("already exists")
800                {
801                    Error::io(
802                        format!("Object with id '{}' already exists in manifest", object_id),
803                        location!(),
804                    )
805                } else {
806                    Error::IO {
807                        source: box_error(std::io::Error::other(format!(
808                            "Failed to execute merge: {}",
809                            e
810                        ))),
811                        location: location!(),
812                    }
813                }
814            })?;
815
816        let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
817        self.manifest_dataset.set_latest(new_dataset).await;
818
819        // Run inline optimization after write
820        if let Err(e) = self.run_inline_optimization().await {
821            log::warn!(
822                "Unexpected failure when running inline optimization: {:?}",
823                e
824            );
825        }
826
827        Ok(())
828    }
829
830    /// Delete an entry from the manifest table
831    pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> {
832        {
833            let predicate = format!("object_id = '{}'", object_id);
834            let mut dataset_guard = self.manifest_dataset.get_mut().await?;
835            dataset_guard
836                .delete(&predicate)
837                .await
838                .map_err(|e| Error::IO {
839                    source: box_error(std::io::Error::other(format!("Failed to delete: {}", e))),
840                    location: location!(),
841                })?;
842        } // Drop the guard here
843
844        self.manifest_dataset.reload().await?;
845
846        // Run inline optimization after delete
847        if let Err(e) = self.run_inline_optimization().await {
848            log::warn!(
849                "Unexpected failure when running inline optimization: {:?}",
850                e
851            );
852        }
853
854        Ok(())
855    }
856
857    /// Register a table in the manifest without creating the physical table (internal helper for migration)
858    pub async fn register_table(&self, name: &str, location: String) -> Result<()> {
859        let object_id = Self::build_object_id(&[], name);
860        if self.manifest_contains_object(&object_id).await? {
861            return Err(Error::io(
862                format!("Table '{}' already exists", name),
863                location!(),
864            ));
865        }
866
867        self.insert_into_manifest(object_id, ObjectType::Table, Some(location))
868            .await
869    }
870
871    /// Validate that all levels of a namespace path exist
872    async fn validate_namespace_levels_exist(&self, namespace_path: &[String]) -> Result<()> {
873        for i in 1..=namespace_path.len() {
874            let partial_path = &namespace_path[..i];
875            let object_id = partial_path.join(DELIMITER);
876            if !self.manifest_contains_object(&object_id).await? {
877                return Err(Error::Namespace {
878                    source: format!("Parent namespace '{}' does not exist", object_id).into(),
879                    location: location!(),
880                });
881            }
882        }
883        Ok(())
884    }
885
886    /// Query the manifest for a namespace with the given object ID
887    async fn query_manifest_for_namespace(&self, object_id: &str) -> Result<Option<NamespaceInfo>> {
888        let filter = format!("object_id = '{}' AND object_type = 'namespace'", object_id);
889        let mut scanner = self.manifest_scanner().await?;
890        scanner.filter(&filter).map_err(|e| Error::IO {
891            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
892            location: location!(),
893        })?;
894        scanner
895            .project(&["object_id", "metadata"])
896            .map_err(|e| Error::IO {
897                source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
898                location: location!(),
899            })?;
900        let batches = Self::execute_scanner(scanner).await?;
901
902        let mut found_result: Option<NamespaceInfo> = None;
903        let mut total_rows = 0;
904
905        for batch in batches {
906            if batch.num_rows() == 0 {
907                continue;
908            }
909
910            total_rows += batch.num_rows();
911            if total_rows > 1 {
912                return Err(Error::io(
913                    format!(
914                        "Expected exactly 1 namespace with id '{}', found {}",
915                        object_id, total_rows
916                    ),
917                    location!(),
918                ));
919            }
920
921            let object_id_array = Self::get_string_column(&batch, "object_id")?;
922            let metadata_array = Self::get_string_column(&batch, "metadata")?;
923
924            let object_id_str = object_id_array.value(0);
925            let metadata = if !metadata_array.is_null(0) {
926                let metadata_str = metadata_array.value(0);
927                match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
928                    Ok(map) => Some(map),
929                    Err(e) => {
930                        return Err(Error::io(
931                            format!(
932                                "Failed to deserialize metadata for namespace '{}': {}",
933                                object_id, e
934                            ),
935                            location!(),
936                        ));
937                    }
938                }
939            } else {
940                None
941            };
942
943            let (namespace, name) = Self::parse_object_id(object_id_str);
944            found_result = Some(NamespaceInfo {
945                namespace,
946                name,
947                metadata,
948            });
949        }
950
951        Ok(found_result)
952    }
953
954    /// Create or get the manifest dataset
955    async fn create_or_get_manifest(
956        root: &str,
957        storage_options: &Option<HashMap<String, String>>,
958        session: Option<Arc<Session>>,
959    ) -> Result<DatasetConsistencyWrapper> {
960        let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME);
961        log::debug!("Attempting to load manifest from {}", manifest_path);
962        let mut builder = DatasetBuilder::from_uri(&manifest_path);
963
964        if let Some(sess) = session.clone() {
965            builder = builder.with_session(sess);
966        }
967
968        if let Some(opts) = storage_options {
969            builder = builder.with_storage_options(opts.clone());
970        }
971
972        let dataset_result = builder.load().await;
973        if let Ok(dataset) = dataset_result {
974            Ok(DatasetConsistencyWrapper::new(dataset))
975        } else {
976            log::info!("Creating new manifest table at {}", manifest_path);
977            let schema = Self::manifest_schema();
978            let empty_batch = RecordBatch::new_empty(schema.clone());
979            let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone());
980
981            let write_params = WriteParams {
982                session,
983                store_params: storage_options.as_ref().map(|opts| ObjectStoreParams {
984                    storage_options: Some(opts.clone()),
985                    ..Default::default()
986                }),
987                ..Default::default()
988            };
989
990            let dataset = Dataset::write(Box::new(reader), &manifest_path, Some(write_params))
991                .await
992                .map_err(|e| Error::IO {
993                    source: box_error(std::io::Error::other(format!(
994                        "Failed to create manifest dataset: {}",
995                        e
996                    ))),
997                    location: location!(),
998                })?;
999
1000            log::info!(
1001                "Successfully created manifest table at {}, version={}, uri={}",
1002                manifest_path,
1003                dataset.version().version,
1004                dataset.uri()
1005            );
1006            Ok(DatasetConsistencyWrapper::new(dataset))
1007        }
1008    }
1009}
1010
1011#[async_trait]
1012impl LanceNamespace for ManifestNamespace {
1013    fn namespace_id(&self) -> String {
1014        self.root.clone()
1015    }
1016
1017    async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1018        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1019            source: "Namespace ID is required".into(),
1020            location: location!(),
1021        })?;
1022
1023        // Build filter to find tables in this namespace
1024        let filter = if namespace_id.is_empty() {
1025            // Root namespace: find tables without a namespace prefix
1026            "object_type = 'table' AND NOT contains(object_id, '$')".to_string()
1027        } else {
1028            // Namespaced: find tables that start with namespace$ but have no additional $
1029            let prefix = namespace_id.join(DELIMITER);
1030            format!(
1031                "object_type = 'table' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1032                prefix, DELIMITER, prefix.len() + 2
1033            )
1034        };
1035
1036        let mut scanner = self.manifest_scanner().await?;
1037        scanner.filter(&filter).map_err(|e| Error::IO {
1038            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1039            location: location!(),
1040        })?;
1041        scanner.project(&["object_id"]).map_err(|e| Error::IO {
1042            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1043            location: location!(),
1044        })?;
1045
1046        let batches = Self::execute_scanner(scanner).await?;
1047
1048        let mut tables = Vec::new();
1049        for batch in batches {
1050            if batch.num_rows() == 0 {
1051                continue;
1052            }
1053
1054            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1055            for i in 0..batch.num_rows() {
1056                let object_id = object_id_array.value(i);
1057                let (_namespace, name) = Self::parse_object_id(object_id);
1058                tables.push(name);
1059            }
1060        }
1061
1062        Ok(ListTablesResponse::new(tables))
1063    }
1064
1065    async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1066        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1067            source: "Table ID is required".into(),
1068            location: location!(),
1069        })?;
1070
1071        if table_id.is_empty() {
1072            return Err(Error::InvalidInput {
1073                source: "Table ID cannot be empty".into(),
1074                location: location!(),
1075            });
1076        }
1077
1078        let object_id = Self::str_object_id(table_id);
1079        let table_info = self.query_manifest_for_table(&object_id).await?;
1080
1081        match table_info {
1082            Some(info) => {
1083                // Construct full URI from relative location
1084                let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1085
1086                // Try to open the dataset to get version and schema
1087                match Dataset::open(&table_uri).await {
1088                    Ok(mut dataset) => {
1089                        // If a specific version is requested, checkout that version
1090                        if let Some(requested_version) = request.version {
1091                            dataset = dataset.checkout_version(requested_version as u64).await?;
1092                        }
1093
1094                        let version = dataset.version().version;
1095                        let lance_schema = dataset.schema();
1096                        let arrow_schema: arrow_schema::Schema = lance_schema.into();
1097                        let json_schema = arrow_schema_to_json(&arrow_schema)?;
1098
1099                        Ok(DescribeTableResponse {
1100                            version: Some(version as i64),
1101                            location: Some(table_uri),
1102                            schema: Some(Box::new(json_schema)),
1103                            properties: None,
1104                            storage_options: self.storage_options.clone(),
1105                        })
1106                    }
1107                    Err(_) => {
1108                        // If dataset can't be opened (e.g., empty table), return minimal info
1109                        Ok(DescribeTableResponse {
1110                            version: None,
1111                            location: Some(table_uri),
1112                            schema: None,
1113                            properties: None,
1114                            storage_options: self.storage_options.clone(),
1115                        })
1116                    }
1117                }
1118            }
1119            None => Err(Error::Namespace {
1120                source: format!("Table '{}' not found", object_id).into(),
1121                location: location!(),
1122            }),
1123        }
1124    }
1125
1126    async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1127        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1128            source: "Table ID is required".into(),
1129            location: location!(),
1130        })?;
1131
1132        if table_id.is_empty() {
1133            return Err(Error::InvalidInput {
1134                source: "Table ID cannot be empty".into(),
1135                location: location!(),
1136            });
1137        }
1138
1139        let (namespace, table_name) = Self::split_object_id(table_id);
1140        let object_id = Self::build_object_id(&namespace, &table_name);
1141        let exists = self.manifest_contains_object(&object_id).await?;
1142        if exists {
1143            Ok(())
1144        } else {
1145            Err(Error::Namespace {
1146                source: format!("Table '{}' not found", table_name).into(),
1147                location: location!(),
1148            })
1149        }
1150    }
1151
1152    async fn create_table(
1153        &self,
1154        request: CreateTableRequest,
1155        data: Bytes,
1156    ) -> Result<CreateTableResponse> {
1157        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1158            source: "Table ID is required".into(),
1159            location: location!(),
1160        })?;
1161
1162        if table_id.is_empty() {
1163            return Err(Error::InvalidInput {
1164                source: "Table ID cannot be empty".into(),
1165                location: location!(),
1166            });
1167        }
1168
1169        let (namespace, table_name) = Self::split_object_id(table_id);
1170        let object_id = Self::build_object_id(&namespace, &table_name);
1171
1172        // Check if table already exists in manifest
1173        if self.manifest_contains_object(&object_id).await? {
1174            return Err(Error::io(
1175                format!("Table '{}' already exists", table_name),
1176                location!(),
1177            ));
1178        }
1179
1180        // Create the physical table location with hash-based naming
1181        // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance
1182        // Otherwise, use hash-based naming: {hash}_{object_id}
1183        let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1184            // Root table with directory listing enabled: use {table_name}.lance
1185            format!("{}.lance", table_name)
1186        } else {
1187            // Child namespace table or dir listing disabled: use hash-based naming
1188            Self::generate_dir_name(&object_id)
1189        };
1190        let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1191
1192        // Validate that request_data is provided
1193        if data.is_empty() {
1194            return Err(Error::Namespace {
1195                source: "Request data (Arrow IPC stream) is required for create_table".into(),
1196                location: location!(),
1197            });
1198        }
1199
1200        // Validate location if provided
1201        if let Some(location) = &request.location {
1202            let location = location.trim_end_matches('/');
1203            if location != table_uri {
1204                return Err(Error::Namespace {
1205                    source: format!(
1206                        "Cannot create table {} at location {}, must be at location {}",
1207                        table_name, location, table_uri
1208                    )
1209                    .into(),
1210                    location: location!(),
1211                });
1212            }
1213        }
1214
1215        // Write the data using Lance Dataset
1216        let cursor = Cursor::new(data.to_vec());
1217        let stream_reader = StreamReader::try_new(cursor, None)
1218            .map_err(|e| Error::io(format!("Failed to read IPC stream: {}", e), location!()))?;
1219
1220        let batches: Vec<RecordBatch> =
1221            stream_reader
1222                .collect::<std::result::Result<Vec<_>, _>>()
1223                .map_err(|e| Error::io(format!("Failed to collect batches: {}", e), location!()))?;
1224
1225        if batches.is_empty() {
1226            return Err(Error::io(
1227                "No data provided for table creation",
1228                location!(),
1229            ));
1230        }
1231
1232        let schema = batches[0].schema();
1233        let batch_results: Vec<std::result::Result<RecordBatch, arrow_schema::ArrowError>> =
1234            batches.into_iter().map(Ok).collect();
1235        let reader = RecordBatchIterator::new(batch_results, schema);
1236
1237        let write_params = WriteParams::default();
1238        let _dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params))
1239            .await
1240            .map_err(|e| Error::IO {
1241                source: box_error(std::io::Error::other(format!(
1242                    "Failed to write dataset: {}",
1243                    e
1244                ))),
1245                location: location!(),
1246            })?;
1247
1248        // Register in manifest (store dir_name, not full URI)
1249        self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1250            .await?;
1251
1252        Ok(CreateTableResponse {
1253            version: Some(1),
1254            location: Some(table_uri),
1255            properties: None,
1256            storage_options: self.storage_options.clone(),
1257        })
1258    }
1259
1260    async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1261        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1262            source: "Table ID is required".into(),
1263            location: location!(),
1264        })?;
1265
1266        if table_id.is_empty() {
1267            return Err(Error::InvalidInput {
1268                source: "Table ID cannot be empty".into(),
1269                location: location!(),
1270            });
1271        }
1272
1273        let (namespace, table_name) = Self::split_object_id(table_id);
1274        let object_id = Self::build_object_id(&namespace, &table_name);
1275
1276        // Query manifest for table location
1277        let table_info = self.query_manifest_for_table(&object_id).await?;
1278
1279        match table_info {
1280            Some(info) => {
1281                // Delete from manifest first
1282                self.delete_from_manifest(&object_id).await?;
1283
1284                // Delete physical data directory using the dir_name from manifest
1285                let table_path = self.base_path.child(info.location.as_str());
1286                let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1287
1288                // Remove the table directory
1289                self.object_store
1290                    .remove_dir_all(table_path)
1291                    .await
1292                    .map_err(|e| Error::Namespace {
1293                        source: format!("Failed to delete table directory: {}", e).into(),
1294                        location: location!(),
1295                    })?;
1296
1297                Ok(DropTableResponse {
1298                    id: request.id.clone(),
1299                    location: Some(table_uri),
1300                    properties: None,
1301                    transaction_id: None,
1302                })
1303            }
1304            None => Err(Error::Namespace {
1305                source: format!("Table '{}' not found", table_name).into(),
1306                location: location!(),
1307            }),
1308        }
1309    }
1310
1311    async fn list_namespaces(
1312        &self,
1313        request: ListNamespacesRequest,
1314    ) -> Result<ListNamespacesResponse> {
1315        let parent_namespace = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1316            source: "Namespace ID is required".into(),
1317            location: location!(),
1318        })?;
1319
1320        // Build filter to find direct child namespaces
1321        let filter = if parent_namespace.is_empty() {
1322            // Root namespace: find all namespaces without a parent
1323            "object_type = 'namespace' AND NOT contains(object_id, '$')".to_string()
1324        } else {
1325            // Non-root: find namespaces that start with parent$ but have no additional $
1326            let prefix = parent_namespace.join(DELIMITER);
1327            format!(
1328                "object_type = 'namespace' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1329                prefix, DELIMITER, prefix.len() + 2
1330            )
1331        };
1332
1333        let mut scanner = self.manifest_scanner().await?;
1334        scanner.filter(&filter).map_err(|e| Error::IO {
1335            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1336            location: location!(),
1337        })?;
1338        scanner.project(&["object_id"]).map_err(|e| Error::IO {
1339            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1340            location: location!(),
1341        })?;
1342
1343        let batches = Self::execute_scanner(scanner).await?;
1344        let mut namespaces = Vec::new();
1345
1346        for batch in batches {
1347            if batch.num_rows() == 0 {
1348                continue;
1349            }
1350
1351            let object_id_array = Self::get_string_column(&batch, "object_id")?;
1352            for i in 0..batch.num_rows() {
1353                let object_id = object_id_array.value(i);
1354                let (_namespace, name) = Self::parse_object_id(object_id);
1355                namespaces.push(name);
1356            }
1357        }
1358
1359        Ok(ListNamespacesResponse::new(namespaces))
1360    }
1361
1362    async fn describe_namespace(
1363        &self,
1364        request: DescribeNamespaceRequest,
1365    ) -> Result<DescribeNamespaceResponse> {
1366        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1367            source: "Namespace ID is required".into(),
1368            location: location!(),
1369        })?;
1370
1371        // Root namespace always exists
1372        if namespace_id.is_empty() {
1373            return Ok(DescribeNamespaceResponse {
1374                properties: Some(HashMap::new()),
1375            });
1376        }
1377
1378        // Check if namespace exists in manifest
1379        let object_id = namespace_id.join(DELIMITER);
1380        let namespace_info = self.query_manifest_for_namespace(&object_id).await?;
1381
1382        match namespace_info {
1383            Some(info) => Ok(DescribeNamespaceResponse {
1384                properties: info.metadata,
1385            }),
1386            None => Err(Error::Namespace {
1387                source: format!("Namespace '{}' not found", object_id).into(),
1388                location: location!(),
1389            }),
1390        }
1391    }
1392
1393    async fn create_namespace(
1394        &self,
1395        request: CreateNamespaceRequest,
1396    ) -> Result<CreateNamespaceResponse> {
1397        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1398            source: "Namespace ID is required".into(),
1399            location: location!(),
1400        })?;
1401
1402        // Root namespace always exists and cannot be created
1403        if namespace_id.is_empty() {
1404            return Err(Error::Namespace {
1405                source: "Root namespace already exists and cannot be created".into(),
1406                location: location!(),
1407            });
1408        }
1409
1410        // Validate parent namespaces exist (but not the namespace being created)
1411        if namespace_id.len() > 1 {
1412            self.validate_namespace_levels_exist(&namespace_id[..namespace_id.len() - 1])
1413                .await?;
1414        }
1415
1416        let object_id = namespace_id.join(DELIMITER);
1417        if self.manifest_contains_object(&object_id).await? {
1418            return Err(Error::Namespace {
1419                source: format!("Namespace '{}' already exists", object_id).into(),
1420                location: location!(),
1421            });
1422        }
1423
1424        // Serialize properties if provided
1425        let metadata = request.properties.as_ref().and_then(|props| {
1426            if props.is_empty() {
1427                None
1428            } else {
1429                Some(serde_json::to_string(props).ok()?)
1430            }
1431        });
1432
1433        self.insert_into_manifest_with_metadata(
1434            object_id,
1435            ObjectType::Namespace,
1436            None,
1437            metadata,
1438            None,
1439        )
1440        .await?;
1441
1442        Ok(CreateNamespaceResponse {
1443            properties: request.properties,
1444        })
1445    }
1446
1447    async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
1448        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1449            source: "Namespace ID is required".into(),
1450            location: location!(),
1451        })?;
1452
1453        // Root namespace always exists and cannot be dropped
1454        if namespace_id.is_empty() {
1455            return Err(Error::Namespace {
1456                source: "Root namespace cannot be dropped".into(),
1457                location: location!(),
1458            });
1459        }
1460
1461        let object_id = namespace_id.join(DELIMITER);
1462
1463        // Check if namespace exists
1464        if !self.manifest_contains_object(&object_id).await? {
1465            return Err(Error::Namespace {
1466                source: format!("Namespace '{}' not found", object_id).into(),
1467                location: location!(),
1468            });
1469        }
1470
1471        // Check for child namespaces
1472        let prefix = format!("{}{}", object_id, DELIMITER);
1473        let filter = format!("starts_with(object_id, '{}')", prefix);
1474        let mut scanner = self.manifest_scanner().await?;
1475        scanner.filter(&filter).map_err(|e| Error::IO {
1476            source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1477            location: location!(),
1478        })?;
1479        scanner.project::<&str>(&[]).map_err(|e| Error::IO {
1480            source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1481            location: location!(),
1482        })?;
1483        scanner.with_row_id();
1484        let count = scanner.count_rows().await.map_err(|e| Error::IO {
1485            source: box_error(std::io::Error::other(format!(
1486                "Failed to count rows: {}",
1487                e
1488            ))),
1489            location: location!(),
1490        })?;
1491
1492        if count > 0 {
1493            return Err(Error::Namespace {
1494                source: format!(
1495                    "Namespace '{}' is not empty (contains {} child objects)",
1496                    object_id, count
1497                )
1498                .into(),
1499                location: location!(),
1500            });
1501        }
1502
1503        self.delete_from_manifest(&object_id).await?;
1504
1505        Ok(DropNamespaceResponse {
1506            properties: None,
1507            transaction_id: None,
1508        })
1509    }
1510
1511    async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
1512        let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1513            source: "Namespace ID is required".into(),
1514            location: location!(),
1515        })?;
1516
1517        // Root namespace always exists
1518        if namespace_id.is_empty() {
1519            return Ok(());
1520        }
1521
1522        let object_id = namespace_id.join(DELIMITER);
1523        if self.manifest_contains_object(&object_id).await? {
1524            Ok(())
1525        } else {
1526            Err(Error::Namespace {
1527                source: format!("Namespace '{}' not found", object_id).into(),
1528                location: location!(),
1529            })
1530        }
1531    }
1532
1533    async fn create_empty_table(
1534        &self,
1535        request: CreateEmptyTableRequest,
1536    ) -> Result<CreateEmptyTableResponse> {
1537        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1538            source: "Table ID is required".into(),
1539            location: location!(),
1540        })?;
1541
1542        if table_id.is_empty() {
1543            return Err(Error::InvalidInput {
1544                source: "Table ID cannot be empty".into(),
1545                location: location!(),
1546            });
1547        }
1548
1549        let (namespace, table_name) = Self::split_object_id(table_id);
1550        let object_id = Self::build_object_id(&namespace, &table_name);
1551
1552        // Check if table already exists in manifest
1553        let existing = self.query_manifest_for_table(&object_id).await?;
1554        if existing.is_some() {
1555            return Err(Error::Namespace {
1556                source: format!("Table '{}' already exists", table_name).into(),
1557                location: location!(),
1558            });
1559        }
1560
1561        // Create table location path with hash-based naming
1562        // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance
1563        // Otherwise, use hash-based naming: {hash}_{object_id}
1564        let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1565            // Root table with directory listing enabled: use {table_name}.lance
1566            format!("{}.lance", table_name)
1567        } else {
1568            // Child namespace table or dir listing disabled: use hash-based naming
1569            Self::generate_dir_name(&object_id)
1570        };
1571        let table_path = self.base_path.child(dir_name.as_str());
1572        let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1573
1574        // Validate location if provided
1575        if let Some(req_location) = &request.location {
1576            let req_location = req_location.trim_end_matches('/');
1577            if req_location != table_uri {
1578                return Err(Error::Namespace {
1579                    source: format!(
1580                        "Cannot create table {} at location {}, must be at location {}",
1581                        table_name, req_location, table_uri
1582                    )
1583                    .into(),
1584                    location: location!(),
1585                });
1586            }
1587        }
1588
1589        // Create the .lance-reserved file to mark the table as existing
1590        let reserved_file_path = table_path.child(".lance-reserved");
1591
1592        self.object_store
1593            .create(&reserved_file_path)
1594            .await
1595            .map_err(|e| Error::Namespace {
1596                source: format!(
1597                    "Failed to create .lance-reserved file for table {}: {}",
1598                    table_name, e
1599                )
1600                .into(),
1601                location: location!(),
1602            })?
1603            .shutdown()
1604            .await
1605            .map_err(|e| Error::Namespace {
1606                source: format!(
1607                    "Failed to finalize .lance-reserved file for table {}: {}",
1608                    table_name, e
1609                )
1610                .into(),
1611                location: location!(),
1612            })?;
1613
1614        // Add entry to manifest marking this as an empty table (store dir_name, not full path)
1615        self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1616            .await?;
1617
1618        log::info!(
1619            "Created empty table '{}' in manifest at {}",
1620            table_name,
1621            table_uri
1622        );
1623
1624        Ok(CreateEmptyTableResponse {
1625            location: Some(table_uri),
1626            properties: None,
1627            storage_options: self.storage_options.clone(),
1628        })
1629    }
1630
1631    async fn register_table(&self, request: RegisterTableRequest) -> Result<RegisterTableResponse> {
1632        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1633            source: "Table ID is required".into(),
1634            location: location!(),
1635        })?;
1636
1637        if table_id.is_empty() {
1638            return Err(Error::InvalidInput {
1639                source: "Table ID cannot be empty".into(),
1640                location: location!(),
1641            });
1642        }
1643
1644        let location = request.location.clone();
1645
1646        // Validate that location is a relative path within the root directory
1647        // We don't allow absolute URIs or paths that escape the root
1648        if location.contains("://") {
1649            return Err(Error::InvalidInput {
1650                source: format!(
1651                    "Absolute URIs are not allowed for register_table. Location must be a relative path within the root directory: {}",
1652                    location
1653                ).into(),
1654                location: location!(),
1655            });
1656        }
1657
1658        if location.starts_with('/') {
1659            return Err(Error::InvalidInput {
1660                source: format!(
1661                    "Absolute paths are not allowed for register_table. Location must be a relative path within the root directory: {}",
1662                    location
1663                ).into(),
1664                location: location!(),
1665            });
1666        }
1667
1668        // Check for path traversal attempts
1669        if location.contains("..") {
1670            return Err(Error::InvalidInput {
1671                source: format!(
1672                    "Path traversal is not allowed. Location must be a relative path within the root directory: {}",
1673                    location
1674                ).into(),
1675                location: location!(),
1676            });
1677        }
1678
1679        let (namespace, table_name) = Self::split_object_id(table_id);
1680        let object_id = Self::build_object_id(&namespace, &table_name);
1681
1682        // Validate that parent namespaces exist (if not root)
1683        if !namespace.is_empty() {
1684            self.validate_namespace_levels_exist(&namespace).await?;
1685        }
1686
1687        // Check if table already exists
1688        if self.manifest_contains_object(&object_id).await? {
1689            return Err(Error::Namespace {
1690                source: format!("Table '{}' already exists", object_id).into(),
1691                location: location!(),
1692            });
1693        }
1694
1695        // Register the table with its location in the manifest
1696        self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone()))
1697            .await?;
1698
1699        Ok(RegisterTableResponse {
1700            location,
1701            properties: None,
1702        })
1703    }
1704
1705    async fn deregister_table(
1706        &self,
1707        request: DeregisterTableRequest,
1708    ) -> Result<DeregisterTableResponse> {
1709        let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1710            source: "Table ID is required".into(),
1711            location: location!(),
1712        })?;
1713
1714        if table_id.is_empty() {
1715            return Err(Error::InvalidInput {
1716                source: "Table ID cannot be empty".into(),
1717                location: location!(),
1718            });
1719        }
1720
1721        let (namespace, table_name) = Self::split_object_id(table_id);
1722        let object_id = Self::build_object_id(&namespace, &table_name);
1723
1724        // Get table info before deleting
1725        let table_info = self.query_manifest_for_table(&object_id).await?;
1726
1727        let table_uri = match table_info {
1728            Some(info) => {
1729                // Delete from manifest only (leave physical data intact)
1730                self.delete_from_manifest(&object_id).await?;
1731                Self::construct_full_uri(&self.root, &info.location)?
1732            }
1733            None => {
1734                return Err(Error::Namespace {
1735                    source: format!("Table '{}' not found", object_id).into(),
1736                    location: location!(),
1737                });
1738            }
1739        };
1740
1741        Ok(DeregisterTableResponse {
1742            id: request.id.clone(),
1743            location: Some(table_uri),
1744            properties: None,
1745        })
1746    }
1747}
1748
1749#[cfg(test)]
1750mod tests {
1751    use crate::{DirectoryNamespaceBuilder, ManifestNamespace};
1752    use bytes::Bytes;
1753    use lance_core::utils::tempfile::TempStdDir;
1754    use lance_namespace::models::{
1755        CreateTableRequest, DescribeTableRequest, DropTableRequest, ListTablesRequest,
1756        TableExistsRequest,
1757    };
1758    use lance_namespace::LanceNamespace;
1759    use rstest::rstest;
1760
1761    fn create_test_ipc_data() -> Vec<u8> {
1762        use arrow::array::{Int32Array, StringArray};
1763        use arrow::datatypes::{DataType, Field, Schema};
1764        use arrow::ipc::writer::StreamWriter;
1765        use arrow::record_batch::RecordBatch;
1766        use std::sync::Arc;
1767
1768        let schema = Arc::new(Schema::new(vec![
1769            Field::new("id", DataType::Int32, false),
1770            Field::new("name", DataType::Utf8, false),
1771        ]));
1772
1773        let batch = RecordBatch::try_new(
1774            schema.clone(),
1775            vec![
1776                Arc::new(Int32Array::from(vec![1, 2, 3])),
1777                Arc::new(StringArray::from(vec!["a", "b", "c"])),
1778            ],
1779        )
1780        .unwrap();
1781
1782        let mut buffer = Vec::new();
1783        {
1784            let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap();
1785            writer.write(&batch).unwrap();
1786            writer.finish().unwrap();
1787        }
1788        buffer
1789    }
1790
1791    #[rstest]
1792    #[case::with_optimization(true)]
1793    #[case::without_optimization(false)]
1794    #[tokio::test]
1795    async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) {
1796        let temp_dir = TempStdDir::default();
1797        let temp_path = temp_dir.to_str().unwrap();
1798
1799        // Create a DirectoryNamespace with manifest enabled (default)
1800        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1801            .inline_optimization_enabled(inline_optimization)
1802            .build()
1803            .await
1804            .unwrap();
1805
1806        // Verify we can list tables (should be empty)
1807        let mut request = ListTablesRequest::new();
1808        request.id = Some(vec![]);
1809        let response = dir_namespace.list_tables(request).await.unwrap();
1810        assert_eq!(response.tables.len(), 0);
1811
1812        // Create a test table
1813        let buffer = create_test_ipc_data();
1814        let mut create_request = CreateTableRequest::new();
1815        create_request.id = Some(vec!["test_table".to_string()]);
1816
1817        let _response = dir_namespace
1818            .create_table(create_request, Bytes::from(buffer))
1819            .await
1820            .unwrap();
1821
1822        // List tables again - should see our new table
1823        let mut request = ListTablesRequest::new();
1824        request.id = Some(vec![]);
1825        let response = dir_namespace.list_tables(request).await.unwrap();
1826        assert_eq!(response.tables.len(), 1);
1827        assert_eq!(response.tables[0], "test_table");
1828    }
1829
1830    #[rstest]
1831    #[case::with_optimization(true)]
1832    #[case::without_optimization(false)]
1833    #[tokio::test]
1834    async fn test_manifest_namespace_table_exists(#[case] inline_optimization: bool) {
1835        let temp_dir = TempStdDir::default();
1836        let temp_path = temp_dir.to_str().unwrap();
1837
1838        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1839            .inline_optimization_enabled(inline_optimization)
1840            .build()
1841            .await
1842            .unwrap();
1843
1844        // Check non-existent table
1845        let mut request = TableExistsRequest::new();
1846        request.id = Some(vec!["nonexistent".to_string()]);
1847        let result = dir_namespace.table_exists(request).await;
1848        assert!(result.is_err());
1849
1850        // Create table
1851        let buffer = create_test_ipc_data();
1852        let mut create_request = CreateTableRequest::new();
1853        create_request.id = Some(vec!["test_table".to_string()]);
1854        dir_namespace
1855            .create_table(create_request, Bytes::from(buffer))
1856            .await
1857            .unwrap();
1858
1859        // Check existing table
1860        let mut request = TableExistsRequest::new();
1861        request.id = Some(vec!["test_table".to_string()]);
1862        let result = dir_namespace.table_exists(request).await;
1863        assert!(result.is_ok());
1864    }
1865
1866    #[rstest]
1867    #[case::with_optimization(true)]
1868    #[case::without_optimization(false)]
1869    #[tokio::test]
1870    async fn test_manifest_namespace_describe_table(#[case] inline_optimization: bool) {
1871        let temp_dir = TempStdDir::default();
1872        let temp_path = temp_dir.to_str().unwrap();
1873
1874        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1875            .inline_optimization_enabled(inline_optimization)
1876            .build()
1877            .await
1878            .unwrap();
1879
1880        // Describe non-existent table
1881        let mut request = DescribeTableRequest::new();
1882        request.id = Some(vec!["nonexistent".to_string()]);
1883        let result = dir_namespace.describe_table(request).await;
1884        assert!(result.is_err());
1885
1886        // Create table
1887        let buffer = create_test_ipc_data();
1888        let mut create_request = CreateTableRequest::new();
1889        create_request.id = Some(vec!["test_table".to_string()]);
1890        dir_namespace
1891            .create_table(create_request, Bytes::from(buffer))
1892            .await
1893            .unwrap();
1894
1895        // Describe existing table
1896        let mut request = DescribeTableRequest::new();
1897        request.id = Some(vec!["test_table".to_string()]);
1898        let response = dir_namespace.describe_table(request).await.unwrap();
1899        assert!(response.location.is_some());
1900        assert!(response.location.unwrap().contains("test_table"));
1901    }
1902
1903    #[rstest]
1904    #[case::with_optimization(true)]
1905    #[case::without_optimization(false)]
1906    #[tokio::test]
1907    async fn test_manifest_namespace_drop_table(#[case] inline_optimization: bool) {
1908        let temp_dir = TempStdDir::default();
1909        let temp_path = temp_dir.to_str().unwrap();
1910
1911        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1912            .inline_optimization_enabled(inline_optimization)
1913            .build()
1914            .await
1915            .unwrap();
1916
1917        // Create table
1918        let buffer = create_test_ipc_data();
1919        let mut create_request = CreateTableRequest::new();
1920        create_request.id = Some(vec!["test_table".to_string()]);
1921        dir_namespace
1922            .create_table(create_request, Bytes::from(buffer))
1923            .await
1924            .unwrap();
1925
1926        // Verify table exists
1927        let mut request = ListTablesRequest::new();
1928        request.id = Some(vec![]);
1929        let response = dir_namespace.list_tables(request).await.unwrap();
1930        assert_eq!(response.tables.len(), 1);
1931
1932        // Drop table
1933        let mut drop_request = DropTableRequest::new();
1934        drop_request.id = Some(vec!["test_table".to_string()]);
1935        let _response = dir_namespace.drop_table(drop_request).await.unwrap();
1936
1937        // Verify table is gone
1938        let mut request = ListTablesRequest::new();
1939        request.id = Some(vec![]);
1940        let response = dir_namespace.list_tables(request).await.unwrap();
1941        assert_eq!(response.tables.len(), 0);
1942    }
1943
1944    #[rstest]
1945    #[case::with_optimization(true)]
1946    #[case::without_optimization(false)]
1947    #[tokio::test]
1948    async fn test_manifest_namespace_multiple_tables(#[case] inline_optimization: bool) {
1949        let temp_dir = TempStdDir::default();
1950        let temp_path = temp_dir.to_str().unwrap();
1951
1952        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1953            .inline_optimization_enabled(inline_optimization)
1954            .build()
1955            .await
1956            .unwrap();
1957
1958        // Create multiple tables
1959        let buffer = create_test_ipc_data();
1960        for i in 1..=3 {
1961            let mut create_request = CreateTableRequest::new();
1962            create_request.id = Some(vec![format!("table{}", i)]);
1963            dir_namespace
1964                .create_table(create_request, Bytes::from(buffer.clone()))
1965                .await
1966                .unwrap();
1967        }
1968
1969        // List all tables
1970        let mut request = ListTablesRequest::new();
1971        request.id = Some(vec![]);
1972        let response = dir_namespace.list_tables(request).await.unwrap();
1973        assert_eq!(response.tables.len(), 3);
1974        assert!(response.tables.contains(&"table1".to_string()));
1975        assert!(response.tables.contains(&"table2".to_string()));
1976        assert!(response.tables.contains(&"table3".to_string()));
1977    }
1978
1979    #[rstest]
1980    #[case::with_optimization(true)]
1981    #[case::without_optimization(false)]
1982    #[tokio::test]
1983    async fn test_directory_only_mode(#[case] inline_optimization: bool) {
1984        let temp_dir = TempStdDir::default();
1985        let temp_path = temp_dir.to_str().unwrap();
1986
1987        // Create a DirectoryNamespace with manifest disabled
1988        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1989            .manifest_enabled(false)
1990            .inline_optimization_enabled(inline_optimization)
1991            .build()
1992            .await
1993            .unwrap();
1994
1995        // Verify we can list tables (should be empty)
1996        let mut request = ListTablesRequest::new();
1997        request.id = Some(vec![]);
1998        let response = dir_namespace.list_tables(request).await.unwrap();
1999        assert_eq!(response.tables.len(), 0);
2000
2001        // Create a test table
2002        let buffer = create_test_ipc_data();
2003        let mut create_request = CreateTableRequest::new();
2004        create_request.id = Some(vec!["test_table".to_string()]);
2005
2006        // Create table - this should use directory-only mode
2007        let _response = dir_namespace
2008            .create_table(create_request, Bytes::from(buffer))
2009            .await
2010            .unwrap();
2011
2012        // List tables - should see our new table
2013        let mut request = ListTablesRequest::new();
2014        request.id = Some(vec![]);
2015        let response = dir_namespace.list_tables(request).await.unwrap();
2016        assert_eq!(response.tables.len(), 1);
2017        assert_eq!(response.tables[0], "test_table");
2018    }
2019
2020    #[rstest]
2021    #[case::with_optimization(true)]
2022    #[case::without_optimization(false)]
2023    #[tokio::test]
2024    async fn test_dual_mode_merge(#[case] inline_optimization: bool) {
2025        let temp_dir = TempStdDir::default();
2026        let temp_path = temp_dir.to_str().unwrap();
2027
2028        // Create a DirectoryNamespace with both manifest and directory enabled
2029        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2030            .manifest_enabled(true)
2031            .dir_listing_enabled(true)
2032            .inline_optimization_enabled(inline_optimization)
2033            .build()
2034            .await
2035            .unwrap();
2036
2037        // Create tables through manifest
2038        let buffer = create_test_ipc_data();
2039        let mut create_request = CreateTableRequest::new();
2040        create_request.id = Some(vec!["table1".to_string()]);
2041        dir_namespace
2042            .create_table(create_request, Bytes::from(buffer))
2043            .await
2044            .unwrap();
2045
2046        // List tables - should see table from both manifest and directory
2047        let mut request = ListTablesRequest::new();
2048        request.id = Some(vec![]);
2049        let response = dir_namespace.list_tables(request).await.unwrap();
2050        assert_eq!(response.tables.len(), 1);
2051        assert_eq!(response.tables[0], "table1");
2052    }
2053
2054    #[rstest]
2055    #[case::with_optimization(true)]
2056    #[case::without_optimization(false)]
2057    #[tokio::test]
2058    async fn test_manifest_only_mode(#[case] inline_optimization: bool) {
2059        let temp_dir = TempStdDir::default();
2060        let temp_path = temp_dir.to_str().unwrap();
2061
2062        // Create a DirectoryNamespace with only manifest enabled
2063        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2064            .manifest_enabled(true)
2065            .dir_listing_enabled(false)
2066            .inline_optimization_enabled(inline_optimization)
2067            .build()
2068            .await
2069            .unwrap();
2070
2071        // Create table
2072        let buffer = create_test_ipc_data();
2073        let mut create_request = CreateTableRequest::new();
2074        create_request.id = Some(vec!["test_table".to_string()]);
2075        dir_namespace
2076            .create_table(create_request, Bytes::from(buffer))
2077            .await
2078            .unwrap();
2079
2080        // List tables - should only use manifest
2081        let mut request = ListTablesRequest::new();
2082        request.id = Some(vec![]);
2083        let response = dir_namespace.list_tables(request).await.unwrap();
2084        assert_eq!(response.tables.len(), 1);
2085        assert_eq!(response.tables[0], "test_table");
2086    }
2087
2088    #[rstest]
2089    #[case::with_optimization(true)]
2090    #[case::without_optimization(false)]
2091    #[tokio::test]
2092    async fn test_drop_nonexistent_table(#[case] inline_optimization: bool) {
2093        let temp_dir = TempStdDir::default();
2094        let temp_path = temp_dir.to_str().unwrap();
2095
2096        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2097            .inline_optimization_enabled(inline_optimization)
2098            .build()
2099            .await
2100            .unwrap();
2101
2102        // Try to drop non-existent table
2103        let mut drop_request = DropTableRequest::new();
2104        drop_request.id = Some(vec!["nonexistent".to_string()]);
2105        let result = dir_namespace.drop_table(drop_request).await;
2106        assert!(result.is_err());
2107    }
2108
2109    #[rstest]
2110    #[case::with_optimization(true)]
2111    #[case::without_optimization(false)]
2112    #[tokio::test]
2113    async fn test_create_duplicate_table_fails(#[case] inline_optimization: bool) {
2114        let temp_dir = TempStdDir::default();
2115        let temp_path = temp_dir.to_str().unwrap();
2116
2117        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2118            .inline_optimization_enabled(inline_optimization)
2119            .build()
2120            .await
2121            .unwrap();
2122
2123        // Create table
2124        let buffer = create_test_ipc_data();
2125        let mut create_request = CreateTableRequest::new();
2126        create_request.id = Some(vec!["test_table".to_string()]);
2127        dir_namespace
2128            .create_table(create_request, Bytes::from(buffer.clone()))
2129            .await
2130            .unwrap();
2131
2132        // Try to create table with same name - should fail
2133        let mut create_request = CreateTableRequest::new();
2134        create_request.id = Some(vec!["test_table".to_string()]);
2135        let result = dir_namespace
2136            .create_table(create_request, Bytes::from(buffer))
2137            .await;
2138        assert!(result.is_err());
2139    }
2140
2141    #[rstest]
2142    #[case::with_optimization(true)]
2143    #[case::without_optimization(false)]
2144    #[tokio::test]
2145    async fn test_create_child_namespace(#[case] inline_optimization: bool) {
2146        use lance_namespace::models::{
2147            CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2148        };
2149
2150        let temp_dir = TempStdDir::default();
2151        let temp_path = temp_dir.to_str().unwrap();
2152
2153        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2154            .inline_optimization_enabled(inline_optimization)
2155            .build()
2156            .await
2157            .unwrap();
2158
2159        // Create a child namespace
2160        let mut create_req = CreateNamespaceRequest::new();
2161        create_req.id = Some(vec!["ns1".to_string()]);
2162        let result = dir_namespace.create_namespace(create_req).await;
2163        assert!(
2164            result.is_ok(),
2165            "Failed to create child namespace: {:?}",
2166            result.err()
2167        );
2168
2169        // Verify namespace exists
2170        let exists_req = NamespaceExistsRequest {
2171            id: Some(vec!["ns1".to_string()]),
2172        };
2173        let result = dir_namespace.namespace_exists(exists_req).await;
2174        assert!(result.is_ok(), "Namespace should exist");
2175
2176        // List child namespaces of root
2177        let list_req = ListNamespacesRequest {
2178            id: Some(vec![]),
2179            page_token: None,
2180            limit: None,
2181        };
2182        let result = dir_namespace.list_namespaces(list_req).await;
2183        assert!(result.is_ok());
2184        let namespaces = result.unwrap();
2185        assert_eq!(namespaces.namespaces.len(), 1);
2186        assert_eq!(namespaces.namespaces[0], "ns1");
2187    }
2188
2189    #[rstest]
2190    #[case::with_optimization(true)]
2191    #[case::without_optimization(false)]
2192    #[tokio::test]
2193    async fn test_create_nested_namespace(#[case] inline_optimization: bool) {
2194        use lance_namespace::models::{
2195            CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2196        };
2197
2198        let temp_dir = TempStdDir::default();
2199        let temp_path = temp_dir.to_str().unwrap();
2200
2201        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2202            .inline_optimization_enabled(inline_optimization)
2203            .build()
2204            .await
2205            .unwrap();
2206
2207        // Create parent namespace
2208        let mut create_req = CreateNamespaceRequest::new();
2209        create_req.id = Some(vec!["parent".to_string()]);
2210        dir_namespace.create_namespace(create_req).await.unwrap();
2211
2212        // Create nested child namespace
2213        let mut create_req = CreateNamespaceRequest::new();
2214        create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2215        let result = dir_namespace.create_namespace(create_req).await;
2216        assert!(
2217            result.is_ok(),
2218            "Failed to create nested namespace: {:?}",
2219            result.err()
2220        );
2221
2222        // Verify nested namespace exists
2223        let exists_req = NamespaceExistsRequest {
2224            id: Some(vec!["parent".to_string(), "child".to_string()]),
2225        };
2226        let result = dir_namespace.namespace_exists(exists_req).await;
2227        assert!(result.is_ok(), "Nested namespace should exist");
2228
2229        // List child namespaces of parent
2230        let list_req = ListNamespacesRequest {
2231            id: Some(vec!["parent".to_string()]),
2232            page_token: None,
2233            limit: None,
2234        };
2235        let result = dir_namespace.list_namespaces(list_req).await;
2236        assert!(result.is_ok());
2237        let namespaces = result.unwrap();
2238        assert_eq!(namespaces.namespaces.len(), 1);
2239        assert_eq!(namespaces.namespaces[0], "child");
2240    }
2241
2242    #[rstest]
2243    #[case::with_optimization(true)]
2244    #[case::without_optimization(false)]
2245    #[tokio::test]
2246    async fn test_create_namespace_without_parent_fails(#[case] inline_optimization: bool) {
2247        use lance_namespace::models::CreateNamespaceRequest;
2248
2249        let temp_dir = TempStdDir::default();
2250        let temp_path = temp_dir.to_str().unwrap();
2251
2252        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2253            .inline_optimization_enabled(inline_optimization)
2254            .build()
2255            .await
2256            .unwrap();
2257
2258        // Try to create nested namespace without parent
2259        let mut create_req = CreateNamespaceRequest::new();
2260        create_req.id = Some(vec!["nonexistent_parent".to_string(), "child".to_string()]);
2261        let result = dir_namespace.create_namespace(create_req).await;
2262        assert!(result.is_err(), "Should fail when parent doesn't exist");
2263    }
2264
2265    #[rstest]
2266    #[case::with_optimization(true)]
2267    #[case::without_optimization(false)]
2268    #[tokio::test]
2269    async fn test_drop_child_namespace(#[case] inline_optimization: bool) {
2270        use lance_namespace::models::{
2271            CreateNamespaceRequest, DropNamespaceRequest, NamespaceExistsRequest,
2272        };
2273
2274        let temp_dir = TempStdDir::default();
2275        let temp_path = temp_dir.to_str().unwrap();
2276
2277        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2278            .inline_optimization_enabled(inline_optimization)
2279            .build()
2280            .await
2281            .unwrap();
2282
2283        // Create a child namespace
2284        let mut create_req = CreateNamespaceRequest::new();
2285        create_req.id = Some(vec!["ns1".to_string()]);
2286        dir_namespace.create_namespace(create_req).await.unwrap();
2287
2288        // Drop the namespace
2289        let mut drop_req = DropNamespaceRequest::new();
2290        drop_req.id = Some(vec!["ns1".to_string()]);
2291        let result = dir_namespace.drop_namespace(drop_req).await;
2292        assert!(
2293            result.is_ok(),
2294            "Failed to drop namespace: {:?}",
2295            result.err()
2296        );
2297
2298        // Verify namespace no longer exists
2299        let exists_req = NamespaceExistsRequest {
2300            id: Some(vec!["ns1".to_string()]),
2301        };
2302        let result = dir_namespace.namespace_exists(exists_req).await;
2303        assert!(result.is_err(), "Namespace should not exist after drop");
2304    }
2305
2306    #[rstest]
2307    #[case::with_optimization(true)]
2308    #[case::without_optimization(false)]
2309    #[tokio::test]
2310    async fn test_drop_namespace_with_children_fails(#[case] inline_optimization: bool) {
2311        use lance_namespace::models::{CreateNamespaceRequest, DropNamespaceRequest};
2312
2313        let temp_dir = TempStdDir::default();
2314        let temp_path = temp_dir.to_str().unwrap();
2315
2316        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2317            .inline_optimization_enabled(inline_optimization)
2318            .build()
2319            .await
2320            .unwrap();
2321
2322        // Create parent and child namespaces
2323        let mut create_req = CreateNamespaceRequest::new();
2324        create_req.id = Some(vec!["parent".to_string()]);
2325        dir_namespace.create_namespace(create_req).await.unwrap();
2326
2327        let mut create_req = CreateNamespaceRequest::new();
2328        create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2329        dir_namespace.create_namespace(create_req).await.unwrap();
2330
2331        // Try to drop parent namespace - should fail because it has children
2332        let mut drop_req = DropNamespaceRequest::new();
2333        drop_req.id = Some(vec!["parent".to_string()]);
2334        let result = dir_namespace.drop_namespace(drop_req).await;
2335        assert!(result.is_err(), "Should fail when namespace has children");
2336    }
2337
2338    #[rstest]
2339    #[case::with_optimization(true)]
2340    #[case::without_optimization(false)]
2341    #[tokio::test]
2342    async fn test_create_table_in_child_namespace(#[case] inline_optimization: bool) {
2343        use lance_namespace::models::{
2344            CreateNamespaceRequest, CreateTableRequest, ListTablesRequest,
2345        };
2346
2347        let temp_dir = TempStdDir::default();
2348        let temp_path = temp_dir.to_str().unwrap();
2349
2350        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2351            .inline_optimization_enabled(inline_optimization)
2352            .build()
2353            .await
2354            .unwrap();
2355
2356        // Create a child namespace
2357        let mut create_ns_req = CreateNamespaceRequest::new();
2358        create_ns_req.id = Some(vec!["ns1".to_string()]);
2359        dir_namespace.create_namespace(create_ns_req).await.unwrap();
2360
2361        // Create a table in the child namespace
2362        let buffer = create_test_ipc_data();
2363        let mut create_table_req = CreateTableRequest::new();
2364        create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2365        let result = dir_namespace
2366            .create_table(create_table_req, Bytes::from(buffer))
2367            .await;
2368        assert!(
2369            result.is_ok(),
2370            "Failed to create table in child namespace: {:?}",
2371            result.err()
2372        );
2373
2374        // List tables in the namespace
2375        let list_req = ListTablesRequest {
2376            id: Some(vec!["ns1".to_string()]),
2377            page_token: None,
2378            limit: None,
2379        };
2380        let result = dir_namespace.list_tables(list_req).await;
2381        assert!(result.is_ok());
2382        let tables = result.unwrap();
2383        assert_eq!(tables.tables.len(), 1);
2384        assert_eq!(tables.tables[0], "table1");
2385    }
2386
2387    #[rstest]
2388    #[case::with_optimization(true)]
2389    #[case::without_optimization(false)]
2390    #[tokio::test]
2391    async fn test_describe_child_namespace(#[case] inline_optimization: bool) {
2392        use lance_namespace::models::{CreateNamespaceRequest, DescribeNamespaceRequest};
2393
2394        let temp_dir = TempStdDir::default();
2395        let temp_path = temp_dir.to_str().unwrap();
2396
2397        let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2398            .inline_optimization_enabled(inline_optimization)
2399            .build()
2400            .await
2401            .unwrap();
2402
2403        // Create a child namespace with properties
2404        let mut properties = std::collections::HashMap::new();
2405        properties.insert("key1".to_string(), "value1".to_string());
2406
2407        let mut create_req = CreateNamespaceRequest::new();
2408        create_req.id = Some(vec!["ns1".to_string()]);
2409        create_req.properties = Some(properties.clone());
2410        dir_namespace.create_namespace(create_req).await.unwrap();
2411
2412        // Describe the namespace
2413        let describe_req = DescribeNamespaceRequest {
2414            id: Some(vec!["ns1".to_string()]),
2415        };
2416        let result = dir_namespace.describe_namespace(describe_req).await;
2417        assert!(
2418            result.is_ok(),
2419            "Failed to describe namespace: {:?}",
2420            result.err()
2421        );
2422        let response = result.unwrap();
2423        assert!(response.properties.is_some());
2424        assert_eq!(
2425            response.properties.unwrap().get("key1"),
2426            Some(&"value1".to_string())
2427        );
2428    }
2429
2430    #[test]
2431    fn test_construct_full_uri_with_cloud_urls() {
2432        // Test S3-style URL with nested path (no trailing slash)
2433        let s3_result =
2434            ManifestNamespace::construct_full_uri("s3://bucket/path/subdir", "table.lance")
2435                .unwrap();
2436        assert_eq!(
2437            s3_result, "s3://bucket/path/subdir/table.lance",
2438            "S3 URL should correctly append table name to nested path"
2439        );
2440
2441        // Test Azure-style URL with nested path (no trailing slash)
2442        let az_result =
2443            ManifestNamespace::construct_full_uri("az://container/path/subdir", "table.lance")
2444                .unwrap();
2445        assert_eq!(
2446            az_result, "az://container/path/subdir/table.lance",
2447            "Azure URL should correctly append table name to nested path"
2448        );
2449
2450        // Test GCS-style URL with nested path (no trailing slash)
2451        let gs_result =
2452            ManifestNamespace::construct_full_uri("gs://bucket/path/subdir", "table.lance")
2453                .unwrap();
2454        assert_eq!(
2455            gs_result, "gs://bucket/path/subdir/table.lance",
2456            "GCS URL should correctly append table name to nested path"
2457        );
2458
2459        // Test with deeper nesting
2460        let deep_result =
2461            ManifestNamespace::construct_full_uri("s3://bucket/a/b/c/d", "my_table.lance").unwrap();
2462        assert_eq!(
2463            deep_result, "s3://bucket/a/b/c/d/my_table.lance",
2464            "Deeply nested path should work correctly"
2465        );
2466
2467        // Test with root-level path (single segment after bucket)
2468        let shallow_result =
2469            ManifestNamespace::construct_full_uri("s3://bucket", "table.lance").unwrap();
2470        assert_eq!(
2471            shallow_result, "s3://bucket/table.lance",
2472            "Single-level nested path should work correctly"
2473        );
2474
2475        // Test that URLs with trailing slash already work (no regression)
2476        let trailing_slash_result =
2477            ManifestNamespace::construct_full_uri("s3://bucket/path/subdir/", "table.lance")
2478                .unwrap();
2479        assert_eq!(
2480            trailing_slash_result, "s3://bucket/path/subdir/table.lance",
2481            "URL with existing trailing slash should still work"
2482        );
2483    }
2484}