1use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray};
10use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
11use arrow_ipc::reader::StreamReader;
12use async_trait::async_trait;
13use bytes::Bytes;
14use futures::stream::StreamExt;
15use lance::dataset::optimize::{compact_files, CompactionOptions};
16use lance::dataset::{builder::DatasetBuilder, WriteParams};
17use lance::session::Session;
18use lance::{dataset::scanner::Scanner, Dataset};
19use lance_core::{box_error, Error, Result};
20use lance_index::optimize::OptimizeOptions;
21use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
22use lance_index::traits::DatasetIndexExt;
23use lance_index::IndexType;
24use lance_io::object_store::{ObjectStore, ObjectStoreParams};
25use lance_namespace::models::{
26 CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
27 CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DeregisterTableRequest,
28 DeregisterTableResponse, DescribeNamespaceRequest, DescribeNamespaceResponse,
29 DescribeTableRequest, DescribeTableResponse, DropNamespaceRequest, DropNamespaceResponse,
30 DropTableRequest, DropTableResponse, ListNamespacesRequest, ListNamespacesResponse,
31 ListTablesRequest, ListTablesResponse, NamespaceExistsRequest, RegisterTableRequest,
32 RegisterTableResponse, TableExistsRequest,
33};
34use lance_namespace::schema::arrow_schema_to_json;
35use lance_namespace::LanceNamespace;
36use object_store::path::Path;
37use snafu::location;
38use std::io::Cursor;
39use std::{
40 collections::HashMap,
41 hash::{DefaultHasher, Hash, Hasher},
42 ops::{Deref, DerefMut},
43 sync::Arc,
44};
45use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
46
47const MANIFEST_TABLE_NAME: &str = "__manifest";
48const DELIMITER: &str = "$";
49
50const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
53const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
55const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum ObjectType {
61 Namespace,
62 Table,
63}
64
65impl ObjectType {
66 pub fn as_str(&self) -> &str {
67 match self {
68 Self::Namespace => "namespace",
69 Self::Table => "table",
70 }
71 }
72
73 pub fn parse(s: &str) -> Result<Self> {
74 match s {
75 "namespace" => Ok(Self::Namespace),
76 "table" => Ok(Self::Table),
77 _ => Err(Error::io(
78 format!("Invalid object type: {}", s),
79 location!(),
80 )),
81 }
82 }
83}
84
85#[derive(Debug, Clone)]
87pub struct TableInfo {
88 pub namespace: Vec<String>,
89 pub name: String,
90 pub location: String,
91}
92
93#[derive(Debug, Clone)]
95pub struct NamespaceInfo {
96 pub namespace: Vec<String>,
97 pub name: String,
98 pub metadata: Option<HashMap<String, String>>,
99}
100
101#[derive(Debug, Clone)]
106pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);
107
108impl DatasetConsistencyWrapper {
109 pub fn new(dataset: Dataset) -> Self {
111 Self(Arc::new(RwLock::new(dataset)))
112 }
113
114 pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
117 self.reload().await?;
118 Ok(DatasetReadGuard {
119 guard: self.0.read().await,
120 })
121 }
122
123 pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
126 self.reload().await?;
127 Ok(DatasetWriteGuard {
128 guard: self.0.write().await,
129 })
130 }
131
132 pub async fn set_latest(&self, dataset: Dataset) {
137 let mut write_guard = self.0.write().await;
138 if dataset.manifest().version > write_guard.manifest().version {
139 *write_guard = dataset;
140 }
141 }
142
143 async fn reload(&self) -> Result<()> {
145 let read_guard = self.0.read().await;
147 let dataset_uri = read_guard.uri().to_string();
148 let current_version = read_guard.version().version;
149 log::debug!(
150 "Reload starting for uri={}, current_version={}",
151 dataset_uri,
152 current_version
153 );
154 let latest_version = read_guard
155 .latest_version_id()
156 .await
157 .map_err(|e| Error::IO {
158 source: box_error(std::io::Error::other(format!(
159 "Failed to get latest version: {}",
160 e
161 ))),
162 location: location!(),
163 })?;
164 log::debug!(
165 "Reload got latest_version={} for uri={}, current_version={}",
166 latest_version,
167 dataset_uri,
168 current_version
169 );
170 drop(read_guard);
171
172 if latest_version == current_version {
174 log::debug!("Already up-to-date for uri={}", dataset_uri);
175 return Ok(());
176 }
177
178 let mut write_guard = self.0.write().await;
180
181 let latest_version = write_guard
183 .latest_version_id()
184 .await
185 .map_err(|e| Error::IO {
186 source: box_error(std::io::Error::other(format!(
187 "Failed to get latest version: {}",
188 e
189 ))),
190 location: location!(),
191 })?;
192
193 if latest_version != write_guard.version().version {
194 write_guard.checkout_latest().await.map_err(|e| Error::IO {
195 source: box_error(std::io::Error::other(format!(
196 "Failed to checkout latest: {}",
197 e
198 ))),
199 location: location!(),
200 })?;
201 }
202
203 Ok(())
204 }
205}
206
207pub struct DatasetReadGuard<'a> {
208 guard: RwLockReadGuard<'a, Dataset>,
209}
210
211impl Deref for DatasetReadGuard<'_> {
212 type Target = Dataset;
213
214 fn deref(&self) -> &Self::Target {
215 &self.guard
216 }
217}
218
219pub struct DatasetWriteGuard<'a> {
220 guard: RwLockWriteGuard<'a, Dataset>,
221}
222
223impl Deref for DatasetWriteGuard<'_> {
224 type Target = Dataset;
225
226 fn deref(&self) -> &Self::Target {
227 &self.guard
228 }
229}
230
231impl DerefMut for DatasetWriteGuard<'_> {
232 fn deref_mut(&mut self) -> &mut Self::Target {
233 &mut self.guard
234 }
235}
236
237#[derive(Debug)]
241pub struct ManifestNamespace {
242 root: String,
243 storage_options: Option<HashMap<String, String>>,
244 #[allow(dead_code)]
245 session: Option<Arc<Session>>,
246 #[allow(dead_code)]
247 object_store: Arc<ObjectStore>,
248 #[allow(dead_code)]
249 base_path: Path,
250 manifest_dataset: DatasetConsistencyWrapper,
251 dir_listing_enabled: bool,
255 inline_optimization_enabled: bool,
258}
259
260impl ManifestNamespace {
261 pub async fn from_directory(
263 root: String,
264 storage_options: Option<HashMap<String, String>>,
265 session: Option<Arc<Session>>,
266 object_store: Arc<ObjectStore>,
267 base_path: Path,
268 dir_listing_enabled: bool,
269 inline_optimization_enabled: bool,
270 ) -> Result<Self> {
271 let manifest_dataset =
272 Self::create_or_get_manifest(&root, &storage_options, session.clone()).await?;
273
274 Ok(Self {
275 root,
276 storage_options,
277 session,
278 object_store,
279 base_path,
280 manifest_dataset,
281 dir_listing_enabled,
282 inline_optimization_enabled,
283 })
284 }
285
286 pub fn build_object_id(namespace: &[String], name: &str) -> String {
288 if namespace.is_empty() {
289 name.to_string()
290 } else {
291 let mut id = namespace.join(DELIMITER);
292 id.push_str(DELIMITER);
293 id.push_str(name);
294 id
295 }
296 }
297
298 pub fn parse_object_id(object_id: &str) -> (Vec<String>, String) {
300 let parts: Vec<&str> = object_id.split(DELIMITER).collect();
301 if parts.len() == 1 {
302 (Vec::new(), parts[0].to_string())
303 } else {
304 let namespace = parts[..parts.len() - 1]
305 .iter()
306 .map(|s| s.to_string())
307 .collect();
308 let name = parts[parts.len() - 1].to_string();
309 (namespace, name)
310 }
311 }
312
313 fn split_object_id(table_id: &[String]) -> (Vec<String>, String) {
315 if table_id.len() == 1 {
316 (vec![], table_id[0].clone())
317 } else {
318 (
319 table_id[..table_id.len() - 1].to_vec(),
320 table_id[table_id.len() - 1].clone(),
321 )
322 }
323 }
324
325 fn str_object_id(table_id: &[String]) -> String {
327 table_id.join(DELIMITER)
328 }
329
330 fn generate_dir_name(object_id: &str) -> String {
337 let random_num: u64 = rand::random();
339
340 let mut hasher = DefaultHasher::new();
342 random_num.hash(&mut hasher);
343 object_id.hash(&mut hasher);
344 let hash = hasher.finish();
345
346 format!("{:08x}_{}", (hash & 0xFFFFFFFF) as u32, object_id)
348 }
349
350 pub(crate) fn construct_full_uri(root: &str, relative_location: &str) -> Result<String> {
352 let mut base_url = lance_io::object_store::uri_to_url(root)?;
353
354 if !base_url.path().ends_with('/') {
359 base_url.set_path(&format!("{}/", base_url.path()));
360 }
361
362 let full_url = base_url
363 .join(relative_location)
364 .map_err(|e| Error::InvalidInput {
365 source: format!(
366 "Failed to join URI '{}' with '{}': {:?}",
367 root, relative_location, e
368 )
369 .into(),
370 location: location!(),
371 })?;
372
373 Ok(full_url.to_string())
374 }
375
376 async fn run_inline_optimization(&self) -> Result<()> {
388 if !self.inline_optimization_enabled {
389 return Ok(());
390 }
391
392 let mut dataset_guard = self.manifest_dataset.get_mut().await?;
394 let dataset: &mut Dataset = &mut dataset_guard;
395
396 let indices = dataset.load_indices().await?;
398
399 let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
401 let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
402 let has_base_objects_index = indices
403 .iter()
404 .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);
405
406 if !has_object_id_index {
408 log::debug!(
409 "Creating BTREE index '{}' on object_id for __manifest table",
410 OBJECT_ID_INDEX_NAME
411 );
412 let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
413 if let Err(e) = dataset
414 .create_index(
415 &["object_id"],
416 IndexType::BTree,
417 Some(OBJECT_ID_INDEX_NAME.to_string()),
418 ¶ms,
419 true,
420 )
421 .await
422 {
423 log::warn!("Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.", e);
424 } else {
425 log::info!(
426 "Created BTREE index '{}' on object_id for __manifest table",
427 OBJECT_ID_INDEX_NAME
428 );
429 }
430 }
431
432 if !has_object_type_index {
434 log::debug!(
435 "Creating Bitmap index '{}' on object_type for __manifest table",
436 OBJECT_TYPE_INDEX_NAME
437 );
438 let params = ScalarIndexParams::default();
439 if let Err(e) = dataset
440 .create_index(
441 &["object_type"],
442 IndexType::Bitmap,
443 Some(OBJECT_TYPE_INDEX_NAME.to_string()),
444 ¶ms,
445 true,
446 )
447 .await
448 {
449 log::warn!("Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.", e);
450 } else {
451 log::info!(
452 "Created Bitmap index '{}' on object_type for __manifest table",
453 OBJECT_TYPE_INDEX_NAME
454 );
455 }
456 }
457
458 if !has_base_objects_index {
460 log::debug!(
461 "Creating LabelList index '{}' on base_objects for __manifest table",
462 BASE_OBJECTS_INDEX_NAME
463 );
464 let params = ScalarIndexParams::default();
465 if let Err(e) = dataset
466 .create_index(
467 &["base_objects"],
468 IndexType::LabelList,
469 Some(BASE_OBJECTS_INDEX_NAME.to_string()),
470 ¶ms,
471 true,
472 )
473 .await
474 {
475 log::warn!("Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.", e);
476 } else {
477 log::info!(
478 "Created LabelList index '{}' on base_objects for __manifest table",
479 BASE_OBJECTS_INDEX_NAME
480 );
481 }
482 }
483
484 log::debug!("Running file compaction on __manifest table");
486 match compact_files(dataset, CompactionOptions::default(), None).await {
487 Ok(compaction_metrics) => {
488 if compaction_metrics.fragments_removed > 0 {
489 log::info!(
490 "Compacted __manifest table: removed {} fragments, added {} fragments",
491 compaction_metrics.fragments_removed,
492 compaction_metrics.fragments_added
493 );
494 }
495 }
496 Err(e) => {
497 log::warn!("Failed to compact files for __manifest table: {:?}. Continuing with optimization.", e);
498 }
499 }
500
501 log::debug!("Optimizing indices on __manifest table");
503 match dataset.optimize_indices(&OptimizeOptions::default()).await {
504 Ok(_) => {
505 log::info!("Successfully optimized indices on __manifest table");
506 }
507 Err(e) => {
508 log::warn!(
509 "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.",
510 e
511 );
512 }
513 }
514
515 Ok(())
516 }
517
518 fn manifest_schema() -> Arc<ArrowSchema> {
520 Arc::new(ArrowSchema::new(vec![
521 Field::new("object_id", DataType::Utf8, false),
522 Field::new("object_type", DataType::Utf8, false),
523 Field::new("location", DataType::Utf8, true),
524 Field::new("metadata", DataType::Utf8, true),
525 Field::new(
526 "base_objects",
527 DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))),
528 true,
529 ),
530 ]))
531 }
532
533 async fn manifest_scanner(&self) -> Result<Scanner> {
535 let dataset_guard = self.manifest_dataset.get().await?;
536 Ok(dataset_guard.scan())
537 }
538
539 async fn execute_scanner(scanner: Scanner) -> Result<Vec<RecordBatch>> {
541 let mut stream = scanner.try_into_stream().await.map_err(|e| Error::IO {
542 source: box_error(std::io::Error::other(format!(
543 "Failed to create stream: {}",
544 e
545 ))),
546 location: location!(),
547 })?;
548
549 let mut batches = Vec::new();
550 while let Some(batch) = stream.next().await {
551 batches.push(batch.map_err(|e| Error::IO {
552 source: box_error(std::io::Error::other(format!(
553 "Failed to read batch: {}",
554 e
555 ))),
556 location: location!(),
557 })?);
558 }
559
560 Ok(batches)
561 }
562
563 fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> {
565 let column = batch
566 .column_by_name(column_name)
567 .ok_or_else(|| Error::io(format!("Column '{}' not found", column_name), location!()))?;
568 column
569 .as_any()
570 .downcast_ref::<StringArray>()
571 .ok_or_else(|| {
572 Error::io(
573 format!("Column '{}' is not a string array", column_name),
574 location!(),
575 )
576 })
577 }
578
579 async fn manifest_contains_object(&self, object_id: &str) -> Result<bool> {
581 let filter = format!("object_id = '{}'", object_id);
582
583 let dataset_guard = self.manifest_dataset.get().await?;
584 let mut scanner = dataset_guard.scan();
585
586 scanner.filter(&filter).map_err(|e| Error::IO {
587 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
588 location: location!(),
589 })?;
590
591 scanner.project::<&str>(&[]).map_err(|e| Error::IO {
593 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
594 location: location!(),
595 })?;
596
597 scanner.with_row_id();
598
599 let count = scanner.count_rows().await.map_err(|e| Error::IO {
600 source: box_error(std::io::Error::other(format!(
601 "Failed to count rows: {}",
602 e
603 ))),
604 location: location!(),
605 })?;
606
607 Ok(count > 0)
608 }
609
610 async fn query_manifest_for_table(&self, object_id: &str) -> Result<Option<TableInfo>> {
612 let filter = format!("object_id = '{}' AND object_type = 'table'", object_id);
613 let mut scanner = self.manifest_scanner().await?;
614 scanner.filter(&filter).map_err(|e| Error::IO {
615 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
616 location: location!(),
617 })?;
618 scanner
619 .project(&["object_id", "location"])
620 .map_err(|e| Error::IO {
621 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
622 location: location!(),
623 })?;
624 let batches = Self::execute_scanner(scanner).await?;
625
626 let mut found_result: Option<TableInfo> = None;
627 let mut total_rows = 0;
628
629 for batch in batches {
630 if batch.num_rows() == 0 {
631 continue;
632 }
633
634 total_rows += batch.num_rows();
635 if total_rows > 1 {
636 return Err(Error::io(
637 format!(
638 "Expected exactly 1 table with id '{}', found {}",
639 object_id, total_rows
640 ),
641 location!(),
642 ));
643 }
644
645 let object_id_array = Self::get_string_column(&batch, "object_id")?;
646 let location_array = Self::get_string_column(&batch, "location")?;
647 let location = location_array.value(0).to_string();
648 let (namespace, name) = Self::parse_object_id(object_id_array.value(0));
649 found_result = Some(TableInfo {
650 namespace,
651 name,
652 location,
653 });
654 }
655
656 Ok(found_result)
657 }
658
659 pub async fn list_manifest_table_locations(&self) -> Result<std::collections::HashSet<String>> {
662 let filter = "object_type = 'table' AND NOT contains(object_id, '$')";
663 let mut scanner = self.manifest_scanner().await?;
664 scanner.filter(filter).map_err(|e| Error::IO {
665 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
666 location: location!(),
667 })?;
668 scanner.project(&["location"]).map_err(|e| Error::IO {
669 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
670 location: location!(),
671 })?;
672
673 let batches = Self::execute_scanner(scanner).await?;
674 let mut locations = std::collections::HashSet::new();
675
676 for batch in batches {
677 if batch.num_rows() == 0 {
678 continue;
679 }
680 let location_array = Self::get_string_column(&batch, "location")?;
681 for i in 0..location_array.len() {
682 locations.insert(location_array.value(i).to_string());
683 }
684 }
685
686 Ok(locations)
687 }
688
689 async fn insert_into_manifest(
691 &self,
692 object_id: String,
693 object_type: ObjectType,
694 location: Option<String>,
695 ) -> Result<()> {
696 self.insert_into_manifest_with_metadata(object_id, object_type, location, None, None)
697 .await
698 }
699
700 async fn insert_into_manifest_with_metadata(
702 &self,
703 object_id: String,
704 object_type: ObjectType,
705 location: Option<String>,
706 metadata: Option<String>,
707 base_objects: Option<Vec<String>>,
708 ) -> Result<()> {
709 use arrow::array::builder::{ListBuilder, StringBuilder};
710
711 let schema = Self::manifest_schema();
712
713 let string_builder = StringBuilder::new();
715 let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
716 "object_id",
717 DataType::Utf8,
718 true,
719 )));
720
721 match base_objects {
722 Some(objects) => {
723 for obj in objects {
724 list_builder.values().append_value(obj);
725 }
726 list_builder.append(true);
727 }
728 None => {
729 list_builder.append_null();
730 }
731 }
732
733 let base_objects_array = list_builder.finish();
734
735 let location_array = match location {
737 Some(loc) => Arc::new(StringArray::from(vec![Some(loc)])),
738 None => Arc::new(StringArray::from(vec![None::<String>])),
739 };
740
741 let metadata_array = match metadata {
742 Some(meta) => Arc::new(StringArray::from(vec![Some(meta)])),
743 None => Arc::new(StringArray::from(vec![None::<String>])),
744 };
745
746 let batch = RecordBatch::try_new(
747 schema.clone(),
748 vec![
749 Arc::new(StringArray::from(vec![object_id.as_str()])),
750 Arc::new(StringArray::from(vec![object_type.as_str()])),
751 location_array,
752 metadata_array,
753 Arc::new(base_objects_array),
754 ],
755 )
756 .map_err(|e| {
757 Error::io(
758 format!("Failed to create manifest entry: {}", e),
759 location!(),
760 )
761 })?;
762
763 let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
764
765 let dataset_guard = self.manifest_dataset.get().await?;
767 let dataset_arc = Arc::new(dataset_guard.clone());
768 drop(dataset_guard); let mut merge_builder =
771 lance::dataset::MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()])
772 .map_err(|e| Error::IO {
773 source: box_error(std::io::Error::other(format!(
774 "Failed to create merge builder: {}",
775 e
776 ))),
777 location: location!(),
778 })?;
779
780 merge_builder.when_matched(lance::dataset::WhenMatched::Fail);
781 merge_builder.when_not_matched(lance::dataset::WhenNotMatched::InsertAll);
782
783 let (new_dataset_arc, _merge_stats) = merge_builder
784 .try_build()
785 .map_err(|e| Error::IO {
786 source: box_error(std::io::Error::other(format!(
787 "Failed to build merge: {}",
788 e
789 ))),
790 location: location!(),
791 })?
792 .execute_reader(Box::new(reader))
793 .await
794 .map_err(|e| {
795 let error_msg = e.to_string();
797 if error_msg.contains("matched")
798 || error_msg.contains("duplicate")
799 || error_msg.contains("already exists")
800 {
801 Error::io(
802 format!("Object with id '{}' already exists in manifest", object_id),
803 location!(),
804 )
805 } else {
806 Error::IO {
807 source: box_error(std::io::Error::other(format!(
808 "Failed to execute merge: {}",
809 e
810 ))),
811 location: location!(),
812 }
813 }
814 })?;
815
816 let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
817 self.manifest_dataset.set_latest(new_dataset).await;
818
819 if let Err(e) = self.run_inline_optimization().await {
821 log::warn!(
822 "Unexpected failure when running inline optimization: {:?}",
823 e
824 );
825 }
826
827 Ok(())
828 }
829
830 pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> {
832 {
833 let predicate = format!("object_id = '{}'", object_id);
834 let mut dataset_guard = self.manifest_dataset.get_mut().await?;
835 dataset_guard
836 .delete(&predicate)
837 .await
838 .map_err(|e| Error::IO {
839 source: box_error(std::io::Error::other(format!("Failed to delete: {}", e))),
840 location: location!(),
841 })?;
842 } self.manifest_dataset.reload().await?;
845
846 if let Err(e) = self.run_inline_optimization().await {
848 log::warn!(
849 "Unexpected failure when running inline optimization: {:?}",
850 e
851 );
852 }
853
854 Ok(())
855 }
856
857 pub async fn register_table(&self, name: &str, location: String) -> Result<()> {
859 let object_id = Self::build_object_id(&[], name);
860 if self.manifest_contains_object(&object_id).await? {
861 return Err(Error::io(
862 format!("Table '{}' already exists", name),
863 location!(),
864 ));
865 }
866
867 self.insert_into_manifest(object_id, ObjectType::Table, Some(location))
868 .await
869 }
870
871 async fn validate_namespace_levels_exist(&self, namespace_path: &[String]) -> Result<()> {
873 for i in 1..=namespace_path.len() {
874 let partial_path = &namespace_path[..i];
875 let object_id = partial_path.join(DELIMITER);
876 if !self.manifest_contains_object(&object_id).await? {
877 return Err(Error::Namespace {
878 source: format!("Parent namespace '{}' does not exist", object_id).into(),
879 location: location!(),
880 });
881 }
882 }
883 Ok(())
884 }
885
886 async fn query_manifest_for_namespace(&self, object_id: &str) -> Result<Option<NamespaceInfo>> {
888 let filter = format!("object_id = '{}' AND object_type = 'namespace'", object_id);
889 let mut scanner = self.manifest_scanner().await?;
890 scanner.filter(&filter).map_err(|e| Error::IO {
891 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
892 location: location!(),
893 })?;
894 scanner
895 .project(&["object_id", "metadata"])
896 .map_err(|e| Error::IO {
897 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
898 location: location!(),
899 })?;
900 let batches = Self::execute_scanner(scanner).await?;
901
902 let mut found_result: Option<NamespaceInfo> = None;
903 let mut total_rows = 0;
904
905 for batch in batches {
906 if batch.num_rows() == 0 {
907 continue;
908 }
909
910 total_rows += batch.num_rows();
911 if total_rows > 1 {
912 return Err(Error::io(
913 format!(
914 "Expected exactly 1 namespace with id '{}', found {}",
915 object_id, total_rows
916 ),
917 location!(),
918 ));
919 }
920
921 let object_id_array = Self::get_string_column(&batch, "object_id")?;
922 let metadata_array = Self::get_string_column(&batch, "metadata")?;
923
924 let object_id_str = object_id_array.value(0);
925 let metadata = if !metadata_array.is_null(0) {
926 let metadata_str = metadata_array.value(0);
927 match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
928 Ok(map) => Some(map),
929 Err(e) => {
930 return Err(Error::io(
931 format!(
932 "Failed to deserialize metadata for namespace '{}': {}",
933 object_id, e
934 ),
935 location!(),
936 ));
937 }
938 }
939 } else {
940 None
941 };
942
943 let (namespace, name) = Self::parse_object_id(object_id_str);
944 found_result = Some(NamespaceInfo {
945 namespace,
946 name,
947 metadata,
948 });
949 }
950
951 Ok(found_result)
952 }
953
954 async fn create_or_get_manifest(
956 root: &str,
957 storage_options: &Option<HashMap<String, String>>,
958 session: Option<Arc<Session>>,
959 ) -> Result<DatasetConsistencyWrapper> {
960 let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME);
961 log::debug!("Attempting to load manifest from {}", manifest_path);
962 let mut builder = DatasetBuilder::from_uri(&manifest_path);
963
964 if let Some(sess) = session.clone() {
965 builder = builder.with_session(sess);
966 }
967
968 if let Some(opts) = storage_options {
969 builder = builder.with_storage_options(opts.clone());
970 }
971
972 let dataset_result = builder.load().await;
973 if let Ok(dataset) = dataset_result {
974 Ok(DatasetConsistencyWrapper::new(dataset))
975 } else {
976 log::info!("Creating new manifest table at {}", manifest_path);
977 let schema = Self::manifest_schema();
978 let empty_batch = RecordBatch::new_empty(schema.clone());
979 let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone());
980
981 let write_params = WriteParams {
982 session,
983 store_params: storage_options.as_ref().map(|opts| ObjectStoreParams {
984 storage_options: Some(opts.clone()),
985 ..Default::default()
986 }),
987 ..Default::default()
988 };
989
990 let dataset = Dataset::write(Box::new(reader), &manifest_path, Some(write_params))
991 .await
992 .map_err(|e| Error::IO {
993 source: box_error(std::io::Error::other(format!(
994 "Failed to create manifest dataset: {}",
995 e
996 ))),
997 location: location!(),
998 })?;
999
1000 log::info!(
1001 "Successfully created manifest table at {}, version={}, uri={}",
1002 manifest_path,
1003 dataset.version().version,
1004 dataset.uri()
1005 );
1006 Ok(DatasetConsistencyWrapper::new(dataset))
1007 }
1008 }
1009}
1010
1011#[async_trait]
1012impl LanceNamespace for ManifestNamespace {
1013 fn namespace_id(&self) -> String {
1014 self.root.clone()
1015 }
1016
1017 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1018 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1019 source: "Namespace ID is required".into(),
1020 location: location!(),
1021 })?;
1022
1023 let filter = if namespace_id.is_empty() {
1025 "object_type = 'table' AND NOT contains(object_id, '$')".to_string()
1027 } else {
1028 let prefix = namespace_id.join(DELIMITER);
1030 format!(
1031 "object_type = 'table' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1032 prefix, DELIMITER, prefix.len() + 2
1033 )
1034 };
1035
1036 let mut scanner = self.manifest_scanner().await?;
1037 scanner.filter(&filter).map_err(|e| Error::IO {
1038 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1039 location: location!(),
1040 })?;
1041 scanner.project(&["object_id"]).map_err(|e| Error::IO {
1042 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1043 location: location!(),
1044 })?;
1045
1046 let batches = Self::execute_scanner(scanner).await?;
1047
1048 let mut tables = Vec::new();
1049 for batch in batches {
1050 if batch.num_rows() == 0 {
1051 continue;
1052 }
1053
1054 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1055 for i in 0..batch.num_rows() {
1056 let object_id = object_id_array.value(i);
1057 let (_namespace, name) = Self::parse_object_id(object_id);
1058 tables.push(name);
1059 }
1060 }
1061
1062 Ok(ListTablesResponse::new(tables))
1063 }
1064
1065 async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1066 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1067 source: "Table ID is required".into(),
1068 location: location!(),
1069 })?;
1070
1071 if table_id.is_empty() {
1072 return Err(Error::InvalidInput {
1073 source: "Table ID cannot be empty".into(),
1074 location: location!(),
1075 });
1076 }
1077
1078 let object_id = Self::str_object_id(table_id);
1079 let table_info = self.query_manifest_for_table(&object_id).await?;
1080
1081 match table_info {
1082 Some(info) => {
1083 let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1085
1086 match Dataset::open(&table_uri).await {
1088 Ok(mut dataset) => {
1089 if let Some(requested_version) = request.version {
1091 dataset = dataset.checkout_version(requested_version as u64).await?;
1092 }
1093
1094 let version = dataset.version().version;
1095 let lance_schema = dataset.schema();
1096 let arrow_schema: arrow_schema::Schema = lance_schema.into();
1097 let json_schema = arrow_schema_to_json(&arrow_schema)?;
1098
1099 Ok(DescribeTableResponse {
1100 version: Some(version as i64),
1101 location: Some(table_uri),
1102 schema: Some(Box::new(json_schema)),
1103 properties: None,
1104 storage_options: self.storage_options.clone(),
1105 })
1106 }
1107 Err(_) => {
1108 Ok(DescribeTableResponse {
1110 version: None,
1111 location: Some(table_uri),
1112 schema: None,
1113 properties: None,
1114 storage_options: self.storage_options.clone(),
1115 })
1116 }
1117 }
1118 }
1119 None => Err(Error::Namespace {
1120 source: format!("Table '{}' not found", object_id).into(),
1121 location: location!(),
1122 }),
1123 }
1124 }
1125
1126 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1127 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1128 source: "Table ID is required".into(),
1129 location: location!(),
1130 })?;
1131
1132 if table_id.is_empty() {
1133 return Err(Error::InvalidInput {
1134 source: "Table ID cannot be empty".into(),
1135 location: location!(),
1136 });
1137 }
1138
1139 let (namespace, table_name) = Self::split_object_id(table_id);
1140 let object_id = Self::build_object_id(&namespace, &table_name);
1141 let exists = self.manifest_contains_object(&object_id).await?;
1142 if exists {
1143 Ok(())
1144 } else {
1145 Err(Error::Namespace {
1146 source: format!("Table '{}' not found", table_name).into(),
1147 location: location!(),
1148 })
1149 }
1150 }
1151
1152 async fn create_table(
1153 &self,
1154 request: CreateTableRequest,
1155 data: Bytes,
1156 ) -> Result<CreateTableResponse> {
1157 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1158 source: "Table ID is required".into(),
1159 location: location!(),
1160 })?;
1161
1162 if table_id.is_empty() {
1163 return Err(Error::InvalidInput {
1164 source: "Table ID cannot be empty".into(),
1165 location: location!(),
1166 });
1167 }
1168
1169 let (namespace, table_name) = Self::split_object_id(table_id);
1170 let object_id = Self::build_object_id(&namespace, &table_name);
1171
1172 if self.manifest_contains_object(&object_id).await? {
1174 return Err(Error::io(
1175 format!("Table '{}' already exists", table_name),
1176 location!(),
1177 ));
1178 }
1179
1180 let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1184 format!("{}.lance", table_name)
1186 } else {
1187 Self::generate_dir_name(&object_id)
1189 };
1190 let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1191
1192 if data.is_empty() {
1194 return Err(Error::Namespace {
1195 source: "Request data (Arrow IPC stream) is required for create_table".into(),
1196 location: location!(),
1197 });
1198 }
1199
1200 if let Some(location) = &request.location {
1202 let location = location.trim_end_matches('/');
1203 if location != table_uri {
1204 return Err(Error::Namespace {
1205 source: format!(
1206 "Cannot create table {} at location {}, must be at location {}",
1207 table_name, location, table_uri
1208 )
1209 .into(),
1210 location: location!(),
1211 });
1212 }
1213 }
1214
1215 let cursor = Cursor::new(data.to_vec());
1217 let stream_reader = StreamReader::try_new(cursor, None)
1218 .map_err(|e| Error::io(format!("Failed to read IPC stream: {}", e), location!()))?;
1219
1220 let batches: Vec<RecordBatch> =
1221 stream_reader
1222 .collect::<std::result::Result<Vec<_>, _>>()
1223 .map_err(|e| Error::io(format!("Failed to collect batches: {}", e), location!()))?;
1224
1225 if batches.is_empty() {
1226 return Err(Error::io(
1227 "No data provided for table creation",
1228 location!(),
1229 ));
1230 }
1231
1232 let schema = batches[0].schema();
1233 let batch_results: Vec<std::result::Result<RecordBatch, arrow_schema::ArrowError>> =
1234 batches.into_iter().map(Ok).collect();
1235 let reader = RecordBatchIterator::new(batch_results, schema);
1236
1237 let write_params = WriteParams::default();
1238 let _dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params))
1239 .await
1240 .map_err(|e| Error::IO {
1241 source: box_error(std::io::Error::other(format!(
1242 "Failed to write dataset: {}",
1243 e
1244 ))),
1245 location: location!(),
1246 })?;
1247
1248 self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1250 .await?;
1251
1252 Ok(CreateTableResponse {
1253 version: Some(1),
1254 location: Some(table_uri),
1255 properties: None,
1256 storage_options: self.storage_options.clone(),
1257 })
1258 }
1259
1260 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1261 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1262 source: "Table ID is required".into(),
1263 location: location!(),
1264 })?;
1265
1266 if table_id.is_empty() {
1267 return Err(Error::InvalidInput {
1268 source: "Table ID cannot be empty".into(),
1269 location: location!(),
1270 });
1271 }
1272
1273 let (namespace, table_name) = Self::split_object_id(table_id);
1274 let object_id = Self::build_object_id(&namespace, &table_name);
1275
1276 let table_info = self.query_manifest_for_table(&object_id).await?;
1278
1279 match table_info {
1280 Some(info) => {
1281 self.delete_from_manifest(&object_id).await?;
1283
1284 let table_path = self.base_path.child(info.location.as_str());
1286 let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1287
1288 self.object_store
1290 .remove_dir_all(table_path)
1291 .await
1292 .map_err(|e| Error::Namespace {
1293 source: format!("Failed to delete table directory: {}", e).into(),
1294 location: location!(),
1295 })?;
1296
1297 Ok(DropTableResponse {
1298 id: request.id.clone(),
1299 location: Some(table_uri),
1300 properties: None,
1301 transaction_id: None,
1302 })
1303 }
1304 None => Err(Error::Namespace {
1305 source: format!("Table '{}' not found", table_name).into(),
1306 location: location!(),
1307 }),
1308 }
1309 }
1310
1311 async fn list_namespaces(
1312 &self,
1313 request: ListNamespacesRequest,
1314 ) -> Result<ListNamespacesResponse> {
1315 let parent_namespace = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1316 source: "Namespace ID is required".into(),
1317 location: location!(),
1318 })?;
1319
1320 let filter = if parent_namespace.is_empty() {
1322 "object_type = 'namespace' AND NOT contains(object_id, '$')".to_string()
1324 } else {
1325 let prefix = parent_namespace.join(DELIMITER);
1327 format!(
1328 "object_type = 'namespace' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1329 prefix, DELIMITER, prefix.len() + 2
1330 )
1331 };
1332
1333 let mut scanner = self.manifest_scanner().await?;
1334 scanner.filter(&filter).map_err(|e| Error::IO {
1335 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1336 location: location!(),
1337 })?;
1338 scanner.project(&["object_id"]).map_err(|e| Error::IO {
1339 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1340 location: location!(),
1341 })?;
1342
1343 let batches = Self::execute_scanner(scanner).await?;
1344 let mut namespaces = Vec::new();
1345
1346 for batch in batches {
1347 if batch.num_rows() == 0 {
1348 continue;
1349 }
1350
1351 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1352 for i in 0..batch.num_rows() {
1353 let object_id = object_id_array.value(i);
1354 let (_namespace, name) = Self::parse_object_id(object_id);
1355 namespaces.push(name);
1356 }
1357 }
1358
1359 Ok(ListNamespacesResponse::new(namespaces))
1360 }
1361
1362 async fn describe_namespace(
1363 &self,
1364 request: DescribeNamespaceRequest,
1365 ) -> Result<DescribeNamespaceResponse> {
1366 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1367 source: "Namespace ID is required".into(),
1368 location: location!(),
1369 })?;
1370
1371 if namespace_id.is_empty() {
1373 return Ok(DescribeNamespaceResponse {
1374 properties: Some(HashMap::new()),
1375 });
1376 }
1377
1378 let object_id = namespace_id.join(DELIMITER);
1380 let namespace_info = self.query_manifest_for_namespace(&object_id).await?;
1381
1382 match namespace_info {
1383 Some(info) => Ok(DescribeNamespaceResponse {
1384 properties: info.metadata,
1385 }),
1386 None => Err(Error::Namespace {
1387 source: format!("Namespace '{}' not found", object_id).into(),
1388 location: location!(),
1389 }),
1390 }
1391 }
1392
1393 async fn create_namespace(
1394 &self,
1395 request: CreateNamespaceRequest,
1396 ) -> Result<CreateNamespaceResponse> {
1397 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1398 source: "Namespace ID is required".into(),
1399 location: location!(),
1400 })?;
1401
1402 if namespace_id.is_empty() {
1404 return Err(Error::Namespace {
1405 source: "Root namespace already exists and cannot be created".into(),
1406 location: location!(),
1407 });
1408 }
1409
1410 if namespace_id.len() > 1 {
1412 self.validate_namespace_levels_exist(&namespace_id[..namespace_id.len() - 1])
1413 .await?;
1414 }
1415
1416 let object_id = namespace_id.join(DELIMITER);
1417 if self.manifest_contains_object(&object_id).await? {
1418 return Err(Error::Namespace {
1419 source: format!("Namespace '{}' already exists", object_id).into(),
1420 location: location!(),
1421 });
1422 }
1423
1424 let metadata = request.properties.as_ref().and_then(|props| {
1426 if props.is_empty() {
1427 None
1428 } else {
1429 Some(serde_json::to_string(props).ok()?)
1430 }
1431 });
1432
1433 self.insert_into_manifest_with_metadata(
1434 object_id,
1435 ObjectType::Namespace,
1436 None,
1437 metadata,
1438 None,
1439 )
1440 .await?;
1441
1442 Ok(CreateNamespaceResponse {
1443 properties: request.properties,
1444 })
1445 }
1446
1447 async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
1448 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1449 source: "Namespace ID is required".into(),
1450 location: location!(),
1451 })?;
1452
1453 if namespace_id.is_empty() {
1455 return Err(Error::Namespace {
1456 source: "Root namespace cannot be dropped".into(),
1457 location: location!(),
1458 });
1459 }
1460
1461 let object_id = namespace_id.join(DELIMITER);
1462
1463 if !self.manifest_contains_object(&object_id).await? {
1465 return Err(Error::Namespace {
1466 source: format!("Namespace '{}' not found", object_id).into(),
1467 location: location!(),
1468 });
1469 }
1470
1471 let prefix = format!("{}{}", object_id, DELIMITER);
1473 let filter = format!("starts_with(object_id, '{}')", prefix);
1474 let mut scanner = self.manifest_scanner().await?;
1475 scanner.filter(&filter).map_err(|e| Error::IO {
1476 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1477 location: location!(),
1478 })?;
1479 scanner.project::<&str>(&[]).map_err(|e| Error::IO {
1480 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1481 location: location!(),
1482 })?;
1483 scanner.with_row_id();
1484 let count = scanner.count_rows().await.map_err(|e| Error::IO {
1485 source: box_error(std::io::Error::other(format!(
1486 "Failed to count rows: {}",
1487 e
1488 ))),
1489 location: location!(),
1490 })?;
1491
1492 if count > 0 {
1493 return Err(Error::Namespace {
1494 source: format!(
1495 "Namespace '{}' is not empty (contains {} child objects)",
1496 object_id, count
1497 )
1498 .into(),
1499 location: location!(),
1500 });
1501 }
1502
1503 self.delete_from_manifest(&object_id).await?;
1504
1505 Ok(DropNamespaceResponse {
1506 properties: None,
1507 transaction_id: None,
1508 })
1509 }
1510
1511 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
1512 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1513 source: "Namespace ID is required".into(),
1514 location: location!(),
1515 })?;
1516
1517 if namespace_id.is_empty() {
1519 return Ok(());
1520 }
1521
1522 let object_id = namespace_id.join(DELIMITER);
1523 if self.manifest_contains_object(&object_id).await? {
1524 Ok(())
1525 } else {
1526 Err(Error::Namespace {
1527 source: format!("Namespace '{}' not found", object_id).into(),
1528 location: location!(),
1529 })
1530 }
1531 }
1532
1533 async fn create_empty_table(
1534 &self,
1535 request: CreateEmptyTableRequest,
1536 ) -> Result<CreateEmptyTableResponse> {
1537 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1538 source: "Table ID is required".into(),
1539 location: location!(),
1540 })?;
1541
1542 if table_id.is_empty() {
1543 return Err(Error::InvalidInput {
1544 source: "Table ID cannot be empty".into(),
1545 location: location!(),
1546 });
1547 }
1548
1549 let (namespace, table_name) = Self::split_object_id(table_id);
1550 let object_id = Self::build_object_id(&namespace, &table_name);
1551
1552 let existing = self.query_manifest_for_table(&object_id).await?;
1554 if existing.is_some() {
1555 return Err(Error::Namespace {
1556 source: format!("Table '{}' already exists", table_name).into(),
1557 location: location!(),
1558 });
1559 }
1560
1561 let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1565 format!("{}.lance", table_name)
1567 } else {
1568 Self::generate_dir_name(&object_id)
1570 };
1571 let table_path = self.base_path.child(dir_name.as_str());
1572 let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1573
1574 if let Some(req_location) = &request.location {
1576 let req_location = req_location.trim_end_matches('/');
1577 if req_location != table_uri {
1578 return Err(Error::Namespace {
1579 source: format!(
1580 "Cannot create table {} at location {}, must be at location {}",
1581 table_name, req_location, table_uri
1582 )
1583 .into(),
1584 location: location!(),
1585 });
1586 }
1587 }
1588
1589 let reserved_file_path = table_path.child(".lance-reserved");
1591
1592 self.object_store
1593 .create(&reserved_file_path)
1594 .await
1595 .map_err(|e| Error::Namespace {
1596 source: format!(
1597 "Failed to create .lance-reserved file for table {}: {}",
1598 table_name, e
1599 )
1600 .into(),
1601 location: location!(),
1602 })?
1603 .shutdown()
1604 .await
1605 .map_err(|e| Error::Namespace {
1606 source: format!(
1607 "Failed to finalize .lance-reserved file for table {}: {}",
1608 table_name, e
1609 )
1610 .into(),
1611 location: location!(),
1612 })?;
1613
1614 self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1616 .await?;
1617
1618 log::info!(
1619 "Created empty table '{}' in manifest at {}",
1620 table_name,
1621 table_uri
1622 );
1623
1624 Ok(CreateEmptyTableResponse {
1625 location: Some(table_uri),
1626 properties: None,
1627 storage_options: self.storage_options.clone(),
1628 })
1629 }
1630
1631 async fn register_table(&self, request: RegisterTableRequest) -> Result<RegisterTableResponse> {
1632 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1633 source: "Table ID is required".into(),
1634 location: location!(),
1635 })?;
1636
1637 if table_id.is_empty() {
1638 return Err(Error::InvalidInput {
1639 source: "Table ID cannot be empty".into(),
1640 location: location!(),
1641 });
1642 }
1643
1644 let location = request.location.clone();
1645
1646 if location.contains("://") {
1649 return Err(Error::InvalidInput {
1650 source: format!(
1651 "Absolute URIs are not allowed for register_table. Location must be a relative path within the root directory: {}",
1652 location
1653 ).into(),
1654 location: location!(),
1655 });
1656 }
1657
1658 if location.starts_with('/') {
1659 return Err(Error::InvalidInput {
1660 source: format!(
1661 "Absolute paths are not allowed for register_table. Location must be a relative path within the root directory: {}",
1662 location
1663 ).into(),
1664 location: location!(),
1665 });
1666 }
1667
1668 if location.contains("..") {
1670 return Err(Error::InvalidInput {
1671 source: format!(
1672 "Path traversal is not allowed. Location must be a relative path within the root directory: {}",
1673 location
1674 ).into(),
1675 location: location!(),
1676 });
1677 }
1678
1679 let (namespace, table_name) = Self::split_object_id(table_id);
1680 let object_id = Self::build_object_id(&namespace, &table_name);
1681
1682 if !namespace.is_empty() {
1684 self.validate_namespace_levels_exist(&namespace).await?;
1685 }
1686
1687 if self.manifest_contains_object(&object_id).await? {
1689 return Err(Error::Namespace {
1690 source: format!("Table '{}' already exists", object_id).into(),
1691 location: location!(),
1692 });
1693 }
1694
1695 self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone()))
1697 .await?;
1698
1699 Ok(RegisterTableResponse {
1700 location,
1701 properties: None,
1702 })
1703 }
1704
1705 async fn deregister_table(
1706 &self,
1707 request: DeregisterTableRequest,
1708 ) -> Result<DeregisterTableResponse> {
1709 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1710 source: "Table ID is required".into(),
1711 location: location!(),
1712 })?;
1713
1714 if table_id.is_empty() {
1715 return Err(Error::InvalidInput {
1716 source: "Table ID cannot be empty".into(),
1717 location: location!(),
1718 });
1719 }
1720
1721 let (namespace, table_name) = Self::split_object_id(table_id);
1722 let object_id = Self::build_object_id(&namespace, &table_name);
1723
1724 let table_info = self.query_manifest_for_table(&object_id).await?;
1726
1727 let table_uri = match table_info {
1728 Some(info) => {
1729 self.delete_from_manifest(&object_id).await?;
1731 Self::construct_full_uri(&self.root, &info.location)?
1732 }
1733 None => {
1734 return Err(Error::Namespace {
1735 source: format!("Table '{}' not found", object_id).into(),
1736 location: location!(),
1737 });
1738 }
1739 };
1740
1741 Ok(DeregisterTableResponse {
1742 id: request.id.clone(),
1743 location: Some(table_uri),
1744 properties: None,
1745 })
1746 }
1747}
1748
1749#[cfg(test)]
1750mod tests {
1751 use crate::{DirectoryNamespaceBuilder, ManifestNamespace};
1752 use bytes::Bytes;
1753 use lance_core::utils::tempfile::TempStdDir;
1754 use lance_namespace::models::{
1755 CreateTableRequest, DescribeTableRequest, DropTableRequest, ListTablesRequest,
1756 TableExistsRequest,
1757 };
1758 use lance_namespace::LanceNamespace;
1759 use rstest::rstest;
1760
1761 fn create_test_ipc_data() -> Vec<u8> {
1762 use arrow::array::{Int32Array, StringArray};
1763 use arrow::datatypes::{DataType, Field, Schema};
1764 use arrow::ipc::writer::StreamWriter;
1765 use arrow::record_batch::RecordBatch;
1766 use std::sync::Arc;
1767
1768 let schema = Arc::new(Schema::new(vec![
1769 Field::new("id", DataType::Int32, false),
1770 Field::new("name", DataType::Utf8, false),
1771 ]));
1772
1773 let batch = RecordBatch::try_new(
1774 schema.clone(),
1775 vec![
1776 Arc::new(Int32Array::from(vec![1, 2, 3])),
1777 Arc::new(StringArray::from(vec!["a", "b", "c"])),
1778 ],
1779 )
1780 .unwrap();
1781
1782 let mut buffer = Vec::new();
1783 {
1784 let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap();
1785 writer.write(&batch).unwrap();
1786 writer.finish().unwrap();
1787 }
1788 buffer
1789 }
1790
1791 #[rstest]
1792 #[case::with_optimization(true)]
1793 #[case::without_optimization(false)]
1794 #[tokio::test]
1795 async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) {
1796 let temp_dir = TempStdDir::default();
1797 let temp_path = temp_dir.to_str().unwrap();
1798
1799 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1801 .inline_optimization_enabled(inline_optimization)
1802 .build()
1803 .await
1804 .unwrap();
1805
1806 let mut request = ListTablesRequest::new();
1808 request.id = Some(vec![]);
1809 let response = dir_namespace.list_tables(request).await.unwrap();
1810 assert_eq!(response.tables.len(), 0);
1811
1812 let buffer = create_test_ipc_data();
1814 let mut create_request = CreateTableRequest::new();
1815 create_request.id = Some(vec!["test_table".to_string()]);
1816
1817 let _response = dir_namespace
1818 .create_table(create_request, Bytes::from(buffer))
1819 .await
1820 .unwrap();
1821
1822 let mut request = ListTablesRequest::new();
1824 request.id = Some(vec![]);
1825 let response = dir_namespace.list_tables(request).await.unwrap();
1826 assert_eq!(response.tables.len(), 1);
1827 assert_eq!(response.tables[0], "test_table");
1828 }
1829
1830 #[rstest]
1831 #[case::with_optimization(true)]
1832 #[case::without_optimization(false)]
1833 #[tokio::test]
1834 async fn test_manifest_namespace_table_exists(#[case] inline_optimization: bool) {
1835 let temp_dir = TempStdDir::default();
1836 let temp_path = temp_dir.to_str().unwrap();
1837
1838 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1839 .inline_optimization_enabled(inline_optimization)
1840 .build()
1841 .await
1842 .unwrap();
1843
1844 let mut request = TableExistsRequest::new();
1846 request.id = Some(vec!["nonexistent".to_string()]);
1847 let result = dir_namespace.table_exists(request).await;
1848 assert!(result.is_err());
1849
1850 let buffer = create_test_ipc_data();
1852 let mut create_request = CreateTableRequest::new();
1853 create_request.id = Some(vec!["test_table".to_string()]);
1854 dir_namespace
1855 .create_table(create_request, Bytes::from(buffer))
1856 .await
1857 .unwrap();
1858
1859 let mut request = TableExistsRequest::new();
1861 request.id = Some(vec!["test_table".to_string()]);
1862 let result = dir_namespace.table_exists(request).await;
1863 assert!(result.is_ok());
1864 }
1865
1866 #[rstest]
1867 #[case::with_optimization(true)]
1868 #[case::without_optimization(false)]
1869 #[tokio::test]
1870 async fn test_manifest_namespace_describe_table(#[case] inline_optimization: bool) {
1871 let temp_dir = TempStdDir::default();
1872 let temp_path = temp_dir.to_str().unwrap();
1873
1874 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1875 .inline_optimization_enabled(inline_optimization)
1876 .build()
1877 .await
1878 .unwrap();
1879
1880 let mut request = DescribeTableRequest::new();
1882 request.id = Some(vec!["nonexistent".to_string()]);
1883 let result = dir_namespace.describe_table(request).await;
1884 assert!(result.is_err());
1885
1886 let buffer = create_test_ipc_data();
1888 let mut create_request = CreateTableRequest::new();
1889 create_request.id = Some(vec!["test_table".to_string()]);
1890 dir_namespace
1891 .create_table(create_request, Bytes::from(buffer))
1892 .await
1893 .unwrap();
1894
1895 let mut request = DescribeTableRequest::new();
1897 request.id = Some(vec!["test_table".to_string()]);
1898 let response = dir_namespace.describe_table(request).await.unwrap();
1899 assert!(response.location.is_some());
1900 assert!(response.location.unwrap().contains("test_table"));
1901 }
1902
1903 #[rstest]
1904 #[case::with_optimization(true)]
1905 #[case::without_optimization(false)]
1906 #[tokio::test]
1907 async fn test_manifest_namespace_drop_table(#[case] inline_optimization: bool) {
1908 let temp_dir = TempStdDir::default();
1909 let temp_path = temp_dir.to_str().unwrap();
1910
1911 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1912 .inline_optimization_enabled(inline_optimization)
1913 .build()
1914 .await
1915 .unwrap();
1916
1917 let buffer = create_test_ipc_data();
1919 let mut create_request = CreateTableRequest::new();
1920 create_request.id = Some(vec!["test_table".to_string()]);
1921 dir_namespace
1922 .create_table(create_request, Bytes::from(buffer))
1923 .await
1924 .unwrap();
1925
1926 let mut request = ListTablesRequest::new();
1928 request.id = Some(vec![]);
1929 let response = dir_namespace.list_tables(request).await.unwrap();
1930 assert_eq!(response.tables.len(), 1);
1931
1932 let mut drop_request = DropTableRequest::new();
1934 drop_request.id = Some(vec!["test_table".to_string()]);
1935 let _response = dir_namespace.drop_table(drop_request).await.unwrap();
1936
1937 let mut request = ListTablesRequest::new();
1939 request.id = Some(vec![]);
1940 let response = dir_namespace.list_tables(request).await.unwrap();
1941 assert_eq!(response.tables.len(), 0);
1942 }
1943
1944 #[rstest]
1945 #[case::with_optimization(true)]
1946 #[case::without_optimization(false)]
1947 #[tokio::test]
1948 async fn test_manifest_namespace_multiple_tables(#[case] inline_optimization: bool) {
1949 let temp_dir = TempStdDir::default();
1950 let temp_path = temp_dir.to_str().unwrap();
1951
1952 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1953 .inline_optimization_enabled(inline_optimization)
1954 .build()
1955 .await
1956 .unwrap();
1957
1958 let buffer = create_test_ipc_data();
1960 for i in 1..=3 {
1961 let mut create_request = CreateTableRequest::new();
1962 create_request.id = Some(vec![format!("table{}", i)]);
1963 dir_namespace
1964 .create_table(create_request, Bytes::from(buffer.clone()))
1965 .await
1966 .unwrap();
1967 }
1968
1969 let mut request = ListTablesRequest::new();
1971 request.id = Some(vec![]);
1972 let response = dir_namespace.list_tables(request).await.unwrap();
1973 assert_eq!(response.tables.len(), 3);
1974 assert!(response.tables.contains(&"table1".to_string()));
1975 assert!(response.tables.contains(&"table2".to_string()));
1976 assert!(response.tables.contains(&"table3".to_string()));
1977 }
1978
1979 #[rstest]
1980 #[case::with_optimization(true)]
1981 #[case::without_optimization(false)]
1982 #[tokio::test]
1983 async fn test_directory_only_mode(#[case] inline_optimization: bool) {
1984 let temp_dir = TempStdDir::default();
1985 let temp_path = temp_dir.to_str().unwrap();
1986
1987 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1989 .manifest_enabled(false)
1990 .inline_optimization_enabled(inline_optimization)
1991 .build()
1992 .await
1993 .unwrap();
1994
1995 let mut request = ListTablesRequest::new();
1997 request.id = Some(vec![]);
1998 let response = dir_namespace.list_tables(request).await.unwrap();
1999 assert_eq!(response.tables.len(), 0);
2000
2001 let buffer = create_test_ipc_data();
2003 let mut create_request = CreateTableRequest::new();
2004 create_request.id = Some(vec!["test_table".to_string()]);
2005
2006 let _response = dir_namespace
2008 .create_table(create_request, Bytes::from(buffer))
2009 .await
2010 .unwrap();
2011
2012 let mut request = ListTablesRequest::new();
2014 request.id = Some(vec![]);
2015 let response = dir_namespace.list_tables(request).await.unwrap();
2016 assert_eq!(response.tables.len(), 1);
2017 assert_eq!(response.tables[0], "test_table");
2018 }
2019
2020 #[rstest]
2021 #[case::with_optimization(true)]
2022 #[case::without_optimization(false)]
2023 #[tokio::test]
2024 async fn test_dual_mode_merge(#[case] inline_optimization: bool) {
2025 let temp_dir = TempStdDir::default();
2026 let temp_path = temp_dir.to_str().unwrap();
2027
2028 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2030 .manifest_enabled(true)
2031 .dir_listing_enabled(true)
2032 .inline_optimization_enabled(inline_optimization)
2033 .build()
2034 .await
2035 .unwrap();
2036
2037 let buffer = create_test_ipc_data();
2039 let mut create_request = CreateTableRequest::new();
2040 create_request.id = Some(vec!["table1".to_string()]);
2041 dir_namespace
2042 .create_table(create_request, Bytes::from(buffer))
2043 .await
2044 .unwrap();
2045
2046 let mut request = ListTablesRequest::new();
2048 request.id = Some(vec![]);
2049 let response = dir_namespace.list_tables(request).await.unwrap();
2050 assert_eq!(response.tables.len(), 1);
2051 assert_eq!(response.tables[0], "table1");
2052 }
2053
2054 #[rstest]
2055 #[case::with_optimization(true)]
2056 #[case::without_optimization(false)]
2057 #[tokio::test]
2058 async fn test_manifest_only_mode(#[case] inline_optimization: bool) {
2059 let temp_dir = TempStdDir::default();
2060 let temp_path = temp_dir.to_str().unwrap();
2061
2062 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2064 .manifest_enabled(true)
2065 .dir_listing_enabled(false)
2066 .inline_optimization_enabled(inline_optimization)
2067 .build()
2068 .await
2069 .unwrap();
2070
2071 let buffer = create_test_ipc_data();
2073 let mut create_request = CreateTableRequest::new();
2074 create_request.id = Some(vec!["test_table".to_string()]);
2075 dir_namespace
2076 .create_table(create_request, Bytes::from(buffer))
2077 .await
2078 .unwrap();
2079
2080 let mut request = ListTablesRequest::new();
2082 request.id = Some(vec![]);
2083 let response = dir_namespace.list_tables(request).await.unwrap();
2084 assert_eq!(response.tables.len(), 1);
2085 assert_eq!(response.tables[0], "test_table");
2086 }
2087
2088 #[rstest]
2089 #[case::with_optimization(true)]
2090 #[case::without_optimization(false)]
2091 #[tokio::test]
2092 async fn test_drop_nonexistent_table(#[case] inline_optimization: bool) {
2093 let temp_dir = TempStdDir::default();
2094 let temp_path = temp_dir.to_str().unwrap();
2095
2096 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2097 .inline_optimization_enabled(inline_optimization)
2098 .build()
2099 .await
2100 .unwrap();
2101
2102 let mut drop_request = DropTableRequest::new();
2104 drop_request.id = Some(vec!["nonexistent".to_string()]);
2105 let result = dir_namespace.drop_table(drop_request).await;
2106 assert!(result.is_err());
2107 }
2108
2109 #[rstest]
2110 #[case::with_optimization(true)]
2111 #[case::without_optimization(false)]
2112 #[tokio::test]
2113 async fn test_create_duplicate_table_fails(#[case] inline_optimization: bool) {
2114 let temp_dir = TempStdDir::default();
2115 let temp_path = temp_dir.to_str().unwrap();
2116
2117 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2118 .inline_optimization_enabled(inline_optimization)
2119 .build()
2120 .await
2121 .unwrap();
2122
2123 let buffer = create_test_ipc_data();
2125 let mut create_request = CreateTableRequest::new();
2126 create_request.id = Some(vec!["test_table".to_string()]);
2127 dir_namespace
2128 .create_table(create_request, Bytes::from(buffer.clone()))
2129 .await
2130 .unwrap();
2131
2132 let mut create_request = CreateTableRequest::new();
2134 create_request.id = Some(vec!["test_table".to_string()]);
2135 let result = dir_namespace
2136 .create_table(create_request, Bytes::from(buffer))
2137 .await;
2138 assert!(result.is_err());
2139 }
2140
2141 #[rstest]
2142 #[case::with_optimization(true)]
2143 #[case::without_optimization(false)]
2144 #[tokio::test]
2145 async fn test_create_child_namespace(#[case] inline_optimization: bool) {
2146 use lance_namespace::models::{
2147 CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2148 };
2149
2150 let temp_dir = TempStdDir::default();
2151 let temp_path = temp_dir.to_str().unwrap();
2152
2153 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2154 .inline_optimization_enabled(inline_optimization)
2155 .build()
2156 .await
2157 .unwrap();
2158
2159 let mut create_req = CreateNamespaceRequest::new();
2161 create_req.id = Some(vec!["ns1".to_string()]);
2162 let result = dir_namespace.create_namespace(create_req).await;
2163 assert!(
2164 result.is_ok(),
2165 "Failed to create child namespace: {:?}",
2166 result.err()
2167 );
2168
2169 let exists_req = NamespaceExistsRequest {
2171 id: Some(vec!["ns1".to_string()]),
2172 };
2173 let result = dir_namespace.namespace_exists(exists_req).await;
2174 assert!(result.is_ok(), "Namespace should exist");
2175
2176 let list_req = ListNamespacesRequest {
2178 id: Some(vec![]),
2179 page_token: None,
2180 limit: None,
2181 };
2182 let result = dir_namespace.list_namespaces(list_req).await;
2183 assert!(result.is_ok());
2184 let namespaces = result.unwrap();
2185 assert_eq!(namespaces.namespaces.len(), 1);
2186 assert_eq!(namespaces.namespaces[0], "ns1");
2187 }
2188
2189 #[rstest]
2190 #[case::with_optimization(true)]
2191 #[case::without_optimization(false)]
2192 #[tokio::test]
2193 async fn test_create_nested_namespace(#[case] inline_optimization: bool) {
2194 use lance_namespace::models::{
2195 CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2196 };
2197
2198 let temp_dir = TempStdDir::default();
2199 let temp_path = temp_dir.to_str().unwrap();
2200
2201 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2202 .inline_optimization_enabled(inline_optimization)
2203 .build()
2204 .await
2205 .unwrap();
2206
2207 let mut create_req = CreateNamespaceRequest::new();
2209 create_req.id = Some(vec!["parent".to_string()]);
2210 dir_namespace.create_namespace(create_req).await.unwrap();
2211
2212 let mut create_req = CreateNamespaceRequest::new();
2214 create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2215 let result = dir_namespace.create_namespace(create_req).await;
2216 assert!(
2217 result.is_ok(),
2218 "Failed to create nested namespace: {:?}",
2219 result.err()
2220 );
2221
2222 let exists_req = NamespaceExistsRequest {
2224 id: Some(vec!["parent".to_string(), "child".to_string()]),
2225 };
2226 let result = dir_namespace.namespace_exists(exists_req).await;
2227 assert!(result.is_ok(), "Nested namespace should exist");
2228
2229 let list_req = ListNamespacesRequest {
2231 id: Some(vec!["parent".to_string()]),
2232 page_token: None,
2233 limit: None,
2234 };
2235 let result = dir_namespace.list_namespaces(list_req).await;
2236 assert!(result.is_ok());
2237 let namespaces = result.unwrap();
2238 assert_eq!(namespaces.namespaces.len(), 1);
2239 assert_eq!(namespaces.namespaces[0], "child");
2240 }
2241
2242 #[rstest]
2243 #[case::with_optimization(true)]
2244 #[case::without_optimization(false)]
2245 #[tokio::test]
2246 async fn test_create_namespace_without_parent_fails(#[case] inline_optimization: bool) {
2247 use lance_namespace::models::CreateNamespaceRequest;
2248
2249 let temp_dir = TempStdDir::default();
2250 let temp_path = temp_dir.to_str().unwrap();
2251
2252 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2253 .inline_optimization_enabled(inline_optimization)
2254 .build()
2255 .await
2256 .unwrap();
2257
2258 let mut create_req = CreateNamespaceRequest::new();
2260 create_req.id = Some(vec!["nonexistent_parent".to_string(), "child".to_string()]);
2261 let result = dir_namespace.create_namespace(create_req).await;
2262 assert!(result.is_err(), "Should fail when parent doesn't exist");
2263 }
2264
2265 #[rstest]
2266 #[case::with_optimization(true)]
2267 #[case::without_optimization(false)]
2268 #[tokio::test]
2269 async fn test_drop_child_namespace(#[case] inline_optimization: bool) {
2270 use lance_namespace::models::{
2271 CreateNamespaceRequest, DropNamespaceRequest, NamespaceExistsRequest,
2272 };
2273
2274 let temp_dir = TempStdDir::default();
2275 let temp_path = temp_dir.to_str().unwrap();
2276
2277 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2278 .inline_optimization_enabled(inline_optimization)
2279 .build()
2280 .await
2281 .unwrap();
2282
2283 let mut create_req = CreateNamespaceRequest::new();
2285 create_req.id = Some(vec!["ns1".to_string()]);
2286 dir_namespace.create_namespace(create_req).await.unwrap();
2287
2288 let mut drop_req = DropNamespaceRequest::new();
2290 drop_req.id = Some(vec!["ns1".to_string()]);
2291 let result = dir_namespace.drop_namespace(drop_req).await;
2292 assert!(
2293 result.is_ok(),
2294 "Failed to drop namespace: {:?}",
2295 result.err()
2296 );
2297
2298 let exists_req = NamespaceExistsRequest {
2300 id: Some(vec!["ns1".to_string()]),
2301 };
2302 let result = dir_namespace.namespace_exists(exists_req).await;
2303 assert!(result.is_err(), "Namespace should not exist after drop");
2304 }
2305
2306 #[rstest]
2307 #[case::with_optimization(true)]
2308 #[case::without_optimization(false)]
2309 #[tokio::test]
2310 async fn test_drop_namespace_with_children_fails(#[case] inline_optimization: bool) {
2311 use lance_namespace::models::{CreateNamespaceRequest, DropNamespaceRequest};
2312
2313 let temp_dir = TempStdDir::default();
2314 let temp_path = temp_dir.to_str().unwrap();
2315
2316 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2317 .inline_optimization_enabled(inline_optimization)
2318 .build()
2319 .await
2320 .unwrap();
2321
2322 let mut create_req = CreateNamespaceRequest::new();
2324 create_req.id = Some(vec!["parent".to_string()]);
2325 dir_namespace.create_namespace(create_req).await.unwrap();
2326
2327 let mut create_req = CreateNamespaceRequest::new();
2328 create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2329 dir_namespace.create_namespace(create_req).await.unwrap();
2330
2331 let mut drop_req = DropNamespaceRequest::new();
2333 drop_req.id = Some(vec!["parent".to_string()]);
2334 let result = dir_namespace.drop_namespace(drop_req).await;
2335 assert!(result.is_err(), "Should fail when namespace has children");
2336 }
2337
2338 #[rstest]
2339 #[case::with_optimization(true)]
2340 #[case::without_optimization(false)]
2341 #[tokio::test]
2342 async fn test_create_table_in_child_namespace(#[case] inline_optimization: bool) {
2343 use lance_namespace::models::{
2344 CreateNamespaceRequest, CreateTableRequest, ListTablesRequest,
2345 };
2346
2347 let temp_dir = TempStdDir::default();
2348 let temp_path = temp_dir.to_str().unwrap();
2349
2350 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2351 .inline_optimization_enabled(inline_optimization)
2352 .build()
2353 .await
2354 .unwrap();
2355
2356 let mut create_ns_req = CreateNamespaceRequest::new();
2358 create_ns_req.id = Some(vec!["ns1".to_string()]);
2359 dir_namespace.create_namespace(create_ns_req).await.unwrap();
2360
2361 let buffer = create_test_ipc_data();
2363 let mut create_table_req = CreateTableRequest::new();
2364 create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2365 let result = dir_namespace
2366 .create_table(create_table_req, Bytes::from(buffer))
2367 .await;
2368 assert!(
2369 result.is_ok(),
2370 "Failed to create table in child namespace: {:?}",
2371 result.err()
2372 );
2373
2374 let list_req = ListTablesRequest {
2376 id: Some(vec!["ns1".to_string()]),
2377 page_token: None,
2378 limit: None,
2379 };
2380 let result = dir_namespace.list_tables(list_req).await;
2381 assert!(result.is_ok());
2382 let tables = result.unwrap();
2383 assert_eq!(tables.tables.len(), 1);
2384 assert_eq!(tables.tables[0], "table1");
2385 }
2386
2387 #[rstest]
2388 #[case::with_optimization(true)]
2389 #[case::without_optimization(false)]
2390 #[tokio::test]
2391 async fn test_describe_child_namespace(#[case] inline_optimization: bool) {
2392 use lance_namespace::models::{CreateNamespaceRequest, DescribeNamespaceRequest};
2393
2394 let temp_dir = TempStdDir::default();
2395 let temp_path = temp_dir.to_str().unwrap();
2396
2397 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2398 .inline_optimization_enabled(inline_optimization)
2399 .build()
2400 .await
2401 .unwrap();
2402
2403 let mut properties = std::collections::HashMap::new();
2405 properties.insert("key1".to_string(), "value1".to_string());
2406
2407 let mut create_req = CreateNamespaceRequest::new();
2408 create_req.id = Some(vec!["ns1".to_string()]);
2409 create_req.properties = Some(properties.clone());
2410 dir_namespace.create_namespace(create_req).await.unwrap();
2411
2412 let describe_req = DescribeNamespaceRequest {
2414 id: Some(vec!["ns1".to_string()]),
2415 };
2416 let result = dir_namespace.describe_namespace(describe_req).await;
2417 assert!(
2418 result.is_ok(),
2419 "Failed to describe namespace: {:?}",
2420 result.err()
2421 );
2422 let response = result.unwrap();
2423 assert!(response.properties.is_some());
2424 assert_eq!(
2425 response.properties.unwrap().get("key1"),
2426 Some(&"value1".to_string())
2427 );
2428 }
2429
2430 #[test]
2431 fn test_construct_full_uri_with_cloud_urls() {
2432 let s3_result =
2434 ManifestNamespace::construct_full_uri("s3://bucket/path/subdir", "table.lance")
2435 .unwrap();
2436 assert_eq!(
2437 s3_result, "s3://bucket/path/subdir/table.lance",
2438 "S3 URL should correctly append table name to nested path"
2439 );
2440
2441 let az_result =
2443 ManifestNamespace::construct_full_uri("az://container/path/subdir", "table.lance")
2444 .unwrap();
2445 assert_eq!(
2446 az_result, "az://container/path/subdir/table.lance",
2447 "Azure URL should correctly append table name to nested path"
2448 );
2449
2450 let gs_result =
2452 ManifestNamespace::construct_full_uri("gs://bucket/path/subdir", "table.lance")
2453 .unwrap();
2454 assert_eq!(
2455 gs_result, "gs://bucket/path/subdir/table.lance",
2456 "GCS URL should correctly append table name to nested path"
2457 );
2458
2459 let deep_result =
2461 ManifestNamespace::construct_full_uri("s3://bucket/a/b/c/d", "my_table.lance").unwrap();
2462 assert_eq!(
2463 deep_result, "s3://bucket/a/b/c/d/my_table.lance",
2464 "Deeply nested path should work correctly"
2465 );
2466
2467 let shallow_result =
2469 ManifestNamespace::construct_full_uri("s3://bucket", "table.lance").unwrap();
2470 assert_eq!(
2471 shallow_result, "s3://bucket/table.lance",
2472 "Single-level nested path should work correctly"
2473 );
2474
2475 let trailing_slash_result =
2477 ManifestNamespace::construct_full_uri("s3://bucket/path/subdir/", "table.lance")
2478 .unwrap();
2479 assert_eq!(
2480 trailing_slash_result, "s3://bucket/path/subdir/table.lance",
2481 "URL with existing trailing slash should still work"
2482 );
2483 }
2484}