1use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray};
10use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
11use arrow_ipc::reader::StreamReader;
12use async_trait::async_trait;
13use bytes::Bytes;
14use futures::stream::StreamExt;
15use lance::dataset::optimize::{compact_files, CompactionOptions};
16use lance::dataset::{builder::DatasetBuilder, WriteParams};
17use lance::session::Session;
18use lance::{dataset::scanner::Scanner, Dataset};
19use lance_core::{box_error, Error, Result};
20use lance_index::optimize::OptimizeOptions;
21use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
22use lance_index::traits::DatasetIndexExt;
23use lance_index::IndexType;
24use lance_io::object_store::{ObjectStore, ObjectStoreParams};
25use lance_namespace::models::{
26 CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
27 CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DeregisterTableRequest,
28 DeregisterTableResponse, DescribeNamespaceRequest, DescribeNamespaceResponse,
29 DescribeTableRequest, DescribeTableResponse, DropNamespaceRequest, DropNamespaceResponse,
30 DropTableRequest, DropTableResponse, ListNamespacesRequest, ListNamespacesResponse,
31 ListTablesRequest, ListTablesResponse, NamespaceExistsRequest, RegisterTableRequest,
32 RegisterTableResponse, TableExistsRequest,
33};
34use lance_namespace::schema::arrow_schema_to_json;
35use lance_namespace::LanceNamespace;
36use object_store::path::Path;
37use snafu::location;
38use std::io::Cursor;
39use std::{
40 collections::HashMap,
41 hash::{DefaultHasher, Hash, Hasher},
42 ops::{Deref, DerefMut},
43 sync::Arc,
44};
45use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
46
47const MANIFEST_TABLE_NAME: &str = "__manifest";
48const DELIMITER: &str = "$";
49
50const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
53const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
55const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum ObjectType {
61 Namespace,
62 Table,
63}
64
65impl ObjectType {
66 pub fn as_str(&self) -> &str {
67 match self {
68 Self::Namespace => "namespace",
69 Self::Table => "table",
70 }
71 }
72
73 pub fn parse(s: &str) -> Result<Self> {
74 match s {
75 "namespace" => Ok(Self::Namespace),
76 "table" => Ok(Self::Table),
77 _ => Err(Error::io(
78 format!("Invalid object type: {}", s),
79 location!(),
80 )),
81 }
82 }
83}
84
85#[derive(Debug, Clone)]
87pub struct TableInfo {
88 pub namespace: Vec<String>,
89 pub name: String,
90 pub location: String,
91}
92
93#[derive(Debug, Clone)]
95pub struct NamespaceInfo {
96 pub namespace: Vec<String>,
97 pub name: String,
98 pub metadata: Option<HashMap<String, String>>,
99}
100
101#[derive(Debug, Clone)]
106pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);
107
108impl DatasetConsistencyWrapper {
109 pub fn new(dataset: Dataset) -> Self {
111 Self(Arc::new(RwLock::new(dataset)))
112 }
113
114 pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
117 self.reload().await?;
118 Ok(DatasetReadGuard {
119 guard: self.0.read().await,
120 })
121 }
122
123 pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
126 self.reload().await?;
127 Ok(DatasetWriteGuard {
128 guard: self.0.write().await,
129 })
130 }
131
132 pub async fn set_latest(&self, dataset: Dataset) {
137 let mut write_guard = self.0.write().await;
138 if dataset.manifest().version > write_guard.manifest().version {
139 *write_guard = dataset;
140 }
141 }
142
143 async fn reload(&self) -> Result<()> {
145 let read_guard = self.0.read().await;
147 let dataset_uri = read_guard.uri().to_string();
148 let current_version = read_guard.version().version;
149 log::debug!(
150 "Reload starting for uri={}, current_version={}",
151 dataset_uri,
152 current_version
153 );
154 let latest_version = read_guard
155 .latest_version_id()
156 .await
157 .map_err(|e| Error::IO {
158 source: box_error(std::io::Error::other(format!(
159 "Failed to get latest version: {}",
160 e
161 ))),
162 location: location!(),
163 })?;
164 log::debug!(
165 "Reload got latest_version={} for uri={}, current_version={}",
166 latest_version,
167 dataset_uri,
168 current_version
169 );
170 drop(read_guard);
171
172 if latest_version == current_version {
174 log::debug!("Already up-to-date for uri={}", dataset_uri);
175 return Ok(());
176 }
177
178 let mut write_guard = self.0.write().await;
180
181 let latest_version = write_guard
183 .latest_version_id()
184 .await
185 .map_err(|e| Error::IO {
186 source: box_error(std::io::Error::other(format!(
187 "Failed to get latest version: {}",
188 e
189 ))),
190 location: location!(),
191 })?;
192
193 if latest_version != write_guard.version().version {
194 write_guard.checkout_latest().await.map_err(|e| Error::IO {
195 source: box_error(std::io::Error::other(format!(
196 "Failed to checkout latest: {}",
197 e
198 ))),
199 location: location!(),
200 })?;
201 }
202
203 Ok(())
204 }
205}
206
207pub struct DatasetReadGuard<'a> {
208 guard: RwLockReadGuard<'a, Dataset>,
209}
210
211impl Deref for DatasetReadGuard<'_> {
212 type Target = Dataset;
213
214 fn deref(&self) -> &Self::Target {
215 &self.guard
216 }
217}
218
219pub struct DatasetWriteGuard<'a> {
220 guard: RwLockWriteGuard<'a, Dataset>,
221}
222
223impl Deref for DatasetWriteGuard<'_> {
224 type Target = Dataset;
225
226 fn deref(&self) -> &Self::Target {
227 &self.guard
228 }
229}
230
231impl DerefMut for DatasetWriteGuard<'_> {
232 fn deref_mut(&mut self) -> &mut Self::Target {
233 &mut self.guard
234 }
235}
236
237#[derive(Debug)]
241pub struct ManifestNamespace {
242 root: String,
243 storage_options: Option<HashMap<String, String>>,
244 #[allow(dead_code)]
245 session: Option<Arc<Session>>,
246 #[allow(dead_code)]
247 object_store: Arc<ObjectStore>,
248 #[allow(dead_code)]
249 base_path: Path,
250 manifest_dataset: DatasetConsistencyWrapper,
251 dir_listing_enabled: bool,
255 inline_optimization_enabled: bool,
258}
259
260impl ManifestNamespace {
261 pub async fn from_directory(
263 root: String,
264 storage_options: Option<HashMap<String, String>>,
265 session: Option<Arc<Session>>,
266 object_store: Arc<ObjectStore>,
267 base_path: Path,
268 dir_listing_enabled: bool,
269 inline_optimization_enabled: bool,
270 ) -> Result<Self> {
271 let manifest_dataset =
272 Self::create_or_get_manifest(&root, &storage_options, session.clone()).await?;
273
274 Ok(Self {
275 root,
276 storage_options,
277 session,
278 object_store,
279 base_path,
280 manifest_dataset,
281 dir_listing_enabled,
282 inline_optimization_enabled,
283 })
284 }
285
286 pub fn build_object_id(namespace: &[String], name: &str) -> String {
288 if namespace.is_empty() {
289 name.to_string()
290 } else {
291 let mut id = namespace.join(DELIMITER);
292 id.push_str(DELIMITER);
293 id.push_str(name);
294 id
295 }
296 }
297
298 pub fn parse_object_id(object_id: &str) -> (Vec<String>, String) {
300 let parts: Vec<&str> = object_id.split(DELIMITER).collect();
301 if parts.len() == 1 {
302 (Vec::new(), parts[0].to_string())
303 } else {
304 let namespace = parts[..parts.len() - 1]
305 .iter()
306 .map(|s| s.to_string())
307 .collect();
308 let name = parts[parts.len() - 1].to_string();
309 (namespace, name)
310 }
311 }
312
313 fn split_object_id(table_id: &[String]) -> (Vec<String>, String) {
315 if table_id.len() == 1 {
316 (vec![], table_id[0].clone())
317 } else {
318 (
319 table_id[..table_id.len() - 1].to_vec(),
320 table_id[table_id.len() - 1].clone(),
321 )
322 }
323 }
324
325 fn str_object_id(table_id: &[String]) -> String {
327 table_id.join(DELIMITER)
328 }
329
330 fn generate_dir_name(object_id: &str) -> String {
337 let random_num: u64 = rand::random();
339
340 let mut hasher = DefaultHasher::new();
342 random_num.hash(&mut hasher);
343 object_id.hash(&mut hasher);
344 let hash = hasher.finish();
345
346 format!("{:08x}_{}", (hash & 0xFFFFFFFF) as u32, object_id)
348 }
349
350 fn construct_full_uri(&self, relative_location: &str) -> Result<String> {
352 let base_url = lance_io::object_store::uri_to_url(&self.root)?;
353 let full_url = base_url
354 .join(relative_location)
355 .map_err(|e| Error::InvalidInput {
356 source: format!(
357 "Failed to join URI '{}' with '{}': {}",
358 self.root, relative_location, e
359 )
360 .into(),
361 location: location!(),
362 })?;
363
364 Ok(full_url.to_string())
365 }
366
367 async fn run_inline_optimization(&self) -> Result<()> {
379 if !self.inline_optimization_enabled {
380 return Ok(());
381 }
382
383 let mut dataset_guard = self.manifest_dataset.get_mut().await?;
385 let dataset: &mut Dataset = &mut dataset_guard;
386
387 let indices = dataset.load_indices().await?;
389
390 let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
392 let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
393 let has_base_objects_index = indices
394 .iter()
395 .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);
396
397 if !has_object_id_index {
399 log::debug!(
400 "Creating BTREE index '{}' on object_id for __manifest table",
401 OBJECT_ID_INDEX_NAME
402 );
403 let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
404 if let Err(e) = dataset
405 .create_index(
406 &["object_id"],
407 IndexType::BTree,
408 Some(OBJECT_ID_INDEX_NAME.to_string()),
409 ¶ms,
410 true,
411 )
412 .await
413 {
414 log::warn!("Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.", e);
415 } else {
416 log::info!(
417 "Created BTREE index '{}' on object_id for __manifest table",
418 OBJECT_ID_INDEX_NAME
419 );
420 }
421 }
422
423 if !has_object_type_index {
425 log::debug!(
426 "Creating Bitmap index '{}' on object_type for __manifest table",
427 OBJECT_TYPE_INDEX_NAME
428 );
429 let params = ScalarIndexParams::default();
430 if let Err(e) = dataset
431 .create_index(
432 &["object_type"],
433 IndexType::Bitmap,
434 Some(OBJECT_TYPE_INDEX_NAME.to_string()),
435 ¶ms,
436 true,
437 )
438 .await
439 {
440 log::warn!("Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.", e);
441 } else {
442 log::info!(
443 "Created Bitmap index '{}' on object_type for __manifest table",
444 OBJECT_TYPE_INDEX_NAME
445 );
446 }
447 }
448
449 if !has_base_objects_index {
451 log::debug!(
452 "Creating LabelList index '{}' on base_objects for __manifest table",
453 BASE_OBJECTS_INDEX_NAME
454 );
455 let params = ScalarIndexParams::default();
456 if let Err(e) = dataset
457 .create_index(
458 &["base_objects"],
459 IndexType::LabelList,
460 Some(BASE_OBJECTS_INDEX_NAME.to_string()),
461 ¶ms,
462 true,
463 )
464 .await
465 {
466 log::warn!("Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.", e);
467 } else {
468 log::info!(
469 "Created LabelList index '{}' on base_objects for __manifest table",
470 BASE_OBJECTS_INDEX_NAME
471 );
472 }
473 }
474
475 log::debug!("Running file compaction on __manifest table");
477 match compact_files(dataset, CompactionOptions::default(), None).await {
478 Ok(compaction_metrics) => {
479 if compaction_metrics.fragments_removed > 0 {
480 log::info!(
481 "Compacted __manifest table: removed {} fragments, added {} fragments",
482 compaction_metrics.fragments_removed,
483 compaction_metrics.fragments_added
484 );
485 }
486 }
487 Err(e) => {
488 log::warn!("Failed to compact files for __manifest table: {:?}. Continuing with optimization.", e);
489 }
490 }
491
492 log::debug!("Optimizing indices on __manifest table");
494 match dataset.optimize_indices(&OptimizeOptions::default()).await {
495 Ok(_) => {
496 log::info!("Successfully optimized indices on __manifest table");
497 }
498 Err(e) => {
499 log::warn!(
500 "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.",
501 e
502 );
503 }
504 }
505
506 Ok(())
507 }
508
509 fn manifest_schema() -> Arc<ArrowSchema> {
511 Arc::new(ArrowSchema::new(vec![
512 Field::new("object_id", DataType::Utf8, false),
513 Field::new("object_type", DataType::Utf8, false),
514 Field::new("location", DataType::Utf8, true),
515 Field::new("metadata", DataType::Utf8, true),
516 Field::new(
517 "base_objects",
518 DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))),
519 true,
520 ),
521 ]))
522 }
523
524 async fn manifest_scanner(&self) -> Result<Scanner> {
526 let dataset_guard = self.manifest_dataset.get().await?;
527 Ok(dataset_guard.scan())
528 }
529
530 async fn execute_scanner(scanner: Scanner) -> Result<Vec<RecordBatch>> {
532 let mut stream = scanner.try_into_stream().await.map_err(|e| Error::IO {
533 source: box_error(std::io::Error::other(format!(
534 "Failed to create stream: {}",
535 e
536 ))),
537 location: location!(),
538 })?;
539
540 let mut batches = Vec::new();
541 while let Some(batch) = stream.next().await {
542 batches.push(batch.map_err(|e| Error::IO {
543 source: box_error(std::io::Error::other(format!(
544 "Failed to read batch: {}",
545 e
546 ))),
547 location: location!(),
548 })?);
549 }
550
551 Ok(batches)
552 }
553
554 fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> {
556 let column = batch
557 .column_by_name(column_name)
558 .ok_or_else(|| Error::io(format!("Column '{}' not found", column_name), location!()))?;
559 column
560 .as_any()
561 .downcast_ref::<StringArray>()
562 .ok_or_else(|| {
563 Error::io(
564 format!("Column '{}' is not a string array", column_name),
565 location!(),
566 )
567 })
568 }
569
570 async fn manifest_contains_object(&self, object_id: &str) -> Result<bool> {
572 let filter = format!("object_id = '{}'", object_id);
573
574 let dataset_guard = self.manifest_dataset.get().await?;
575 let mut scanner = dataset_guard.scan();
576
577 scanner.filter(&filter).map_err(|e| Error::IO {
578 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
579 location: location!(),
580 })?;
581
582 scanner.project::<&str>(&[]).map_err(|e| Error::IO {
584 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
585 location: location!(),
586 })?;
587
588 scanner.with_row_id();
589
590 let count = scanner.count_rows().await.map_err(|e| Error::IO {
591 source: box_error(std::io::Error::other(format!(
592 "Failed to count rows: {}",
593 e
594 ))),
595 location: location!(),
596 })?;
597
598 Ok(count > 0)
599 }
600
601 async fn query_manifest_for_table(&self, object_id: &str) -> Result<Option<TableInfo>> {
603 let filter = format!("object_id = '{}' AND object_type = 'table'", object_id);
604 let mut scanner = self.manifest_scanner().await?;
605 scanner.filter(&filter).map_err(|e| Error::IO {
606 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
607 location: location!(),
608 })?;
609 scanner
610 .project(&["object_id", "location"])
611 .map_err(|e| Error::IO {
612 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
613 location: location!(),
614 })?;
615 let batches = Self::execute_scanner(scanner).await?;
616
617 let mut found_result: Option<TableInfo> = None;
618 let mut total_rows = 0;
619
620 for batch in batches {
621 if batch.num_rows() == 0 {
622 continue;
623 }
624
625 total_rows += batch.num_rows();
626 if total_rows > 1 {
627 return Err(Error::io(
628 format!(
629 "Expected exactly 1 table with id '{}', found {}",
630 object_id, total_rows
631 ),
632 location!(),
633 ));
634 }
635
636 let object_id_array = Self::get_string_column(&batch, "object_id")?;
637 let location_array = Self::get_string_column(&batch, "location")?;
638 let location = location_array.value(0).to_string();
639 let (namespace, name) = Self::parse_object_id(object_id_array.value(0));
640 found_result = Some(TableInfo {
641 namespace,
642 name,
643 location,
644 });
645 }
646
647 Ok(found_result)
648 }
649
650 pub async fn list_manifest_table_locations(&self) -> Result<std::collections::HashSet<String>> {
653 let filter = "object_type = 'table' AND NOT contains(object_id, '$')";
654 let mut scanner = self.manifest_scanner().await?;
655 scanner.filter(filter).map_err(|e| Error::IO {
656 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
657 location: location!(),
658 })?;
659 scanner.project(&["location"]).map_err(|e| Error::IO {
660 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
661 location: location!(),
662 })?;
663
664 let batches = Self::execute_scanner(scanner).await?;
665 let mut locations = std::collections::HashSet::new();
666
667 for batch in batches {
668 if batch.num_rows() == 0 {
669 continue;
670 }
671 let location_array = Self::get_string_column(&batch, "location")?;
672 for i in 0..location_array.len() {
673 locations.insert(location_array.value(i).to_string());
674 }
675 }
676
677 Ok(locations)
678 }
679
680 async fn insert_into_manifest(
682 &self,
683 object_id: String,
684 object_type: ObjectType,
685 location: Option<String>,
686 ) -> Result<()> {
687 self.insert_into_manifest_with_metadata(object_id, object_type, location, None, None)
688 .await
689 }
690
691 async fn insert_into_manifest_with_metadata(
693 &self,
694 object_id: String,
695 object_type: ObjectType,
696 location: Option<String>,
697 metadata: Option<String>,
698 base_objects: Option<Vec<String>>,
699 ) -> Result<()> {
700 use arrow::array::builder::{ListBuilder, StringBuilder};
701
702 let schema = Self::manifest_schema();
703
704 let string_builder = StringBuilder::new();
706 let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
707 "object_id",
708 DataType::Utf8,
709 true,
710 )));
711
712 match base_objects {
713 Some(objects) => {
714 for obj in objects {
715 list_builder.values().append_value(obj);
716 }
717 list_builder.append(true);
718 }
719 None => {
720 list_builder.append_null();
721 }
722 }
723
724 let base_objects_array = list_builder.finish();
725
726 let location_array = match location {
728 Some(loc) => Arc::new(StringArray::from(vec![Some(loc)])),
729 None => Arc::new(StringArray::from(vec![None::<String>])),
730 };
731
732 let metadata_array = match metadata {
733 Some(meta) => Arc::new(StringArray::from(vec![Some(meta)])),
734 None => Arc::new(StringArray::from(vec![None::<String>])),
735 };
736
737 let batch = RecordBatch::try_new(
738 schema.clone(),
739 vec![
740 Arc::new(StringArray::from(vec![object_id.as_str()])),
741 Arc::new(StringArray::from(vec![object_type.as_str()])),
742 location_array,
743 metadata_array,
744 Arc::new(base_objects_array),
745 ],
746 )
747 .map_err(|e| {
748 Error::io(
749 format!("Failed to create manifest entry: {}", e),
750 location!(),
751 )
752 })?;
753
754 let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
755
756 let dataset_guard = self.manifest_dataset.get().await?;
758 let dataset_arc = Arc::new(dataset_guard.clone());
759 drop(dataset_guard); let mut merge_builder =
762 lance::dataset::MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()])
763 .map_err(|e| Error::IO {
764 source: box_error(std::io::Error::other(format!(
765 "Failed to create merge builder: {}",
766 e
767 ))),
768 location: location!(),
769 })?;
770
771 merge_builder.when_matched(lance::dataset::WhenMatched::Fail);
772 merge_builder.when_not_matched(lance::dataset::WhenNotMatched::InsertAll);
773
774 let (new_dataset_arc, _merge_stats) = merge_builder
775 .try_build()
776 .map_err(|e| Error::IO {
777 source: box_error(std::io::Error::other(format!(
778 "Failed to build merge: {}",
779 e
780 ))),
781 location: location!(),
782 })?
783 .execute_reader(Box::new(reader))
784 .await
785 .map_err(|e| {
786 let error_msg = e.to_string();
788 if error_msg.contains("matched")
789 || error_msg.contains("duplicate")
790 || error_msg.contains("already exists")
791 {
792 Error::io(
793 format!("Object with id '{}' already exists in manifest", object_id),
794 location!(),
795 )
796 } else {
797 Error::IO {
798 source: box_error(std::io::Error::other(format!(
799 "Failed to execute merge: {}",
800 e
801 ))),
802 location: location!(),
803 }
804 }
805 })?;
806
807 let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
808 self.manifest_dataset.set_latest(new_dataset).await;
809
810 if let Err(e) = self.run_inline_optimization().await {
812 log::warn!(
813 "Unexpected failure when running inline optimization: {:?}",
814 e
815 );
816 }
817
818 Ok(())
819 }
820
821 pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> {
823 {
824 let predicate = format!("object_id = '{}'", object_id);
825 let mut dataset_guard = self.manifest_dataset.get_mut().await?;
826 dataset_guard
827 .delete(&predicate)
828 .await
829 .map_err(|e| Error::IO {
830 source: box_error(std::io::Error::other(format!("Failed to delete: {}", e))),
831 location: location!(),
832 })?;
833 } self.manifest_dataset.reload().await?;
836
837 if let Err(e) = self.run_inline_optimization().await {
839 log::warn!(
840 "Unexpected failure when running inline optimization: {:?}",
841 e
842 );
843 }
844
845 Ok(())
846 }
847
848 pub async fn register_table(&self, name: &str, location: String) -> Result<()> {
850 let object_id = Self::build_object_id(&[], name);
851 if self.manifest_contains_object(&object_id).await? {
852 return Err(Error::io(
853 format!("Table '{}' already exists", name),
854 location!(),
855 ));
856 }
857
858 self.insert_into_manifest(object_id, ObjectType::Table, Some(location))
859 .await
860 }
861
862 async fn validate_namespace_levels_exist(&self, namespace_path: &[String]) -> Result<()> {
864 for i in 1..=namespace_path.len() {
865 let partial_path = &namespace_path[..i];
866 let object_id = partial_path.join(DELIMITER);
867 if !self.manifest_contains_object(&object_id).await? {
868 return Err(Error::Namespace {
869 source: format!("Parent namespace '{}' does not exist", object_id).into(),
870 location: location!(),
871 });
872 }
873 }
874 Ok(())
875 }
876
877 async fn query_manifest_for_namespace(&self, object_id: &str) -> Result<Option<NamespaceInfo>> {
879 let filter = format!("object_id = '{}' AND object_type = 'namespace'", object_id);
880 let mut scanner = self.manifest_scanner().await?;
881 scanner.filter(&filter).map_err(|e| Error::IO {
882 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
883 location: location!(),
884 })?;
885 scanner
886 .project(&["object_id", "metadata"])
887 .map_err(|e| Error::IO {
888 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
889 location: location!(),
890 })?;
891 let batches = Self::execute_scanner(scanner).await?;
892
893 let mut found_result: Option<NamespaceInfo> = None;
894 let mut total_rows = 0;
895
896 for batch in batches {
897 if batch.num_rows() == 0 {
898 continue;
899 }
900
901 total_rows += batch.num_rows();
902 if total_rows > 1 {
903 return Err(Error::io(
904 format!(
905 "Expected exactly 1 namespace with id '{}', found {}",
906 object_id, total_rows
907 ),
908 location!(),
909 ));
910 }
911
912 let object_id_array = Self::get_string_column(&batch, "object_id")?;
913 let metadata_array = Self::get_string_column(&batch, "metadata")?;
914
915 let object_id_str = object_id_array.value(0);
916 let metadata = if !metadata_array.is_null(0) {
917 let metadata_str = metadata_array.value(0);
918 match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
919 Ok(map) => Some(map),
920 Err(e) => {
921 return Err(Error::io(
922 format!(
923 "Failed to deserialize metadata for namespace '{}': {}",
924 object_id, e
925 ),
926 location!(),
927 ));
928 }
929 }
930 } else {
931 None
932 };
933
934 let (namespace, name) = Self::parse_object_id(object_id_str);
935 found_result = Some(NamespaceInfo {
936 namespace,
937 name,
938 metadata,
939 });
940 }
941
942 Ok(found_result)
943 }
944
945 async fn create_or_get_manifest(
947 root: &str,
948 storage_options: &Option<HashMap<String, String>>,
949 session: Option<Arc<Session>>,
950 ) -> Result<DatasetConsistencyWrapper> {
951 let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME);
952 log::debug!("Attempting to load manifest from {}", manifest_path);
953 let mut builder = DatasetBuilder::from_uri(&manifest_path);
954
955 if let Some(sess) = session.clone() {
956 builder = builder.with_session(sess);
957 }
958
959 if let Some(opts) = storage_options {
960 builder = builder.with_storage_options(opts.clone());
961 }
962
963 let dataset_result = builder.load().await;
964 if let Ok(dataset) = dataset_result {
965 Ok(DatasetConsistencyWrapper::new(dataset))
966 } else {
967 log::info!("Creating new manifest table at {}", manifest_path);
968 let schema = Self::manifest_schema();
969 let empty_batch = RecordBatch::new_empty(schema.clone());
970 let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone());
971
972 let write_params = WriteParams {
973 session,
974 store_params: storage_options.as_ref().map(|opts| ObjectStoreParams {
975 storage_options: Some(opts.clone()),
976 ..Default::default()
977 }),
978 ..Default::default()
979 };
980
981 let dataset = Dataset::write(Box::new(reader), &manifest_path, Some(write_params))
982 .await
983 .map_err(|e| Error::IO {
984 source: box_error(std::io::Error::other(format!(
985 "Failed to create manifest dataset: {}",
986 e
987 ))),
988 location: location!(),
989 })?;
990
991 log::info!(
992 "Successfully created manifest table at {}, version={}, uri={}",
993 manifest_path,
994 dataset.version().version,
995 dataset.uri()
996 );
997 Ok(DatasetConsistencyWrapper::new(dataset))
998 }
999 }
1000}
1001
1002#[async_trait]
1003impl LanceNamespace for ManifestNamespace {
1004 fn namespace_id(&self) -> String {
1005 self.root.clone()
1006 }
1007
1008 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1009 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1010 source: "Namespace ID is required".into(),
1011 location: location!(),
1012 })?;
1013
1014 let filter = if namespace_id.is_empty() {
1016 "object_type = 'table' AND NOT contains(object_id, '$')".to_string()
1018 } else {
1019 let prefix = namespace_id.join(DELIMITER);
1021 format!(
1022 "object_type = 'table' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1023 prefix, DELIMITER, prefix.len() + 2
1024 )
1025 };
1026
1027 let mut scanner = self.manifest_scanner().await?;
1028 scanner.filter(&filter).map_err(|e| Error::IO {
1029 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1030 location: location!(),
1031 })?;
1032 scanner.project(&["object_id"]).map_err(|e| Error::IO {
1033 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1034 location: location!(),
1035 })?;
1036
1037 let batches = Self::execute_scanner(scanner).await?;
1038
1039 let mut tables = Vec::new();
1040 for batch in batches {
1041 if batch.num_rows() == 0 {
1042 continue;
1043 }
1044
1045 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1046 for i in 0..batch.num_rows() {
1047 let object_id = object_id_array.value(i);
1048 let (_namespace, name) = Self::parse_object_id(object_id);
1049 tables.push(name);
1050 }
1051 }
1052
1053 Ok(ListTablesResponse::new(tables))
1054 }
1055
1056 async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1057 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1058 source: "Table ID is required".into(),
1059 location: location!(),
1060 })?;
1061
1062 if table_id.is_empty() {
1063 return Err(Error::InvalidInput {
1064 source: "Table ID cannot be empty".into(),
1065 location: location!(),
1066 });
1067 }
1068
1069 let object_id = Self::str_object_id(table_id);
1070 let table_info = self.query_manifest_for_table(&object_id).await?;
1071
1072 match table_info {
1073 Some(info) => {
1074 let table_uri = self.construct_full_uri(&info.location)?;
1076
1077 match Dataset::open(&table_uri).await {
1079 Ok(mut dataset) => {
1080 if let Some(requested_version) = request.version {
1082 dataset = dataset.checkout_version(requested_version as u64).await?;
1083 }
1084
1085 let version = dataset.version().version;
1086 let lance_schema = dataset.schema();
1087 let arrow_schema: arrow_schema::Schema = lance_schema.into();
1088 let json_schema = arrow_schema_to_json(&arrow_schema)?;
1089
1090 Ok(DescribeTableResponse {
1091 version: Some(version as i64),
1092 location: Some(table_uri),
1093 schema: Some(Box::new(json_schema)),
1094 properties: None,
1095 storage_options: self.storage_options.clone(),
1096 })
1097 }
1098 Err(_) => {
1099 Ok(DescribeTableResponse {
1101 version: None,
1102 location: Some(table_uri),
1103 schema: None,
1104 properties: None,
1105 storage_options: self.storage_options.clone(),
1106 })
1107 }
1108 }
1109 }
1110 None => Err(Error::Namespace {
1111 source: format!("Table '{}' not found", object_id).into(),
1112 location: location!(),
1113 }),
1114 }
1115 }
1116
1117 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1118 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1119 source: "Table ID is required".into(),
1120 location: location!(),
1121 })?;
1122
1123 if table_id.is_empty() {
1124 return Err(Error::InvalidInput {
1125 source: "Table ID cannot be empty".into(),
1126 location: location!(),
1127 });
1128 }
1129
1130 let (namespace, table_name) = Self::split_object_id(table_id);
1131 let object_id = Self::build_object_id(&namespace, &table_name);
1132 let exists = self.manifest_contains_object(&object_id).await?;
1133 if exists {
1134 Ok(())
1135 } else {
1136 Err(Error::Namespace {
1137 source: format!("Table '{}' not found", table_name).into(),
1138 location: location!(),
1139 })
1140 }
1141 }
1142
1143 async fn create_table(
1144 &self,
1145 request: CreateTableRequest,
1146 data: Bytes,
1147 ) -> Result<CreateTableResponse> {
1148 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1149 source: "Table ID is required".into(),
1150 location: location!(),
1151 })?;
1152
1153 if table_id.is_empty() {
1154 return Err(Error::InvalidInput {
1155 source: "Table ID cannot be empty".into(),
1156 location: location!(),
1157 });
1158 }
1159
1160 let (namespace, table_name) = Self::split_object_id(table_id);
1161 let object_id = Self::build_object_id(&namespace, &table_name);
1162
1163 if self.manifest_contains_object(&object_id).await? {
1165 return Err(Error::io(
1166 format!("Table '{}' already exists", table_name),
1167 location!(),
1168 ));
1169 }
1170
1171 let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1175 format!("{}.lance", table_name)
1177 } else {
1178 Self::generate_dir_name(&object_id)
1180 };
1181 let table_uri = self.construct_full_uri(&dir_name)?;
1182
1183 if data.is_empty() {
1185 return Err(Error::Namespace {
1186 source: "Request data (Arrow IPC stream) is required for create_table".into(),
1187 location: location!(),
1188 });
1189 }
1190
1191 if let Some(location) = &request.location {
1193 let location = location.trim_end_matches('/');
1194 if location != table_uri {
1195 return Err(Error::Namespace {
1196 source: format!(
1197 "Cannot create table {} at location {}, must be at location {}",
1198 table_name, location, table_uri
1199 )
1200 .into(),
1201 location: location!(),
1202 });
1203 }
1204 }
1205
1206 let cursor = Cursor::new(data.to_vec());
1208 let stream_reader = StreamReader::try_new(cursor, None)
1209 .map_err(|e| Error::io(format!("Failed to read IPC stream: {}", e), location!()))?;
1210
1211 let batches: Vec<RecordBatch> =
1212 stream_reader
1213 .collect::<std::result::Result<Vec<_>, _>>()
1214 .map_err(|e| Error::io(format!("Failed to collect batches: {}", e), location!()))?;
1215
1216 if batches.is_empty() {
1217 return Err(Error::io(
1218 "No data provided for table creation",
1219 location!(),
1220 ));
1221 }
1222
1223 let schema = batches[0].schema();
1224 let batch_results: Vec<std::result::Result<RecordBatch, arrow_schema::ArrowError>> =
1225 batches.into_iter().map(Ok).collect();
1226 let reader = RecordBatchIterator::new(batch_results, schema);
1227
1228 let write_params = WriteParams::default();
1229 let _dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params))
1230 .await
1231 .map_err(|e| Error::IO {
1232 source: box_error(std::io::Error::other(format!(
1233 "Failed to write dataset: {}",
1234 e
1235 ))),
1236 location: location!(),
1237 })?;
1238
1239 self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1241 .await?;
1242
1243 Ok(CreateTableResponse {
1244 version: Some(1),
1245 location: Some(table_uri),
1246 properties: None,
1247 storage_options: self.storage_options.clone(),
1248 })
1249 }
1250
1251 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1252 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1253 source: "Table ID is required".into(),
1254 location: location!(),
1255 })?;
1256
1257 if table_id.is_empty() {
1258 return Err(Error::InvalidInput {
1259 source: "Table ID cannot be empty".into(),
1260 location: location!(),
1261 });
1262 }
1263
1264 let (namespace, table_name) = Self::split_object_id(table_id);
1265 let object_id = Self::build_object_id(&namespace, &table_name);
1266
1267 let table_info = self.query_manifest_for_table(&object_id).await?;
1269
1270 match table_info {
1271 Some(info) => {
1272 self.delete_from_manifest(&object_id).await?;
1274
1275 let table_path = self.base_path.child(info.location.as_str());
1277 let table_uri = self.construct_full_uri(&info.location)?;
1278
1279 self.object_store
1281 .remove_dir_all(table_path)
1282 .await
1283 .map_err(|e| Error::Namespace {
1284 source: format!("Failed to delete table directory: {}", e).into(),
1285 location: location!(),
1286 })?;
1287
1288 Ok(DropTableResponse {
1289 id: request.id.clone(),
1290 location: Some(table_uri),
1291 properties: None,
1292 transaction_id: None,
1293 })
1294 }
1295 None => Err(Error::Namespace {
1296 source: format!("Table '{}' not found", table_name).into(),
1297 location: location!(),
1298 }),
1299 }
1300 }
1301
1302 async fn list_namespaces(
1303 &self,
1304 request: ListNamespacesRequest,
1305 ) -> Result<ListNamespacesResponse> {
1306 let parent_namespace = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1307 source: "Namespace ID is required".into(),
1308 location: location!(),
1309 })?;
1310
1311 let filter = if parent_namespace.is_empty() {
1313 "object_type = 'namespace' AND NOT contains(object_id, '$')".to_string()
1315 } else {
1316 let prefix = parent_namespace.join(DELIMITER);
1318 format!(
1319 "object_type = 'namespace' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1320 prefix, DELIMITER, prefix.len() + 2
1321 )
1322 };
1323
1324 let mut scanner = self.manifest_scanner().await?;
1325 scanner.filter(&filter).map_err(|e| Error::IO {
1326 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1327 location: location!(),
1328 })?;
1329 scanner.project(&["object_id"]).map_err(|e| Error::IO {
1330 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1331 location: location!(),
1332 })?;
1333
1334 let batches = Self::execute_scanner(scanner).await?;
1335 let mut namespaces = Vec::new();
1336
1337 for batch in batches {
1338 if batch.num_rows() == 0 {
1339 continue;
1340 }
1341
1342 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1343 for i in 0..batch.num_rows() {
1344 let object_id = object_id_array.value(i);
1345 let (_namespace, name) = Self::parse_object_id(object_id);
1346 namespaces.push(name);
1347 }
1348 }
1349
1350 Ok(ListNamespacesResponse::new(namespaces))
1351 }
1352
1353 async fn describe_namespace(
1354 &self,
1355 request: DescribeNamespaceRequest,
1356 ) -> Result<DescribeNamespaceResponse> {
1357 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1358 source: "Namespace ID is required".into(),
1359 location: location!(),
1360 })?;
1361
1362 if namespace_id.is_empty() {
1364 return Ok(DescribeNamespaceResponse {
1365 properties: Some(HashMap::new()),
1366 });
1367 }
1368
1369 let object_id = namespace_id.join(DELIMITER);
1371 let namespace_info = self.query_manifest_for_namespace(&object_id).await?;
1372
1373 match namespace_info {
1374 Some(info) => Ok(DescribeNamespaceResponse {
1375 properties: info.metadata,
1376 }),
1377 None => Err(Error::Namespace {
1378 source: format!("Namespace '{}' not found", object_id).into(),
1379 location: location!(),
1380 }),
1381 }
1382 }
1383
1384 async fn create_namespace(
1385 &self,
1386 request: CreateNamespaceRequest,
1387 ) -> Result<CreateNamespaceResponse> {
1388 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1389 source: "Namespace ID is required".into(),
1390 location: location!(),
1391 })?;
1392
1393 if namespace_id.is_empty() {
1395 return Err(Error::Namespace {
1396 source: "Root namespace already exists and cannot be created".into(),
1397 location: location!(),
1398 });
1399 }
1400
1401 if namespace_id.len() > 1 {
1403 self.validate_namespace_levels_exist(&namespace_id[..namespace_id.len() - 1])
1404 .await?;
1405 }
1406
1407 let object_id = namespace_id.join(DELIMITER);
1408 if self.manifest_contains_object(&object_id).await? {
1409 return Err(Error::Namespace {
1410 source: format!("Namespace '{}' already exists", object_id).into(),
1411 location: location!(),
1412 });
1413 }
1414
1415 let metadata = request.properties.as_ref().and_then(|props| {
1417 if props.is_empty() {
1418 None
1419 } else {
1420 Some(serde_json::to_string(props).ok()?)
1421 }
1422 });
1423
1424 self.insert_into_manifest_with_metadata(
1425 object_id,
1426 ObjectType::Namespace,
1427 None,
1428 metadata,
1429 None,
1430 )
1431 .await?;
1432
1433 Ok(CreateNamespaceResponse {
1434 properties: request.properties,
1435 })
1436 }
1437
1438 async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
1439 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1440 source: "Namespace ID is required".into(),
1441 location: location!(),
1442 })?;
1443
1444 if namespace_id.is_empty() {
1446 return Err(Error::Namespace {
1447 source: "Root namespace cannot be dropped".into(),
1448 location: location!(),
1449 });
1450 }
1451
1452 let object_id = namespace_id.join(DELIMITER);
1453
1454 if !self.manifest_contains_object(&object_id).await? {
1456 return Err(Error::Namespace {
1457 source: format!("Namespace '{}' not found", object_id).into(),
1458 location: location!(),
1459 });
1460 }
1461
1462 let prefix = format!("{}{}", object_id, DELIMITER);
1464 let filter = format!("starts_with(object_id, '{}')", prefix);
1465 let mut scanner = self.manifest_scanner().await?;
1466 scanner.filter(&filter).map_err(|e| Error::IO {
1467 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1468 location: location!(),
1469 })?;
1470 scanner.project::<&str>(&[]).map_err(|e| Error::IO {
1471 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1472 location: location!(),
1473 })?;
1474 scanner.with_row_id();
1475 let count = scanner.count_rows().await.map_err(|e| Error::IO {
1476 source: box_error(std::io::Error::other(format!(
1477 "Failed to count rows: {}",
1478 e
1479 ))),
1480 location: location!(),
1481 })?;
1482
1483 if count > 0 {
1484 return Err(Error::Namespace {
1485 source: format!(
1486 "Namespace '{}' is not empty (contains {} child objects)",
1487 object_id, count
1488 )
1489 .into(),
1490 location: location!(),
1491 });
1492 }
1493
1494 self.delete_from_manifest(&object_id).await?;
1495
1496 Ok(DropNamespaceResponse {
1497 properties: None,
1498 transaction_id: None,
1499 })
1500 }
1501
1502 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
1503 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1504 source: "Namespace ID is required".into(),
1505 location: location!(),
1506 })?;
1507
1508 if namespace_id.is_empty() {
1510 return Ok(());
1511 }
1512
1513 let object_id = namespace_id.join(DELIMITER);
1514 if self.manifest_contains_object(&object_id).await? {
1515 Ok(())
1516 } else {
1517 Err(Error::Namespace {
1518 source: format!("Namespace '{}' not found", object_id).into(),
1519 location: location!(),
1520 })
1521 }
1522 }
1523
1524 async fn create_empty_table(
1525 &self,
1526 request: CreateEmptyTableRequest,
1527 ) -> Result<CreateEmptyTableResponse> {
1528 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1529 source: "Table ID is required".into(),
1530 location: location!(),
1531 })?;
1532
1533 if table_id.is_empty() {
1534 return Err(Error::InvalidInput {
1535 source: "Table ID cannot be empty".into(),
1536 location: location!(),
1537 });
1538 }
1539
1540 let (namespace, table_name) = Self::split_object_id(table_id);
1541 let object_id = Self::build_object_id(&namespace, &table_name);
1542
1543 let existing = self.query_manifest_for_table(&object_id).await?;
1545 if existing.is_some() {
1546 return Err(Error::Namespace {
1547 source: format!("Table '{}' already exists", table_name).into(),
1548 location: location!(),
1549 });
1550 }
1551
1552 let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1556 format!("{}.lance", table_name)
1558 } else {
1559 Self::generate_dir_name(&object_id)
1561 };
1562 let table_path = self.base_path.child(dir_name.as_str());
1563 let table_uri = self.construct_full_uri(&dir_name)?;
1564
1565 if let Some(req_location) = &request.location {
1567 let req_location = req_location.trim_end_matches('/');
1568 if req_location != table_uri {
1569 return Err(Error::Namespace {
1570 source: format!(
1571 "Cannot create table {} at location {}, must be at location {}",
1572 table_name, req_location, table_uri
1573 )
1574 .into(),
1575 location: location!(),
1576 });
1577 }
1578 }
1579
1580 let reserved_file_path = table_path.child(".lance-reserved");
1582
1583 self.object_store
1584 .create(&reserved_file_path)
1585 .await
1586 .map_err(|e| Error::Namespace {
1587 source: format!(
1588 "Failed to create .lance-reserved file for table {}: {}",
1589 table_name, e
1590 )
1591 .into(),
1592 location: location!(),
1593 })?
1594 .shutdown()
1595 .await
1596 .map_err(|e| Error::Namespace {
1597 source: format!(
1598 "Failed to finalize .lance-reserved file for table {}: {}",
1599 table_name, e
1600 )
1601 .into(),
1602 location: location!(),
1603 })?;
1604
1605 self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1607 .await?;
1608
1609 log::info!(
1610 "Created empty table '{}' in manifest at {}",
1611 table_name,
1612 table_uri
1613 );
1614
1615 Ok(CreateEmptyTableResponse {
1616 location: Some(table_uri),
1617 properties: None,
1618 storage_options: self.storage_options.clone(),
1619 })
1620 }
1621
1622 async fn register_table(&self, request: RegisterTableRequest) -> Result<RegisterTableResponse> {
1623 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1624 source: "Table ID is required".into(),
1625 location: location!(),
1626 })?;
1627
1628 if table_id.is_empty() {
1629 return Err(Error::InvalidInput {
1630 source: "Table ID cannot be empty".into(),
1631 location: location!(),
1632 });
1633 }
1634
1635 let location = request.location.clone();
1636
1637 if location.contains("://") {
1640 return Err(Error::InvalidInput {
1641 source: format!(
1642 "Absolute URIs are not allowed for register_table. Location must be a relative path within the root directory: {}",
1643 location
1644 ).into(),
1645 location: location!(),
1646 });
1647 }
1648
1649 if location.starts_with('/') {
1650 return Err(Error::InvalidInput {
1651 source: format!(
1652 "Absolute paths are not allowed for register_table. Location must be a relative path within the root directory: {}",
1653 location
1654 ).into(),
1655 location: location!(),
1656 });
1657 }
1658
1659 if location.contains("..") {
1661 return Err(Error::InvalidInput {
1662 source: format!(
1663 "Path traversal is not allowed. Location must be a relative path within the root directory: {}",
1664 location
1665 ).into(),
1666 location: location!(),
1667 });
1668 }
1669
1670 let (namespace, table_name) = Self::split_object_id(table_id);
1671 let object_id = Self::build_object_id(&namespace, &table_name);
1672
1673 if !namespace.is_empty() {
1675 self.validate_namespace_levels_exist(&namespace).await?;
1676 }
1677
1678 if self.manifest_contains_object(&object_id).await? {
1680 return Err(Error::Namespace {
1681 source: format!("Table '{}' already exists", object_id).into(),
1682 location: location!(),
1683 });
1684 }
1685
1686 self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone()))
1688 .await?;
1689
1690 Ok(RegisterTableResponse {
1691 location,
1692 properties: None,
1693 })
1694 }
1695
1696 async fn deregister_table(
1697 &self,
1698 request: DeregisterTableRequest,
1699 ) -> Result<DeregisterTableResponse> {
1700 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1701 source: "Table ID is required".into(),
1702 location: location!(),
1703 })?;
1704
1705 if table_id.is_empty() {
1706 return Err(Error::InvalidInput {
1707 source: "Table ID cannot be empty".into(),
1708 location: location!(),
1709 });
1710 }
1711
1712 let (namespace, table_name) = Self::split_object_id(table_id);
1713 let object_id = Self::build_object_id(&namespace, &table_name);
1714
1715 let table_info = self.query_manifest_for_table(&object_id).await?;
1717
1718 let table_uri = match table_info {
1719 Some(info) => {
1720 self.delete_from_manifest(&object_id).await?;
1722
1723 self.construct_full_uri(&info.location)?
1725 }
1726 None => {
1727 return Err(Error::Namespace {
1728 source: format!("Table '{}' not found", object_id).into(),
1729 location: location!(),
1730 });
1731 }
1732 };
1733
1734 Ok(DeregisterTableResponse {
1735 id: request.id.clone(),
1736 location: Some(table_uri),
1737 properties: None,
1738 })
1739 }
1740}
1741
1742#[cfg(test)]
1743mod tests {
1744 use crate::DirectoryNamespaceBuilder;
1745 use bytes::Bytes;
1746 use lance_core::utils::tempfile::TempStdDir;
1747 use lance_namespace::models::{
1748 CreateTableRequest, DescribeTableRequest, DropTableRequest, ListTablesRequest,
1749 TableExistsRequest,
1750 };
1751 use lance_namespace::LanceNamespace;
1752 use rstest::rstest;
1753
1754 fn create_test_ipc_data() -> Vec<u8> {
1755 use arrow::array::{Int32Array, StringArray};
1756 use arrow::datatypes::{DataType, Field, Schema};
1757 use arrow::ipc::writer::StreamWriter;
1758 use arrow::record_batch::RecordBatch;
1759 use std::sync::Arc;
1760
1761 let schema = Arc::new(Schema::new(vec![
1762 Field::new("id", DataType::Int32, false),
1763 Field::new("name", DataType::Utf8, false),
1764 ]));
1765
1766 let batch = RecordBatch::try_new(
1767 schema.clone(),
1768 vec![
1769 Arc::new(Int32Array::from(vec![1, 2, 3])),
1770 Arc::new(StringArray::from(vec!["a", "b", "c"])),
1771 ],
1772 )
1773 .unwrap();
1774
1775 let mut buffer = Vec::new();
1776 {
1777 let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap();
1778 writer.write(&batch).unwrap();
1779 writer.finish().unwrap();
1780 }
1781 buffer
1782 }
1783
1784 #[rstest]
1785 #[case::with_optimization(true)]
1786 #[case::without_optimization(false)]
1787 #[tokio::test]
1788 async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) {
1789 let temp_dir = TempStdDir::default();
1790 let temp_path = temp_dir.to_str().unwrap();
1791
1792 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1794 .inline_optimization_enabled(inline_optimization)
1795 .build()
1796 .await
1797 .unwrap();
1798
1799 let mut request = ListTablesRequest::new();
1801 request.id = Some(vec![]);
1802 let response = dir_namespace.list_tables(request).await.unwrap();
1803 assert_eq!(response.tables.len(), 0);
1804
1805 let buffer = create_test_ipc_data();
1807 let mut create_request = CreateTableRequest::new();
1808 create_request.id = Some(vec!["test_table".to_string()]);
1809
1810 let _response = dir_namespace
1811 .create_table(create_request, Bytes::from(buffer))
1812 .await
1813 .unwrap();
1814
1815 let mut request = ListTablesRequest::new();
1817 request.id = Some(vec![]);
1818 let response = dir_namespace.list_tables(request).await.unwrap();
1819 assert_eq!(response.tables.len(), 1);
1820 assert_eq!(response.tables[0], "test_table");
1821 }
1822
1823 #[rstest]
1824 #[case::with_optimization(true)]
1825 #[case::without_optimization(false)]
1826 #[tokio::test]
1827 async fn test_manifest_namespace_table_exists(#[case] inline_optimization: bool) {
1828 let temp_dir = TempStdDir::default();
1829 let temp_path = temp_dir.to_str().unwrap();
1830
1831 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1832 .inline_optimization_enabled(inline_optimization)
1833 .build()
1834 .await
1835 .unwrap();
1836
1837 let mut request = TableExistsRequest::new();
1839 request.id = Some(vec!["nonexistent".to_string()]);
1840 let result = dir_namespace.table_exists(request).await;
1841 assert!(result.is_err());
1842
1843 let buffer = create_test_ipc_data();
1845 let mut create_request = CreateTableRequest::new();
1846 create_request.id = Some(vec!["test_table".to_string()]);
1847 dir_namespace
1848 .create_table(create_request, Bytes::from(buffer))
1849 .await
1850 .unwrap();
1851
1852 let mut request = TableExistsRequest::new();
1854 request.id = Some(vec!["test_table".to_string()]);
1855 let result = dir_namespace.table_exists(request).await;
1856 assert!(result.is_ok());
1857 }
1858
1859 #[rstest]
1860 #[case::with_optimization(true)]
1861 #[case::without_optimization(false)]
1862 #[tokio::test]
1863 async fn test_manifest_namespace_describe_table(#[case] inline_optimization: bool) {
1864 let temp_dir = TempStdDir::default();
1865 let temp_path = temp_dir.to_str().unwrap();
1866
1867 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1868 .inline_optimization_enabled(inline_optimization)
1869 .build()
1870 .await
1871 .unwrap();
1872
1873 let mut request = DescribeTableRequest::new();
1875 request.id = Some(vec!["nonexistent".to_string()]);
1876 let result = dir_namespace.describe_table(request).await;
1877 assert!(result.is_err());
1878
1879 let buffer = create_test_ipc_data();
1881 let mut create_request = CreateTableRequest::new();
1882 create_request.id = Some(vec!["test_table".to_string()]);
1883 dir_namespace
1884 .create_table(create_request, Bytes::from(buffer))
1885 .await
1886 .unwrap();
1887
1888 let mut request = DescribeTableRequest::new();
1890 request.id = Some(vec!["test_table".to_string()]);
1891 let response = dir_namespace.describe_table(request).await.unwrap();
1892 assert!(response.location.is_some());
1893 assert!(response.location.unwrap().contains("test_table"));
1894 }
1895
1896 #[rstest]
1897 #[case::with_optimization(true)]
1898 #[case::without_optimization(false)]
1899 #[tokio::test]
1900 async fn test_manifest_namespace_drop_table(#[case] inline_optimization: bool) {
1901 let temp_dir = TempStdDir::default();
1902 let temp_path = temp_dir.to_str().unwrap();
1903
1904 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1905 .inline_optimization_enabled(inline_optimization)
1906 .build()
1907 .await
1908 .unwrap();
1909
1910 let buffer = create_test_ipc_data();
1912 let mut create_request = CreateTableRequest::new();
1913 create_request.id = Some(vec!["test_table".to_string()]);
1914 dir_namespace
1915 .create_table(create_request, Bytes::from(buffer))
1916 .await
1917 .unwrap();
1918
1919 let mut request = ListTablesRequest::new();
1921 request.id = Some(vec![]);
1922 let response = dir_namespace.list_tables(request).await.unwrap();
1923 assert_eq!(response.tables.len(), 1);
1924
1925 let mut drop_request = DropTableRequest::new();
1927 drop_request.id = Some(vec!["test_table".to_string()]);
1928 let _response = dir_namespace.drop_table(drop_request).await.unwrap();
1929
1930 let mut request = ListTablesRequest::new();
1932 request.id = Some(vec![]);
1933 let response = dir_namespace.list_tables(request).await.unwrap();
1934 assert_eq!(response.tables.len(), 0);
1935 }
1936
1937 #[rstest]
1938 #[case::with_optimization(true)]
1939 #[case::without_optimization(false)]
1940 #[tokio::test]
1941 async fn test_manifest_namespace_multiple_tables(#[case] inline_optimization: bool) {
1942 let temp_dir = TempStdDir::default();
1943 let temp_path = temp_dir.to_str().unwrap();
1944
1945 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1946 .inline_optimization_enabled(inline_optimization)
1947 .build()
1948 .await
1949 .unwrap();
1950
1951 let buffer = create_test_ipc_data();
1953 for i in 1..=3 {
1954 let mut create_request = CreateTableRequest::new();
1955 create_request.id = Some(vec![format!("table{}", i)]);
1956 dir_namespace
1957 .create_table(create_request, Bytes::from(buffer.clone()))
1958 .await
1959 .unwrap();
1960 }
1961
1962 let mut request = ListTablesRequest::new();
1964 request.id = Some(vec![]);
1965 let response = dir_namespace.list_tables(request).await.unwrap();
1966 assert_eq!(response.tables.len(), 3);
1967 assert!(response.tables.contains(&"table1".to_string()));
1968 assert!(response.tables.contains(&"table2".to_string()));
1969 assert!(response.tables.contains(&"table3".to_string()));
1970 }
1971
1972 #[rstest]
1973 #[case::with_optimization(true)]
1974 #[case::without_optimization(false)]
1975 #[tokio::test]
1976 async fn test_directory_only_mode(#[case] inline_optimization: bool) {
1977 let temp_dir = TempStdDir::default();
1978 let temp_path = temp_dir.to_str().unwrap();
1979
1980 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1982 .manifest_enabled(false)
1983 .inline_optimization_enabled(inline_optimization)
1984 .build()
1985 .await
1986 .unwrap();
1987
1988 let mut request = ListTablesRequest::new();
1990 request.id = Some(vec![]);
1991 let response = dir_namespace.list_tables(request).await.unwrap();
1992 assert_eq!(response.tables.len(), 0);
1993
1994 let buffer = create_test_ipc_data();
1996 let mut create_request = CreateTableRequest::new();
1997 create_request.id = Some(vec!["test_table".to_string()]);
1998
1999 let _response = dir_namespace
2001 .create_table(create_request, Bytes::from(buffer))
2002 .await
2003 .unwrap();
2004
2005 let mut request = ListTablesRequest::new();
2007 request.id = Some(vec![]);
2008 let response = dir_namespace.list_tables(request).await.unwrap();
2009 assert_eq!(response.tables.len(), 1);
2010 assert_eq!(response.tables[0], "test_table");
2011 }
2012
2013 #[rstest]
2014 #[case::with_optimization(true)]
2015 #[case::without_optimization(false)]
2016 #[tokio::test]
2017 async fn test_dual_mode_merge(#[case] inline_optimization: bool) {
2018 let temp_dir = TempStdDir::default();
2019 let temp_path = temp_dir.to_str().unwrap();
2020
2021 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2023 .manifest_enabled(true)
2024 .dir_listing_enabled(true)
2025 .inline_optimization_enabled(inline_optimization)
2026 .build()
2027 .await
2028 .unwrap();
2029
2030 let buffer = create_test_ipc_data();
2032 let mut create_request = CreateTableRequest::new();
2033 create_request.id = Some(vec!["table1".to_string()]);
2034 dir_namespace
2035 .create_table(create_request, Bytes::from(buffer))
2036 .await
2037 .unwrap();
2038
2039 let mut request = ListTablesRequest::new();
2041 request.id = Some(vec![]);
2042 let response = dir_namespace.list_tables(request).await.unwrap();
2043 assert_eq!(response.tables.len(), 1);
2044 assert_eq!(response.tables[0], "table1");
2045 }
2046
2047 #[rstest]
2048 #[case::with_optimization(true)]
2049 #[case::without_optimization(false)]
2050 #[tokio::test]
2051 async fn test_manifest_only_mode(#[case] inline_optimization: bool) {
2052 let temp_dir = TempStdDir::default();
2053 let temp_path = temp_dir.to_str().unwrap();
2054
2055 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2057 .manifest_enabled(true)
2058 .dir_listing_enabled(false)
2059 .inline_optimization_enabled(inline_optimization)
2060 .build()
2061 .await
2062 .unwrap();
2063
2064 let buffer = create_test_ipc_data();
2066 let mut create_request = CreateTableRequest::new();
2067 create_request.id = Some(vec!["test_table".to_string()]);
2068 dir_namespace
2069 .create_table(create_request, Bytes::from(buffer))
2070 .await
2071 .unwrap();
2072
2073 let mut request = ListTablesRequest::new();
2075 request.id = Some(vec![]);
2076 let response = dir_namespace.list_tables(request).await.unwrap();
2077 assert_eq!(response.tables.len(), 1);
2078 assert_eq!(response.tables[0], "test_table");
2079 }
2080
2081 #[rstest]
2082 #[case::with_optimization(true)]
2083 #[case::without_optimization(false)]
2084 #[tokio::test]
2085 async fn test_drop_nonexistent_table(#[case] inline_optimization: bool) {
2086 let temp_dir = TempStdDir::default();
2087 let temp_path = temp_dir.to_str().unwrap();
2088
2089 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2090 .inline_optimization_enabled(inline_optimization)
2091 .build()
2092 .await
2093 .unwrap();
2094
2095 let mut drop_request = DropTableRequest::new();
2097 drop_request.id = Some(vec!["nonexistent".to_string()]);
2098 let result = dir_namespace.drop_table(drop_request).await;
2099 assert!(result.is_err());
2100 }
2101
2102 #[rstest]
2103 #[case::with_optimization(true)]
2104 #[case::without_optimization(false)]
2105 #[tokio::test]
2106 async fn test_create_duplicate_table_fails(#[case] inline_optimization: bool) {
2107 let temp_dir = TempStdDir::default();
2108 let temp_path = temp_dir.to_str().unwrap();
2109
2110 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2111 .inline_optimization_enabled(inline_optimization)
2112 .build()
2113 .await
2114 .unwrap();
2115
2116 let buffer = create_test_ipc_data();
2118 let mut create_request = CreateTableRequest::new();
2119 create_request.id = Some(vec!["test_table".to_string()]);
2120 dir_namespace
2121 .create_table(create_request, Bytes::from(buffer.clone()))
2122 .await
2123 .unwrap();
2124
2125 let mut create_request = CreateTableRequest::new();
2127 create_request.id = Some(vec!["test_table".to_string()]);
2128 let result = dir_namespace
2129 .create_table(create_request, Bytes::from(buffer))
2130 .await;
2131 assert!(result.is_err());
2132 }
2133
2134 #[rstest]
2135 #[case::with_optimization(true)]
2136 #[case::without_optimization(false)]
2137 #[tokio::test]
2138 async fn test_create_child_namespace(#[case] inline_optimization: bool) {
2139 use lance_namespace::models::{
2140 CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2141 };
2142
2143 let temp_dir = TempStdDir::default();
2144 let temp_path = temp_dir.to_str().unwrap();
2145
2146 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2147 .inline_optimization_enabled(inline_optimization)
2148 .build()
2149 .await
2150 .unwrap();
2151
2152 let mut create_req = CreateNamespaceRequest::new();
2154 create_req.id = Some(vec!["ns1".to_string()]);
2155 let result = dir_namespace.create_namespace(create_req).await;
2156 assert!(
2157 result.is_ok(),
2158 "Failed to create child namespace: {:?}",
2159 result.err()
2160 );
2161
2162 let exists_req = NamespaceExistsRequest {
2164 id: Some(vec!["ns1".to_string()]),
2165 };
2166 let result = dir_namespace.namespace_exists(exists_req).await;
2167 assert!(result.is_ok(), "Namespace should exist");
2168
2169 let list_req = ListNamespacesRequest {
2171 id: Some(vec![]),
2172 page_token: None,
2173 limit: None,
2174 };
2175 let result = dir_namespace.list_namespaces(list_req).await;
2176 assert!(result.is_ok());
2177 let namespaces = result.unwrap();
2178 assert_eq!(namespaces.namespaces.len(), 1);
2179 assert_eq!(namespaces.namespaces[0], "ns1");
2180 }
2181
2182 #[rstest]
2183 #[case::with_optimization(true)]
2184 #[case::without_optimization(false)]
2185 #[tokio::test]
2186 async fn test_create_nested_namespace(#[case] inline_optimization: bool) {
2187 use lance_namespace::models::{
2188 CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2189 };
2190
2191 let temp_dir = TempStdDir::default();
2192 let temp_path = temp_dir.to_str().unwrap();
2193
2194 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2195 .inline_optimization_enabled(inline_optimization)
2196 .build()
2197 .await
2198 .unwrap();
2199
2200 let mut create_req = CreateNamespaceRequest::new();
2202 create_req.id = Some(vec!["parent".to_string()]);
2203 dir_namespace.create_namespace(create_req).await.unwrap();
2204
2205 let mut create_req = CreateNamespaceRequest::new();
2207 create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2208 let result = dir_namespace.create_namespace(create_req).await;
2209 assert!(
2210 result.is_ok(),
2211 "Failed to create nested namespace: {:?}",
2212 result.err()
2213 );
2214
2215 let exists_req = NamespaceExistsRequest {
2217 id: Some(vec!["parent".to_string(), "child".to_string()]),
2218 };
2219 let result = dir_namespace.namespace_exists(exists_req).await;
2220 assert!(result.is_ok(), "Nested namespace should exist");
2221
2222 let list_req = ListNamespacesRequest {
2224 id: Some(vec!["parent".to_string()]),
2225 page_token: None,
2226 limit: None,
2227 };
2228 let result = dir_namespace.list_namespaces(list_req).await;
2229 assert!(result.is_ok());
2230 let namespaces = result.unwrap();
2231 assert_eq!(namespaces.namespaces.len(), 1);
2232 assert_eq!(namespaces.namespaces[0], "child");
2233 }
2234
2235 #[rstest]
2236 #[case::with_optimization(true)]
2237 #[case::without_optimization(false)]
2238 #[tokio::test]
2239 async fn test_create_namespace_without_parent_fails(#[case] inline_optimization: bool) {
2240 use lance_namespace::models::CreateNamespaceRequest;
2241
2242 let temp_dir = TempStdDir::default();
2243 let temp_path = temp_dir.to_str().unwrap();
2244
2245 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2246 .inline_optimization_enabled(inline_optimization)
2247 .build()
2248 .await
2249 .unwrap();
2250
2251 let mut create_req = CreateNamespaceRequest::new();
2253 create_req.id = Some(vec!["nonexistent_parent".to_string(), "child".to_string()]);
2254 let result = dir_namespace.create_namespace(create_req).await;
2255 assert!(result.is_err(), "Should fail when parent doesn't exist");
2256 }
2257
2258 #[rstest]
2259 #[case::with_optimization(true)]
2260 #[case::without_optimization(false)]
2261 #[tokio::test]
2262 async fn test_drop_child_namespace(#[case] inline_optimization: bool) {
2263 use lance_namespace::models::{
2264 CreateNamespaceRequest, DropNamespaceRequest, NamespaceExistsRequest,
2265 };
2266
2267 let temp_dir = TempStdDir::default();
2268 let temp_path = temp_dir.to_str().unwrap();
2269
2270 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2271 .inline_optimization_enabled(inline_optimization)
2272 .build()
2273 .await
2274 .unwrap();
2275
2276 let mut create_req = CreateNamespaceRequest::new();
2278 create_req.id = Some(vec!["ns1".to_string()]);
2279 dir_namespace.create_namespace(create_req).await.unwrap();
2280
2281 let mut drop_req = DropNamespaceRequest::new();
2283 drop_req.id = Some(vec!["ns1".to_string()]);
2284 let result = dir_namespace.drop_namespace(drop_req).await;
2285 assert!(
2286 result.is_ok(),
2287 "Failed to drop namespace: {:?}",
2288 result.err()
2289 );
2290
2291 let exists_req = NamespaceExistsRequest {
2293 id: Some(vec!["ns1".to_string()]),
2294 };
2295 let result = dir_namespace.namespace_exists(exists_req).await;
2296 assert!(result.is_err(), "Namespace should not exist after drop");
2297 }
2298
2299 #[rstest]
2300 #[case::with_optimization(true)]
2301 #[case::without_optimization(false)]
2302 #[tokio::test]
2303 async fn test_drop_namespace_with_children_fails(#[case] inline_optimization: bool) {
2304 use lance_namespace::models::{CreateNamespaceRequest, DropNamespaceRequest};
2305
2306 let temp_dir = TempStdDir::default();
2307 let temp_path = temp_dir.to_str().unwrap();
2308
2309 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2310 .inline_optimization_enabled(inline_optimization)
2311 .build()
2312 .await
2313 .unwrap();
2314
2315 let mut create_req = CreateNamespaceRequest::new();
2317 create_req.id = Some(vec!["parent".to_string()]);
2318 dir_namespace.create_namespace(create_req).await.unwrap();
2319
2320 let mut create_req = CreateNamespaceRequest::new();
2321 create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2322 dir_namespace.create_namespace(create_req).await.unwrap();
2323
2324 let mut drop_req = DropNamespaceRequest::new();
2326 drop_req.id = Some(vec!["parent".to_string()]);
2327 let result = dir_namespace.drop_namespace(drop_req).await;
2328 assert!(result.is_err(), "Should fail when namespace has children");
2329 }
2330
2331 #[rstest]
2332 #[case::with_optimization(true)]
2333 #[case::without_optimization(false)]
2334 #[tokio::test]
2335 async fn test_create_table_in_child_namespace(#[case] inline_optimization: bool) {
2336 use lance_namespace::models::{
2337 CreateNamespaceRequest, CreateTableRequest, ListTablesRequest,
2338 };
2339
2340 let temp_dir = TempStdDir::default();
2341 let temp_path = temp_dir.to_str().unwrap();
2342
2343 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2344 .inline_optimization_enabled(inline_optimization)
2345 .build()
2346 .await
2347 .unwrap();
2348
2349 let mut create_ns_req = CreateNamespaceRequest::new();
2351 create_ns_req.id = Some(vec!["ns1".to_string()]);
2352 dir_namespace.create_namespace(create_ns_req).await.unwrap();
2353
2354 let buffer = create_test_ipc_data();
2356 let mut create_table_req = CreateTableRequest::new();
2357 create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2358 let result = dir_namespace
2359 .create_table(create_table_req, Bytes::from(buffer))
2360 .await;
2361 assert!(
2362 result.is_ok(),
2363 "Failed to create table in child namespace: {:?}",
2364 result.err()
2365 );
2366
2367 let list_req = ListTablesRequest {
2369 id: Some(vec!["ns1".to_string()]),
2370 page_token: None,
2371 limit: None,
2372 };
2373 let result = dir_namespace.list_tables(list_req).await;
2374 assert!(result.is_ok());
2375 let tables = result.unwrap();
2376 assert_eq!(tables.tables.len(), 1);
2377 assert_eq!(tables.tables[0], "table1");
2378 }
2379
2380 #[rstest]
2381 #[case::with_optimization(true)]
2382 #[case::without_optimization(false)]
2383 #[tokio::test]
2384 async fn test_describe_child_namespace(#[case] inline_optimization: bool) {
2385 use lance_namespace::models::{CreateNamespaceRequest, DescribeNamespaceRequest};
2386
2387 let temp_dir = TempStdDir::default();
2388 let temp_path = temp_dir.to_str().unwrap();
2389
2390 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2391 .inline_optimization_enabled(inline_optimization)
2392 .build()
2393 .await
2394 .unwrap();
2395
2396 let mut properties = std::collections::HashMap::new();
2398 properties.insert("key1".to_string(), "value1".to_string());
2399
2400 let mut create_req = CreateNamespaceRequest::new();
2401 create_req.id = Some(vec!["ns1".to_string()]);
2402 create_req.properties = Some(properties.clone());
2403 dir_namespace.create_namespace(create_req).await.unwrap();
2404
2405 let describe_req = DescribeNamespaceRequest {
2407 id: Some(vec!["ns1".to_string()]),
2408 };
2409 let result = dir_namespace.describe_namespace(describe_req).await;
2410 assert!(
2411 result.is_ok(),
2412 "Failed to describe namespace: {:?}",
2413 result.err()
2414 );
2415 let response = result.unwrap();
2416 assert!(response.properties.is_some());
2417 assert_eq!(
2418 response.properties.unwrap().get("key1"),
2419 Some(&"value1".to_string())
2420 );
2421 }
2422}