1use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray};
10use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
11use arrow_ipc::reader::StreamReader;
12use async_trait::async_trait;
13use bytes::Bytes;
14use futures::stream::StreamExt;
15use lance::dataset::optimize::{compact_files, CompactionOptions};
16use lance::dataset::{builder::DatasetBuilder, WriteParams};
17use lance::session::Session;
18use lance::{dataset::scanner::Scanner, Dataset};
19use lance_core::{box_error, Error, Result};
20use lance_index::optimize::OptimizeOptions;
21use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
22use lance_index::traits::DatasetIndexExt;
23use lance_index::IndexType;
24use lance_io::object_store::{ObjectStore, ObjectStoreParams};
25use lance_namespace::models::{
26 CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
27 CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DeclareTableRequest,
28 DeclareTableResponse, DeregisterTableRequest, DeregisterTableResponse,
29 DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
30 DescribeTableResponse, DropNamespaceRequest, DropNamespaceResponse, DropTableRequest,
31 DropTableResponse, ListNamespacesRequest, ListNamespacesResponse, ListTablesRequest,
32 ListTablesResponse, NamespaceExistsRequest, RegisterTableRequest, RegisterTableResponse,
33 TableExistsRequest,
34};
35use lance_namespace::schema::arrow_schema_to_json;
36use lance_namespace::LanceNamespace;
37use object_store::path::Path;
38use snafu::location;
39use std::io::Cursor;
40use std::{
41 collections::HashMap,
42 hash::{DefaultHasher, Hash, Hasher},
43 ops::{Deref, DerefMut},
44 sync::Arc,
45};
46use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
47
48const MANIFEST_TABLE_NAME: &str = "__manifest";
49const DELIMITER: &str = "$";
50
51const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
54const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
56const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum ObjectType {
62 Namespace,
63 Table,
64}
65
66impl ObjectType {
67 pub fn as_str(&self) -> &str {
68 match self {
69 Self::Namespace => "namespace",
70 Self::Table => "table",
71 }
72 }
73
74 pub fn parse(s: &str) -> Result<Self> {
75 match s {
76 "namespace" => Ok(Self::Namespace),
77 "table" => Ok(Self::Table),
78 _ => Err(Error::io(
79 format!("Invalid object type: {}", s),
80 location!(),
81 )),
82 }
83 }
84}
85
86#[derive(Debug, Clone)]
88pub struct TableInfo {
89 pub namespace: Vec<String>,
90 pub name: String,
91 pub location: String,
92}
93
94#[derive(Debug, Clone)]
96pub struct NamespaceInfo {
97 pub namespace: Vec<String>,
98 pub name: String,
99 pub metadata: Option<HashMap<String, String>>,
100}
101
102#[derive(Debug, Clone)]
107pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);
108
109impl DatasetConsistencyWrapper {
110 pub fn new(dataset: Dataset) -> Self {
112 Self(Arc::new(RwLock::new(dataset)))
113 }
114
115 pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
118 self.reload().await?;
119 Ok(DatasetReadGuard {
120 guard: self.0.read().await,
121 })
122 }
123
124 pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
127 self.reload().await?;
128 Ok(DatasetWriteGuard {
129 guard: self.0.write().await,
130 })
131 }
132
133 pub async fn set_latest(&self, dataset: Dataset) {
138 let mut write_guard = self.0.write().await;
139 if dataset.manifest().version > write_guard.manifest().version {
140 *write_guard = dataset;
141 }
142 }
143
144 async fn reload(&self) -> Result<()> {
146 let read_guard = self.0.read().await;
148 let dataset_uri = read_guard.uri().to_string();
149 let current_version = read_guard.version().version;
150 log::debug!(
151 "Reload starting for uri={}, current_version={}",
152 dataset_uri,
153 current_version
154 );
155 let latest_version = read_guard
156 .latest_version_id()
157 .await
158 .map_err(|e| Error::IO {
159 source: box_error(std::io::Error::other(format!(
160 "Failed to get latest version: {}",
161 e
162 ))),
163 location: location!(),
164 })?;
165 log::debug!(
166 "Reload got latest_version={} for uri={}, current_version={}",
167 latest_version,
168 dataset_uri,
169 current_version
170 );
171 drop(read_guard);
172
173 if latest_version == current_version {
175 log::debug!("Already up-to-date for uri={}", dataset_uri);
176 return Ok(());
177 }
178
179 let mut write_guard = self.0.write().await;
181
182 let latest_version = write_guard
184 .latest_version_id()
185 .await
186 .map_err(|e| Error::IO {
187 source: box_error(std::io::Error::other(format!(
188 "Failed to get latest version: {}",
189 e
190 ))),
191 location: location!(),
192 })?;
193
194 if latest_version != write_guard.version().version {
195 write_guard.checkout_latest().await.map_err(|e| Error::IO {
196 source: box_error(std::io::Error::other(format!(
197 "Failed to checkout latest: {}",
198 e
199 ))),
200 location: location!(),
201 })?;
202 }
203
204 Ok(())
205 }
206}
207
208pub struct DatasetReadGuard<'a> {
209 guard: RwLockReadGuard<'a, Dataset>,
210}
211
212impl Deref for DatasetReadGuard<'_> {
213 type Target = Dataset;
214
215 fn deref(&self) -> &Self::Target {
216 &self.guard
217 }
218}
219
220pub struct DatasetWriteGuard<'a> {
221 guard: RwLockWriteGuard<'a, Dataset>,
222}
223
224impl Deref for DatasetWriteGuard<'_> {
225 type Target = Dataset;
226
227 fn deref(&self) -> &Self::Target {
228 &self.guard
229 }
230}
231
232impl DerefMut for DatasetWriteGuard<'_> {
233 fn deref_mut(&mut self) -> &mut Self::Target {
234 &mut self.guard
235 }
236}
237
238#[derive(Debug)]
242pub struct ManifestNamespace {
243 root: String,
244 storage_options: Option<HashMap<String, String>>,
245 #[allow(dead_code)]
246 session: Option<Arc<Session>>,
247 #[allow(dead_code)]
248 object_store: Arc<ObjectStore>,
249 #[allow(dead_code)]
250 base_path: Path,
251 manifest_dataset: DatasetConsistencyWrapper,
252 dir_listing_enabled: bool,
256 inline_optimization_enabled: bool,
259}
260
261impl ManifestNamespace {
262 pub async fn from_directory(
264 root: String,
265 storage_options: Option<HashMap<String, String>>,
266 session: Option<Arc<Session>>,
267 object_store: Arc<ObjectStore>,
268 base_path: Path,
269 dir_listing_enabled: bool,
270 inline_optimization_enabled: bool,
271 ) -> Result<Self> {
272 let manifest_dataset =
273 Self::create_or_get_manifest(&root, &storage_options, session.clone()).await?;
274
275 Ok(Self {
276 root,
277 storage_options,
278 session,
279 object_store,
280 base_path,
281 manifest_dataset,
282 dir_listing_enabled,
283 inline_optimization_enabled,
284 })
285 }
286
287 pub fn build_object_id(namespace: &[String], name: &str) -> String {
289 if namespace.is_empty() {
290 name.to_string()
291 } else {
292 let mut id = namespace.join(DELIMITER);
293 id.push_str(DELIMITER);
294 id.push_str(name);
295 id
296 }
297 }
298
299 pub fn parse_object_id(object_id: &str) -> (Vec<String>, String) {
301 let parts: Vec<&str> = object_id.split(DELIMITER).collect();
302 if parts.len() == 1 {
303 (Vec::new(), parts[0].to_string())
304 } else {
305 let namespace = parts[..parts.len() - 1]
306 .iter()
307 .map(|s| s.to_string())
308 .collect();
309 let name = parts[parts.len() - 1].to_string();
310 (namespace, name)
311 }
312 }
313
314 fn split_object_id(table_id: &[String]) -> (Vec<String>, String) {
316 if table_id.len() == 1 {
317 (vec![], table_id[0].clone())
318 } else {
319 (
320 table_id[..table_id.len() - 1].to_vec(),
321 table_id[table_id.len() - 1].clone(),
322 )
323 }
324 }
325
326 fn str_object_id(table_id: &[String]) -> String {
328 table_id.join(DELIMITER)
329 }
330
331 fn generate_dir_name(object_id: &str) -> String {
338 let random_num: u64 = rand::random();
340
341 let mut hasher = DefaultHasher::new();
343 random_num.hash(&mut hasher);
344 object_id.hash(&mut hasher);
345 let hash = hasher.finish();
346
347 format!("{:08x}_{}", (hash & 0xFFFFFFFF) as u32, object_id)
349 }
350
351 pub(crate) fn construct_full_uri(root: &str, relative_location: &str) -> Result<String> {
353 let mut base_url = lance_io::object_store::uri_to_url(root)?;
354
355 if !base_url.path().ends_with('/') {
360 base_url.set_path(&format!("{}/", base_url.path()));
361 }
362
363 let full_url = base_url
364 .join(relative_location)
365 .map_err(|e| Error::InvalidInput {
366 source: format!(
367 "Failed to join URI '{}' with '{}': {:?}",
368 root, relative_location, e
369 )
370 .into(),
371 location: location!(),
372 })?;
373
374 Ok(full_url.to_string())
375 }
376
377 async fn run_inline_optimization(&self) -> Result<()> {
389 if !self.inline_optimization_enabled {
390 return Ok(());
391 }
392
393 let mut dataset_guard = self.manifest_dataset.get_mut().await?;
395 let dataset: &mut Dataset = &mut dataset_guard;
396
397 let indices = dataset.load_indices().await?;
399
400 let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
402 let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
403 let has_base_objects_index = indices
404 .iter()
405 .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);
406
407 if !has_object_id_index {
409 log::debug!(
410 "Creating BTREE index '{}' on object_id for __manifest table",
411 OBJECT_ID_INDEX_NAME
412 );
413 let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
414 if let Err(e) = dataset
415 .create_index(
416 &["object_id"],
417 IndexType::BTree,
418 Some(OBJECT_ID_INDEX_NAME.to_string()),
419 ¶ms,
420 true,
421 )
422 .await
423 {
424 log::warn!("Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.", e);
425 } else {
426 log::info!(
427 "Created BTREE index '{}' on object_id for __manifest table",
428 OBJECT_ID_INDEX_NAME
429 );
430 }
431 }
432
433 if !has_object_type_index {
435 log::debug!(
436 "Creating Bitmap index '{}' on object_type for __manifest table",
437 OBJECT_TYPE_INDEX_NAME
438 );
439 let params = ScalarIndexParams::default();
440 if let Err(e) = dataset
441 .create_index(
442 &["object_type"],
443 IndexType::Bitmap,
444 Some(OBJECT_TYPE_INDEX_NAME.to_string()),
445 ¶ms,
446 true,
447 )
448 .await
449 {
450 log::warn!("Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.", e);
451 } else {
452 log::info!(
453 "Created Bitmap index '{}' on object_type for __manifest table",
454 OBJECT_TYPE_INDEX_NAME
455 );
456 }
457 }
458
459 if !has_base_objects_index {
461 log::debug!(
462 "Creating LabelList index '{}' on base_objects for __manifest table",
463 BASE_OBJECTS_INDEX_NAME
464 );
465 let params = ScalarIndexParams::default();
466 if let Err(e) = dataset
467 .create_index(
468 &["base_objects"],
469 IndexType::LabelList,
470 Some(BASE_OBJECTS_INDEX_NAME.to_string()),
471 ¶ms,
472 true,
473 )
474 .await
475 {
476 log::warn!("Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.", e);
477 } else {
478 log::info!(
479 "Created LabelList index '{}' on base_objects for __manifest table",
480 BASE_OBJECTS_INDEX_NAME
481 );
482 }
483 }
484
485 log::debug!("Running file compaction on __manifest table");
487 match compact_files(dataset, CompactionOptions::default(), None).await {
488 Ok(compaction_metrics) => {
489 if compaction_metrics.fragments_removed > 0 {
490 log::info!(
491 "Compacted __manifest table: removed {} fragments, added {} fragments",
492 compaction_metrics.fragments_removed,
493 compaction_metrics.fragments_added
494 );
495 }
496 }
497 Err(e) => {
498 log::warn!("Failed to compact files for __manifest table: {:?}. Continuing with optimization.", e);
499 }
500 }
501
502 log::debug!("Optimizing indices on __manifest table");
504 match dataset.optimize_indices(&OptimizeOptions::default()).await {
505 Ok(_) => {
506 log::info!("Successfully optimized indices on __manifest table");
507 }
508 Err(e) => {
509 log::warn!(
510 "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.",
511 e
512 );
513 }
514 }
515
516 Ok(())
517 }
518
519 fn manifest_schema() -> Arc<ArrowSchema> {
521 Arc::new(ArrowSchema::new(vec![
522 Field::new("object_id", DataType::Utf8, false),
523 Field::new("object_type", DataType::Utf8, false),
524 Field::new("location", DataType::Utf8, true),
525 Field::new("metadata", DataType::Utf8, true),
526 Field::new(
527 "base_objects",
528 DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))),
529 true,
530 ),
531 ]))
532 }
533
534 async fn manifest_scanner(&self) -> Result<Scanner> {
536 let dataset_guard = self.manifest_dataset.get().await?;
537 Ok(dataset_guard.scan())
538 }
539
540 async fn execute_scanner(scanner: Scanner) -> Result<Vec<RecordBatch>> {
542 let mut stream = scanner.try_into_stream().await.map_err(|e| Error::IO {
543 source: box_error(std::io::Error::other(format!(
544 "Failed to create stream: {}",
545 e
546 ))),
547 location: location!(),
548 })?;
549
550 let mut batches = Vec::new();
551 while let Some(batch) = stream.next().await {
552 batches.push(batch.map_err(|e| Error::IO {
553 source: box_error(std::io::Error::other(format!(
554 "Failed to read batch: {}",
555 e
556 ))),
557 location: location!(),
558 })?);
559 }
560
561 Ok(batches)
562 }
563
564 fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> {
566 let column = batch
567 .column_by_name(column_name)
568 .ok_or_else(|| Error::io(format!("Column '{}' not found", column_name), location!()))?;
569 column
570 .as_any()
571 .downcast_ref::<StringArray>()
572 .ok_or_else(|| {
573 Error::io(
574 format!("Column '{}' is not a string array", column_name),
575 location!(),
576 )
577 })
578 }
579
580 async fn manifest_contains_object(&self, object_id: &str) -> Result<bool> {
582 let filter = format!("object_id = '{}'", object_id);
583
584 let dataset_guard = self.manifest_dataset.get().await?;
585 let mut scanner = dataset_guard.scan();
586
587 scanner.filter(&filter).map_err(|e| Error::IO {
588 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
589 location: location!(),
590 })?;
591
592 scanner.project::<&str>(&[]).map_err(|e| Error::IO {
594 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
595 location: location!(),
596 })?;
597
598 scanner.with_row_id();
599
600 let count = scanner.count_rows().await.map_err(|e| Error::IO {
601 source: box_error(std::io::Error::other(format!(
602 "Failed to count rows: {}",
603 e
604 ))),
605 location: location!(),
606 })?;
607
608 Ok(count > 0)
609 }
610
611 async fn query_manifest_for_table(&self, object_id: &str) -> Result<Option<TableInfo>> {
613 let filter = format!("object_id = '{}' AND object_type = 'table'", object_id);
614 let mut scanner = self.manifest_scanner().await?;
615 scanner.filter(&filter).map_err(|e| Error::IO {
616 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
617 location: location!(),
618 })?;
619 scanner
620 .project(&["object_id", "location"])
621 .map_err(|e| Error::IO {
622 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
623 location: location!(),
624 })?;
625 let batches = Self::execute_scanner(scanner).await?;
626
627 let mut found_result: Option<TableInfo> = None;
628 let mut total_rows = 0;
629
630 for batch in batches {
631 if batch.num_rows() == 0 {
632 continue;
633 }
634
635 total_rows += batch.num_rows();
636 if total_rows > 1 {
637 return Err(Error::io(
638 format!(
639 "Expected exactly 1 table with id '{}', found {}",
640 object_id, total_rows
641 ),
642 location!(),
643 ));
644 }
645
646 let object_id_array = Self::get_string_column(&batch, "object_id")?;
647 let location_array = Self::get_string_column(&batch, "location")?;
648 let location = location_array.value(0).to_string();
649 let (namespace, name) = Self::parse_object_id(object_id_array.value(0));
650 found_result = Some(TableInfo {
651 namespace,
652 name,
653 location,
654 });
655 }
656
657 Ok(found_result)
658 }
659
660 pub async fn list_manifest_table_locations(&self) -> Result<std::collections::HashSet<String>> {
663 let filter = "object_type = 'table' AND NOT contains(object_id, '$')";
664 let mut scanner = self.manifest_scanner().await?;
665 scanner.filter(filter).map_err(|e| Error::IO {
666 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
667 location: location!(),
668 })?;
669 scanner.project(&["location"]).map_err(|e| Error::IO {
670 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
671 location: location!(),
672 })?;
673
674 let batches = Self::execute_scanner(scanner).await?;
675 let mut locations = std::collections::HashSet::new();
676
677 for batch in batches {
678 if batch.num_rows() == 0 {
679 continue;
680 }
681 let location_array = Self::get_string_column(&batch, "location")?;
682 for i in 0..location_array.len() {
683 locations.insert(location_array.value(i).to_string());
684 }
685 }
686
687 Ok(locations)
688 }
689
690 async fn insert_into_manifest(
692 &self,
693 object_id: String,
694 object_type: ObjectType,
695 location: Option<String>,
696 ) -> Result<()> {
697 self.insert_into_manifest_with_metadata(object_id, object_type, location, None, None)
698 .await
699 }
700
701 async fn insert_into_manifest_with_metadata(
703 &self,
704 object_id: String,
705 object_type: ObjectType,
706 location: Option<String>,
707 metadata: Option<String>,
708 base_objects: Option<Vec<String>>,
709 ) -> Result<()> {
710 use arrow::array::builder::{ListBuilder, StringBuilder};
711
712 let schema = Self::manifest_schema();
713
714 let string_builder = StringBuilder::new();
716 let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
717 "object_id",
718 DataType::Utf8,
719 true,
720 )));
721
722 match base_objects {
723 Some(objects) => {
724 for obj in objects {
725 list_builder.values().append_value(obj);
726 }
727 list_builder.append(true);
728 }
729 None => {
730 list_builder.append_null();
731 }
732 }
733
734 let base_objects_array = list_builder.finish();
735
736 let location_array = match location {
738 Some(loc) => Arc::new(StringArray::from(vec![Some(loc)])),
739 None => Arc::new(StringArray::from(vec![None::<String>])),
740 };
741
742 let metadata_array = match metadata {
743 Some(meta) => Arc::new(StringArray::from(vec![Some(meta)])),
744 None => Arc::new(StringArray::from(vec![None::<String>])),
745 };
746
747 let batch = RecordBatch::try_new(
748 schema.clone(),
749 vec![
750 Arc::new(StringArray::from(vec![object_id.as_str()])),
751 Arc::new(StringArray::from(vec![object_type.as_str()])),
752 location_array,
753 metadata_array,
754 Arc::new(base_objects_array),
755 ],
756 )
757 .map_err(|e| {
758 Error::io(
759 format!("Failed to create manifest entry: {}", e),
760 location!(),
761 )
762 })?;
763
764 let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
765
766 let dataset_guard = self.manifest_dataset.get().await?;
768 let dataset_arc = Arc::new(dataset_guard.clone());
769 drop(dataset_guard); let mut merge_builder =
772 lance::dataset::MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()])
773 .map_err(|e| Error::IO {
774 source: box_error(std::io::Error::other(format!(
775 "Failed to create merge builder: {}",
776 e
777 ))),
778 location: location!(),
779 })?;
780
781 merge_builder.when_matched(lance::dataset::WhenMatched::Fail);
782 merge_builder.when_not_matched(lance::dataset::WhenNotMatched::InsertAll);
783
784 let (new_dataset_arc, _merge_stats) = merge_builder
785 .try_build()
786 .map_err(|e| Error::IO {
787 source: box_error(std::io::Error::other(format!(
788 "Failed to build merge: {}",
789 e
790 ))),
791 location: location!(),
792 })?
793 .execute_reader(Box::new(reader))
794 .await
795 .map_err(|e| {
796 let error_msg = e.to_string();
798 if error_msg.contains("matched")
799 || error_msg.contains("duplicate")
800 || error_msg.contains("already exists")
801 {
802 Error::io(
803 format!("Object with id '{}' already exists in manifest", object_id),
804 location!(),
805 )
806 } else {
807 Error::IO {
808 source: box_error(std::io::Error::other(format!(
809 "Failed to execute merge: {}",
810 e
811 ))),
812 location: location!(),
813 }
814 }
815 })?;
816
817 let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
818 self.manifest_dataset.set_latest(new_dataset).await;
819
820 if let Err(e) = self.run_inline_optimization().await {
822 log::warn!(
823 "Unexpected failure when running inline optimization: {:?}",
824 e
825 );
826 }
827
828 Ok(())
829 }
830
831 pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> {
833 {
834 let predicate = format!("object_id = '{}'", object_id);
835 let mut dataset_guard = self.manifest_dataset.get_mut().await?;
836 dataset_guard
837 .delete(&predicate)
838 .await
839 .map_err(|e| Error::IO {
840 source: box_error(std::io::Error::other(format!("Failed to delete: {}", e))),
841 location: location!(),
842 })?;
843 } self.manifest_dataset.reload().await?;
846
847 if let Err(e) = self.run_inline_optimization().await {
849 log::warn!(
850 "Unexpected failure when running inline optimization: {:?}",
851 e
852 );
853 }
854
855 Ok(())
856 }
857
858 pub async fn register_table(&self, name: &str, location: String) -> Result<()> {
860 let object_id = Self::build_object_id(&[], name);
861 if self.manifest_contains_object(&object_id).await? {
862 return Err(Error::io(
863 format!("Table '{}' already exists", name),
864 location!(),
865 ));
866 }
867
868 self.insert_into_manifest(object_id, ObjectType::Table, Some(location))
869 .await
870 }
871
872 async fn validate_namespace_levels_exist(&self, namespace_path: &[String]) -> Result<()> {
874 for i in 1..=namespace_path.len() {
875 let partial_path = &namespace_path[..i];
876 let object_id = partial_path.join(DELIMITER);
877 if !self.manifest_contains_object(&object_id).await? {
878 return Err(Error::Namespace {
879 source: format!("Parent namespace '{}' does not exist", object_id).into(),
880 location: location!(),
881 });
882 }
883 }
884 Ok(())
885 }
886
887 async fn query_manifest_for_namespace(&self, object_id: &str) -> Result<Option<NamespaceInfo>> {
889 let filter = format!("object_id = '{}' AND object_type = 'namespace'", object_id);
890 let mut scanner = self.manifest_scanner().await?;
891 scanner.filter(&filter).map_err(|e| Error::IO {
892 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
893 location: location!(),
894 })?;
895 scanner
896 .project(&["object_id", "metadata"])
897 .map_err(|e| Error::IO {
898 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
899 location: location!(),
900 })?;
901 let batches = Self::execute_scanner(scanner).await?;
902
903 let mut found_result: Option<NamespaceInfo> = None;
904 let mut total_rows = 0;
905
906 for batch in batches {
907 if batch.num_rows() == 0 {
908 continue;
909 }
910
911 total_rows += batch.num_rows();
912 if total_rows > 1 {
913 return Err(Error::io(
914 format!(
915 "Expected exactly 1 namespace with id '{}', found {}",
916 object_id, total_rows
917 ),
918 location!(),
919 ));
920 }
921
922 let object_id_array = Self::get_string_column(&batch, "object_id")?;
923 let metadata_array = Self::get_string_column(&batch, "metadata")?;
924
925 let object_id_str = object_id_array.value(0);
926 let metadata = if !metadata_array.is_null(0) {
927 let metadata_str = metadata_array.value(0);
928 match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
929 Ok(map) => Some(map),
930 Err(e) => {
931 return Err(Error::io(
932 format!(
933 "Failed to deserialize metadata for namespace '{}': {}",
934 object_id, e
935 ),
936 location!(),
937 ));
938 }
939 }
940 } else {
941 None
942 };
943
944 let (namespace, name) = Self::parse_object_id(object_id_str);
945 found_result = Some(NamespaceInfo {
946 namespace,
947 name,
948 metadata,
949 });
950 }
951
952 Ok(found_result)
953 }
954
955 async fn create_or_get_manifest(
957 root: &str,
958 storage_options: &Option<HashMap<String, String>>,
959 session: Option<Arc<Session>>,
960 ) -> Result<DatasetConsistencyWrapper> {
961 let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME);
962 log::debug!("Attempting to load manifest from {}", manifest_path);
963 let mut builder = DatasetBuilder::from_uri(&manifest_path);
964
965 if let Some(sess) = session.clone() {
966 builder = builder.with_session(sess);
967 }
968
969 if let Some(opts) = storage_options {
970 builder = builder.with_storage_options(opts.clone());
971 }
972
973 let dataset_result = builder.load().await;
974 if let Ok(dataset) = dataset_result {
975 Ok(DatasetConsistencyWrapper::new(dataset))
976 } else {
977 log::info!("Creating new manifest table at {}", manifest_path);
978 let schema = Self::manifest_schema();
979 let empty_batch = RecordBatch::new_empty(schema.clone());
980 let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone());
981
982 let write_params = WriteParams {
983 session,
984 store_params: storage_options.as_ref().map(|opts| ObjectStoreParams {
985 storage_options_accessor: Some(Arc::new(
986 lance_io::object_store::StorageOptionsAccessor::with_static_options(
987 opts.clone(),
988 ),
989 )),
990 ..Default::default()
991 }),
992 ..Default::default()
993 };
994
995 let dataset = Dataset::write(Box::new(reader), &manifest_path, Some(write_params))
996 .await
997 .map_err(|e| Error::IO {
998 source: box_error(std::io::Error::other(format!(
999 "Failed to create manifest dataset: {}",
1000 e
1001 ))),
1002 location: location!(),
1003 })?;
1004
1005 log::info!(
1006 "Successfully created manifest table at {}, version={}, uri={}",
1007 manifest_path,
1008 dataset.version().version,
1009 dataset.uri()
1010 );
1011 Ok(DatasetConsistencyWrapper::new(dataset))
1012 }
1013 }
1014}
1015
1016#[async_trait]
1017impl LanceNamespace for ManifestNamespace {
1018 fn namespace_id(&self) -> String {
1019 self.root.clone()
1020 }
1021
1022 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1023 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1024 source: "Namespace ID is required".into(),
1025 location: location!(),
1026 })?;
1027
1028 let filter = if namespace_id.is_empty() {
1030 "object_type = 'table' AND NOT contains(object_id, '$')".to_string()
1032 } else {
1033 let prefix = namespace_id.join(DELIMITER);
1035 format!(
1036 "object_type = 'table' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1037 prefix, DELIMITER, prefix.len() + 2
1038 )
1039 };
1040
1041 let mut scanner = self.manifest_scanner().await?;
1042 scanner.filter(&filter).map_err(|e| Error::IO {
1043 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1044 location: location!(),
1045 })?;
1046 scanner.project(&["object_id"]).map_err(|e| Error::IO {
1047 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1048 location: location!(),
1049 })?;
1050
1051 let batches = Self::execute_scanner(scanner).await?;
1052
1053 let mut tables = Vec::new();
1054 for batch in batches {
1055 if batch.num_rows() == 0 {
1056 continue;
1057 }
1058
1059 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1060 for i in 0..batch.num_rows() {
1061 let object_id = object_id_array.value(i);
1062 let (_namespace, name) = Self::parse_object_id(object_id);
1063 tables.push(name);
1064 }
1065 }
1066
1067 Ok(ListTablesResponse::new(tables))
1068 }
1069
1070 async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1071 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1072 source: "Table ID is required".into(),
1073 location: location!(),
1074 })?;
1075
1076 if table_id.is_empty() {
1077 return Err(Error::InvalidInput {
1078 source: "Table ID cannot be empty".into(),
1079 location: location!(),
1080 });
1081 }
1082
1083 let object_id = Self::str_object_id(table_id);
1084 let table_info = self.query_manifest_for_table(&object_id).await?;
1085
1086 let table_name = table_id.last().cloned().unwrap_or_default();
1088 let namespace_id: Vec<String> = if table_id.len() > 1 {
1089 table_id[..table_id.len() - 1].to_vec()
1090 } else {
1091 vec![]
1092 };
1093
1094 let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
1095 let vend_credentials = request.vend_credentials.unwrap_or(true);
1097
1098 match table_info {
1099 Some(info) => {
1100 let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1102
1103 let storage_options = if vend_credentials {
1104 self.storage_options.clone()
1105 } else {
1106 None
1107 };
1108
1109 if !load_detailed_metadata {
1111 return Ok(DescribeTableResponse {
1112 table: Some(table_name),
1113 namespace: Some(namespace_id),
1114 location: Some(table_uri.clone()),
1115 table_uri: Some(table_uri),
1116 storage_options,
1117 ..Default::default()
1118 });
1119 }
1120
1121 match Dataset::open(&table_uri).await {
1123 Ok(mut dataset) => {
1124 if let Some(requested_version) = request.version {
1126 dataset = dataset.checkout_version(requested_version as u64).await?;
1127 }
1128
1129 let version = dataset.version().version;
1130 let lance_schema = dataset.schema();
1131 let arrow_schema: arrow_schema::Schema = lance_schema.into();
1132 let json_schema = arrow_schema_to_json(&arrow_schema)?;
1133
1134 Ok(DescribeTableResponse {
1135 table: Some(table_name.clone()),
1136 namespace: Some(namespace_id.clone()),
1137 version: Some(version as i64),
1138 location: Some(table_uri.clone()),
1139 table_uri: Some(table_uri),
1140 schema: Some(Box::new(json_schema)),
1141 storage_options,
1142 ..Default::default()
1143 })
1144 }
1145 Err(_) => {
1146 Ok(DescribeTableResponse {
1148 table: Some(table_name),
1149 namespace: Some(namespace_id),
1150 location: Some(table_uri.clone()),
1151 table_uri: Some(table_uri),
1152 storage_options,
1153 ..Default::default()
1154 })
1155 }
1156 }
1157 }
1158 None => Err(Error::Namespace {
1159 source: format!("Table '{}' not found", object_id).into(),
1160 location: location!(),
1161 }),
1162 }
1163 }
1164
1165 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1166 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1167 source: "Table ID is required".into(),
1168 location: location!(),
1169 })?;
1170
1171 if table_id.is_empty() {
1172 return Err(Error::InvalidInput {
1173 source: "Table ID cannot be empty".into(),
1174 location: location!(),
1175 });
1176 }
1177
1178 let (namespace, table_name) = Self::split_object_id(table_id);
1179 let object_id = Self::build_object_id(&namespace, &table_name);
1180 let exists = self.manifest_contains_object(&object_id).await?;
1181 if exists {
1182 Ok(())
1183 } else {
1184 Err(Error::Namespace {
1185 source: format!("Table '{}' not found", table_name).into(),
1186 location: location!(),
1187 })
1188 }
1189 }
1190
1191 async fn create_table(
1192 &self,
1193 request: CreateTableRequest,
1194 data: Bytes,
1195 ) -> Result<CreateTableResponse> {
1196 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1197 source: "Table ID is required".into(),
1198 location: location!(),
1199 })?;
1200
1201 if table_id.is_empty() {
1202 return Err(Error::InvalidInput {
1203 source: "Table ID cannot be empty".into(),
1204 location: location!(),
1205 });
1206 }
1207
1208 let (namespace, table_name) = Self::split_object_id(table_id);
1209 let object_id = Self::build_object_id(&namespace, &table_name);
1210
1211 if self.manifest_contains_object(&object_id).await? {
1213 return Err(Error::io(
1214 format!("Table '{}' already exists", table_name),
1215 location!(),
1216 ));
1217 }
1218
1219 let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1223 format!("{}.lance", table_name)
1225 } else {
1226 Self::generate_dir_name(&object_id)
1228 };
1229 let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1230
1231 if data.is_empty() {
1233 return Err(Error::Namespace {
1234 source: "Request data (Arrow IPC stream) is required for create_table".into(),
1235 location: location!(),
1236 });
1237 }
1238
1239 let cursor = Cursor::new(data.to_vec());
1241 let stream_reader = StreamReader::try_new(cursor, None)
1242 .map_err(|e| Error::io(format!("Failed to read IPC stream: {}", e), location!()))?;
1243
1244 let batches: Vec<RecordBatch> =
1245 stream_reader
1246 .collect::<std::result::Result<Vec<_>, _>>()
1247 .map_err(|e| Error::io(format!("Failed to collect batches: {}", e), location!()))?;
1248
1249 if batches.is_empty() {
1250 return Err(Error::io(
1251 "No data provided for table creation",
1252 location!(),
1253 ));
1254 }
1255
1256 let schema = batches[0].schema();
1257 let batch_results: Vec<std::result::Result<RecordBatch, arrow_schema::ArrowError>> =
1258 batches.into_iter().map(Ok).collect();
1259 let reader = RecordBatchIterator::new(batch_results, schema);
1260
1261 let write_params = WriteParams::default();
1262 let _dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params))
1263 .await
1264 .map_err(|e| Error::IO {
1265 source: box_error(std::io::Error::other(format!(
1266 "Failed to write dataset: {}",
1267 e
1268 ))),
1269 location: location!(),
1270 })?;
1271
1272 self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1274 .await?;
1275
1276 Ok(CreateTableResponse {
1277 version: Some(1),
1278 location: Some(table_uri),
1279 storage_options: self.storage_options.clone(),
1280 ..Default::default()
1281 })
1282 }
1283
1284 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1285 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1286 source: "Table ID is required".into(),
1287 location: location!(),
1288 })?;
1289
1290 if table_id.is_empty() {
1291 return Err(Error::InvalidInput {
1292 source: "Table ID cannot be empty".into(),
1293 location: location!(),
1294 });
1295 }
1296
1297 let (namespace, table_name) = Self::split_object_id(table_id);
1298 let object_id = Self::build_object_id(&namespace, &table_name);
1299
1300 let table_info = self.query_manifest_for_table(&object_id).await?;
1302
1303 match table_info {
1304 Some(info) => {
1305 self.delete_from_manifest(&object_id).await?;
1307
1308 let table_path = self.base_path.child(info.location.as_str());
1310 let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1311
1312 self.object_store
1314 .remove_dir_all(table_path)
1315 .await
1316 .map_err(|e| Error::Namespace {
1317 source: format!("Failed to delete table directory: {}", e).into(),
1318 location: location!(),
1319 })?;
1320
1321 Ok(DropTableResponse {
1322 id: request.id.clone(),
1323 location: Some(table_uri),
1324 ..Default::default()
1325 })
1326 }
1327 None => Err(Error::Namespace {
1328 source: format!("Table '{}' not found", table_name).into(),
1329 location: location!(),
1330 }),
1331 }
1332 }
1333
1334 async fn list_namespaces(
1335 &self,
1336 request: ListNamespacesRequest,
1337 ) -> Result<ListNamespacesResponse> {
1338 let parent_namespace = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1339 source: "Namespace ID is required".into(),
1340 location: location!(),
1341 })?;
1342
1343 let filter = if parent_namespace.is_empty() {
1345 "object_type = 'namespace' AND NOT contains(object_id, '$')".to_string()
1347 } else {
1348 let prefix = parent_namespace.join(DELIMITER);
1350 format!(
1351 "object_type = 'namespace' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1352 prefix, DELIMITER, prefix.len() + 2
1353 )
1354 };
1355
1356 let mut scanner = self.manifest_scanner().await?;
1357 scanner.filter(&filter).map_err(|e| Error::IO {
1358 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1359 location: location!(),
1360 })?;
1361 scanner.project(&["object_id"]).map_err(|e| Error::IO {
1362 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1363 location: location!(),
1364 })?;
1365
1366 let batches = Self::execute_scanner(scanner).await?;
1367 let mut namespaces = Vec::new();
1368
1369 for batch in batches {
1370 if batch.num_rows() == 0 {
1371 continue;
1372 }
1373
1374 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1375 for i in 0..batch.num_rows() {
1376 let object_id = object_id_array.value(i);
1377 let (_namespace, name) = Self::parse_object_id(object_id);
1378 namespaces.push(name);
1379 }
1380 }
1381
1382 Ok(ListNamespacesResponse::new(namespaces))
1383 }
1384
1385 async fn describe_namespace(
1386 &self,
1387 request: DescribeNamespaceRequest,
1388 ) -> Result<DescribeNamespaceResponse> {
1389 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1390 source: "Namespace ID is required".into(),
1391 location: location!(),
1392 })?;
1393
1394 if namespace_id.is_empty() {
1396 #[allow(clippy::needless_update)]
1397 return Ok(DescribeNamespaceResponse {
1398 properties: Some(HashMap::new()),
1399 ..Default::default()
1400 });
1401 }
1402
1403 let object_id = namespace_id.join(DELIMITER);
1405 let namespace_info = self.query_manifest_for_namespace(&object_id).await?;
1406
1407 match namespace_info {
1408 #[allow(clippy::needless_update)]
1409 Some(info) => Ok(DescribeNamespaceResponse {
1410 properties: info.metadata,
1411 ..Default::default()
1412 }),
1413 None => Err(Error::Namespace {
1414 source: format!("Namespace '{}' not found", object_id).into(),
1415 location: location!(),
1416 }),
1417 }
1418 }
1419
1420 async fn create_namespace(
1421 &self,
1422 request: CreateNamespaceRequest,
1423 ) -> Result<CreateNamespaceResponse> {
1424 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1425 source: "Namespace ID is required".into(),
1426 location: location!(),
1427 })?;
1428
1429 if namespace_id.is_empty() {
1431 return Err(Error::Namespace {
1432 source: "Root namespace already exists and cannot be created".into(),
1433 location: location!(),
1434 });
1435 }
1436
1437 if namespace_id.len() > 1 {
1439 self.validate_namespace_levels_exist(&namespace_id[..namespace_id.len() - 1])
1440 .await?;
1441 }
1442
1443 let object_id = namespace_id.join(DELIMITER);
1444 if self.manifest_contains_object(&object_id).await? {
1445 return Err(Error::Namespace {
1446 source: format!("Namespace '{}' already exists", object_id).into(),
1447 location: location!(),
1448 });
1449 }
1450
1451 let metadata = request.properties.as_ref().and_then(|props| {
1453 if props.is_empty() {
1454 None
1455 } else {
1456 Some(serde_json::to_string(props).ok()?)
1457 }
1458 });
1459
1460 self.insert_into_manifest_with_metadata(
1461 object_id,
1462 ObjectType::Namespace,
1463 None,
1464 metadata,
1465 None,
1466 )
1467 .await?;
1468
1469 Ok(CreateNamespaceResponse {
1470 properties: request.properties,
1471 ..Default::default()
1472 })
1473 }
1474
1475 async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
1476 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1477 source: "Namespace ID is required".into(),
1478 location: location!(),
1479 })?;
1480
1481 if namespace_id.is_empty() {
1483 return Err(Error::Namespace {
1484 source: "Root namespace cannot be dropped".into(),
1485 location: location!(),
1486 });
1487 }
1488
1489 let object_id = namespace_id.join(DELIMITER);
1490
1491 if !self.manifest_contains_object(&object_id).await? {
1493 return Err(Error::Namespace {
1494 source: format!("Namespace '{}' not found", object_id).into(),
1495 location: location!(),
1496 });
1497 }
1498
1499 let prefix = format!("{}{}", object_id, DELIMITER);
1501 let filter = format!("starts_with(object_id, '{}')", prefix);
1502 let mut scanner = self.manifest_scanner().await?;
1503 scanner.filter(&filter).map_err(|e| Error::IO {
1504 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1505 location: location!(),
1506 })?;
1507 scanner.project::<&str>(&[]).map_err(|e| Error::IO {
1508 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1509 location: location!(),
1510 })?;
1511 scanner.with_row_id();
1512 let count = scanner.count_rows().await.map_err(|e| Error::IO {
1513 source: box_error(std::io::Error::other(format!(
1514 "Failed to count rows: {}",
1515 e
1516 ))),
1517 location: location!(),
1518 })?;
1519
1520 if count > 0 {
1521 return Err(Error::Namespace {
1522 source: format!(
1523 "Namespace '{}' is not empty (contains {} child objects)",
1524 object_id, count
1525 )
1526 .into(),
1527 location: location!(),
1528 });
1529 }
1530
1531 self.delete_from_manifest(&object_id).await?;
1532
1533 Ok(DropNamespaceResponse::default())
1534 }
1535
1536 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
1537 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1538 source: "Namespace ID is required".into(),
1539 location: location!(),
1540 })?;
1541
1542 if namespace_id.is_empty() {
1544 return Ok(());
1545 }
1546
1547 let object_id = namespace_id.join(DELIMITER);
1548 if self.manifest_contains_object(&object_id).await? {
1549 Ok(())
1550 } else {
1551 Err(Error::Namespace {
1552 source: format!("Namespace '{}' not found", object_id).into(),
1553 location: location!(),
1554 })
1555 }
1556 }
1557
1558 async fn create_empty_table(
1559 &self,
1560 request: CreateEmptyTableRequest,
1561 ) -> Result<CreateEmptyTableResponse> {
1562 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1563 source: "Table ID is required".into(),
1564 location: location!(),
1565 })?;
1566
1567 if table_id.is_empty() {
1568 return Err(Error::InvalidInput {
1569 source: "Table ID cannot be empty".into(),
1570 location: location!(),
1571 });
1572 }
1573
1574 let (namespace, table_name) = Self::split_object_id(table_id);
1575 let object_id = Self::build_object_id(&namespace, &table_name);
1576
1577 let existing = self.query_manifest_for_table(&object_id).await?;
1579 if existing.is_some() {
1580 return Err(Error::Namespace {
1581 source: format!("Table '{}' already exists", table_name).into(),
1582 location: location!(),
1583 });
1584 }
1585
1586 let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1590 format!("{}.lance", table_name)
1592 } else {
1593 Self::generate_dir_name(&object_id)
1595 };
1596 let table_path = self.base_path.child(dir_name.as_str());
1597 let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1598
1599 if let Some(req_location) = &request.location {
1601 let req_location = req_location.trim_end_matches('/');
1602 if req_location != table_uri {
1603 return Err(Error::Namespace {
1604 source: format!(
1605 "Cannot create table {} at location {}, must be at location {}",
1606 table_name, req_location, table_uri
1607 )
1608 .into(),
1609 location: location!(),
1610 });
1611 }
1612 }
1613
1614 let reserved_file_path = table_path.child(".lance-reserved");
1616
1617 self.object_store
1618 .create(&reserved_file_path)
1619 .await
1620 .map_err(|e| Error::Namespace {
1621 source: format!(
1622 "Failed to create .lance-reserved file for table {}: {}",
1623 table_name, e
1624 )
1625 .into(),
1626 location: location!(),
1627 })?
1628 .shutdown()
1629 .await
1630 .map_err(|e| Error::Namespace {
1631 source: format!(
1632 "Failed to finalize .lance-reserved file for table {}: {}",
1633 table_name, e
1634 )
1635 .into(),
1636 location: location!(),
1637 })?;
1638
1639 self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1641 .await?;
1642
1643 log::info!(
1644 "Created empty table '{}' in manifest at {}",
1645 table_name,
1646 table_uri
1647 );
1648
1649 let vend_credentials = request.vend_credentials.unwrap_or(true);
1651 let storage_options = if vend_credentials {
1652 self.storage_options.clone()
1653 } else {
1654 None
1655 };
1656
1657 Ok(CreateEmptyTableResponse {
1658 location: Some(table_uri),
1659 storage_options,
1660 ..Default::default()
1661 })
1662 }
1663
1664 async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
1665 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1666 source: "Table ID is required".into(),
1667 location: location!(),
1668 })?;
1669
1670 if table_id.is_empty() {
1671 return Err(Error::InvalidInput {
1672 source: "Table ID cannot be empty".into(),
1673 location: location!(),
1674 });
1675 }
1676
1677 let (namespace, table_name) = Self::split_object_id(table_id);
1678 let object_id = Self::build_object_id(&namespace, &table_name);
1679
1680 let existing = self.query_manifest_for_table(&object_id).await?;
1682 if existing.is_some() {
1683 return Err(Error::Namespace {
1684 source: format!("Table '{}' already exists", table_name).into(),
1685 location: location!(),
1686 });
1687 }
1688
1689 let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1693 format!("{}.lance", table_name)
1695 } else {
1696 Self::generate_dir_name(&object_id)
1698 };
1699 let table_path = self.base_path.child(dir_name.as_str());
1700 let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1701
1702 if let Some(req_location) = &request.location {
1704 let req_location = req_location.trim_end_matches('/');
1705 if req_location != table_uri {
1706 return Err(Error::Namespace {
1707 source: format!(
1708 "Cannot declare table {} at location {}, must be at location {}",
1709 table_name, req_location, table_uri
1710 )
1711 .into(),
1712 location: location!(),
1713 });
1714 }
1715 }
1716
1717 let reserved_file_path = table_path.child(".lance-reserved");
1719
1720 self.object_store
1721 .create(&reserved_file_path)
1722 .await
1723 .map_err(|e| Error::Namespace {
1724 source: format!(
1725 "Failed to create .lance-reserved file for table {}: {}",
1726 table_name, e
1727 )
1728 .into(),
1729 location: location!(),
1730 })?
1731 .shutdown()
1732 .await
1733 .map_err(|e| Error::Namespace {
1734 source: format!(
1735 "Failed to finalize .lance-reserved file for table {}: {}",
1736 table_name, e
1737 )
1738 .into(),
1739 location: location!(),
1740 })?;
1741
1742 self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1744 .await?;
1745
1746 log::info!(
1747 "Declared table '{}' in manifest at {}",
1748 table_name,
1749 table_uri
1750 );
1751
1752 let vend_credentials = request.vend_credentials.unwrap_or(true);
1754 let storage_options = if vend_credentials {
1755 self.storage_options.clone()
1756 } else {
1757 None
1758 };
1759
1760 Ok(DeclareTableResponse {
1761 location: Some(table_uri),
1762 storage_options,
1763 ..Default::default()
1764 })
1765 }
1766
1767 async fn register_table(&self, request: RegisterTableRequest) -> Result<RegisterTableResponse> {
1768 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1769 source: "Table ID is required".into(),
1770 location: location!(),
1771 })?;
1772
1773 if table_id.is_empty() {
1774 return Err(Error::InvalidInput {
1775 source: "Table ID cannot be empty".into(),
1776 location: location!(),
1777 });
1778 }
1779
1780 let location = request.location.clone();
1781
1782 if location.contains("://") {
1785 return Err(Error::InvalidInput {
1786 source: format!(
1787 "Absolute URIs are not allowed for register_table. Location must be a relative path within the root directory: {}",
1788 location
1789 ).into(),
1790 location: location!(),
1791 });
1792 }
1793
1794 if location.starts_with('/') {
1795 return Err(Error::InvalidInput {
1796 source: format!(
1797 "Absolute paths are not allowed for register_table. Location must be a relative path within the root directory: {}",
1798 location
1799 ).into(),
1800 location: location!(),
1801 });
1802 }
1803
1804 if location.contains("..") {
1806 return Err(Error::InvalidInput {
1807 source: format!(
1808 "Path traversal is not allowed. Location must be a relative path within the root directory: {}",
1809 location
1810 ).into(),
1811 location: location!(),
1812 });
1813 }
1814
1815 let (namespace, table_name) = Self::split_object_id(table_id);
1816 let object_id = Self::build_object_id(&namespace, &table_name);
1817
1818 if !namespace.is_empty() {
1820 self.validate_namespace_levels_exist(&namespace).await?;
1821 }
1822
1823 if self.manifest_contains_object(&object_id).await? {
1825 return Err(Error::Namespace {
1826 source: format!("Table '{}' already exists", object_id).into(),
1827 location: location!(),
1828 });
1829 }
1830
1831 self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone()))
1833 .await?;
1834
1835 Ok(RegisterTableResponse {
1836 location: Some(location),
1837 ..Default::default()
1838 })
1839 }
1840
1841 async fn deregister_table(
1842 &self,
1843 request: DeregisterTableRequest,
1844 ) -> Result<DeregisterTableResponse> {
1845 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1846 source: "Table ID is required".into(),
1847 location: location!(),
1848 })?;
1849
1850 if table_id.is_empty() {
1851 return Err(Error::InvalidInput {
1852 source: "Table ID cannot be empty".into(),
1853 location: location!(),
1854 });
1855 }
1856
1857 let (namespace, table_name) = Self::split_object_id(table_id);
1858 let object_id = Self::build_object_id(&namespace, &table_name);
1859
1860 let table_info = self.query_manifest_for_table(&object_id).await?;
1862
1863 let table_uri = match table_info {
1864 Some(info) => {
1865 self.delete_from_manifest(&object_id).await?;
1867 Self::construct_full_uri(&self.root, &info.location)?
1868 }
1869 None => {
1870 return Err(Error::Namespace {
1871 source: format!("Table '{}' not found", object_id).into(),
1872 location: location!(),
1873 });
1874 }
1875 };
1876
1877 Ok(DeregisterTableResponse {
1878 id: request.id.clone(),
1879 location: Some(table_uri),
1880 ..Default::default()
1881 })
1882 }
1883}
1884
1885#[cfg(test)]
1886mod tests {
1887 use crate::{DirectoryNamespaceBuilder, ManifestNamespace};
1888 use bytes::Bytes;
1889 use lance_core::utils::tempfile::TempStdDir;
1890 use lance_namespace::models::{
1891 CreateTableRequest, DescribeTableRequest, DropTableRequest, ListTablesRequest,
1892 TableExistsRequest,
1893 };
1894 use lance_namespace::LanceNamespace;
1895 use rstest::rstest;
1896
1897 fn create_test_ipc_data() -> Vec<u8> {
1898 use arrow::array::{Int32Array, StringArray};
1899 use arrow::datatypes::{DataType, Field, Schema};
1900 use arrow::ipc::writer::StreamWriter;
1901 use arrow::record_batch::RecordBatch;
1902 use std::sync::Arc;
1903
1904 let schema = Arc::new(Schema::new(vec![
1905 Field::new("id", DataType::Int32, false),
1906 Field::new("name", DataType::Utf8, false),
1907 ]));
1908
1909 let batch = RecordBatch::try_new(
1910 schema.clone(),
1911 vec![
1912 Arc::new(Int32Array::from(vec![1, 2, 3])),
1913 Arc::new(StringArray::from(vec!["a", "b", "c"])),
1914 ],
1915 )
1916 .unwrap();
1917
1918 let mut buffer = Vec::new();
1919 {
1920 let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap();
1921 writer.write(&batch).unwrap();
1922 writer.finish().unwrap();
1923 }
1924 buffer
1925 }
1926
1927 #[rstest]
1928 #[case::with_optimization(true)]
1929 #[case::without_optimization(false)]
1930 #[tokio::test]
1931 async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) {
1932 let temp_dir = TempStdDir::default();
1933 let temp_path = temp_dir.to_str().unwrap();
1934
1935 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1937 .inline_optimization_enabled(inline_optimization)
1938 .build()
1939 .await
1940 .unwrap();
1941
1942 let mut request = ListTablesRequest::new();
1944 request.id = Some(vec![]);
1945 let response = dir_namespace.list_tables(request).await.unwrap();
1946 assert_eq!(response.tables.len(), 0);
1947
1948 let buffer = create_test_ipc_data();
1950 let mut create_request = CreateTableRequest::new();
1951 create_request.id = Some(vec!["test_table".to_string()]);
1952
1953 let _response = dir_namespace
1954 .create_table(create_request, Bytes::from(buffer))
1955 .await
1956 .unwrap();
1957
1958 let mut request = ListTablesRequest::new();
1960 request.id = Some(vec![]);
1961 let response = dir_namespace.list_tables(request).await.unwrap();
1962 assert_eq!(response.tables.len(), 1);
1963 assert_eq!(response.tables[0], "test_table");
1964 }
1965
1966 #[rstest]
1967 #[case::with_optimization(true)]
1968 #[case::without_optimization(false)]
1969 #[tokio::test]
1970 async fn test_manifest_namespace_table_exists(#[case] inline_optimization: bool) {
1971 let temp_dir = TempStdDir::default();
1972 let temp_path = temp_dir.to_str().unwrap();
1973
1974 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1975 .inline_optimization_enabled(inline_optimization)
1976 .build()
1977 .await
1978 .unwrap();
1979
1980 let mut request = TableExistsRequest::new();
1982 request.id = Some(vec!["nonexistent".to_string()]);
1983 let result = dir_namespace.table_exists(request).await;
1984 assert!(result.is_err());
1985
1986 let buffer = create_test_ipc_data();
1988 let mut create_request = CreateTableRequest::new();
1989 create_request.id = Some(vec!["test_table".to_string()]);
1990 dir_namespace
1991 .create_table(create_request, Bytes::from(buffer))
1992 .await
1993 .unwrap();
1994
1995 let mut request = TableExistsRequest::new();
1997 request.id = Some(vec!["test_table".to_string()]);
1998 let result = dir_namespace.table_exists(request).await;
1999 assert!(result.is_ok());
2000 }
2001
2002 #[rstest]
2003 #[case::with_optimization(true)]
2004 #[case::without_optimization(false)]
2005 #[tokio::test]
2006 async fn test_manifest_namespace_describe_table(#[case] inline_optimization: bool) {
2007 let temp_dir = TempStdDir::default();
2008 let temp_path = temp_dir.to_str().unwrap();
2009
2010 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2011 .inline_optimization_enabled(inline_optimization)
2012 .build()
2013 .await
2014 .unwrap();
2015
2016 let mut request = DescribeTableRequest::new();
2018 request.id = Some(vec!["nonexistent".to_string()]);
2019 let result = dir_namespace.describe_table(request).await;
2020 assert!(result.is_err());
2021
2022 let buffer = create_test_ipc_data();
2024 let mut create_request = CreateTableRequest::new();
2025 create_request.id = Some(vec!["test_table".to_string()]);
2026 dir_namespace
2027 .create_table(create_request, Bytes::from(buffer))
2028 .await
2029 .unwrap();
2030
2031 let mut request = DescribeTableRequest::new();
2033 request.id = Some(vec!["test_table".to_string()]);
2034 let response = dir_namespace.describe_table(request).await.unwrap();
2035 assert!(response.location.is_some());
2036 assert!(response.location.unwrap().contains("test_table"));
2037 }
2038
2039 #[rstest]
2040 #[case::with_optimization(true)]
2041 #[case::without_optimization(false)]
2042 #[tokio::test]
2043 async fn test_manifest_namespace_drop_table(#[case] inline_optimization: bool) {
2044 let temp_dir = TempStdDir::default();
2045 let temp_path = temp_dir.to_str().unwrap();
2046
2047 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2048 .inline_optimization_enabled(inline_optimization)
2049 .build()
2050 .await
2051 .unwrap();
2052
2053 let buffer = create_test_ipc_data();
2055 let mut create_request = CreateTableRequest::new();
2056 create_request.id = Some(vec!["test_table".to_string()]);
2057 dir_namespace
2058 .create_table(create_request, Bytes::from(buffer))
2059 .await
2060 .unwrap();
2061
2062 let mut request = ListTablesRequest::new();
2064 request.id = Some(vec![]);
2065 let response = dir_namespace.list_tables(request).await.unwrap();
2066 assert_eq!(response.tables.len(), 1);
2067
2068 let mut drop_request = DropTableRequest::new();
2070 drop_request.id = Some(vec!["test_table".to_string()]);
2071 let _response = dir_namespace.drop_table(drop_request).await.unwrap();
2072
2073 let mut request = ListTablesRequest::new();
2075 request.id = Some(vec![]);
2076 let response = dir_namespace.list_tables(request).await.unwrap();
2077 assert_eq!(response.tables.len(), 0);
2078 }
2079
2080 #[rstest]
2081 #[case::with_optimization(true)]
2082 #[case::without_optimization(false)]
2083 #[tokio::test]
2084 async fn test_manifest_namespace_multiple_tables(#[case] inline_optimization: bool) {
2085 let temp_dir = TempStdDir::default();
2086 let temp_path = temp_dir.to_str().unwrap();
2087
2088 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2089 .inline_optimization_enabled(inline_optimization)
2090 .build()
2091 .await
2092 .unwrap();
2093
2094 let buffer = create_test_ipc_data();
2096 for i in 1..=3 {
2097 let mut create_request = CreateTableRequest::new();
2098 create_request.id = Some(vec![format!("table{}", i)]);
2099 dir_namespace
2100 .create_table(create_request, Bytes::from(buffer.clone()))
2101 .await
2102 .unwrap();
2103 }
2104
2105 let mut request = ListTablesRequest::new();
2107 request.id = Some(vec![]);
2108 let response = dir_namespace.list_tables(request).await.unwrap();
2109 assert_eq!(response.tables.len(), 3);
2110 assert!(response.tables.contains(&"table1".to_string()));
2111 assert!(response.tables.contains(&"table2".to_string()));
2112 assert!(response.tables.contains(&"table3".to_string()));
2113 }
2114
2115 #[rstest]
2116 #[case::with_optimization(true)]
2117 #[case::without_optimization(false)]
2118 #[tokio::test]
2119 async fn test_directory_only_mode(#[case] inline_optimization: bool) {
2120 let temp_dir = TempStdDir::default();
2121 let temp_path = temp_dir.to_str().unwrap();
2122
2123 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2125 .manifest_enabled(false)
2126 .inline_optimization_enabled(inline_optimization)
2127 .build()
2128 .await
2129 .unwrap();
2130
2131 let mut request = ListTablesRequest::new();
2133 request.id = Some(vec![]);
2134 let response = dir_namespace.list_tables(request).await.unwrap();
2135 assert_eq!(response.tables.len(), 0);
2136
2137 let buffer = create_test_ipc_data();
2139 let mut create_request = CreateTableRequest::new();
2140 create_request.id = Some(vec!["test_table".to_string()]);
2141
2142 let _response = dir_namespace
2144 .create_table(create_request, Bytes::from(buffer))
2145 .await
2146 .unwrap();
2147
2148 let mut request = ListTablesRequest::new();
2150 request.id = Some(vec![]);
2151 let response = dir_namespace.list_tables(request).await.unwrap();
2152 assert_eq!(response.tables.len(), 1);
2153 assert_eq!(response.tables[0], "test_table");
2154 }
2155
2156 #[rstest]
2157 #[case::with_optimization(true)]
2158 #[case::without_optimization(false)]
2159 #[tokio::test]
2160 async fn test_dual_mode_merge(#[case] inline_optimization: bool) {
2161 let temp_dir = TempStdDir::default();
2162 let temp_path = temp_dir.to_str().unwrap();
2163
2164 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2166 .manifest_enabled(true)
2167 .dir_listing_enabled(true)
2168 .inline_optimization_enabled(inline_optimization)
2169 .build()
2170 .await
2171 .unwrap();
2172
2173 let buffer = create_test_ipc_data();
2175 let mut create_request = CreateTableRequest::new();
2176 create_request.id = Some(vec!["table1".to_string()]);
2177 dir_namespace
2178 .create_table(create_request, Bytes::from(buffer))
2179 .await
2180 .unwrap();
2181
2182 let mut request = ListTablesRequest::new();
2184 request.id = Some(vec![]);
2185 let response = dir_namespace.list_tables(request).await.unwrap();
2186 assert_eq!(response.tables.len(), 1);
2187 assert_eq!(response.tables[0], "table1");
2188 }
2189
2190 #[rstest]
2191 #[case::with_optimization(true)]
2192 #[case::without_optimization(false)]
2193 #[tokio::test]
2194 async fn test_manifest_only_mode(#[case] inline_optimization: bool) {
2195 let temp_dir = TempStdDir::default();
2196 let temp_path = temp_dir.to_str().unwrap();
2197
2198 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2200 .manifest_enabled(true)
2201 .dir_listing_enabled(false)
2202 .inline_optimization_enabled(inline_optimization)
2203 .build()
2204 .await
2205 .unwrap();
2206
2207 let buffer = create_test_ipc_data();
2209 let mut create_request = CreateTableRequest::new();
2210 create_request.id = Some(vec!["test_table".to_string()]);
2211 dir_namespace
2212 .create_table(create_request, Bytes::from(buffer))
2213 .await
2214 .unwrap();
2215
2216 let mut request = ListTablesRequest::new();
2218 request.id = Some(vec![]);
2219 let response = dir_namespace.list_tables(request).await.unwrap();
2220 assert_eq!(response.tables.len(), 1);
2221 assert_eq!(response.tables[0], "test_table");
2222 }
2223
2224 #[rstest]
2225 #[case::with_optimization(true)]
2226 #[case::without_optimization(false)]
2227 #[tokio::test]
2228 async fn test_drop_nonexistent_table(#[case] inline_optimization: bool) {
2229 let temp_dir = TempStdDir::default();
2230 let temp_path = temp_dir.to_str().unwrap();
2231
2232 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2233 .inline_optimization_enabled(inline_optimization)
2234 .build()
2235 .await
2236 .unwrap();
2237
2238 let mut drop_request = DropTableRequest::new();
2240 drop_request.id = Some(vec!["nonexistent".to_string()]);
2241 let result = dir_namespace.drop_table(drop_request).await;
2242 assert!(result.is_err());
2243 }
2244
2245 #[rstest]
2246 #[case::with_optimization(true)]
2247 #[case::without_optimization(false)]
2248 #[tokio::test]
2249 async fn test_create_duplicate_table_fails(#[case] inline_optimization: bool) {
2250 let temp_dir = TempStdDir::default();
2251 let temp_path = temp_dir.to_str().unwrap();
2252
2253 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2254 .inline_optimization_enabled(inline_optimization)
2255 .build()
2256 .await
2257 .unwrap();
2258
2259 let buffer = create_test_ipc_data();
2261 let mut create_request = CreateTableRequest::new();
2262 create_request.id = Some(vec!["test_table".to_string()]);
2263 dir_namespace
2264 .create_table(create_request, Bytes::from(buffer.clone()))
2265 .await
2266 .unwrap();
2267
2268 let mut create_request = CreateTableRequest::new();
2270 create_request.id = Some(vec!["test_table".to_string()]);
2271 let result = dir_namespace
2272 .create_table(create_request, Bytes::from(buffer))
2273 .await;
2274 assert!(result.is_err());
2275 }
2276
2277 #[rstest]
2278 #[case::with_optimization(true)]
2279 #[case::without_optimization(false)]
2280 #[tokio::test]
2281 async fn test_create_child_namespace(#[case] inline_optimization: bool) {
2282 use lance_namespace::models::{
2283 CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2284 };
2285
2286 let temp_dir = TempStdDir::default();
2287 let temp_path = temp_dir.to_str().unwrap();
2288
2289 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2290 .inline_optimization_enabled(inline_optimization)
2291 .build()
2292 .await
2293 .unwrap();
2294
2295 let mut create_req = CreateNamespaceRequest::new();
2297 create_req.id = Some(vec!["ns1".to_string()]);
2298 let result = dir_namespace.create_namespace(create_req).await;
2299 assert!(
2300 result.is_ok(),
2301 "Failed to create child namespace: {:?}",
2302 result.err()
2303 );
2304
2305 let exists_req = NamespaceExistsRequest {
2307 id: Some(vec!["ns1".to_string()]),
2308 ..Default::default()
2309 };
2310 let result = dir_namespace.namespace_exists(exists_req).await;
2311 assert!(result.is_ok(), "Namespace should exist");
2312
2313 let list_req = ListNamespacesRequest {
2315 id: Some(vec![]),
2316 page_token: None,
2317 limit: None,
2318 ..Default::default()
2319 };
2320 let result = dir_namespace.list_namespaces(list_req).await;
2321 assert!(result.is_ok());
2322 let namespaces = result.unwrap();
2323 assert_eq!(namespaces.namespaces.len(), 1);
2324 assert_eq!(namespaces.namespaces[0], "ns1");
2325 }
2326
2327 #[rstest]
2328 #[case::with_optimization(true)]
2329 #[case::without_optimization(false)]
2330 #[tokio::test]
2331 async fn test_create_nested_namespace(#[case] inline_optimization: bool) {
2332 use lance_namespace::models::{
2333 CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2334 };
2335
2336 let temp_dir = TempStdDir::default();
2337 let temp_path = temp_dir.to_str().unwrap();
2338
2339 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2340 .inline_optimization_enabled(inline_optimization)
2341 .build()
2342 .await
2343 .unwrap();
2344
2345 let mut create_req = CreateNamespaceRequest::new();
2347 create_req.id = Some(vec!["parent".to_string()]);
2348 dir_namespace.create_namespace(create_req).await.unwrap();
2349
2350 let mut create_req = CreateNamespaceRequest::new();
2352 create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2353 let result = dir_namespace.create_namespace(create_req).await;
2354 assert!(
2355 result.is_ok(),
2356 "Failed to create nested namespace: {:?}",
2357 result.err()
2358 );
2359
2360 let exists_req = NamespaceExistsRequest {
2362 id: Some(vec!["parent".to_string(), "child".to_string()]),
2363 ..Default::default()
2364 };
2365 let result = dir_namespace.namespace_exists(exists_req).await;
2366 assert!(result.is_ok(), "Nested namespace should exist");
2367
2368 let list_req = ListNamespacesRequest {
2370 id: Some(vec!["parent".to_string()]),
2371 page_token: None,
2372 limit: None,
2373 ..Default::default()
2374 };
2375 let result = dir_namespace.list_namespaces(list_req).await;
2376 assert!(result.is_ok());
2377 let namespaces = result.unwrap();
2378 assert_eq!(namespaces.namespaces.len(), 1);
2379 assert_eq!(namespaces.namespaces[0], "child");
2380 }
2381
2382 #[rstest]
2383 #[case::with_optimization(true)]
2384 #[case::without_optimization(false)]
2385 #[tokio::test]
2386 async fn test_create_namespace_without_parent_fails(#[case] inline_optimization: bool) {
2387 use lance_namespace::models::CreateNamespaceRequest;
2388
2389 let temp_dir = TempStdDir::default();
2390 let temp_path = temp_dir.to_str().unwrap();
2391
2392 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2393 .inline_optimization_enabled(inline_optimization)
2394 .build()
2395 .await
2396 .unwrap();
2397
2398 let mut create_req = CreateNamespaceRequest::new();
2400 create_req.id = Some(vec!["nonexistent_parent".to_string(), "child".to_string()]);
2401 let result = dir_namespace.create_namespace(create_req).await;
2402 assert!(result.is_err(), "Should fail when parent doesn't exist");
2403 }
2404
2405 #[rstest]
2406 #[case::with_optimization(true)]
2407 #[case::without_optimization(false)]
2408 #[tokio::test]
2409 async fn test_drop_child_namespace(#[case] inline_optimization: bool) {
2410 use lance_namespace::models::{
2411 CreateNamespaceRequest, DropNamespaceRequest, NamespaceExistsRequest,
2412 };
2413
2414 let temp_dir = TempStdDir::default();
2415 let temp_path = temp_dir.to_str().unwrap();
2416
2417 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2418 .inline_optimization_enabled(inline_optimization)
2419 .build()
2420 .await
2421 .unwrap();
2422
2423 let mut create_req = CreateNamespaceRequest::new();
2425 create_req.id = Some(vec!["ns1".to_string()]);
2426 dir_namespace.create_namespace(create_req).await.unwrap();
2427
2428 let mut drop_req = DropNamespaceRequest::new();
2430 drop_req.id = Some(vec!["ns1".to_string()]);
2431 let result = dir_namespace.drop_namespace(drop_req).await;
2432 assert!(
2433 result.is_ok(),
2434 "Failed to drop namespace: {:?}",
2435 result.err()
2436 );
2437
2438 let exists_req = NamespaceExistsRequest {
2440 id: Some(vec!["ns1".to_string()]),
2441 ..Default::default()
2442 };
2443 let result = dir_namespace.namespace_exists(exists_req).await;
2444 assert!(result.is_err(), "Namespace should not exist after drop");
2445 }
2446
2447 #[rstest]
2448 #[case::with_optimization(true)]
2449 #[case::without_optimization(false)]
2450 #[tokio::test]
2451 async fn test_drop_namespace_with_children_fails(#[case] inline_optimization: bool) {
2452 use lance_namespace::models::{CreateNamespaceRequest, DropNamespaceRequest};
2453
2454 let temp_dir = TempStdDir::default();
2455 let temp_path = temp_dir.to_str().unwrap();
2456
2457 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2458 .inline_optimization_enabled(inline_optimization)
2459 .build()
2460 .await
2461 .unwrap();
2462
2463 let mut create_req = CreateNamespaceRequest::new();
2465 create_req.id = Some(vec!["parent".to_string()]);
2466 dir_namespace.create_namespace(create_req).await.unwrap();
2467
2468 let mut create_req = CreateNamespaceRequest::new();
2469 create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2470 dir_namespace.create_namespace(create_req).await.unwrap();
2471
2472 let mut drop_req = DropNamespaceRequest::new();
2474 drop_req.id = Some(vec!["parent".to_string()]);
2475 let result = dir_namespace.drop_namespace(drop_req).await;
2476 assert!(result.is_err(), "Should fail when namespace has children");
2477 }
2478
2479 #[rstest]
2480 #[case::with_optimization(true)]
2481 #[case::without_optimization(false)]
2482 #[tokio::test]
2483 async fn test_create_table_in_child_namespace(#[case] inline_optimization: bool) {
2484 use lance_namespace::models::{
2485 CreateNamespaceRequest, CreateTableRequest, ListTablesRequest,
2486 };
2487
2488 let temp_dir = TempStdDir::default();
2489 let temp_path = temp_dir.to_str().unwrap();
2490
2491 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2492 .inline_optimization_enabled(inline_optimization)
2493 .build()
2494 .await
2495 .unwrap();
2496
2497 let mut create_ns_req = CreateNamespaceRequest::new();
2499 create_ns_req.id = Some(vec!["ns1".to_string()]);
2500 dir_namespace.create_namespace(create_ns_req).await.unwrap();
2501
2502 let buffer = create_test_ipc_data();
2504 let mut create_table_req = CreateTableRequest::new();
2505 create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2506 let result = dir_namespace
2507 .create_table(create_table_req, Bytes::from(buffer))
2508 .await;
2509 assert!(
2510 result.is_ok(),
2511 "Failed to create table in child namespace: {:?}",
2512 result.err()
2513 );
2514
2515 let list_req = ListTablesRequest {
2517 id: Some(vec!["ns1".to_string()]),
2518 page_token: None,
2519 limit: None,
2520 ..Default::default()
2521 };
2522 let result = dir_namespace.list_tables(list_req).await;
2523 assert!(result.is_ok());
2524 let tables = result.unwrap();
2525 assert_eq!(tables.tables.len(), 1);
2526 assert_eq!(tables.tables[0], "table1");
2527 }
2528
2529 #[rstest]
2530 #[case::with_optimization(true)]
2531 #[case::without_optimization(false)]
2532 #[tokio::test]
2533 async fn test_describe_child_namespace(#[case] inline_optimization: bool) {
2534 use lance_namespace::models::{CreateNamespaceRequest, DescribeNamespaceRequest};
2535
2536 let temp_dir = TempStdDir::default();
2537 let temp_path = temp_dir.to_str().unwrap();
2538
2539 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2540 .inline_optimization_enabled(inline_optimization)
2541 .build()
2542 .await
2543 .unwrap();
2544
2545 let mut properties = std::collections::HashMap::new();
2547 properties.insert("key1".to_string(), "value1".to_string());
2548
2549 let mut create_req = CreateNamespaceRequest::new();
2550 create_req.id = Some(vec!["ns1".to_string()]);
2551 create_req.properties = Some(properties.clone());
2552 dir_namespace.create_namespace(create_req).await.unwrap();
2553
2554 let describe_req = DescribeNamespaceRequest {
2556 id: Some(vec!["ns1".to_string()]),
2557 ..Default::default()
2558 };
2559 let result = dir_namespace.describe_namespace(describe_req).await;
2560 assert!(
2561 result.is_ok(),
2562 "Failed to describe namespace: {:?}",
2563 result.err()
2564 );
2565 let response = result.unwrap();
2566 assert!(response.properties.is_some());
2567 assert_eq!(
2568 response.properties.unwrap().get("key1"),
2569 Some(&"value1".to_string())
2570 );
2571 }
2572
2573 #[test]
2574 fn test_construct_full_uri_with_cloud_urls() {
2575 let s3_result =
2577 ManifestNamespace::construct_full_uri("s3://bucket/path/subdir", "table.lance")
2578 .unwrap();
2579 assert_eq!(
2580 s3_result, "s3://bucket/path/subdir/table.lance",
2581 "S3 URL should correctly append table name to nested path"
2582 );
2583
2584 let az_result =
2586 ManifestNamespace::construct_full_uri("az://container/path/subdir", "table.lance")
2587 .unwrap();
2588 assert_eq!(
2589 az_result, "az://container/path/subdir/table.lance",
2590 "Azure URL should correctly append table name to nested path"
2591 );
2592
2593 let gs_result =
2595 ManifestNamespace::construct_full_uri("gs://bucket/path/subdir", "table.lance")
2596 .unwrap();
2597 assert_eq!(
2598 gs_result, "gs://bucket/path/subdir/table.lance",
2599 "GCS URL should correctly append table name to nested path"
2600 );
2601
2602 let deep_result =
2604 ManifestNamespace::construct_full_uri("s3://bucket/a/b/c/d", "my_table.lance").unwrap();
2605 assert_eq!(
2606 deep_result, "s3://bucket/a/b/c/d/my_table.lance",
2607 "Deeply nested path should work correctly"
2608 );
2609
2610 let shallow_result =
2612 ManifestNamespace::construct_full_uri("s3://bucket", "table.lance").unwrap();
2613 assert_eq!(
2614 shallow_result, "s3://bucket/table.lance",
2615 "Single-level nested path should work correctly"
2616 );
2617
2618 let trailing_slash_result =
2620 ManifestNamespace::construct_full_uri("s3://bucket/path/subdir/", "table.lance")
2621 .unwrap();
2622 assert_eq!(
2623 trailing_slash_result, "s3://bucket/path/subdir/table.lance",
2624 "URL with existing trailing slash should still work"
2625 );
2626 }
2627}