1use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray};
10use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
11use arrow_ipc::reader::StreamReader;
12use async_trait::async_trait;
13use bytes::Bytes;
14use futures::{stream::StreamExt, FutureExt};
15use lance::dataset::optimize::{compact_files, CompactionOptions};
16use lance::dataset::{builder::DatasetBuilder, WriteParams};
17use lance::session::Session;
18use lance::{dataset::scanner::Scanner, Dataset};
19use lance_core::{box_error, Error, Result};
20use lance_index::optimize::OptimizeOptions;
21use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
22use lance_index::traits::DatasetIndexExt;
23use lance_index::IndexType;
24use lance_io::object_store::{ObjectStore, ObjectStoreParams};
25use lance_namespace::models::{
26 CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
27 CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DeclareTableRequest,
28 DeclareTableResponse, DeregisterTableRequest, DeregisterTableResponse,
29 DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
30 DescribeTableResponse, DropNamespaceRequest, DropNamespaceResponse, DropTableRequest,
31 DropTableResponse, ListNamespacesRequest, ListNamespacesResponse, ListTablesRequest,
32 ListTablesResponse, NamespaceExistsRequest, RegisterTableRequest, RegisterTableResponse,
33 TableExistsRequest,
34};
35use lance_namespace::schema::arrow_schema_to_json;
36use lance_namespace::LanceNamespace;
37use object_store::path::Path;
38use snafu::location;
39use std::io::Cursor;
40use std::{
41 collections::HashMap,
42 hash::{DefaultHasher, Hash, Hasher},
43 ops::{Deref, DerefMut},
44 sync::Arc,
45};
46use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
47
48const MANIFEST_TABLE_NAME: &str = "__manifest";
49const DELIMITER: &str = "$";
50
51const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
54const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
56const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum ObjectType {
62 Namespace,
63 Table,
64}
65
66impl ObjectType {
67 pub fn as_str(&self) -> &str {
68 match self {
69 Self::Namespace => "namespace",
70 Self::Table => "table",
71 }
72 }
73
74 pub fn parse(s: &str) -> Result<Self> {
75 match s {
76 "namespace" => Ok(Self::Namespace),
77 "table" => Ok(Self::Table),
78 _ => Err(Error::io(
79 format!("Invalid object type: {}", s),
80 location!(),
81 )),
82 }
83 }
84}
85
86#[derive(Debug, Clone)]
88pub struct TableInfo {
89 pub namespace: Vec<String>,
90 pub name: String,
91 pub location: String,
92}
93
94#[derive(Debug, Clone)]
96pub struct NamespaceInfo {
97 pub namespace: Vec<String>,
98 pub name: String,
99 pub metadata: Option<HashMap<String, String>>,
100}
101
102#[derive(Debug, Clone)]
107pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);
108
109impl DatasetConsistencyWrapper {
110 pub fn new(dataset: Dataset) -> Self {
112 Self(Arc::new(RwLock::new(dataset)))
113 }
114
115 pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
118 self.reload().await?;
119 Ok(DatasetReadGuard {
120 guard: self.0.read().await,
121 })
122 }
123
124 pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
127 self.reload().await?;
128 Ok(DatasetWriteGuard {
129 guard: self.0.write().await,
130 })
131 }
132
133 pub async fn set_latest(&self, dataset: Dataset) {
138 let mut write_guard = self.0.write().await;
139 if dataset.manifest().version > write_guard.manifest().version {
140 *write_guard = dataset;
141 }
142 }
143
144 async fn reload(&self) -> Result<()> {
146 let read_guard = self.0.read().await;
148 let dataset_uri = read_guard.uri().to_string();
149 let current_version = read_guard.version().version;
150 log::debug!(
151 "Reload starting for uri={}, current_version={}",
152 dataset_uri,
153 current_version
154 );
155 let latest_version = read_guard
156 .latest_version_id()
157 .await
158 .map_err(|e| Error::IO {
159 source: box_error(std::io::Error::other(format!(
160 "Failed to get latest version: {}",
161 e
162 ))),
163 location: location!(),
164 })?;
165 log::debug!(
166 "Reload got latest_version={} for uri={}, current_version={}",
167 latest_version,
168 dataset_uri,
169 current_version
170 );
171 drop(read_guard);
172
173 if latest_version == current_version {
175 log::debug!("Already up-to-date for uri={}", dataset_uri);
176 return Ok(());
177 }
178
179 let mut write_guard = self.0.write().await;
181
182 let latest_version = write_guard
184 .latest_version_id()
185 .await
186 .map_err(|e| Error::IO {
187 source: box_error(std::io::Error::other(format!(
188 "Failed to get latest version: {}",
189 e
190 ))),
191 location: location!(),
192 })?;
193
194 if latest_version != write_guard.version().version {
195 write_guard.checkout_latest().await.map_err(|e| Error::IO {
196 source: box_error(std::io::Error::other(format!(
197 "Failed to checkout latest: {}",
198 e
199 ))),
200 location: location!(),
201 })?;
202 }
203
204 Ok(())
205 }
206}
207
208pub struct DatasetReadGuard<'a> {
209 guard: RwLockReadGuard<'a, Dataset>,
210}
211
212impl Deref for DatasetReadGuard<'_> {
213 type Target = Dataset;
214
215 fn deref(&self) -> &Self::Target {
216 &self.guard
217 }
218}
219
220pub struct DatasetWriteGuard<'a> {
221 guard: RwLockWriteGuard<'a, Dataset>,
222}
223
224impl Deref for DatasetWriteGuard<'_> {
225 type Target = Dataset;
226
227 fn deref(&self) -> &Self::Target {
228 &self.guard
229 }
230}
231
232impl DerefMut for DatasetWriteGuard<'_> {
233 fn deref_mut(&mut self) -> &mut Self::Target {
234 &mut self.guard
235 }
236}
237
238#[derive(Debug)]
242pub struct ManifestNamespace {
243 root: String,
244 storage_options: Option<HashMap<String, String>>,
245 #[allow(dead_code)]
246 session: Option<Arc<Session>>,
247 #[allow(dead_code)]
248 object_store: Arc<ObjectStore>,
249 #[allow(dead_code)]
250 base_path: Path,
251 manifest_dataset: DatasetConsistencyWrapper,
252 dir_listing_enabled: bool,
256 inline_optimization_enabled: bool,
259}
260
261impl ManifestNamespace {
262 pub async fn from_directory(
264 root: String,
265 storage_options: Option<HashMap<String, String>>,
266 session: Option<Arc<Session>>,
267 object_store: Arc<ObjectStore>,
268 base_path: Path,
269 dir_listing_enabled: bool,
270 inline_optimization_enabled: bool,
271 ) -> Result<Self> {
272 let manifest_dataset =
273 Self::create_or_get_manifest(&root, &storage_options, session.clone()).await?;
274
275 Ok(Self {
276 root,
277 storage_options,
278 session,
279 object_store,
280 base_path,
281 manifest_dataset,
282 dir_listing_enabled,
283 inline_optimization_enabled,
284 })
285 }
286
287 pub fn build_object_id(namespace: &[String], name: &str) -> String {
289 if namespace.is_empty() {
290 name.to_string()
291 } else {
292 let mut id = namespace.join(DELIMITER);
293 id.push_str(DELIMITER);
294 id.push_str(name);
295 id
296 }
297 }
298
299 pub fn parse_object_id(object_id: &str) -> (Vec<String>, String) {
301 let parts: Vec<&str> = object_id.split(DELIMITER).collect();
302 if parts.len() == 1 {
303 (Vec::new(), parts[0].to_string())
304 } else {
305 let namespace = parts[..parts.len() - 1]
306 .iter()
307 .map(|s| s.to_string())
308 .collect();
309 let name = parts[parts.len() - 1].to_string();
310 (namespace, name)
311 }
312 }
313
314 fn split_object_id(table_id: &[String]) -> (Vec<String>, String) {
316 if table_id.len() == 1 {
317 (vec![], table_id[0].clone())
318 } else {
319 (
320 table_id[..table_id.len() - 1].to_vec(),
321 table_id[table_id.len() - 1].clone(),
322 )
323 }
324 }
325
326 fn str_object_id(table_id: &[String]) -> String {
328 table_id.join(DELIMITER)
329 }
330
331 fn generate_dir_name(object_id: &str) -> String {
338 let random_num: u64 = rand::random();
340
341 let mut hasher = DefaultHasher::new();
343 random_num.hash(&mut hasher);
344 object_id.hash(&mut hasher);
345 let hash = hasher.finish();
346
347 format!("{:08x}_{}", (hash & 0xFFFFFFFF) as u32, object_id)
349 }
350
351 pub(crate) fn construct_full_uri(root: &str, relative_location: &str) -> Result<String> {
353 let mut base_url = lance_io::object_store::uri_to_url(root)?;
354
355 if !base_url.path().ends_with('/') {
360 base_url.set_path(&format!("{}/", base_url.path()));
361 }
362
363 let full_url = base_url
364 .join(relative_location)
365 .map_err(|e| Error::InvalidInput {
366 source: format!(
367 "Failed to join URI '{}' with '{}': {:?}",
368 root, relative_location, e
369 )
370 .into(),
371 location: location!(),
372 })?;
373
374 Ok(full_url.to_string())
375 }
376
377 async fn run_inline_optimization(&self) -> Result<()> {
389 if !self.inline_optimization_enabled {
390 return Ok(());
391 }
392
393 let mut dataset_guard = self.manifest_dataset.get_mut().await?;
395 let dataset: &mut Dataset = &mut dataset_guard;
396
397 let indices = dataset.load_indices().await?;
399
400 let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
402 let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
403 let has_base_objects_index = indices
404 .iter()
405 .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);
406
407 if !has_object_id_index {
409 log::debug!(
410 "Creating BTREE index '{}' on object_id for __manifest table",
411 OBJECT_ID_INDEX_NAME
412 );
413 let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
414 if let Err(e) = dataset
415 .create_index(
416 &["object_id"],
417 IndexType::BTree,
418 Some(OBJECT_ID_INDEX_NAME.to_string()),
419 ¶ms,
420 true,
421 )
422 .await
423 {
424 log::warn!("Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.", e);
425 } else {
426 log::info!(
427 "Created BTREE index '{}' on object_id for __manifest table",
428 OBJECT_ID_INDEX_NAME
429 );
430 }
431 }
432
433 if !has_object_type_index {
435 log::debug!(
436 "Creating Bitmap index '{}' on object_type for __manifest table",
437 OBJECT_TYPE_INDEX_NAME
438 );
439 let params = ScalarIndexParams::default();
440 if let Err(e) = dataset
441 .create_index(
442 &["object_type"],
443 IndexType::Bitmap,
444 Some(OBJECT_TYPE_INDEX_NAME.to_string()),
445 ¶ms,
446 true,
447 )
448 .await
449 {
450 log::warn!("Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.", e);
451 } else {
452 log::info!(
453 "Created Bitmap index '{}' on object_type for __manifest table",
454 OBJECT_TYPE_INDEX_NAME
455 );
456 }
457 }
458
459 if !has_base_objects_index {
461 log::debug!(
462 "Creating LabelList index '{}' on base_objects for __manifest table",
463 BASE_OBJECTS_INDEX_NAME
464 );
465 let params = ScalarIndexParams::default();
466 if let Err(e) = dataset
467 .create_index(
468 &["base_objects"],
469 IndexType::LabelList,
470 Some(BASE_OBJECTS_INDEX_NAME.to_string()),
471 ¶ms,
472 true,
473 )
474 .await
475 {
476 log::warn!("Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.", e);
477 } else {
478 log::info!(
479 "Created LabelList index '{}' on base_objects for __manifest table",
480 BASE_OBJECTS_INDEX_NAME
481 );
482 }
483 }
484
485 log::debug!("Running file compaction on __manifest table");
487 match compact_files(dataset, CompactionOptions::default(), None).await {
488 Ok(compaction_metrics) => {
489 if compaction_metrics.fragments_removed > 0 {
490 log::info!(
491 "Compacted __manifest table: removed {} fragments, added {} fragments",
492 compaction_metrics.fragments_removed,
493 compaction_metrics.fragments_added
494 );
495 }
496 }
497 Err(e) => {
498 log::warn!("Failed to compact files for __manifest table: {:?}. Continuing with optimization.", e);
499 }
500 }
501
502 log::debug!("Optimizing indices on __manifest table");
504 match dataset.optimize_indices(&OptimizeOptions::default()).await {
505 Ok(_) => {
506 log::info!("Successfully optimized indices on __manifest table");
507 }
508 Err(e) => {
509 log::warn!(
510 "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.",
511 e
512 );
513 }
514 }
515
516 Ok(())
517 }
518
519 fn manifest_schema() -> Arc<ArrowSchema> {
521 Arc::new(ArrowSchema::new(vec![
522 Field::new("object_id", DataType::Utf8, false),
523 Field::new("object_type", DataType::Utf8, false),
524 Field::new("location", DataType::Utf8, true),
525 Field::new("metadata", DataType::Utf8, true),
526 Field::new(
527 "base_objects",
528 DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))),
529 true,
530 ),
531 ]))
532 }
533
534 async fn manifest_scanner(&self) -> Result<Scanner> {
536 let dataset_guard = self.manifest_dataset.get().await?;
537 Ok(dataset_guard.scan())
538 }
539
540 async fn execute_scanner(scanner: Scanner) -> Result<Vec<RecordBatch>> {
542 let mut stream = scanner.try_into_stream().await.map_err(|e| Error::IO {
543 source: box_error(std::io::Error::other(format!(
544 "Failed to create stream: {}",
545 e
546 ))),
547 location: location!(),
548 })?;
549
550 let mut batches = Vec::new();
551 while let Some(batch) = stream.next().await {
552 batches.push(batch.map_err(|e| Error::IO {
553 source: box_error(std::io::Error::other(format!(
554 "Failed to read batch: {}",
555 e
556 ))),
557 location: location!(),
558 })?);
559 }
560
561 Ok(batches)
562 }
563
564 fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> {
566 let column = batch
567 .column_by_name(column_name)
568 .ok_or_else(|| Error::io(format!("Column '{}' not found", column_name), location!()))?;
569 column
570 .as_any()
571 .downcast_ref::<StringArray>()
572 .ok_or_else(|| {
573 Error::io(
574 format!("Column '{}' is not a string array", column_name),
575 location!(),
576 )
577 })
578 }
579
580 async fn manifest_contains_object(&self, object_id: &str) -> Result<bool> {
582 let filter = format!("object_id = '{}'", object_id);
583
584 let dataset_guard = self.manifest_dataset.get().await?;
585 let mut scanner = dataset_guard.scan();
586
587 scanner.filter(&filter).map_err(|e| Error::IO {
588 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
589 location: location!(),
590 })?;
591
592 scanner.project::<&str>(&[]).map_err(|e| Error::IO {
594 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
595 location: location!(),
596 })?;
597
598 scanner.with_row_id();
599
600 let count = scanner.count_rows().await.map_err(|e| Error::IO {
601 source: box_error(std::io::Error::other(format!(
602 "Failed to count rows: {}",
603 e
604 ))),
605 location: location!(),
606 })?;
607
608 Ok(count > 0)
609 }
610
611 async fn query_manifest_for_table(&self, object_id: &str) -> Result<Option<TableInfo>> {
613 let filter = format!("object_id = '{}' AND object_type = 'table'", object_id);
614 let mut scanner = self.manifest_scanner().await?;
615 scanner.filter(&filter).map_err(|e| Error::IO {
616 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
617 location: location!(),
618 })?;
619 scanner
620 .project(&["object_id", "location"])
621 .map_err(|e| Error::IO {
622 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
623 location: location!(),
624 })?;
625 let batches = Self::execute_scanner(scanner).await?;
626
627 let mut found_result: Option<TableInfo> = None;
628 let mut total_rows = 0;
629
630 for batch in batches {
631 if batch.num_rows() == 0 {
632 continue;
633 }
634
635 total_rows += batch.num_rows();
636 if total_rows > 1 {
637 return Err(Error::io(
638 format!(
639 "Expected exactly 1 table with id '{}', found {}",
640 object_id, total_rows
641 ),
642 location!(),
643 ));
644 }
645
646 let object_id_array = Self::get_string_column(&batch, "object_id")?;
647 let location_array = Self::get_string_column(&batch, "location")?;
648 let location = location_array.value(0).to_string();
649 let (namespace, name) = Self::parse_object_id(object_id_array.value(0));
650 found_result = Some(TableInfo {
651 namespace,
652 name,
653 location,
654 });
655 }
656
657 Ok(found_result)
658 }
659
660 pub async fn list_manifest_table_locations(&self) -> Result<std::collections::HashSet<String>> {
663 let filter = "object_type = 'table' AND NOT contains(object_id, '$')";
664 let mut scanner = self.manifest_scanner().await?;
665 scanner.filter(filter).map_err(|e| Error::IO {
666 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
667 location: location!(),
668 })?;
669 scanner.project(&["location"]).map_err(|e| Error::IO {
670 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
671 location: location!(),
672 })?;
673
674 let batches = Self::execute_scanner(scanner).await?;
675 let mut locations = std::collections::HashSet::new();
676
677 for batch in batches {
678 if batch.num_rows() == 0 {
679 continue;
680 }
681 let location_array = Self::get_string_column(&batch, "location")?;
682 for i in 0..location_array.len() {
683 locations.insert(location_array.value(i).to_string());
684 }
685 }
686
687 Ok(locations)
688 }
689
690 async fn insert_into_manifest(
692 &self,
693 object_id: String,
694 object_type: ObjectType,
695 location: Option<String>,
696 ) -> Result<()> {
697 self.insert_into_manifest_with_metadata(object_id, object_type, location, None, None)
698 .await
699 }
700
701 async fn insert_into_manifest_with_metadata(
703 &self,
704 object_id: String,
705 object_type: ObjectType,
706 location: Option<String>,
707 metadata: Option<String>,
708 base_objects: Option<Vec<String>>,
709 ) -> Result<()> {
710 use arrow::array::builder::{ListBuilder, StringBuilder};
711
712 let schema = Self::manifest_schema();
713
714 let string_builder = StringBuilder::new();
716 let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
717 "object_id",
718 DataType::Utf8,
719 true,
720 )));
721
722 match base_objects {
723 Some(objects) => {
724 for obj in objects {
725 list_builder.values().append_value(obj);
726 }
727 list_builder.append(true);
728 }
729 None => {
730 list_builder.append_null();
731 }
732 }
733
734 let base_objects_array = list_builder.finish();
735
736 let location_array = match location {
738 Some(loc) => Arc::new(StringArray::from(vec![Some(loc)])),
739 None => Arc::new(StringArray::from(vec![None::<String>])),
740 };
741
742 let metadata_array = match metadata {
743 Some(meta) => Arc::new(StringArray::from(vec![Some(meta)])),
744 None => Arc::new(StringArray::from(vec![None::<String>])),
745 };
746
747 let batch = RecordBatch::try_new(
748 schema.clone(),
749 vec![
750 Arc::new(StringArray::from(vec![object_id.as_str()])),
751 Arc::new(StringArray::from(vec![object_type.as_str()])),
752 location_array,
753 metadata_array,
754 Arc::new(base_objects_array),
755 ],
756 )
757 .map_err(|e| {
758 Error::io(
759 format!("Failed to create manifest entry: {}", e),
760 location!(),
761 )
762 })?;
763
764 let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
765
766 let dataset_guard = self.manifest_dataset.get().await?;
768 let dataset_arc = Arc::new(dataset_guard.clone());
769 drop(dataset_guard); let mut merge_builder =
772 lance::dataset::MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()])
773 .map_err(|e| Error::IO {
774 source: box_error(std::io::Error::other(format!(
775 "Failed to create merge builder: {}",
776 e
777 ))),
778 location: location!(),
779 })?;
780
781 merge_builder.when_matched(lance::dataset::WhenMatched::Fail);
782 merge_builder.when_not_matched(lance::dataset::WhenNotMatched::InsertAll);
783
784 let (new_dataset_arc, _merge_stats) = merge_builder
785 .try_build()
786 .map_err(|e| Error::IO {
787 source: box_error(std::io::Error::other(format!(
788 "Failed to build merge: {}",
789 e
790 ))),
791 location: location!(),
792 })?
793 .execute_reader(Box::new(reader))
794 .await
795 .map_err(|e| {
796 let error_msg = e.to_string();
798 if error_msg.contains("matched")
799 || error_msg.contains("duplicate")
800 || error_msg.contains("already exists")
801 {
802 Error::io(
803 format!("Object with id '{}' already exists in manifest", object_id),
804 location!(),
805 )
806 } else {
807 Error::IO {
808 source: box_error(std::io::Error::other(format!(
809 "Failed to execute merge: {}",
810 e
811 ))),
812 location: location!(),
813 }
814 }
815 })?;
816
817 let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
818 self.manifest_dataset.set_latest(new_dataset).await;
819
820 if let Err(e) = self.run_inline_optimization().await {
822 log::warn!(
823 "Unexpected failure when running inline optimization: {:?}",
824 e
825 );
826 }
827
828 Ok(())
829 }
830
831 pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> {
833 {
834 let predicate = format!("object_id = '{}'", object_id);
835 let mut dataset_guard = self.manifest_dataset.get_mut().await?;
836 dataset_guard
837 .delete(&predicate)
838 .await
839 .map_err(|e| Error::IO {
840 source: box_error(std::io::Error::other(format!("Failed to delete: {}", e))),
841 location: location!(),
842 })?;
843 } self.manifest_dataset.reload().await?;
846
847 if let Err(e) = self.run_inline_optimization().await {
849 log::warn!(
850 "Unexpected failure when running inline optimization: {:?}",
851 e
852 );
853 }
854
855 Ok(())
856 }
857
858 pub async fn register_table(&self, name: &str, location: String) -> Result<()> {
860 let object_id = Self::build_object_id(&[], name);
861 if self.manifest_contains_object(&object_id).await? {
862 return Err(Error::io(
863 format!("Table '{}' already exists", name),
864 location!(),
865 ));
866 }
867
868 self.insert_into_manifest(object_id, ObjectType::Table, Some(location))
869 .await
870 }
871
872 async fn validate_namespace_levels_exist(&self, namespace_path: &[String]) -> Result<()> {
874 for i in 1..=namespace_path.len() {
875 let partial_path = &namespace_path[..i];
876 let object_id = partial_path.join(DELIMITER);
877 if !self.manifest_contains_object(&object_id).await? {
878 return Err(Error::Namespace {
879 source: format!("Parent namespace '{}' does not exist", object_id).into(),
880 location: location!(),
881 });
882 }
883 }
884 Ok(())
885 }
886
887 async fn query_manifest_for_namespace(&self, object_id: &str) -> Result<Option<NamespaceInfo>> {
889 let filter = format!("object_id = '{}' AND object_type = 'namespace'", object_id);
890 let mut scanner = self.manifest_scanner().await?;
891 scanner.filter(&filter).map_err(|e| Error::IO {
892 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
893 location: location!(),
894 })?;
895 scanner
896 .project(&["object_id", "metadata"])
897 .map_err(|e| Error::IO {
898 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
899 location: location!(),
900 })?;
901 let batches = Self::execute_scanner(scanner).await?;
902
903 let mut found_result: Option<NamespaceInfo> = None;
904 let mut total_rows = 0;
905
906 for batch in batches {
907 if batch.num_rows() == 0 {
908 continue;
909 }
910
911 total_rows += batch.num_rows();
912 if total_rows > 1 {
913 return Err(Error::io(
914 format!(
915 "Expected exactly 1 namespace with id '{}', found {}",
916 object_id, total_rows
917 ),
918 location!(),
919 ));
920 }
921
922 let object_id_array = Self::get_string_column(&batch, "object_id")?;
923 let metadata_array = Self::get_string_column(&batch, "metadata")?;
924
925 let object_id_str = object_id_array.value(0);
926 let metadata = if !metadata_array.is_null(0) {
927 let metadata_str = metadata_array.value(0);
928 match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
929 Ok(map) => Some(map),
930 Err(e) => {
931 return Err(Error::io(
932 format!(
933 "Failed to deserialize metadata for namespace '{}': {}",
934 object_id, e
935 ),
936 location!(),
937 ));
938 }
939 }
940 } else {
941 None
942 };
943
944 let (namespace, name) = Self::parse_object_id(object_id_str);
945 found_result = Some(NamespaceInfo {
946 namespace,
947 name,
948 metadata,
949 });
950 }
951
952 Ok(found_result)
953 }
954
955 async fn create_or_get_manifest(
957 root: &str,
958 storage_options: &Option<HashMap<String, String>>,
959 session: Option<Arc<Session>>,
960 ) -> Result<DatasetConsistencyWrapper> {
961 let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME);
962 log::debug!("Attempting to load manifest from {}", manifest_path);
963 let mut builder = DatasetBuilder::from_uri(&manifest_path);
964
965 if let Some(sess) = session.clone() {
966 builder = builder.with_session(sess);
967 }
968
969 if let Some(opts) = storage_options {
970 builder = builder.with_storage_options(opts.clone());
971 }
972
973 let dataset_result = builder.load().await;
974 if let Ok(dataset) = dataset_result {
975 Ok(DatasetConsistencyWrapper::new(dataset))
976 } else {
977 log::info!("Creating new manifest table at {}", manifest_path);
978 let schema = Self::manifest_schema();
979 let empty_batch = RecordBatch::new_empty(schema.clone());
980 let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone());
981
982 let write_params = WriteParams {
983 session,
984 store_params: storage_options.as_ref().map(|opts| ObjectStoreParams {
985 storage_options_accessor: Some(Arc::new(
986 lance_io::object_store::StorageOptionsAccessor::with_static_options(
987 opts.clone(),
988 ),
989 )),
990 ..Default::default()
991 }),
992 ..Default::default()
993 };
994
995 let dataset = Dataset::write(Box::new(reader), &manifest_path, Some(write_params))
996 .await
997 .map_err(|e| Error::IO {
998 source: box_error(std::io::Error::other(format!(
999 "Failed to create manifest dataset: {}",
1000 e
1001 ))),
1002 location: location!(),
1003 })?;
1004
1005 log::info!(
1006 "Successfully created manifest table at {}, version={}, uri={}",
1007 manifest_path,
1008 dataset.version().version,
1009 dataset.uri()
1010 );
1011 Ok(DatasetConsistencyWrapper::new(dataset))
1012 }
1013 }
1014}
1015
1016#[async_trait]
1017impl LanceNamespace for ManifestNamespace {
1018 fn namespace_id(&self) -> String {
1019 self.root.clone()
1020 }
1021
1022 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1023 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1024 source: "Namespace ID is required".into(),
1025 location: location!(),
1026 })?;
1027
1028 let filter = if namespace_id.is_empty() {
1030 "object_type = 'table' AND NOT contains(object_id, '$')".to_string()
1032 } else {
1033 let prefix = namespace_id.join(DELIMITER);
1035 format!(
1036 "object_type = 'table' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1037 prefix, DELIMITER, prefix.len() + 2
1038 )
1039 };
1040
1041 let mut scanner = self.manifest_scanner().await?;
1042 scanner.filter(&filter).map_err(|e| Error::IO {
1043 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1044 location: location!(),
1045 })?;
1046 scanner.project(&["object_id"]).map_err(|e| Error::IO {
1047 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1048 location: location!(),
1049 })?;
1050
1051 let batches = Self::execute_scanner(scanner).await?;
1052
1053 let mut tables = Vec::new();
1054 for batch in batches {
1055 if batch.num_rows() == 0 {
1056 continue;
1057 }
1058
1059 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1060 for i in 0..batch.num_rows() {
1061 let object_id = object_id_array.value(i);
1062 let (_namespace, name) = Self::parse_object_id(object_id);
1063 tables.push(name);
1064 }
1065 }
1066
1067 Ok(ListTablesResponse::new(tables))
1068 }
1069
1070 async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1071 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1072 source: "Table ID is required".into(),
1073 location: location!(),
1074 })?;
1075
1076 if table_id.is_empty() {
1077 return Err(Error::InvalidInput {
1078 source: "Table ID cannot be empty".into(),
1079 location: location!(),
1080 });
1081 }
1082
1083 let object_id = Self::str_object_id(table_id);
1084 let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
1085
1086 let table_name = table_id.last().cloned().unwrap_or_default();
1088 let namespace_id: Vec<String> = if table_id.len() > 1 {
1089 table_id[..table_id.len() - 1].to_vec()
1090 } else {
1091 vec![]
1092 };
1093
1094 let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
1095 let vend_credentials = request.vend_credentials.unwrap_or(true);
1097
1098 match table_info {
1099 Some(info) => {
1100 let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1102
1103 let storage_options = if vend_credentials {
1104 self.storage_options.clone()
1105 } else {
1106 None
1107 };
1108
1109 if !load_detailed_metadata {
1111 return Ok(DescribeTableResponse {
1112 table: Some(table_name),
1113 namespace: Some(namespace_id),
1114 location: Some(table_uri.clone()),
1115 table_uri: Some(table_uri),
1116 storage_options,
1117 ..Default::default()
1118 });
1119 }
1120
1121 match Dataset::open(&table_uri).await {
1123 Ok(mut dataset) => {
1124 if let Some(requested_version) = request.version {
1126 dataset = dataset.checkout_version(requested_version as u64).await?;
1127 }
1128
1129 let version = dataset.version().version;
1130 let lance_schema = dataset.schema();
1131 let arrow_schema: arrow_schema::Schema = lance_schema.into();
1132 let json_schema = arrow_schema_to_json(&arrow_schema)?;
1133
1134 Ok(DescribeTableResponse {
1135 table: Some(table_name.clone()),
1136 namespace: Some(namespace_id.clone()),
1137 version: Some(version as i64),
1138 location: Some(table_uri.clone()),
1139 table_uri: Some(table_uri),
1140 schema: Some(Box::new(json_schema)),
1141 storage_options,
1142 ..Default::default()
1143 })
1144 }
1145 Err(_) => {
1146 Ok(DescribeTableResponse {
1148 table: Some(table_name),
1149 namespace: Some(namespace_id),
1150 location: Some(table_uri.clone()),
1151 table_uri: Some(table_uri),
1152 storage_options,
1153 ..Default::default()
1154 })
1155 }
1156 }
1157 }
1158 None => Err(Error::Namespace {
1159 source: format!("Table '{}' not found", object_id).into(),
1160 location: location!(),
1161 }),
1162 }
1163 }
1164
1165 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1166 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1167 source: "Table ID is required".into(),
1168 location: location!(),
1169 })?;
1170
1171 if table_id.is_empty() {
1172 return Err(Error::InvalidInput {
1173 source: "Table ID cannot be empty".into(),
1174 location: location!(),
1175 });
1176 }
1177
1178 let (namespace, table_name) = Self::split_object_id(table_id);
1179 let object_id = Self::build_object_id(&namespace, &table_name);
1180 let exists = self.manifest_contains_object(&object_id).await?;
1181 if exists {
1182 Ok(())
1183 } else {
1184 Err(Error::Namespace {
1185 source: format!("Table '{}' not found", table_name).into(),
1186 location: location!(),
1187 })
1188 }
1189 }
1190
1191 async fn create_table(
1192 &self,
1193 request: CreateTableRequest,
1194 data: Bytes,
1195 ) -> Result<CreateTableResponse> {
1196 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1197 source: "Table ID is required".into(),
1198 location: location!(),
1199 })?;
1200
1201 if table_id.is_empty() {
1202 return Err(Error::InvalidInput {
1203 source: "Table ID cannot be empty".into(),
1204 location: location!(),
1205 });
1206 }
1207
1208 let (namespace, table_name) = Self::split_object_id(table_id);
1209 let object_id = Self::build_object_id(&namespace, &table_name);
1210
1211 if self.manifest_contains_object(&object_id).await? {
1213 return Err(Error::io(
1214 format!("Table '{}' already exists", table_name),
1215 location!(),
1216 ));
1217 }
1218
1219 let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1223 format!("{}.lance", table_name)
1225 } else {
1226 Self::generate_dir_name(&object_id)
1228 };
1229 let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1230
1231 if data.is_empty() {
1233 return Err(Error::Namespace {
1234 source: "Request data (Arrow IPC stream) is required for create_table".into(),
1235 location: location!(),
1236 });
1237 }
1238
1239 let cursor = Cursor::new(data.to_vec());
1241 let stream_reader = StreamReader::try_new(cursor, None)
1242 .map_err(|e| Error::io(format!("Failed to read IPC stream: {}", e), location!()))?;
1243
1244 let batches: Vec<RecordBatch> =
1245 stream_reader
1246 .collect::<std::result::Result<Vec<_>, _>>()
1247 .map_err(|e| Error::io(format!("Failed to collect batches: {}", e), location!()))?;
1248
1249 if batches.is_empty() {
1250 return Err(Error::io(
1251 "No data provided for table creation",
1252 location!(),
1253 ));
1254 }
1255
1256 let schema = batches[0].schema();
1257 let batch_results: Vec<std::result::Result<RecordBatch, arrow_schema::ArrowError>> =
1258 batches.into_iter().map(Ok).collect();
1259 let reader = RecordBatchIterator::new(batch_results, schema);
1260
1261 let write_params = WriteParams::default();
1262 let _dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params))
1263 .await
1264 .map_err(|e| Error::IO {
1265 source: box_error(std::io::Error::other(format!(
1266 "Failed to write dataset: {}",
1267 e
1268 ))),
1269 location: location!(),
1270 })?;
1271
1272 self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1274 .await?;
1275
1276 Ok(CreateTableResponse {
1277 version: Some(1),
1278 location: Some(table_uri),
1279 storage_options: self.storage_options.clone(),
1280 ..Default::default()
1281 })
1282 }
1283
1284 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1285 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1286 source: "Table ID is required".into(),
1287 location: location!(),
1288 })?;
1289
1290 if table_id.is_empty() {
1291 return Err(Error::InvalidInput {
1292 source: "Table ID cannot be empty".into(),
1293 location: location!(),
1294 });
1295 }
1296
1297 let (namespace, table_name) = Self::split_object_id(table_id);
1298 let object_id = Self::build_object_id(&namespace, &table_name);
1299
1300 let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
1302
1303 match table_info {
1304 Some(info) => {
1305 self.delete_from_manifest(&object_id).boxed().await?;
1307
1308 let table_path = self.base_path.child(info.location.as_str());
1310 let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1311
1312 self.object_store
1314 .remove_dir_all(table_path)
1315 .boxed()
1316 .await
1317 .map_err(|e| Error::Namespace {
1318 source: format!("Failed to delete table directory: {}", e).into(),
1319 location: location!(),
1320 })?;
1321
1322 Ok(DropTableResponse {
1323 id: request.id.clone(),
1324 location: Some(table_uri),
1325 ..Default::default()
1326 })
1327 }
1328 None => Err(Error::Namespace {
1329 source: format!("Table '{}' not found", table_name).into(),
1330 location: location!(),
1331 }),
1332 }
1333 }
1334
1335 async fn list_namespaces(
1336 &self,
1337 request: ListNamespacesRequest,
1338 ) -> Result<ListNamespacesResponse> {
1339 let parent_namespace = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1340 source: "Namespace ID is required".into(),
1341 location: location!(),
1342 })?;
1343
1344 let filter = if parent_namespace.is_empty() {
1346 "object_type = 'namespace' AND NOT contains(object_id, '$')".to_string()
1348 } else {
1349 let prefix = parent_namespace.join(DELIMITER);
1351 format!(
1352 "object_type = 'namespace' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1353 prefix, DELIMITER, prefix.len() + 2
1354 )
1355 };
1356
1357 let mut scanner = self.manifest_scanner().await?;
1358 scanner.filter(&filter).map_err(|e| Error::IO {
1359 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1360 location: location!(),
1361 })?;
1362 scanner.project(&["object_id"]).map_err(|e| Error::IO {
1363 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1364 location: location!(),
1365 })?;
1366
1367 let batches = Self::execute_scanner(scanner).await?;
1368 let mut namespaces = Vec::new();
1369
1370 for batch in batches {
1371 if batch.num_rows() == 0 {
1372 continue;
1373 }
1374
1375 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1376 for i in 0..batch.num_rows() {
1377 let object_id = object_id_array.value(i);
1378 let (_namespace, name) = Self::parse_object_id(object_id);
1379 namespaces.push(name);
1380 }
1381 }
1382
1383 Ok(ListNamespacesResponse::new(namespaces))
1384 }
1385
1386 async fn describe_namespace(
1387 &self,
1388 request: DescribeNamespaceRequest,
1389 ) -> Result<DescribeNamespaceResponse> {
1390 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1391 source: "Namespace ID is required".into(),
1392 location: location!(),
1393 })?;
1394
1395 if namespace_id.is_empty() {
1397 #[allow(clippy::needless_update)]
1398 return Ok(DescribeNamespaceResponse {
1399 properties: Some(HashMap::new()),
1400 ..Default::default()
1401 });
1402 }
1403
1404 let object_id = namespace_id.join(DELIMITER);
1406 let namespace_info = self.query_manifest_for_namespace(&object_id).await?;
1407
1408 match namespace_info {
1409 #[allow(clippy::needless_update)]
1410 Some(info) => Ok(DescribeNamespaceResponse {
1411 properties: info.metadata,
1412 ..Default::default()
1413 }),
1414 None => Err(Error::Namespace {
1415 source: format!("Namespace '{}' not found", object_id).into(),
1416 location: location!(),
1417 }),
1418 }
1419 }
1420
1421 async fn create_namespace(
1422 &self,
1423 request: CreateNamespaceRequest,
1424 ) -> Result<CreateNamespaceResponse> {
1425 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1426 source: "Namespace ID is required".into(),
1427 location: location!(),
1428 })?;
1429
1430 if namespace_id.is_empty() {
1432 return Err(Error::Namespace {
1433 source: "Root namespace already exists and cannot be created".into(),
1434 location: location!(),
1435 });
1436 }
1437
1438 if namespace_id.len() > 1 {
1440 self.validate_namespace_levels_exist(&namespace_id[..namespace_id.len() - 1])
1441 .await?;
1442 }
1443
1444 let object_id = namespace_id.join(DELIMITER);
1445 if self.manifest_contains_object(&object_id).await? {
1446 return Err(Error::Namespace {
1447 source: format!("Namespace '{}' already exists", object_id).into(),
1448 location: location!(),
1449 });
1450 }
1451
1452 let metadata = request.properties.as_ref().and_then(|props| {
1454 if props.is_empty() {
1455 None
1456 } else {
1457 Some(serde_json::to_string(props).ok()?)
1458 }
1459 });
1460
1461 self.insert_into_manifest_with_metadata(
1462 object_id,
1463 ObjectType::Namespace,
1464 None,
1465 metadata,
1466 None,
1467 )
1468 .await?;
1469
1470 Ok(CreateNamespaceResponse {
1471 properties: request.properties,
1472 ..Default::default()
1473 })
1474 }
1475
1476 async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
1477 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1478 source: "Namespace ID is required".into(),
1479 location: location!(),
1480 })?;
1481
1482 if namespace_id.is_empty() {
1484 return Err(Error::Namespace {
1485 source: "Root namespace cannot be dropped".into(),
1486 location: location!(),
1487 });
1488 }
1489
1490 let object_id = namespace_id.join(DELIMITER);
1491
1492 if !self.manifest_contains_object(&object_id).boxed().await? {
1494 return Err(Error::Namespace {
1495 source: format!("Namespace '{}' not found", object_id).into(),
1496 location: location!(),
1497 });
1498 }
1499
1500 let prefix = format!("{}{}", object_id, DELIMITER);
1502 let filter = format!("starts_with(object_id, '{}')", prefix);
1503 let mut scanner = self.manifest_scanner().boxed().await?;
1504 scanner.filter(&filter).map_err(|e| Error::IO {
1505 source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
1506 location: location!(),
1507 })?;
1508 scanner.project::<&str>(&[]).map_err(|e| Error::IO {
1509 source: box_error(std::io::Error::other(format!("Failed to project: {}", e))),
1510 location: location!(),
1511 })?;
1512 scanner.with_row_id();
1513 let count = scanner.count_rows().boxed().await.map_err(|e| Error::IO {
1514 source: box_error(std::io::Error::other(format!(
1515 "Failed to count rows: {}",
1516 e
1517 ))),
1518 location: location!(),
1519 })?;
1520
1521 if count > 0 {
1522 return Err(Error::Namespace {
1523 source: format!(
1524 "Namespace '{}' is not empty (contains {} child objects)",
1525 object_id, count
1526 )
1527 .into(),
1528 location: location!(),
1529 });
1530 }
1531
1532 self.delete_from_manifest(&object_id).boxed().await?;
1533
1534 Ok(DropNamespaceResponse::default())
1535 }
1536
1537 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
1538 let namespace_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1539 source: "Namespace ID is required".into(),
1540 location: location!(),
1541 })?;
1542
1543 if namespace_id.is_empty() {
1545 return Ok(());
1546 }
1547
1548 let object_id = namespace_id.join(DELIMITER);
1549 if self.manifest_contains_object(&object_id).await? {
1550 Ok(())
1551 } else {
1552 Err(Error::Namespace {
1553 source: format!("Namespace '{}' not found", object_id).into(),
1554 location: location!(),
1555 })
1556 }
1557 }
1558
1559 async fn create_empty_table(
1560 &self,
1561 request: CreateEmptyTableRequest,
1562 ) -> Result<CreateEmptyTableResponse> {
1563 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1564 source: "Table ID is required".into(),
1565 location: location!(),
1566 })?;
1567
1568 if table_id.is_empty() {
1569 return Err(Error::InvalidInput {
1570 source: "Table ID cannot be empty".into(),
1571 location: location!(),
1572 });
1573 }
1574
1575 let (namespace, table_name) = Self::split_object_id(table_id);
1576 let object_id = Self::build_object_id(&namespace, &table_name);
1577
1578 let existing = self.query_manifest_for_table(&object_id).await?;
1580 if existing.is_some() {
1581 return Err(Error::Namespace {
1582 source: format!("Table '{}' already exists", table_name).into(),
1583 location: location!(),
1584 });
1585 }
1586
1587 let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1591 format!("{}.lance", table_name)
1593 } else {
1594 Self::generate_dir_name(&object_id)
1596 };
1597 let table_path = self.base_path.child(dir_name.as_str());
1598 let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1599
1600 if let Some(req_location) = &request.location {
1602 let req_location = req_location.trim_end_matches('/');
1603 if req_location != table_uri {
1604 return Err(Error::Namespace {
1605 source: format!(
1606 "Cannot create table {} at location {}, must be at location {}",
1607 table_name, req_location, table_uri
1608 )
1609 .into(),
1610 location: location!(),
1611 });
1612 }
1613 }
1614
1615 let reserved_file_path = table_path.child(".lance-reserved");
1617
1618 self.object_store
1619 .create(&reserved_file_path)
1620 .await
1621 .map_err(|e| Error::Namespace {
1622 source: format!(
1623 "Failed to create .lance-reserved file for table {}: {}",
1624 table_name, e
1625 )
1626 .into(),
1627 location: location!(),
1628 })?
1629 .shutdown()
1630 .await
1631 .map_err(|e| Error::Namespace {
1632 source: format!(
1633 "Failed to finalize .lance-reserved file for table {}: {}",
1634 table_name, e
1635 )
1636 .into(),
1637 location: location!(),
1638 })?;
1639
1640 self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1642 .await?;
1643
1644 log::info!(
1645 "Created empty table '{}' in manifest at {}",
1646 table_name,
1647 table_uri
1648 );
1649
1650 let vend_credentials = request.vend_credentials.unwrap_or(true);
1652 let storage_options = if vend_credentials {
1653 self.storage_options.clone()
1654 } else {
1655 None
1656 };
1657
1658 Ok(CreateEmptyTableResponse {
1659 location: Some(table_uri),
1660 storage_options,
1661 ..Default::default()
1662 })
1663 }
1664
1665 async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
1666 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1667 source: "Table ID is required".into(),
1668 location: location!(),
1669 })?;
1670
1671 if table_id.is_empty() {
1672 return Err(Error::InvalidInput {
1673 source: "Table ID cannot be empty".into(),
1674 location: location!(),
1675 });
1676 }
1677
1678 let (namespace, table_name) = Self::split_object_id(table_id);
1679 let object_id = Self::build_object_id(&namespace, &table_name);
1680
1681 let existing = self.query_manifest_for_table(&object_id).await?;
1683 if existing.is_some() {
1684 return Err(Error::Namespace {
1685 source: format!("Table '{}' already exists", table_name).into(),
1686 location: location!(),
1687 });
1688 }
1689
1690 let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1694 format!("{}.lance", table_name)
1696 } else {
1697 Self::generate_dir_name(&object_id)
1699 };
1700 let table_path = self.base_path.child(dir_name.as_str());
1701 let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1702
1703 if let Some(req_location) = &request.location {
1705 let req_location = req_location.trim_end_matches('/');
1706 if req_location != table_uri {
1707 return Err(Error::Namespace {
1708 source: format!(
1709 "Cannot declare table {} at location {}, must be at location {}",
1710 table_name, req_location, table_uri
1711 )
1712 .into(),
1713 location: location!(),
1714 });
1715 }
1716 }
1717
1718 let reserved_file_path = table_path.child(".lance-reserved");
1720
1721 self.object_store
1722 .create(&reserved_file_path)
1723 .await
1724 .map_err(|e| Error::Namespace {
1725 source: format!(
1726 "Failed to create .lance-reserved file for table {}: {}",
1727 table_name, e
1728 )
1729 .into(),
1730 location: location!(),
1731 })?
1732 .shutdown()
1733 .await
1734 .map_err(|e| Error::Namespace {
1735 source: format!(
1736 "Failed to finalize .lance-reserved file for table {}: {}",
1737 table_name, e
1738 )
1739 .into(),
1740 location: location!(),
1741 })?;
1742
1743 self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1745 .await?;
1746
1747 log::info!(
1748 "Declared table '{}' in manifest at {}",
1749 table_name,
1750 table_uri
1751 );
1752
1753 let vend_credentials = request.vend_credentials.unwrap_or(true);
1755 let storage_options = if vend_credentials {
1756 self.storage_options.clone()
1757 } else {
1758 None
1759 };
1760
1761 Ok(DeclareTableResponse {
1762 location: Some(table_uri),
1763 storage_options,
1764 ..Default::default()
1765 })
1766 }
1767
1768 async fn register_table(&self, request: RegisterTableRequest) -> Result<RegisterTableResponse> {
1769 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1770 source: "Table ID is required".into(),
1771 location: location!(),
1772 })?;
1773
1774 if table_id.is_empty() {
1775 return Err(Error::InvalidInput {
1776 source: "Table ID cannot be empty".into(),
1777 location: location!(),
1778 });
1779 }
1780
1781 let location = request.location.clone();
1782
1783 if location.contains("://") {
1786 return Err(Error::InvalidInput {
1787 source: format!(
1788 "Absolute URIs are not allowed for register_table. Location must be a relative path within the root directory: {}",
1789 location
1790 ).into(),
1791 location: location!(),
1792 });
1793 }
1794
1795 if location.starts_with('/') {
1796 return Err(Error::InvalidInput {
1797 source: format!(
1798 "Absolute paths are not allowed for register_table. Location must be a relative path within the root directory: {}",
1799 location
1800 ).into(),
1801 location: location!(),
1802 });
1803 }
1804
1805 if location.contains("..") {
1807 return Err(Error::InvalidInput {
1808 source: format!(
1809 "Path traversal is not allowed. Location must be a relative path within the root directory: {}",
1810 location
1811 ).into(),
1812 location: location!(),
1813 });
1814 }
1815
1816 let (namespace, table_name) = Self::split_object_id(table_id);
1817 let object_id = Self::build_object_id(&namespace, &table_name);
1818
1819 if !namespace.is_empty() {
1821 self.validate_namespace_levels_exist(&namespace).await?;
1822 }
1823
1824 if self.manifest_contains_object(&object_id).await? {
1826 return Err(Error::Namespace {
1827 source: format!("Table '{}' already exists", object_id).into(),
1828 location: location!(),
1829 });
1830 }
1831
1832 self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone()))
1834 .await?;
1835
1836 Ok(RegisterTableResponse {
1837 location: Some(location),
1838 ..Default::default()
1839 })
1840 }
1841
1842 async fn deregister_table(
1843 &self,
1844 request: DeregisterTableRequest,
1845 ) -> Result<DeregisterTableResponse> {
1846 let table_id = request.id.as_ref().ok_or_else(|| Error::InvalidInput {
1847 source: "Table ID is required".into(),
1848 location: location!(),
1849 })?;
1850
1851 if table_id.is_empty() {
1852 return Err(Error::InvalidInput {
1853 source: "Table ID cannot be empty".into(),
1854 location: location!(),
1855 });
1856 }
1857
1858 let (namespace, table_name) = Self::split_object_id(table_id);
1859 let object_id = Self::build_object_id(&namespace, &table_name);
1860
1861 let table_info = self.query_manifest_for_table(&object_id).await?;
1863
1864 let table_uri = match table_info {
1865 Some(info) => {
1866 self.delete_from_manifest(&object_id).boxed().await?;
1868 Self::construct_full_uri(&self.root, &info.location)?
1869 }
1870 None => {
1871 return Err(Error::Namespace {
1872 source: format!("Table '{}' not found", object_id).into(),
1873 location: location!(),
1874 });
1875 }
1876 };
1877
1878 Ok(DeregisterTableResponse {
1879 id: request.id.clone(),
1880 location: Some(table_uri),
1881 ..Default::default()
1882 })
1883 }
1884}
1885
1886#[cfg(test)]
1887mod tests {
1888 use crate::{DirectoryNamespaceBuilder, ManifestNamespace};
1889 use bytes::Bytes;
1890 use lance_core::utils::tempfile::TempStdDir;
1891 use lance_namespace::models::{
1892 CreateTableRequest, DescribeTableRequest, DropTableRequest, ListTablesRequest,
1893 TableExistsRequest,
1894 };
1895 use lance_namespace::LanceNamespace;
1896 use rstest::rstest;
1897
1898 fn create_test_ipc_data() -> Vec<u8> {
1899 use arrow::array::{Int32Array, StringArray};
1900 use arrow::datatypes::{DataType, Field, Schema};
1901 use arrow::ipc::writer::StreamWriter;
1902 use arrow::record_batch::RecordBatch;
1903 use std::sync::Arc;
1904
1905 let schema = Arc::new(Schema::new(vec![
1906 Field::new("id", DataType::Int32, false),
1907 Field::new("name", DataType::Utf8, false),
1908 ]));
1909
1910 let batch = RecordBatch::try_new(
1911 schema.clone(),
1912 vec![
1913 Arc::new(Int32Array::from(vec![1, 2, 3])),
1914 Arc::new(StringArray::from(vec!["a", "b", "c"])),
1915 ],
1916 )
1917 .unwrap();
1918
1919 let mut buffer = Vec::new();
1920 {
1921 let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap();
1922 writer.write(&batch).unwrap();
1923 writer.finish().unwrap();
1924 }
1925 buffer
1926 }
1927
1928 #[rstest]
1929 #[case::with_optimization(true)]
1930 #[case::without_optimization(false)]
1931 #[tokio::test]
1932 async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) {
1933 let temp_dir = TempStdDir::default();
1934 let temp_path = temp_dir.to_str().unwrap();
1935
1936 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1938 .inline_optimization_enabled(inline_optimization)
1939 .build()
1940 .await
1941 .unwrap();
1942
1943 let mut request = ListTablesRequest::new();
1945 request.id = Some(vec![]);
1946 let response = dir_namespace.list_tables(request).await.unwrap();
1947 assert_eq!(response.tables.len(), 0);
1948
1949 let buffer = create_test_ipc_data();
1951 let mut create_request = CreateTableRequest::new();
1952 create_request.id = Some(vec!["test_table".to_string()]);
1953
1954 let _response = dir_namespace
1955 .create_table(create_request, Bytes::from(buffer))
1956 .await
1957 .unwrap();
1958
1959 let mut request = ListTablesRequest::new();
1961 request.id = Some(vec![]);
1962 let response = dir_namespace.list_tables(request).await.unwrap();
1963 assert_eq!(response.tables.len(), 1);
1964 assert_eq!(response.tables[0], "test_table");
1965 }
1966
1967 #[rstest]
1968 #[case::with_optimization(true)]
1969 #[case::without_optimization(false)]
1970 #[tokio::test]
1971 async fn test_manifest_namespace_table_exists(#[case] inline_optimization: bool) {
1972 let temp_dir = TempStdDir::default();
1973 let temp_path = temp_dir.to_str().unwrap();
1974
1975 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1976 .inline_optimization_enabled(inline_optimization)
1977 .build()
1978 .await
1979 .unwrap();
1980
1981 let mut request = TableExistsRequest::new();
1983 request.id = Some(vec!["nonexistent".to_string()]);
1984 let result = dir_namespace.table_exists(request).await;
1985 assert!(result.is_err());
1986
1987 let buffer = create_test_ipc_data();
1989 let mut create_request = CreateTableRequest::new();
1990 create_request.id = Some(vec!["test_table".to_string()]);
1991 dir_namespace
1992 .create_table(create_request, Bytes::from(buffer))
1993 .await
1994 .unwrap();
1995
1996 let mut request = TableExistsRequest::new();
1998 request.id = Some(vec!["test_table".to_string()]);
1999 let result = dir_namespace.table_exists(request).await;
2000 assert!(result.is_ok());
2001 }
2002
2003 #[rstest]
2004 #[case::with_optimization(true)]
2005 #[case::without_optimization(false)]
2006 #[tokio::test]
2007 async fn test_manifest_namespace_describe_table(#[case] inline_optimization: bool) {
2008 let temp_dir = TempStdDir::default();
2009 let temp_path = temp_dir.to_str().unwrap();
2010
2011 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2012 .inline_optimization_enabled(inline_optimization)
2013 .build()
2014 .await
2015 .unwrap();
2016
2017 let mut request = DescribeTableRequest::new();
2019 request.id = Some(vec!["nonexistent".to_string()]);
2020 let result = dir_namespace.describe_table(request).await;
2021 assert!(result.is_err());
2022
2023 let buffer = create_test_ipc_data();
2025 let mut create_request = CreateTableRequest::new();
2026 create_request.id = Some(vec!["test_table".to_string()]);
2027 dir_namespace
2028 .create_table(create_request, Bytes::from(buffer))
2029 .await
2030 .unwrap();
2031
2032 let mut request = DescribeTableRequest::new();
2034 request.id = Some(vec!["test_table".to_string()]);
2035 let response = dir_namespace.describe_table(request).await.unwrap();
2036 assert!(response.location.is_some());
2037 assert!(response.location.unwrap().contains("test_table"));
2038 }
2039
2040 #[rstest]
2041 #[case::with_optimization(true)]
2042 #[case::without_optimization(false)]
2043 #[tokio::test]
2044 async fn test_manifest_namespace_drop_table(#[case] inline_optimization: bool) {
2045 let temp_dir = TempStdDir::default();
2046 let temp_path = temp_dir.to_str().unwrap();
2047
2048 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2049 .inline_optimization_enabled(inline_optimization)
2050 .build()
2051 .await
2052 .unwrap();
2053
2054 let buffer = create_test_ipc_data();
2056 let mut create_request = CreateTableRequest::new();
2057 create_request.id = Some(vec!["test_table".to_string()]);
2058 dir_namespace
2059 .create_table(create_request, Bytes::from(buffer))
2060 .await
2061 .unwrap();
2062
2063 let mut request = ListTablesRequest::new();
2065 request.id = Some(vec![]);
2066 let response = dir_namespace.list_tables(request).await.unwrap();
2067 assert_eq!(response.tables.len(), 1);
2068
2069 let mut drop_request = DropTableRequest::new();
2071 drop_request.id = Some(vec!["test_table".to_string()]);
2072 let _response = dir_namespace.drop_table(drop_request).await.unwrap();
2073
2074 let mut request = ListTablesRequest::new();
2076 request.id = Some(vec![]);
2077 let response = dir_namespace.list_tables(request).await.unwrap();
2078 assert_eq!(response.tables.len(), 0);
2079 }
2080
2081 #[rstest]
2082 #[case::with_optimization(true)]
2083 #[case::without_optimization(false)]
2084 #[tokio::test]
2085 async fn test_manifest_namespace_multiple_tables(#[case] inline_optimization: bool) {
2086 let temp_dir = TempStdDir::default();
2087 let temp_path = temp_dir.to_str().unwrap();
2088
2089 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2090 .inline_optimization_enabled(inline_optimization)
2091 .build()
2092 .await
2093 .unwrap();
2094
2095 let buffer = create_test_ipc_data();
2097 for i in 1..=3 {
2098 let mut create_request = CreateTableRequest::new();
2099 create_request.id = Some(vec![format!("table{}", i)]);
2100 dir_namespace
2101 .create_table(create_request, Bytes::from(buffer.clone()))
2102 .await
2103 .unwrap();
2104 }
2105
2106 let mut request = ListTablesRequest::new();
2108 request.id = Some(vec![]);
2109 let response = dir_namespace.list_tables(request).await.unwrap();
2110 assert_eq!(response.tables.len(), 3);
2111 assert!(response.tables.contains(&"table1".to_string()));
2112 assert!(response.tables.contains(&"table2".to_string()));
2113 assert!(response.tables.contains(&"table3".to_string()));
2114 }
2115
2116 #[rstest]
2117 #[case::with_optimization(true)]
2118 #[case::without_optimization(false)]
2119 #[tokio::test]
2120 async fn test_directory_only_mode(#[case] inline_optimization: bool) {
2121 let temp_dir = TempStdDir::default();
2122 let temp_path = temp_dir.to_str().unwrap();
2123
2124 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2126 .manifest_enabled(false)
2127 .inline_optimization_enabled(inline_optimization)
2128 .build()
2129 .await
2130 .unwrap();
2131
2132 let mut request = ListTablesRequest::new();
2134 request.id = Some(vec![]);
2135 let response = dir_namespace.list_tables(request).await.unwrap();
2136 assert_eq!(response.tables.len(), 0);
2137
2138 let buffer = create_test_ipc_data();
2140 let mut create_request = CreateTableRequest::new();
2141 create_request.id = Some(vec!["test_table".to_string()]);
2142
2143 let _response = dir_namespace
2145 .create_table(create_request, Bytes::from(buffer))
2146 .await
2147 .unwrap();
2148
2149 let mut request = ListTablesRequest::new();
2151 request.id = Some(vec![]);
2152 let response = dir_namespace.list_tables(request).await.unwrap();
2153 assert_eq!(response.tables.len(), 1);
2154 assert_eq!(response.tables[0], "test_table");
2155 }
2156
2157 #[rstest]
2158 #[case::with_optimization(true)]
2159 #[case::without_optimization(false)]
2160 #[tokio::test]
2161 async fn test_dual_mode_merge(#[case] inline_optimization: bool) {
2162 let temp_dir = TempStdDir::default();
2163 let temp_path = temp_dir.to_str().unwrap();
2164
2165 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2167 .manifest_enabled(true)
2168 .dir_listing_enabled(true)
2169 .inline_optimization_enabled(inline_optimization)
2170 .build()
2171 .await
2172 .unwrap();
2173
2174 let buffer = create_test_ipc_data();
2176 let mut create_request = CreateTableRequest::new();
2177 create_request.id = Some(vec!["table1".to_string()]);
2178 dir_namespace
2179 .create_table(create_request, Bytes::from(buffer))
2180 .await
2181 .unwrap();
2182
2183 let mut request = ListTablesRequest::new();
2185 request.id = Some(vec![]);
2186 let response = dir_namespace.list_tables(request).await.unwrap();
2187 assert_eq!(response.tables.len(), 1);
2188 assert_eq!(response.tables[0], "table1");
2189 }
2190
2191 #[rstest]
2192 #[case::with_optimization(true)]
2193 #[case::without_optimization(false)]
2194 #[tokio::test]
2195 async fn test_manifest_only_mode(#[case] inline_optimization: bool) {
2196 let temp_dir = TempStdDir::default();
2197 let temp_path = temp_dir.to_str().unwrap();
2198
2199 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2201 .manifest_enabled(true)
2202 .dir_listing_enabled(false)
2203 .inline_optimization_enabled(inline_optimization)
2204 .build()
2205 .await
2206 .unwrap();
2207
2208 let buffer = create_test_ipc_data();
2210 let mut create_request = CreateTableRequest::new();
2211 create_request.id = Some(vec!["test_table".to_string()]);
2212 dir_namespace
2213 .create_table(create_request, Bytes::from(buffer))
2214 .await
2215 .unwrap();
2216
2217 let mut request = ListTablesRequest::new();
2219 request.id = Some(vec![]);
2220 let response = dir_namespace.list_tables(request).await.unwrap();
2221 assert_eq!(response.tables.len(), 1);
2222 assert_eq!(response.tables[0], "test_table");
2223 }
2224
2225 #[rstest]
2226 #[case::with_optimization(true)]
2227 #[case::without_optimization(false)]
2228 #[tokio::test]
2229 async fn test_drop_nonexistent_table(#[case] inline_optimization: bool) {
2230 let temp_dir = TempStdDir::default();
2231 let temp_path = temp_dir.to_str().unwrap();
2232
2233 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2234 .inline_optimization_enabled(inline_optimization)
2235 .build()
2236 .await
2237 .unwrap();
2238
2239 let mut drop_request = DropTableRequest::new();
2241 drop_request.id = Some(vec!["nonexistent".to_string()]);
2242 let result = dir_namespace.drop_table(drop_request).await;
2243 assert!(result.is_err());
2244 }
2245
2246 #[rstest]
2247 #[case::with_optimization(true)]
2248 #[case::without_optimization(false)]
2249 #[tokio::test]
2250 async fn test_create_duplicate_table_fails(#[case] inline_optimization: bool) {
2251 let temp_dir = TempStdDir::default();
2252 let temp_path = temp_dir.to_str().unwrap();
2253
2254 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2255 .inline_optimization_enabled(inline_optimization)
2256 .build()
2257 .await
2258 .unwrap();
2259
2260 let buffer = create_test_ipc_data();
2262 let mut create_request = CreateTableRequest::new();
2263 create_request.id = Some(vec!["test_table".to_string()]);
2264 dir_namespace
2265 .create_table(create_request, Bytes::from(buffer.clone()))
2266 .await
2267 .unwrap();
2268
2269 let mut create_request = CreateTableRequest::new();
2271 create_request.id = Some(vec!["test_table".to_string()]);
2272 let result = dir_namespace
2273 .create_table(create_request, Bytes::from(buffer))
2274 .await;
2275 assert!(result.is_err());
2276 }
2277
2278 #[rstest]
2279 #[case::with_optimization(true)]
2280 #[case::without_optimization(false)]
2281 #[tokio::test]
2282 async fn test_create_child_namespace(#[case] inline_optimization: bool) {
2283 use lance_namespace::models::{
2284 CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2285 };
2286
2287 let temp_dir = TempStdDir::default();
2288 let temp_path = temp_dir.to_str().unwrap();
2289
2290 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2291 .inline_optimization_enabled(inline_optimization)
2292 .build()
2293 .await
2294 .unwrap();
2295
2296 let mut create_req = CreateNamespaceRequest::new();
2298 create_req.id = Some(vec!["ns1".to_string()]);
2299 let result = dir_namespace.create_namespace(create_req).await;
2300 assert!(
2301 result.is_ok(),
2302 "Failed to create child namespace: {:?}",
2303 result.err()
2304 );
2305
2306 let exists_req = NamespaceExistsRequest {
2308 id: Some(vec!["ns1".to_string()]),
2309 ..Default::default()
2310 };
2311 let result = dir_namespace.namespace_exists(exists_req).await;
2312 assert!(result.is_ok(), "Namespace should exist");
2313
2314 let list_req = ListNamespacesRequest {
2316 id: Some(vec![]),
2317 page_token: None,
2318 limit: None,
2319 ..Default::default()
2320 };
2321 let result = dir_namespace.list_namespaces(list_req).await;
2322 assert!(result.is_ok());
2323 let namespaces = result.unwrap();
2324 assert_eq!(namespaces.namespaces.len(), 1);
2325 assert_eq!(namespaces.namespaces[0], "ns1");
2326 }
2327
2328 #[rstest]
2329 #[case::with_optimization(true)]
2330 #[case::without_optimization(false)]
2331 #[tokio::test]
2332 async fn test_create_nested_namespace(#[case] inline_optimization: bool) {
2333 use lance_namespace::models::{
2334 CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2335 };
2336
2337 let temp_dir = TempStdDir::default();
2338 let temp_path = temp_dir.to_str().unwrap();
2339
2340 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2341 .inline_optimization_enabled(inline_optimization)
2342 .build()
2343 .await
2344 .unwrap();
2345
2346 let mut create_req = CreateNamespaceRequest::new();
2348 create_req.id = Some(vec!["parent".to_string()]);
2349 dir_namespace.create_namespace(create_req).await.unwrap();
2350
2351 let mut create_req = CreateNamespaceRequest::new();
2353 create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2354 let result = dir_namespace.create_namespace(create_req).await;
2355 assert!(
2356 result.is_ok(),
2357 "Failed to create nested namespace: {:?}",
2358 result.err()
2359 );
2360
2361 let exists_req = NamespaceExistsRequest {
2363 id: Some(vec!["parent".to_string(), "child".to_string()]),
2364 ..Default::default()
2365 };
2366 let result = dir_namespace.namespace_exists(exists_req).await;
2367 assert!(result.is_ok(), "Nested namespace should exist");
2368
2369 let list_req = ListNamespacesRequest {
2371 id: Some(vec!["parent".to_string()]),
2372 page_token: None,
2373 limit: None,
2374 ..Default::default()
2375 };
2376 let result = dir_namespace.list_namespaces(list_req).await;
2377 assert!(result.is_ok());
2378 let namespaces = result.unwrap();
2379 assert_eq!(namespaces.namespaces.len(), 1);
2380 assert_eq!(namespaces.namespaces[0], "child");
2381 }
2382
2383 #[rstest]
2384 #[case::with_optimization(true)]
2385 #[case::without_optimization(false)]
2386 #[tokio::test]
2387 async fn test_create_namespace_without_parent_fails(#[case] inline_optimization: bool) {
2388 use lance_namespace::models::CreateNamespaceRequest;
2389
2390 let temp_dir = TempStdDir::default();
2391 let temp_path = temp_dir.to_str().unwrap();
2392
2393 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2394 .inline_optimization_enabled(inline_optimization)
2395 .build()
2396 .await
2397 .unwrap();
2398
2399 let mut create_req = CreateNamespaceRequest::new();
2401 create_req.id = Some(vec!["nonexistent_parent".to_string(), "child".to_string()]);
2402 let result = dir_namespace.create_namespace(create_req).await;
2403 assert!(result.is_err(), "Should fail when parent doesn't exist");
2404 }
2405
2406 #[rstest]
2407 #[case::with_optimization(true)]
2408 #[case::without_optimization(false)]
2409 #[tokio::test]
2410 async fn test_drop_child_namespace(#[case] inline_optimization: bool) {
2411 use lance_namespace::models::{
2412 CreateNamespaceRequest, DropNamespaceRequest, NamespaceExistsRequest,
2413 };
2414
2415 let temp_dir = TempStdDir::default();
2416 let temp_path = temp_dir.to_str().unwrap();
2417
2418 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2419 .inline_optimization_enabled(inline_optimization)
2420 .build()
2421 .await
2422 .unwrap();
2423
2424 let mut create_req = CreateNamespaceRequest::new();
2426 create_req.id = Some(vec!["ns1".to_string()]);
2427 dir_namespace.create_namespace(create_req).await.unwrap();
2428
2429 let mut drop_req = DropNamespaceRequest::new();
2431 drop_req.id = Some(vec!["ns1".to_string()]);
2432 let result = dir_namespace.drop_namespace(drop_req).await;
2433 assert!(
2434 result.is_ok(),
2435 "Failed to drop namespace: {:?}",
2436 result.err()
2437 );
2438
2439 let exists_req = NamespaceExistsRequest {
2441 id: Some(vec!["ns1".to_string()]),
2442 ..Default::default()
2443 };
2444 let result = dir_namespace.namespace_exists(exists_req).await;
2445 assert!(result.is_err(), "Namespace should not exist after drop");
2446 }
2447
2448 #[rstest]
2449 #[case::with_optimization(true)]
2450 #[case::without_optimization(false)]
2451 #[tokio::test]
2452 async fn test_drop_namespace_with_children_fails(#[case] inline_optimization: bool) {
2453 use lance_namespace::models::{CreateNamespaceRequest, DropNamespaceRequest};
2454
2455 let temp_dir = TempStdDir::default();
2456 let temp_path = temp_dir.to_str().unwrap();
2457
2458 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2459 .inline_optimization_enabled(inline_optimization)
2460 .build()
2461 .await
2462 .unwrap();
2463
2464 let mut create_req = CreateNamespaceRequest::new();
2466 create_req.id = Some(vec!["parent".to_string()]);
2467 dir_namespace.create_namespace(create_req).await.unwrap();
2468
2469 let mut create_req = CreateNamespaceRequest::new();
2470 create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2471 dir_namespace.create_namespace(create_req).await.unwrap();
2472
2473 let mut drop_req = DropNamespaceRequest::new();
2475 drop_req.id = Some(vec!["parent".to_string()]);
2476 let result = dir_namespace.drop_namespace(drop_req).await;
2477 assert!(result.is_err(), "Should fail when namespace has children");
2478 }
2479
2480 #[rstest]
2481 #[case::with_optimization(true)]
2482 #[case::without_optimization(false)]
2483 #[tokio::test]
2484 async fn test_create_table_in_child_namespace(#[case] inline_optimization: bool) {
2485 use lance_namespace::models::{
2486 CreateNamespaceRequest, CreateTableRequest, ListTablesRequest,
2487 };
2488
2489 let temp_dir = TempStdDir::default();
2490 let temp_path = temp_dir.to_str().unwrap();
2491
2492 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2493 .inline_optimization_enabled(inline_optimization)
2494 .build()
2495 .await
2496 .unwrap();
2497
2498 let mut create_ns_req = CreateNamespaceRequest::new();
2500 create_ns_req.id = Some(vec!["ns1".to_string()]);
2501 dir_namespace.create_namespace(create_ns_req).await.unwrap();
2502
2503 let buffer = create_test_ipc_data();
2505 let mut create_table_req = CreateTableRequest::new();
2506 create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2507 let result = dir_namespace
2508 .create_table(create_table_req, Bytes::from(buffer))
2509 .await;
2510 assert!(
2511 result.is_ok(),
2512 "Failed to create table in child namespace: {:?}",
2513 result.err()
2514 );
2515
2516 let list_req = ListTablesRequest {
2518 id: Some(vec!["ns1".to_string()]),
2519 page_token: None,
2520 limit: None,
2521 ..Default::default()
2522 };
2523 let result = dir_namespace.list_tables(list_req).await;
2524 assert!(result.is_ok());
2525 let tables = result.unwrap();
2526 assert_eq!(tables.tables.len(), 1);
2527 assert_eq!(tables.tables[0], "table1");
2528 }
2529
2530 #[rstest]
2531 #[case::with_optimization(true)]
2532 #[case::without_optimization(false)]
2533 #[tokio::test]
2534 async fn test_describe_child_namespace(#[case] inline_optimization: bool) {
2535 use lance_namespace::models::{CreateNamespaceRequest, DescribeNamespaceRequest};
2536
2537 let temp_dir = TempStdDir::default();
2538 let temp_path = temp_dir.to_str().unwrap();
2539
2540 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2541 .inline_optimization_enabled(inline_optimization)
2542 .build()
2543 .await
2544 .unwrap();
2545
2546 let mut properties = std::collections::HashMap::new();
2548 properties.insert("key1".to_string(), "value1".to_string());
2549
2550 let mut create_req = CreateNamespaceRequest::new();
2551 create_req.id = Some(vec!["ns1".to_string()]);
2552 create_req.properties = Some(properties.clone());
2553 dir_namespace.create_namespace(create_req).await.unwrap();
2554
2555 let describe_req = DescribeNamespaceRequest {
2557 id: Some(vec!["ns1".to_string()]),
2558 ..Default::default()
2559 };
2560 let result = dir_namespace.describe_namespace(describe_req).await;
2561 assert!(
2562 result.is_ok(),
2563 "Failed to describe namespace: {:?}",
2564 result.err()
2565 );
2566 let response = result.unwrap();
2567 assert!(response.properties.is_some());
2568 assert_eq!(
2569 response.properties.unwrap().get("key1"),
2570 Some(&"value1".to_string())
2571 );
2572 }
2573
2574 #[test]
2575 fn test_construct_full_uri_with_cloud_urls() {
2576 let s3_result =
2578 ManifestNamespace::construct_full_uri("s3://bucket/path/subdir", "table.lance")
2579 .unwrap();
2580 assert_eq!(
2581 s3_result, "s3://bucket/path/subdir/table.lance",
2582 "S3 URL should correctly append table name to nested path"
2583 );
2584
2585 let az_result =
2587 ManifestNamespace::construct_full_uri("az://container/path/subdir", "table.lance")
2588 .unwrap();
2589 assert_eq!(
2590 az_result, "az://container/path/subdir/table.lance",
2591 "Azure URL should correctly append table name to nested path"
2592 );
2593
2594 let gs_result =
2596 ManifestNamespace::construct_full_uri("gs://bucket/path/subdir", "table.lance")
2597 .unwrap();
2598 assert_eq!(
2599 gs_result, "gs://bucket/path/subdir/table.lance",
2600 "GCS URL should correctly append table name to nested path"
2601 );
2602
2603 let deep_result =
2605 ManifestNamespace::construct_full_uri("s3://bucket/a/b/c/d", "my_table.lance").unwrap();
2606 assert_eq!(
2607 deep_result, "s3://bucket/a/b/c/d/my_table.lance",
2608 "Deeply nested path should work correctly"
2609 );
2610
2611 let shallow_result =
2613 ManifestNamespace::construct_full_uri("s3://bucket", "table.lance").unwrap();
2614 assert_eq!(
2615 shallow_result, "s3://bucket/table.lance",
2616 "Single-level nested path should work correctly"
2617 );
2618
2619 let trailing_slash_result =
2621 ManifestNamespace::construct_full_uri("s3://bucket/path/subdir/", "table.lance")
2622 .unwrap();
2623 assert_eq!(
2624 trailing_slash_result, "s3://bucket/path/subdir/table.lance",
2625 "URL with existing trailing slash should still work"
2626 );
2627 }
2628}