1use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray};
10use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
11use arrow_ipc::reader::StreamReader;
12use async_trait::async_trait;
13use bytes::Bytes;
14use futures::{FutureExt, stream::StreamExt};
15use lance::dataset::optimize::{CompactionOptions, compact_files};
16use lance::dataset::{ReadParams, WriteParams, builder::DatasetBuilder};
17use lance::session::Session;
18use lance::{Dataset, dataset::scanner::Scanner};
19use lance_core::Error as LanceError;
20use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION;
21use lance_core::{Error, Result, box_error};
22use lance_index::IndexType;
23use lance_index::optimize::OptimizeOptions;
24use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
25use lance_index::traits::DatasetIndexExt;
26use lance_io::object_store::{ObjectStore, ObjectStoreParams};
27use lance_namespace::LanceNamespace;
28use lance_namespace::error::NamespaceError;
29use lance_namespace::models::{
30 CreateNamespaceRequest, CreateNamespaceResponse, CreateTableRequest, CreateTableResponse,
31 DeclareTableRequest, DeclareTableResponse, DeregisterTableRequest, DeregisterTableResponse,
32 DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
33 DescribeTableResponse, DropNamespaceRequest, DropNamespaceResponse, DropTableRequest,
34 DropTableResponse, ListNamespacesRequest, ListNamespacesResponse, ListTablesRequest,
35 ListTablesResponse, NamespaceExistsRequest, RegisterTableRequest, RegisterTableResponse,
36 TableExistsRequest,
37};
38use lance_namespace::schema::arrow_schema_to_json;
39use object_store::path::Path;
40use std::io::Cursor;
41use std::{
42 collections::HashMap,
43 hash::{DefaultHasher, Hash, Hasher},
44 ops::{Deref, DerefMut},
45 sync::Arc,
46};
47use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
48
49const MANIFEST_TABLE_NAME: &str = "__manifest";
50const DELIMITER: &str = "$";
51
52const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
55const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
57const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum ObjectType {
63 Namespace,
64 Table,
65}
66
67impl ObjectType {
68 pub fn as_str(&self) -> &str {
69 match self {
70 Self::Namespace => "namespace",
71 Self::Table => "table",
72 }
73 }
74
75 pub fn parse(s: &str) -> Result<Self> {
76 match s {
77 "namespace" => Ok(Self::Namespace),
78 "table" => Ok(Self::Table),
79 _ => Err(Error::io(format!("Invalid object type: {}", s))),
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
86pub struct TableInfo {
87 pub namespace: Vec<String>,
88 pub name: String,
89 pub location: String,
90}
91
92#[derive(Debug, Clone)]
94pub struct NamespaceInfo {
95 pub namespace: Vec<String>,
96 pub name: String,
97 pub metadata: Option<HashMap<String, String>>,
98}
99
100#[derive(Debug, Clone)]
105pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);
106
107impl DatasetConsistencyWrapper {
108 pub fn new(dataset: Dataset) -> Self {
110 Self(Arc::new(RwLock::new(dataset)))
111 }
112
113 pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
116 self.reload().await?;
117 Ok(DatasetReadGuard {
118 guard: self.0.read().await,
119 })
120 }
121
122 pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
125 self.reload().await?;
126 Ok(DatasetWriteGuard {
127 guard: self.0.write().await,
128 })
129 }
130
131 pub async fn set_latest(&self, dataset: Dataset) {
136 let mut write_guard = self.0.write().await;
137 if dataset.manifest().version > write_guard.manifest().version {
138 *write_guard = dataset;
139 }
140 }
141
142 async fn reload(&self) -> Result<()> {
144 let read_guard = self.0.read().await;
146 let dataset_uri = read_guard.uri().to_string();
147 let current_version = read_guard.version().version;
148 log::debug!(
149 "Reload starting for uri={}, current_version={}",
150 dataset_uri,
151 current_version
152 );
153 let latest_version = read_guard.latest_version_id().await.map_err(|e| {
154 Error::io_source(box_error(std::io::Error::other(format!(
155 "Failed to get latest version: {}",
156 e
157 ))))
158 })?;
159 log::debug!(
160 "Reload got latest_version={} for uri={}, current_version={}",
161 latest_version,
162 dataset_uri,
163 current_version
164 );
165 drop(read_guard);
166
167 if latest_version == current_version {
169 log::debug!("Already up-to-date for uri={}", dataset_uri);
170 return Ok(());
171 }
172
173 let mut write_guard = self.0.write().await;
175
176 let latest_version = write_guard.latest_version_id().await.map_err(|e| {
178 Error::io_source(box_error(std::io::Error::other(format!(
179 "Failed to get latest version: {}",
180 e
181 ))))
182 })?;
183
184 if latest_version != write_guard.version().version {
185 write_guard.checkout_latest().await.map_err(|e| {
186 Error::io_source(box_error(std::io::Error::other(format!(
187 "Failed to checkout latest: {}",
188 e
189 ))))
190 })?;
191 }
192
193 Ok(())
194 }
195}
196
197pub struct DatasetReadGuard<'a> {
198 guard: RwLockReadGuard<'a, Dataset>,
199}
200
201impl Deref for DatasetReadGuard<'_> {
202 type Target = Dataset;
203
204 fn deref(&self) -> &Self::Target {
205 &self.guard
206 }
207}
208
209pub struct DatasetWriteGuard<'a> {
210 guard: RwLockWriteGuard<'a, Dataset>,
211}
212
213impl Deref for DatasetWriteGuard<'_> {
214 type Target = Dataset;
215
216 fn deref(&self) -> &Self::Target {
217 &self.guard
218 }
219}
220
221impl DerefMut for DatasetWriteGuard<'_> {
222 fn deref_mut(&mut self) -> &mut Self::Target {
223 &mut self.guard
224 }
225}
226
227pub struct ManifestNamespace {
231 root: String,
232 storage_options: Option<HashMap<String, String>>,
233 #[allow(dead_code)]
234 session: Option<Arc<Session>>,
235 #[allow(dead_code)]
236 object_store: Arc<ObjectStore>,
237 #[allow(dead_code)]
238 base_path: Path,
239 manifest_dataset: DatasetConsistencyWrapper,
240 dir_listing_enabled: bool,
244 inline_optimization_enabled: bool,
247 commit_retries: Option<u32>,
250}
251
252impl std::fmt::Debug for ManifestNamespace {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 f.debug_struct("ManifestNamespace")
255 .field("root", &self.root)
256 .field("storage_options", &self.storage_options)
257 .field("dir_listing_enabled", &self.dir_listing_enabled)
258 .field(
259 "inline_optimization_enabled",
260 &self.inline_optimization_enabled,
261 )
262 .finish()
263 }
264}
265
266fn convert_lance_commit_error(e: &LanceError, operation: &str, object_id: Option<&str>) -> Error {
275 match e {
276 LanceError::CommitConflict { .. } => NamespaceError::Throttled {
278 message: format!("Too many concurrent writes, please retry later: {:?}", e),
279 }
280 .into(),
281 LanceError::TooMuchWriteContention { .. } | LanceError::IncompatibleTransaction { .. } => {
284 let message = if let Some(id) = object_id {
285 format!(
286 "Object '{}' was concurrently modified by another operation: {:?}",
287 id, e
288 )
289 } else {
290 format!(
291 "Object was concurrently modified by another operation: {:?}",
292 e
293 )
294 };
295 NamespaceError::ConcurrentModification { message }.into()
296 }
297 _ => {
299 let error_msg = e.to_string();
300 if error_msg.contains("matched")
301 || error_msg.contains("duplicate")
302 || error_msg.contains("already exists")
303 {
304 let message = if let Some(id) = object_id {
305 format!(
306 "Object '{}' was concurrently created by another operation: {:?}",
307 id, e
308 )
309 } else {
310 format!(
311 "Object was concurrently created by another operation: {:?}",
312 e
313 )
314 };
315 return NamespaceError::ConcurrentModification { message }.into();
316 }
317 Error::io_source(box_error(std::io::Error::other(format!(
318 "{}: {:?}",
319 operation, e
320 ))))
321 }
322 }
323}
324
325impl ManifestNamespace {
326 #[allow(clippy::too_many_arguments)]
328 pub async fn from_directory(
329 root: String,
330 storage_options: Option<HashMap<String, String>>,
331 session: Option<Arc<Session>>,
332 object_store: Arc<ObjectStore>,
333 base_path: Path,
334 dir_listing_enabled: bool,
335 inline_optimization_enabled: bool,
336 commit_retries: Option<u32>,
337 ) -> Result<Self> {
338 let manifest_dataset =
339 Self::ensure_manifest_table_up_to_date(&root, &storage_options, session.clone())
340 .await?;
341
342 Ok(Self {
343 root,
344 storage_options,
345 session,
346 object_store,
347 base_path,
348 manifest_dataset,
349 dir_listing_enabled,
350 inline_optimization_enabled,
351 commit_retries,
352 })
353 }
354
355 pub fn build_object_id(namespace: &[String], name: &str) -> String {
357 if namespace.is_empty() {
358 name.to_string()
359 } else {
360 let mut id = namespace.join(DELIMITER);
361 id.push_str(DELIMITER);
362 id.push_str(name);
363 id
364 }
365 }
366
367 pub fn parse_object_id(object_id: &str) -> (Vec<String>, String) {
369 let parts: Vec<&str> = object_id.split(DELIMITER).collect();
370 if parts.len() == 1 {
371 (Vec::new(), parts[0].to_string())
372 } else {
373 let namespace = parts[..parts.len() - 1]
374 .iter()
375 .map(|s| s.to_string())
376 .collect();
377 let name = parts[parts.len() - 1].to_string();
378 (namespace, name)
379 }
380 }
381
382 fn split_object_id(table_id: &[String]) -> (Vec<String>, String) {
384 if table_id.len() == 1 {
385 (vec![], table_id[0].clone())
386 } else {
387 (
388 table_id[..table_id.len() - 1].to_vec(),
389 table_id[table_id.len() - 1].clone(),
390 )
391 }
392 }
393
394 fn str_object_id(table_id: &[String]) -> String {
396 table_id.join(DELIMITER)
397 }
398
399 fn generate_dir_name(object_id: &str) -> String {
406 let random_num: u64 = rand::random();
408
409 let mut hasher = DefaultHasher::new();
411 random_num.hash(&mut hasher);
412 object_id.hash(&mut hasher);
413 let hash = hasher.finish();
414
415 format!("{:08x}_{}", (hash & 0xFFFFFFFF) as u32, object_id)
417 }
418
419 pub(crate) fn construct_full_uri(root: &str, relative_location: &str) -> Result<String> {
421 let mut base_url = lance_io::object_store::uri_to_url(root)?;
422
423 if !base_url.path().ends_with('/') {
428 base_url.set_path(&format!("{}/", base_url.path()));
429 }
430
431 let full_url = base_url.join(relative_location).map_err(|e| {
432 Error::invalid_input_source(
433 format!(
434 "Failed to join URI '{}' with '{}': {:?}",
435 root, relative_location, e
436 )
437 .into(),
438 )
439 })?;
440
441 Ok(full_url.to_string())
442 }
443
444 async fn run_inline_optimization(&self) -> Result<()> {
456 if !self.inline_optimization_enabled {
457 return Ok(());
458 }
459
460 let mut dataset_guard = self.manifest_dataset.get_mut().await?;
462 let dataset: &mut Dataset = &mut dataset_guard;
463
464 let indices = dataset.load_indices().await?;
466
467 let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
469 let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
470 let has_base_objects_index = indices
471 .iter()
472 .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);
473
474 if !has_object_id_index {
476 log::debug!(
477 "Creating BTREE index '{}' on object_id for __manifest table",
478 OBJECT_ID_INDEX_NAME
479 );
480 let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
481 if let Err(e) = dataset
482 .create_index(
483 &["object_id"],
484 IndexType::BTree,
485 Some(OBJECT_ID_INDEX_NAME.to_string()),
486 ¶ms,
487 true,
488 )
489 .await
490 {
491 log::warn!(
492 "Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.",
493 e
494 );
495 } else {
496 log::info!(
497 "Created BTREE index '{}' on object_id for __manifest table",
498 OBJECT_ID_INDEX_NAME
499 );
500 }
501 }
502
503 if !has_object_type_index {
505 log::debug!(
506 "Creating Bitmap index '{}' on object_type for __manifest table",
507 OBJECT_TYPE_INDEX_NAME
508 );
509 let params = ScalarIndexParams::default();
510 if let Err(e) = dataset
511 .create_index(
512 &["object_type"],
513 IndexType::Bitmap,
514 Some(OBJECT_TYPE_INDEX_NAME.to_string()),
515 ¶ms,
516 true,
517 )
518 .await
519 {
520 log::warn!(
521 "Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.",
522 e
523 );
524 } else {
525 log::info!(
526 "Created Bitmap index '{}' on object_type for __manifest table",
527 OBJECT_TYPE_INDEX_NAME
528 );
529 }
530 }
531
532 if !has_base_objects_index {
534 log::debug!(
535 "Creating LabelList index '{}' on base_objects for __manifest table",
536 BASE_OBJECTS_INDEX_NAME
537 );
538 let params = ScalarIndexParams::default();
539 if let Err(e) = dataset
540 .create_index(
541 &["base_objects"],
542 IndexType::LabelList,
543 Some(BASE_OBJECTS_INDEX_NAME.to_string()),
544 ¶ms,
545 true,
546 )
547 .await
548 {
549 log::warn!(
550 "Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.",
551 e
552 );
553 } else {
554 log::info!(
555 "Created LabelList index '{}' on base_objects for __manifest table",
556 BASE_OBJECTS_INDEX_NAME
557 );
558 }
559 }
560
561 log::debug!("Running file compaction on __manifest table");
563 match compact_files(dataset, CompactionOptions::default(), None).await {
564 Ok(compaction_metrics) => {
565 if compaction_metrics.fragments_removed > 0 {
566 log::info!(
567 "Compacted __manifest table: removed {} fragments, added {} fragments",
568 compaction_metrics.fragments_removed,
569 compaction_metrics.fragments_added
570 );
571 }
572 }
573 Err(e) => {
574 log::warn!(
575 "Failed to compact files for __manifest table: {:?}. Continuing with optimization.",
576 e
577 );
578 }
579 }
580
581 log::debug!("Optimizing indices on __manifest table");
583 match dataset.optimize_indices(&OptimizeOptions::default()).await {
584 Ok(_) => {
585 log::info!("Successfully optimized indices on __manifest table");
586 }
587 Err(e) => {
588 log::warn!(
589 "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.",
590 e
591 );
592 }
593 }
594
595 Ok(())
596 }
597
598 fn manifest_schema() -> Arc<ArrowSchema> {
600 Arc::new(ArrowSchema::new(vec![
601 Field::new("object_id", DataType::Utf8, false).with_metadata(
603 [(
604 LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(),
605 "0".to_string(),
606 )]
607 .into_iter()
608 .collect(),
609 ),
610 Field::new("object_type", DataType::Utf8, false),
611 Field::new("location", DataType::Utf8, true),
612 Field::new("metadata", DataType::Utf8, true),
613 Field::new(
614 "base_objects",
615 DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))),
616 true,
617 ),
618 ]))
619 }
620
621 async fn manifest_scanner(&self) -> Result<Scanner> {
623 let dataset_guard = self.manifest_dataset.get().await?;
624 Ok(dataset_guard.scan())
625 }
626
627 async fn execute_scanner(scanner: Scanner) -> Result<Vec<RecordBatch>> {
629 let mut stream = scanner.try_into_stream().await.map_err(|e| {
630 Error::io_source(box_error(std::io::Error::other(format!(
631 "Failed to create stream: {}",
632 e
633 ))))
634 })?;
635
636 let mut batches = Vec::new();
637 while let Some(batch) = stream.next().await {
638 batches.push(batch.map_err(|e| {
639 Error::io_source(box_error(std::io::Error::other(format!(
640 "Failed to read batch: {}",
641 e
642 ))))
643 })?);
644 }
645
646 Ok(batches)
647 }
648
649 fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> {
651 let column = batch
652 .column_by_name(column_name)
653 .ok_or_else(|| Error::io(format!("Column '{}' not found", column_name)))?;
654 column
655 .as_any()
656 .downcast_ref::<StringArray>()
657 .ok_or_else(|| Error::io(format!("Column '{}' is not a string array", column_name)))
658 }
659
660 async fn manifest_contains_object(&self, object_id: &str) -> Result<bool> {
662 let filter = format!("object_id = '{}'", object_id);
663
664 let dataset_guard = self.manifest_dataset.get().await?;
665 let mut scanner = dataset_guard.scan();
666
667 scanner.filter(&filter).map_err(|e| {
668 Error::io_source(box_error(std::io::Error::other(format!(
669 "Failed to filter: {}",
670 e
671 ))))
672 })?;
673
674 scanner.project::<&str>(&[]).map_err(|e| {
676 Error::io_source(box_error(std::io::Error::other(format!(
677 "Failed to project: {}",
678 e
679 ))))
680 })?;
681
682 scanner.with_row_id();
683
684 let count = scanner.count_rows().await.map_err(|e| {
685 Error::io_source(box_error(std::io::Error::other(format!(
686 "Failed to count rows: {}",
687 e
688 ))))
689 })?;
690
691 Ok(count > 0)
692 }
693
694 async fn query_manifest_for_table(&self, object_id: &str) -> Result<Option<TableInfo>> {
696 let filter = format!("object_id = '{}' AND object_type = 'table'", object_id);
697 let mut scanner = self.manifest_scanner().await?;
698 scanner.filter(&filter).map_err(|e| {
699 Error::io_source(box_error(std::io::Error::other(format!(
700 "Failed to filter: {}",
701 e
702 ))))
703 })?;
704 scanner.project(&["object_id", "location"]).map_err(|e| {
705 Error::io_source(box_error(std::io::Error::other(format!(
706 "Failed to project: {}",
707 e
708 ))))
709 })?;
710 let batches = Self::execute_scanner(scanner).await?;
711
712 let mut found_result: Option<TableInfo> = None;
713 let mut total_rows = 0;
714
715 for batch in batches {
716 if batch.num_rows() == 0 {
717 continue;
718 }
719
720 total_rows += batch.num_rows();
721 if total_rows > 1 {
722 return Err(Error::io(format!(
723 "Expected exactly 1 table with id '{}', found {}",
724 object_id, total_rows
725 )));
726 }
727
728 let object_id_array = Self::get_string_column(&batch, "object_id")?;
729 let location_array = Self::get_string_column(&batch, "location")?;
730 let location = location_array.value(0).to_string();
731 let (namespace, name) = Self::parse_object_id(object_id_array.value(0));
732 found_result = Some(TableInfo {
733 namespace,
734 name,
735 location,
736 });
737 }
738
739 Ok(found_result)
740 }
741
742 pub async fn list_manifest_table_locations(&self) -> Result<std::collections::HashSet<String>> {
745 let filter = "object_type = 'table' AND NOT contains(object_id, '$')";
746 let mut scanner = self.manifest_scanner().await?;
747 scanner.filter(filter).map_err(|e| {
748 Error::io_source(box_error(std::io::Error::other(format!(
749 "Failed to filter: {}",
750 e
751 ))))
752 })?;
753 scanner.project(&["location"]).map_err(|e| {
754 Error::io_source(box_error(std::io::Error::other(format!(
755 "Failed to project: {}",
756 e
757 ))))
758 })?;
759
760 let batches = Self::execute_scanner(scanner).await?;
761 let mut locations = std::collections::HashSet::new();
762
763 for batch in batches {
764 if batch.num_rows() == 0 {
765 continue;
766 }
767 let location_array = Self::get_string_column(&batch, "location")?;
768 for i in 0..location_array.len() {
769 locations.insert(location_array.value(i).to_string());
770 }
771 }
772
773 Ok(locations)
774 }
775
776 async fn insert_into_manifest(
778 &self,
779 object_id: String,
780 object_type: ObjectType,
781 location: Option<String>,
782 ) -> Result<()> {
783 self.insert_into_manifest_with_metadata(object_id, object_type, location, None, None)
784 .await
785 }
786
787 async fn insert_into_manifest_with_metadata(
789 &self,
790 object_id: String,
791 object_type: ObjectType,
792 location: Option<String>,
793 metadata: Option<String>,
794 base_objects: Option<Vec<String>>,
795 ) -> Result<()> {
796 use arrow::array::builder::{ListBuilder, StringBuilder};
797
798 let schema = Self::manifest_schema();
799
800 let string_builder = StringBuilder::new();
802 let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
803 "object_id",
804 DataType::Utf8,
805 true,
806 )));
807
808 match base_objects {
809 Some(objects) => {
810 for obj in objects {
811 list_builder.values().append_value(obj);
812 }
813 list_builder.append(true);
814 }
815 None => {
816 list_builder.append_null();
817 }
818 }
819
820 let base_objects_array = list_builder.finish();
821
822 let location_array = match location {
824 Some(loc) => Arc::new(StringArray::from(vec![Some(loc)])),
825 None => Arc::new(StringArray::from(vec![None::<String>])),
826 };
827
828 let metadata_array = match metadata {
829 Some(meta) => Arc::new(StringArray::from(vec![Some(meta)])),
830 None => Arc::new(StringArray::from(vec![None::<String>])),
831 };
832
833 let batch = RecordBatch::try_new(
834 schema.clone(),
835 vec![
836 Arc::new(StringArray::from(vec![object_id.as_str()])),
837 Arc::new(StringArray::from(vec![object_type.as_str()])),
838 location_array,
839 metadata_array,
840 Arc::new(base_objects_array),
841 ],
842 )
843 .map_err(|e| Error::io(format!("Failed to create manifest entry: {}", e)))?;
844
845 let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
846
847 let dataset_guard = self.manifest_dataset.get().await?;
849 let dataset_arc = Arc::new(dataset_guard.clone());
850 drop(dataset_guard); let mut merge_builder =
853 lance::dataset::MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()])
854 .map_err(|e| {
855 Error::io_source(box_error(std::io::Error::other(format!(
856 "Failed to create merge builder: {}",
857 e
858 ))))
859 })?;
860
861 merge_builder.when_matched(lance::dataset::WhenMatched::Fail);
862 merge_builder.when_not_matched(lance::dataset::WhenNotMatched::InsertAll);
863 merge_builder.conflict_retries(5);
869 merge_builder.use_index(false);
874 if let Some(retries) = self.commit_retries {
875 merge_builder.commit_retries(retries);
876 }
877
878 let (new_dataset_arc, _merge_stats) = merge_builder
879 .try_build()
880 .map_err(|e| {
881 Error::io_source(box_error(std::io::Error::other(format!(
882 "Failed to build merge: {}",
883 e
884 ))))
885 })?
886 .execute_reader(Box::new(reader))
887 .await
888 .map_err(|e| {
889 convert_lance_commit_error(&e, "Failed to execute merge", Some(&object_id))
890 })?;
891
892 let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
893 self.manifest_dataset.set_latest(new_dataset).await;
894
895 if let Err(e) = self.run_inline_optimization().await {
897 log::warn!(
898 "Unexpected failure when running inline optimization: {:?}",
899 e
900 );
901 }
902
903 Ok(())
904 }
905
906 pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> {
908 let predicate = format!("object_id = '{}'", object_id);
909
910 let dataset_guard = self.manifest_dataset.get().await?;
912 let dataset = Arc::new(dataset_guard.clone());
913 drop(dataset_guard); let new_dataset = lance::dataset::DeleteBuilder::new(dataset, &predicate)
916 .execute()
917 .await
918 .map_err(|e| convert_lance_commit_error(&e, "Failed to delete", None))?;
919
920 self.manifest_dataset
922 .set_latest(
923 Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
924 )
925 .await;
926
927 if let Err(e) = self.run_inline_optimization().await {
929 log::warn!(
930 "Unexpected failure when running inline optimization: {:?}",
931 e
932 );
933 }
934
935 Ok(())
936 }
937
938 pub async fn register_table(&self, name: &str, location: String) -> Result<()> {
940 let object_id = Self::build_object_id(&[], name);
941 if self.manifest_contains_object(&object_id).await? {
942 return Err(Error::io(format!("Table '{}' already exists", name)));
943 }
944
945 self.insert_into_manifest(object_id, ObjectType::Table, Some(location))
946 .await
947 }
948
949 async fn validate_namespace_levels_exist(&self, namespace_path: &[String]) -> Result<()> {
951 for i in 1..=namespace_path.len() {
952 let partial_path = &namespace_path[..i];
953 let object_id = partial_path.join(DELIMITER);
954 if !self.manifest_contains_object(&object_id).await? {
955 return Err(Error::namespace_source(
956 format!("Parent namespace '{}' does not exist", object_id).into(),
957 ));
958 }
959 }
960 Ok(())
961 }
962
963 async fn query_manifest_for_namespace(&self, object_id: &str) -> Result<Option<NamespaceInfo>> {
965 let filter = format!("object_id = '{}' AND object_type = 'namespace'", object_id);
966 let mut scanner = self.manifest_scanner().await?;
967 scanner.filter(&filter).map_err(|e| {
968 Error::io_source(box_error(std::io::Error::other(format!(
969 "Failed to filter: {}",
970 e
971 ))))
972 })?;
973 scanner.project(&["object_id", "metadata"]).map_err(|e| {
974 Error::io_source(box_error(std::io::Error::other(format!(
975 "Failed to project: {}",
976 e
977 ))))
978 })?;
979 let batches = Self::execute_scanner(scanner).await?;
980
981 let mut found_result: Option<NamespaceInfo> = None;
982 let mut total_rows = 0;
983
984 for batch in batches {
985 if batch.num_rows() == 0 {
986 continue;
987 }
988
989 total_rows += batch.num_rows();
990 if total_rows > 1 {
991 return Err(Error::io(format!(
992 "Expected exactly 1 namespace with id '{}', found {}",
993 object_id, total_rows
994 )));
995 }
996
997 let object_id_array = Self::get_string_column(&batch, "object_id")?;
998 let metadata_array = Self::get_string_column(&batch, "metadata")?;
999
1000 let object_id_str = object_id_array.value(0);
1001 let metadata = if !metadata_array.is_null(0) {
1002 let metadata_str = metadata_array.value(0);
1003 match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
1004 Ok(map) => Some(map),
1005 Err(e) => {
1006 return Err(Error::io(format!(
1007 "Failed to deserialize metadata for namespace '{}': {}",
1008 object_id, e
1009 )));
1010 }
1011 }
1012 } else {
1013 None
1014 };
1015
1016 let (namespace, name) = Self::parse_object_id(object_id_str);
1017 found_result = Some(NamespaceInfo {
1018 namespace,
1019 name,
1020 metadata,
1021 });
1022 }
1023
1024 Ok(found_result)
1025 }
1026
1027 async fn ensure_manifest_table_up_to_date(
1034 root: &str,
1035 storage_options: &Option<HashMap<String, String>>,
1036 session: Option<Arc<Session>>,
1037 ) -> Result<DatasetConsistencyWrapper> {
1038 let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME);
1039 log::debug!("Attempting to load manifest from {}", manifest_path);
1040 let store_options = ObjectStoreParams {
1041 storage_options_accessor: storage_options.as_ref().map(|opts| {
1042 Arc::new(
1043 lance_io::object_store::StorageOptionsAccessor::with_static_options(
1044 opts.clone(),
1045 ),
1046 )
1047 }),
1048 ..Default::default()
1049 };
1050 let read_params = ReadParams {
1051 session: session.clone(),
1052 store_options: Some(store_options.clone()),
1053 ..Default::default()
1054 };
1055 let dataset_result = DatasetBuilder::from_uri(&manifest_path)
1056 .with_read_params(read_params)
1057 .load()
1058 .await;
1059 if let Ok(mut dataset) = dataset_result {
1060 let needs_pk_migration = dataset
1062 .schema()
1063 .field("object_id")
1064 .map(|f| {
1065 !f.metadata
1066 .contains_key(LANCE_UNENFORCED_PRIMARY_KEY_POSITION)
1067 })
1068 .unwrap_or(false);
1069
1070 if needs_pk_migration {
1071 log::info!("Migrating __manifest table to add primary key metadata on object_id");
1072 dataset
1073 .update_field_metadata()
1074 .update("object_id", [(LANCE_UNENFORCED_PRIMARY_KEY_POSITION, "0")])
1075 .map_err(|e| {
1076 Error::io_source(box_error(std::io::Error::other(format!(
1077 "Failed to find object_id field for migration: {}",
1078 e
1079 ))))
1080 })?
1081 .await
1082 .map_err(|e| {
1083 Error::io_source(box_error(std::io::Error::other(format!(
1084 "Failed to migrate primary key metadata: {}",
1085 e
1086 ))))
1087 })?;
1088 }
1089
1090 Ok(DatasetConsistencyWrapper::new(dataset))
1091 } else {
1092 log::info!("Creating new manifest table at {}", manifest_path);
1093 let schema = Self::manifest_schema();
1094 let empty_batch = RecordBatch::new_empty(schema.clone());
1095 let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone());
1096
1097 let store_params = ObjectStoreParams {
1098 storage_options_accessor: storage_options.as_ref().map(|opts| {
1099 Arc::new(
1100 lance_io::object_store::StorageOptionsAccessor::with_static_options(
1101 opts.clone(),
1102 ),
1103 )
1104 }),
1105 ..Default::default()
1106 };
1107 let write_params = WriteParams {
1108 session: session.clone(),
1109 store_params: Some(store_params),
1110 ..Default::default()
1111 };
1112
1113 let dataset =
1114 Dataset::write(Box::new(reader), &manifest_path, Some(write_params)).await;
1115
1116 match dataset {
1118 Ok(dataset) => {
1119 log::info!(
1120 "Successfully created manifest table at {}, version={}, uri={}",
1121 manifest_path,
1122 dataset.version().version,
1123 dataset.uri()
1124 );
1125 Ok(DatasetConsistencyWrapper::new(dataset))
1126 }
1127 Err(ref e)
1128 if matches!(
1129 e,
1130 LanceError::DatasetAlreadyExists { .. }
1131 | LanceError::CommitConflict { .. }
1132 | LanceError::IncompatibleTransaction { .. }
1133 | LanceError::RetryableCommitConflict { .. }
1134 ) =>
1135 {
1136 log::info!(
1138 "Manifest table was created by another process, loading it: {}",
1139 manifest_path
1140 );
1141 let recovery_store_options = ObjectStoreParams {
1142 storage_options_accessor: storage_options.as_ref().map(|opts| {
1143 Arc::new(
1144 lance_io::object_store::StorageOptionsAccessor::with_static_options(
1145 opts.clone(),
1146 ),
1147 )
1148 }),
1149 ..Default::default()
1150 };
1151 let recovery_read_params = ReadParams {
1152 session,
1153 store_options: Some(recovery_store_options),
1154 ..Default::default()
1155 };
1156 let dataset = DatasetBuilder::from_uri(&manifest_path)
1157 .with_read_params(recovery_read_params)
1158 .load()
1159 .await
1160 .map_err(|e| {
1161 Error::io_source(box_error(std::io::Error::other(format!(
1162 "Failed to load manifest dataset after creation conflict: {}",
1163 e
1164 ))))
1165 })?;
1166 Ok(DatasetConsistencyWrapper::new(dataset))
1167 }
1168 Err(e) => Err(Error::io_source(box_error(std::io::Error::other(format!(
1169 "Failed to create manifest dataset: {}",
1170 e
1171 ))))),
1172 }
1173 }
1174 }
1175}
1176
1177#[async_trait]
1178impl LanceNamespace for ManifestNamespace {
1179 fn namespace_id(&self) -> String {
1180 self.root.clone()
1181 }
1182
1183 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1184 let namespace_id = request
1185 .id
1186 .as_ref()
1187 .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
1188
1189 let filter = if namespace_id.is_empty() {
1191 "object_type = 'table' AND NOT contains(object_id, '$')".to_string()
1193 } else {
1194 let prefix = namespace_id.join(DELIMITER);
1196 format!(
1197 "object_type = 'table' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1198 prefix,
1199 DELIMITER,
1200 prefix.len() + 2
1201 )
1202 };
1203
1204 let mut scanner = self.manifest_scanner().await?;
1205 scanner.filter(&filter).map_err(|e| {
1206 Error::io_source(box_error(std::io::Error::other(format!(
1207 "Failed to filter: {}",
1208 e
1209 ))))
1210 })?;
1211 scanner.project(&["object_id"]).map_err(|e| {
1212 Error::io_source(box_error(std::io::Error::other(format!(
1213 "Failed to project: {}",
1214 e
1215 ))))
1216 })?;
1217
1218 let batches = Self::execute_scanner(scanner).await?;
1219
1220 let mut tables = Vec::new();
1221 for batch in batches {
1222 if batch.num_rows() == 0 {
1223 continue;
1224 }
1225
1226 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1227 for i in 0..batch.num_rows() {
1228 let object_id = object_id_array.value(i);
1229 let (_namespace, name) = Self::parse_object_id(object_id);
1230 tables.push(name);
1231 }
1232 }
1233
1234 Ok(ListTablesResponse::new(tables))
1235 }
1236
1237 async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1238 let table_id = request
1239 .id
1240 .as_ref()
1241 .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1242
1243 if table_id.is_empty() {
1244 return Err(Error::invalid_input_source(
1245 "Table ID cannot be empty".into(),
1246 ));
1247 }
1248
1249 let object_id = Self::str_object_id(table_id);
1250 let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
1251
1252 let table_name = table_id.last().cloned().unwrap_or_default();
1254 let namespace_id: Vec<String> = if table_id.len() > 1 {
1255 table_id[..table_id.len() - 1].to_vec()
1256 } else {
1257 vec![]
1258 };
1259
1260 let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
1261 let vend_credentials = request.vend_credentials.unwrap_or(true);
1263
1264 match table_info {
1265 Some(info) => {
1266 let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1268
1269 let storage_options = if vend_credentials {
1270 self.storage_options.clone()
1271 } else {
1272 None
1273 };
1274
1275 if !load_detailed_metadata {
1277 return Ok(DescribeTableResponse {
1278 table: Some(table_name),
1279 namespace: Some(namespace_id),
1280 location: Some(table_uri.clone()),
1281 table_uri: Some(table_uri),
1282 storage_options,
1283 ..Default::default()
1284 });
1285 }
1286
1287 match Dataset::open(&table_uri).await {
1289 Ok(mut dataset) => {
1290 if let Some(requested_version) = request.version {
1292 dataset = dataset.checkout_version(requested_version as u64).await?;
1293 }
1294
1295 let version = dataset.version().version;
1296 let lance_schema = dataset.schema();
1297 let arrow_schema: arrow_schema::Schema = lance_schema.into();
1298 let json_schema = arrow_schema_to_json(&arrow_schema)?;
1299
1300 Ok(DescribeTableResponse {
1301 table: Some(table_name.clone()),
1302 namespace: Some(namespace_id.clone()),
1303 version: Some(version as i64),
1304 location: Some(table_uri.clone()),
1305 table_uri: Some(table_uri),
1306 schema: Some(Box::new(json_schema)),
1307 storage_options,
1308 ..Default::default()
1309 })
1310 }
1311 Err(_) => {
1312 Ok(DescribeTableResponse {
1314 table: Some(table_name),
1315 namespace: Some(namespace_id),
1316 location: Some(table_uri.clone()),
1317 table_uri: Some(table_uri),
1318 storage_options,
1319 ..Default::default()
1320 })
1321 }
1322 }
1323 }
1324 None => Err(Error::namespace_source(
1325 format!("Table '{}' not found", object_id).into(),
1326 )),
1327 }
1328 }
1329
1330 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1331 let table_id = request
1332 .id
1333 .as_ref()
1334 .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1335
1336 if table_id.is_empty() {
1337 return Err(Error::invalid_input_source(
1338 "Table ID cannot be empty".into(),
1339 ));
1340 }
1341
1342 let (namespace, table_name) = Self::split_object_id(table_id);
1343 let object_id = Self::build_object_id(&namespace, &table_name);
1344 let exists = self.manifest_contains_object(&object_id).await?;
1345 if exists {
1346 Ok(())
1347 } else {
1348 Err(Error::namespace_source(
1349 format!("Table '{}' not found", table_name).into(),
1350 ))
1351 }
1352 }
1353
1354 async fn create_table(
1355 &self,
1356 request: CreateTableRequest,
1357 data: Bytes,
1358 ) -> Result<CreateTableResponse> {
1359 let table_id = request
1360 .id
1361 .as_ref()
1362 .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1363
1364 if table_id.is_empty() {
1365 return Err(Error::invalid_input_source(
1366 "Table ID cannot be empty".into(),
1367 ));
1368 }
1369
1370 let (namespace, table_name) = Self::split_object_id(table_id);
1371 let object_id = Self::build_object_id(&namespace, &table_name);
1372
1373 if self.manifest_contains_object(&object_id).await? {
1375 return Err(Error::io(format!("Table '{}' already exists", table_name)));
1376 }
1377
1378 let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1382 format!("{}.lance", table_name)
1384 } else {
1385 Self::generate_dir_name(&object_id)
1387 };
1388 let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1389
1390 if data.is_empty() {
1392 return Err(Error::namespace_source(
1393 "Request data (Arrow IPC stream) is required for create_table".into(),
1394 ));
1395 }
1396
1397 let cursor = Cursor::new(data.to_vec());
1399 let stream_reader = StreamReader::try_new(cursor, None)
1400 .map_err(|e| Error::io(format!("Failed to read IPC stream: {}", e)))?;
1401
1402 let batches: Vec<RecordBatch> =
1403 stream_reader
1404 .collect::<std::result::Result<Vec<_>, _>>()
1405 .map_err(|e| Error::io(format!("Failed to collect batches: {}", e)))?;
1406
1407 if batches.is_empty() {
1408 return Err(Error::io("No data provided for table creation"));
1409 }
1410
1411 let schema = batches[0].schema();
1412 let batch_results: Vec<std::result::Result<RecordBatch, arrow_schema::ArrowError>> =
1413 batches.into_iter().map(Ok).collect();
1414 let reader = RecordBatchIterator::new(batch_results, schema);
1415
1416 let store_params = ObjectStoreParams {
1417 storage_options_accessor: self.storage_options.as_ref().map(|opts| {
1418 Arc::new(
1419 lance_io::object_store::StorageOptionsAccessor::with_static_options(
1420 opts.clone(),
1421 ),
1422 )
1423 }),
1424 ..Default::default()
1425 };
1426 let write_params = WriteParams {
1427 session: self.session.clone(),
1428 store_params: Some(store_params),
1429 ..Default::default()
1430 };
1431 let _dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params))
1432 .await
1433 .map_err(|e| {
1434 Error::io_source(box_error(std::io::Error::other(format!(
1435 "Failed to write dataset: {}",
1436 e
1437 ))))
1438 })?;
1439
1440 self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1442 .await?;
1443
1444 Ok(CreateTableResponse {
1445 version: Some(1),
1446 location: Some(table_uri),
1447 storage_options: self.storage_options.clone(),
1448 ..Default::default()
1449 })
1450 }
1451
1452 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1453 let table_id = request
1454 .id
1455 .as_ref()
1456 .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1457
1458 if table_id.is_empty() {
1459 return Err(Error::invalid_input_source(
1460 "Table ID cannot be empty".into(),
1461 ));
1462 }
1463
1464 let (namespace, table_name) = Self::split_object_id(table_id);
1465 let object_id = Self::build_object_id(&namespace, &table_name);
1466
1467 let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
1469
1470 match table_info {
1471 Some(info) => {
1472 self.delete_from_manifest(&object_id).boxed().await?;
1474
1475 let table_path = self.base_path.child(info.location.as_str());
1477 let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1478
1479 self.object_store
1481 .remove_dir_all(table_path)
1482 .boxed()
1483 .await
1484 .map_err(|e| {
1485 Error::namespace_source(
1486 format!("Failed to delete table directory: {}", e).into(),
1487 )
1488 })?;
1489
1490 Ok(DropTableResponse {
1491 id: request.id.clone(),
1492 location: Some(table_uri),
1493 ..Default::default()
1494 })
1495 }
1496 None => Err(Error::namespace_source(
1497 format!("Table '{}' not found", table_name).into(),
1498 )),
1499 }
1500 }
1501
1502 async fn list_namespaces(
1503 &self,
1504 request: ListNamespacesRequest,
1505 ) -> Result<ListNamespacesResponse> {
1506 let parent_namespace = request
1507 .id
1508 .as_ref()
1509 .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
1510
1511 let filter = if parent_namespace.is_empty() {
1513 "object_type = 'namespace' AND NOT contains(object_id, '$')".to_string()
1515 } else {
1516 let prefix = parent_namespace.join(DELIMITER);
1518 format!(
1519 "object_type = 'namespace' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1520 prefix,
1521 DELIMITER,
1522 prefix.len() + 2
1523 )
1524 };
1525
1526 let mut scanner = self.manifest_scanner().await?;
1527 scanner.filter(&filter).map_err(|e| {
1528 Error::io_source(box_error(std::io::Error::other(format!(
1529 "Failed to filter: {}",
1530 e
1531 ))))
1532 })?;
1533 scanner.project(&["object_id"]).map_err(|e| {
1534 Error::io_source(box_error(std::io::Error::other(format!(
1535 "Failed to project: {}",
1536 e
1537 ))))
1538 })?;
1539
1540 let batches = Self::execute_scanner(scanner).await?;
1541 let mut namespaces = Vec::new();
1542
1543 for batch in batches {
1544 if batch.num_rows() == 0 {
1545 continue;
1546 }
1547
1548 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1549 for i in 0..batch.num_rows() {
1550 let object_id = object_id_array.value(i);
1551 let (_namespace, name) = Self::parse_object_id(object_id);
1552 namespaces.push(name);
1553 }
1554 }
1555
1556 Ok(ListNamespacesResponse::new(namespaces))
1557 }
1558
1559 async fn describe_namespace(
1560 &self,
1561 request: DescribeNamespaceRequest,
1562 ) -> Result<DescribeNamespaceResponse> {
1563 let namespace_id = request
1564 .id
1565 .as_ref()
1566 .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
1567
1568 if namespace_id.is_empty() {
1570 #[allow(clippy::needless_update)]
1571 return Ok(DescribeNamespaceResponse {
1572 properties: Some(HashMap::new()),
1573 ..Default::default()
1574 });
1575 }
1576
1577 let object_id = namespace_id.join(DELIMITER);
1579 let namespace_info = self.query_manifest_for_namespace(&object_id).await?;
1580
1581 match namespace_info {
1582 #[allow(clippy::needless_update)]
1583 Some(info) => Ok(DescribeNamespaceResponse {
1584 properties: info.metadata,
1585 ..Default::default()
1586 }),
1587 None => Err(Error::namespace_source(
1588 format!("Namespace '{}' not found", object_id).into(),
1589 )),
1590 }
1591 }
1592
1593 async fn create_namespace(
1594 &self,
1595 request: CreateNamespaceRequest,
1596 ) -> Result<CreateNamespaceResponse> {
1597 let namespace_id = request
1598 .id
1599 .as_ref()
1600 .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
1601
1602 if namespace_id.is_empty() {
1604 return Err(Error::namespace_source(
1605 "Root namespace already exists and cannot be created".into(),
1606 ));
1607 }
1608
1609 if namespace_id.len() > 1 {
1611 self.validate_namespace_levels_exist(&namespace_id[..namespace_id.len() - 1])
1612 .await?;
1613 }
1614
1615 let object_id = namespace_id.join(DELIMITER);
1616 if self.manifest_contains_object(&object_id).await? {
1617 return Err(Error::namespace_source(
1618 format!("Namespace '{}' already exists", object_id).into(),
1619 ));
1620 }
1621
1622 let metadata = request.properties.as_ref().and_then(|props| {
1624 if props.is_empty() {
1625 None
1626 } else {
1627 Some(serde_json::to_string(props).ok()?)
1628 }
1629 });
1630
1631 self.insert_into_manifest_with_metadata(
1632 object_id,
1633 ObjectType::Namespace,
1634 None,
1635 metadata,
1636 None,
1637 )
1638 .await?;
1639
1640 Ok(CreateNamespaceResponse {
1641 properties: request.properties,
1642 ..Default::default()
1643 })
1644 }
1645
1646 async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
1647 let namespace_id = request
1648 .id
1649 .as_ref()
1650 .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
1651
1652 if namespace_id.is_empty() {
1654 return Err(Error::namespace_source(
1655 "Root namespace cannot be dropped".into(),
1656 ));
1657 }
1658
1659 let object_id = namespace_id.join(DELIMITER);
1660
1661 if !self.manifest_contains_object(&object_id).boxed().await? {
1663 return Err(Error::namespace_source(
1664 format!("Namespace '{}' not found", object_id).into(),
1665 ));
1666 }
1667
1668 let prefix = format!("{}{}", object_id, DELIMITER);
1670 let filter = format!("starts_with(object_id, '{}')", prefix);
1671 let mut scanner = self.manifest_scanner().boxed().await?;
1672 scanner.filter(&filter).map_err(|e| {
1673 Error::io_source(box_error(std::io::Error::other(format!(
1674 "Failed to filter: {}",
1675 e
1676 ))))
1677 })?;
1678 scanner.project::<&str>(&[]).map_err(|e| {
1679 Error::io_source(box_error(std::io::Error::other(format!(
1680 "Failed to project: {}",
1681 e
1682 ))))
1683 })?;
1684 scanner.with_row_id();
1685 let count = scanner.count_rows().boxed().await.map_err(|e| {
1686 Error::io_source(box_error(std::io::Error::other(format!(
1687 "Failed to count rows: {}",
1688 e
1689 ))))
1690 })?;
1691
1692 if count > 0 {
1693 return Err(Error::namespace_source(
1694 format!(
1695 "Namespace '{}' is not empty (contains {} child objects)",
1696 object_id, count
1697 )
1698 .into(),
1699 ));
1700 }
1701
1702 self.delete_from_manifest(&object_id).boxed().await?;
1703
1704 Ok(DropNamespaceResponse::default())
1705 }
1706
1707 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
1708 let namespace_id = request
1709 .id
1710 .as_ref()
1711 .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
1712
1713 if namespace_id.is_empty() {
1715 return Ok(());
1716 }
1717
1718 let object_id = namespace_id.join(DELIMITER);
1719 if self.manifest_contains_object(&object_id).await? {
1720 Ok(())
1721 } else {
1722 Err(Error::namespace_source(
1723 format!("Namespace '{}' not found", object_id).into(),
1724 ))
1725 }
1726 }
1727
1728 async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
1729 let table_id = request
1730 .id
1731 .as_ref()
1732 .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1733
1734 if table_id.is_empty() {
1735 return Err(Error::invalid_input_source(
1736 "Table ID cannot be empty".into(),
1737 ));
1738 }
1739
1740 let (namespace, table_name) = Self::split_object_id(table_id);
1741 let object_id = Self::build_object_id(&namespace, &table_name);
1742
1743 let existing = self.query_manifest_for_table(&object_id).await?;
1745 if existing.is_some() {
1746 return Err(Error::namespace_source(
1747 format!("Table '{}' already exists", table_name).into(),
1748 ));
1749 }
1750
1751 let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1755 format!("{}.lance", table_name)
1757 } else {
1758 Self::generate_dir_name(&object_id)
1760 };
1761 let table_path = self.base_path.child(dir_name.as_str());
1762 let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1763
1764 if let Some(req_location) = &request.location {
1766 let req_location = req_location.trim_end_matches('/');
1767 if req_location != table_uri {
1768 return Err(Error::namespace_source(
1769 format!(
1770 "Cannot declare table {} at location {}, must be at location {}",
1771 table_name, req_location, table_uri
1772 )
1773 .into(),
1774 ));
1775 }
1776 }
1777
1778 let reserved_file_path = table_path.child(".lance-reserved");
1780
1781 self.object_store
1782 .create(&reserved_file_path)
1783 .await
1784 .map_err(|e| {
1785 Error::namespace_source(
1786 format!(
1787 "Failed to create .lance-reserved file for table {}: {}",
1788 table_name, e
1789 )
1790 .into(),
1791 )
1792 })?
1793 .shutdown()
1794 .await
1795 .map_err(|e| {
1796 Error::namespace_source(
1797 format!(
1798 "Failed to finalize .lance-reserved file for table {}: {}",
1799 table_name, e
1800 )
1801 .into(),
1802 )
1803 })?;
1804
1805 self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1807 .await?;
1808
1809 log::info!(
1810 "Declared table '{}' in manifest at {}",
1811 table_name,
1812 table_uri
1813 );
1814
1815 let vend_credentials = request.vend_credentials.unwrap_or(true);
1817 let storage_options = if vend_credentials {
1818 self.storage_options.clone()
1819 } else {
1820 None
1821 };
1822
1823 Ok(DeclareTableResponse {
1824 location: Some(table_uri),
1825 storage_options,
1826 ..Default::default()
1827 })
1828 }
1829
1830 async fn register_table(&self, request: RegisterTableRequest) -> Result<RegisterTableResponse> {
1831 let table_id = request
1832 .id
1833 .as_ref()
1834 .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1835
1836 if table_id.is_empty() {
1837 return Err(Error::invalid_input_source(
1838 "Table ID cannot be empty".into(),
1839 ));
1840 }
1841
1842 let location = request.location.clone();
1843
1844 if location.contains("://") {
1847 return Err(Error::invalid_input_source(format!(
1848 "Absolute URIs are not allowed for register_table. Location must be a relative path within the root directory: {}",
1849 location
1850 ).into()));
1851 }
1852
1853 if location.starts_with('/') {
1854 return Err(Error::invalid_input_source(format!(
1855 "Absolute paths are not allowed for register_table. Location must be a relative path within the root directory: {}",
1856 location
1857 ).into()));
1858 }
1859
1860 if location.contains("..") {
1862 return Err(Error::invalid_input_source(format!(
1863 "Path traversal is not allowed. Location must be a relative path within the root directory: {}",
1864 location
1865 ).into()));
1866 }
1867
1868 let (namespace, table_name) = Self::split_object_id(table_id);
1869 let object_id = Self::build_object_id(&namespace, &table_name);
1870
1871 if !namespace.is_empty() {
1873 self.validate_namespace_levels_exist(&namespace).await?;
1874 }
1875
1876 if self.manifest_contains_object(&object_id).await? {
1878 return Err(Error::namespace_source(
1879 format!("Table '{}' already exists", object_id).into(),
1880 ));
1881 }
1882
1883 self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone()))
1885 .await?;
1886
1887 Ok(RegisterTableResponse {
1888 location: Some(location),
1889 ..Default::default()
1890 })
1891 }
1892
1893 async fn deregister_table(
1894 &self,
1895 request: DeregisterTableRequest,
1896 ) -> Result<DeregisterTableResponse> {
1897 let table_id = request
1898 .id
1899 .as_ref()
1900 .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1901
1902 if table_id.is_empty() {
1903 return Err(Error::invalid_input_source(
1904 "Table ID cannot be empty".into(),
1905 ));
1906 }
1907
1908 let (namespace, table_name) = Self::split_object_id(table_id);
1909 let object_id = Self::build_object_id(&namespace, &table_name);
1910
1911 let table_info = self.query_manifest_for_table(&object_id).await?;
1913
1914 let table_uri = match table_info {
1915 Some(info) => {
1916 self.delete_from_manifest(&object_id).boxed().await?;
1918 Self::construct_full_uri(&self.root, &info.location)?
1919 }
1920 None => {
1921 return Err(Error::namespace_source(
1922 format!("Table '{}' not found", object_id).into(),
1923 ));
1924 }
1925 };
1926
1927 Ok(DeregisterTableResponse {
1928 id: request.id.clone(),
1929 location: Some(table_uri),
1930 ..Default::default()
1931 })
1932 }
1933}
1934
1935#[cfg(test)]
1936mod tests {
1937 use crate::{DirectoryNamespaceBuilder, ManifestNamespace};
1938 use bytes::Bytes;
1939 use lance_core::utils::tempfile::TempStdDir;
1940 use lance_namespace::LanceNamespace;
1941 use lance_namespace::models::{
1942 CreateNamespaceRequest, CreateTableRequest, DescribeTableRequest, DropTableRequest,
1943 ListTablesRequest, TableExistsRequest,
1944 };
1945 use rstest::rstest;
1946
1947 fn create_test_ipc_data() -> Vec<u8> {
1948 use arrow::array::{Int32Array, StringArray};
1949 use arrow::datatypes::{DataType, Field, Schema};
1950 use arrow::ipc::writer::StreamWriter;
1951 use arrow::record_batch::RecordBatch;
1952 use std::sync::Arc;
1953
1954 let schema = Arc::new(Schema::new(vec![
1955 Field::new("id", DataType::Int32, false),
1956 Field::new("name", DataType::Utf8, false),
1957 ]));
1958
1959 let batch = RecordBatch::try_new(
1960 schema.clone(),
1961 vec![
1962 Arc::new(Int32Array::from(vec![1, 2, 3])),
1963 Arc::new(StringArray::from(vec!["a", "b", "c"])),
1964 ],
1965 )
1966 .unwrap();
1967
1968 let mut buffer = Vec::new();
1969 {
1970 let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap();
1971 writer.write(&batch).unwrap();
1972 writer.finish().unwrap();
1973 }
1974 buffer
1975 }
1976
1977 #[rstest]
1978 #[case::with_optimization(true)]
1979 #[case::without_optimization(false)]
1980 #[tokio::test]
1981 async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) {
1982 let temp_dir = TempStdDir::default();
1983 let temp_path = temp_dir.to_str().unwrap();
1984
1985 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
1987 .inline_optimization_enabled(inline_optimization)
1988 .build()
1989 .await
1990 .unwrap();
1991
1992 let mut request = ListTablesRequest::new();
1994 request.id = Some(vec![]);
1995 let response = dir_namespace.list_tables(request).await.unwrap();
1996 assert_eq!(response.tables.len(), 0);
1997
1998 let buffer = create_test_ipc_data();
2000 let mut create_request = CreateTableRequest::new();
2001 create_request.id = Some(vec!["test_table".to_string()]);
2002
2003 let _response = dir_namespace
2004 .create_table(create_request, Bytes::from(buffer))
2005 .await
2006 .unwrap();
2007
2008 let mut request = ListTablesRequest::new();
2010 request.id = Some(vec![]);
2011 let response = dir_namespace.list_tables(request).await.unwrap();
2012 assert_eq!(response.tables.len(), 1);
2013 assert_eq!(response.tables[0], "test_table");
2014 }
2015
2016 #[rstest]
2017 #[case::with_optimization(true)]
2018 #[case::without_optimization(false)]
2019 #[tokio::test]
2020 async fn test_manifest_namespace_table_exists(#[case] inline_optimization: bool) {
2021 let temp_dir = TempStdDir::default();
2022 let temp_path = temp_dir.to_str().unwrap();
2023
2024 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2025 .inline_optimization_enabled(inline_optimization)
2026 .build()
2027 .await
2028 .unwrap();
2029
2030 let mut request = TableExistsRequest::new();
2032 request.id = Some(vec!["nonexistent".to_string()]);
2033 let result = dir_namespace.table_exists(request).await;
2034 assert!(result.is_err());
2035
2036 let buffer = create_test_ipc_data();
2038 let mut create_request = CreateTableRequest::new();
2039 create_request.id = Some(vec!["test_table".to_string()]);
2040 dir_namespace
2041 .create_table(create_request, Bytes::from(buffer))
2042 .await
2043 .unwrap();
2044
2045 let mut request = TableExistsRequest::new();
2047 request.id = Some(vec!["test_table".to_string()]);
2048 let result = dir_namespace.table_exists(request).await;
2049 assert!(result.is_ok());
2050 }
2051
2052 #[rstest]
2053 #[case::with_optimization(true)]
2054 #[case::without_optimization(false)]
2055 #[tokio::test]
2056 async fn test_manifest_namespace_describe_table(#[case] inline_optimization: bool) {
2057 let temp_dir = TempStdDir::default();
2058 let temp_path = temp_dir.to_str().unwrap();
2059
2060 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2061 .inline_optimization_enabled(inline_optimization)
2062 .build()
2063 .await
2064 .unwrap();
2065
2066 let mut request = DescribeTableRequest::new();
2068 request.id = Some(vec!["nonexistent".to_string()]);
2069 let result = dir_namespace.describe_table(request).await;
2070 assert!(result.is_err());
2071
2072 let buffer = create_test_ipc_data();
2074 let mut create_request = CreateTableRequest::new();
2075 create_request.id = Some(vec!["test_table".to_string()]);
2076 dir_namespace
2077 .create_table(create_request, Bytes::from(buffer))
2078 .await
2079 .unwrap();
2080
2081 let mut request = DescribeTableRequest::new();
2083 request.id = Some(vec!["test_table".to_string()]);
2084 let response = dir_namespace.describe_table(request).await.unwrap();
2085 assert!(response.location.is_some());
2086 assert!(response.location.unwrap().contains("test_table"));
2087 }
2088
2089 #[rstest]
2090 #[case::with_optimization(true)]
2091 #[case::without_optimization(false)]
2092 #[tokio::test]
2093 async fn test_manifest_namespace_drop_table(#[case] inline_optimization: bool) {
2094 let temp_dir = TempStdDir::default();
2095 let temp_path = temp_dir.to_str().unwrap();
2096
2097 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2098 .inline_optimization_enabled(inline_optimization)
2099 .build()
2100 .await
2101 .unwrap();
2102
2103 let buffer = create_test_ipc_data();
2105 let mut create_request = CreateTableRequest::new();
2106 create_request.id = Some(vec!["test_table".to_string()]);
2107 dir_namespace
2108 .create_table(create_request, Bytes::from(buffer))
2109 .await
2110 .unwrap();
2111
2112 let mut request = ListTablesRequest::new();
2114 request.id = Some(vec![]);
2115 let response = dir_namespace.list_tables(request).await.unwrap();
2116 assert_eq!(response.tables.len(), 1);
2117
2118 let mut drop_request = DropTableRequest::new();
2120 drop_request.id = Some(vec!["test_table".to_string()]);
2121 let _response = dir_namespace.drop_table(drop_request).await.unwrap();
2122
2123 let mut request = ListTablesRequest::new();
2125 request.id = Some(vec![]);
2126 let response = dir_namespace.list_tables(request).await.unwrap();
2127 assert_eq!(response.tables.len(), 0);
2128 }
2129
2130 #[rstest]
2131 #[case::with_optimization(true)]
2132 #[case::without_optimization(false)]
2133 #[tokio::test]
2134 async fn test_manifest_namespace_multiple_tables(#[case] inline_optimization: bool) {
2135 let temp_dir = TempStdDir::default();
2136 let temp_path = temp_dir.to_str().unwrap();
2137
2138 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2139 .inline_optimization_enabled(inline_optimization)
2140 .build()
2141 .await
2142 .unwrap();
2143
2144 let buffer = create_test_ipc_data();
2146 for i in 1..=3 {
2147 let mut create_request = CreateTableRequest::new();
2148 create_request.id = Some(vec![format!("table{}", i)]);
2149 dir_namespace
2150 .create_table(create_request, Bytes::from(buffer.clone()))
2151 .await
2152 .unwrap();
2153 }
2154
2155 let mut request = ListTablesRequest::new();
2157 request.id = Some(vec![]);
2158 let response = dir_namespace.list_tables(request).await.unwrap();
2159 assert_eq!(response.tables.len(), 3);
2160 assert!(response.tables.contains(&"table1".to_string()));
2161 assert!(response.tables.contains(&"table2".to_string()));
2162 assert!(response.tables.contains(&"table3".to_string()));
2163 }
2164
2165 #[rstest]
2166 #[case::with_optimization(true)]
2167 #[case::without_optimization(false)]
2168 #[tokio::test]
2169 async fn test_directory_only_mode(#[case] inline_optimization: bool) {
2170 let temp_dir = TempStdDir::default();
2171 let temp_path = temp_dir.to_str().unwrap();
2172
2173 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2175 .manifest_enabled(false)
2176 .inline_optimization_enabled(inline_optimization)
2177 .build()
2178 .await
2179 .unwrap();
2180
2181 let mut request = ListTablesRequest::new();
2183 request.id = Some(vec![]);
2184 let response = dir_namespace.list_tables(request).await.unwrap();
2185 assert_eq!(response.tables.len(), 0);
2186
2187 let buffer = create_test_ipc_data();
2189 let mut create_request = CreateTableRequest::new();
2190 create_request.id = Some(vec!["test_table".to_string()]);
2191
2192 let _response = dir_namespace
2194 .create_table(create_request, Bytes::from(buffer))
2195 .await
2196 .unwrap();
2197
2198 let mut request = ListTablesRequest::new();
2200 request.id = Some(vec![]);
2201 let response = dir_namespace.list_tables(request).await.unwrap();
2202 assert_eq!(response.tables.len(), 1);
2203 assert_eq!(response.tables[0], "test_table");
2204 }
2205
2206 #[rstest]
2207 #[case::with_optimization(true)]
2208 #[case::without_optimization(false)]
2209 #[tokio::test]
2210 async fn test_dual_mode_merge(#[case] inline_optimization: bool) {
2211 let temp_dir = TempStdDir::default();
2212 let temp_path = temp_dir.to_str().unwrap();
2213
2214 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2216 .manifest_enabled(true)
2217 .dir_listing_enabled(true)
2218 .inline_optimization_enabled(inline_optimization)
2219 .build()
2220 .await
2221 .unwrap();
2222
2223 let buffer = create_test_ipc_data();
2225 let mut create_request = CreateTableRequest::new();
2226 create_request.id = Some(vec!["table1".to_string()]);
2227 dir_namespace
2228 .create_table(create_request, Bytes::from(buffer))
2229 .await
2230 .unwrap();
2231
2232 let mut request = ListTablesRequest::new();
2234 request.id = Some(vec![]);
2235 let response = dir_namespace.list_tables(request).await.unwrap();
2236 assert_eq!(response.tables.len(), 1);
2237 assert_eq!(response.tables[0], "table1");
2238 }
2239
2240 #[rstest]
2241 #[case::with_optimization(true)]
2242 #[case::without_optimization(false)]
2243 #[tokio::test]
2244 async fn test_manifest_only_mode(#[case] inline_optimization: bool) {
2245 let temp_dir = TempStdDir::default();
2246 let temp_path = temp_dir.to_str().unwrap();
2247
2248 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2250 .manifest_enabled(true)
2251 .dir_listing_enabled(false)
2252 .inline_optimization_enabled(inline_optimization)
2253 .build()
2254 .await
2255 .unwrap();
2256
2257 let buffer = create_test_ipc_data();
2259 let mut create_request = CreateTableRequest::new();
2260 create_request.id = Some(vec!["test_table".to_string()]);
2261 dir_namespace
2262 .create_table(create_request, Bytes::from(buffer))
2263 .await
2264 .unwrap();
2265
2266 let mut request = ListTablesRequest::new();
2268 request.id = Some(vec![]);
2269 let response = dir_namespace.list_tables(request).await.unwrap();
2270 assert_eq!(response.tables.len(), 1);
2271 assert_eq!(response.tables[0], "test_table");
2272 }
2273
2274 #[rstest]
2275 #[case::with_optimization(true)]
2276 #[case::without_optimization(false)]
2277 #[tokio::test]
2278 async fn test_drop_nonexistent_table(#[case] inline_optimization: bool) {
2279 let temp_dir = TempStdDir::default();
2280 let temp_path = temp_dir.to_str().unwrap();
2281
2282 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2283 .inline_optimization_enabled(inline_optimization)
2284 .build()
2285 .await
2286 .unwrap();
2287
2288 let mut drop_request = DropTableRequest::new();
2290 drop_request.id = Some(vec!["nonexistent".to_string()]);
2291 let result = dir_namespace.drop_table(drop_request).await;
2292 assert!(result.is_err());
2293 }
2294
2295 #[rstest]
2296 #[case::with_optimization(true)]
2297 #[case::without_optimization(false)]
2298 #[tokio::test]
2299 async fn test_create_duplicate_table_fails(#[case] inline_optimization: bool) {
2300 let temp_dir = TempStdDir::default();
2301 let temp_path = temp_dir.to_str().unwrap();
2302
2303 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2304 .inline_optimization_enabled(inline_optimization)
2305 .build()
2306 .await
2307 .unwrap();
2308
2309 let buffer = create_test_ipc_data();
2311 let mut create_request = CreateTableRequest::new();
2312 create_request.id = Some(vec!["test_table".to_string()]);
2313 dir_namespace
2314 .create_table(create_request, Bytes::from(buffer.clone()))
2315 .await
2316 .unwrap();
2317
2318 let mut create_request = CreateTableRequest::new();
2320 create_request.id = Some(vec!["test_table".to_string()]);
2321 let result = dir_namespace
2322 .create_table(create_request, Bytes::from(buffer))
2323 .await;
2324 assert!(result.is_err());
2325 }
2326
2327 #[rstest]
2328 #[case::with_optimization(true)]
2329 #[case::without_optimization(false)]
2330 #[tokio::test]
2331 async fn test_create_child_namespace(#[case] inline_optimization: bool) {
2332 use lance_namespace::models::{
2333 CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2334 };
2335
2336 let temp_dir = TempStdDir::default();
2337 let temp_path = temp_dir.to_str().unwrap();
2338
2339 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2340 .inline_optimization_enabled(inline_optimization)
2341 .build()
2342 .await
2343 .unwrap();
2344
2345 let mut create_req = CreateNamespaceRequest::new();
2347 create_req.id = Some(vec!["ns1".to_string()]);
2348 let result = dir_namespace.create_namespace(create_req).await;
2349 assert!(
2350 result.is_ok(),
2351 "Failed to create child namespace: {:?}",
2352 result.err()
2353 );
2354
2355 let exists_req = NamespaceExistsRequest {
2357 id: Some(vec!["ns1".to_string()]),
2358 ..Default::default()
2359 };
2360 let result = dir_namespace.namespace_exists(exists_req).await;
2361 assert!(result.is_ok(), "Namespace should exist");
2362
2363 let list_req = ListNamespacesRequest {
2365 id: Some(vec![]),
2366 page_token: None,
2367 limit: None,
2368 ..Default::default()
2369 };
2370 let result = dir_namespace.list_namespaces(list_req).await;
2371 assert!(result.is_ok());
2372 let namespaces = result.unwrap();
2373 assert_eq!(namespaces.namespaces.len(), 1);
2374 assert_eq!(namespaces.namespaces[0], "ns1");
2375 }
2376
2377 #[rstest]
2378 #[case::with_optimization(true)]
2379 #[case::without_optimization(false)]
2380 #[tokio::test]
2381 async fn test_create_nested_namespace(#[case] inline_optimization: bool) {
2382 use lance_namespace::models::{
2383 CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2384 };
2385
2386 let temp_dir = TempStdDir::default();
2387 let temp_path = temp_dir.to_str().unwrap();
2388
2389 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2390 .inline_optimization_enabled(inline_optimization)
2391 .build()
2392 .await
2393 .unwrap();
2394
2395 let mut create_req = CreateNamespaceRequest::new();
2397 create_req.id = Some(vec!["parent".to_string()]);
2398 dir_namespace.create_namespace(create_req).await.unwrap();
2399
2400 let mut create_req = CreateNamespaceRequest::new();
2402 create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2403 let result = dir_namespace.create_namespace(create_req).await;
2404 assert!(
2405 result.is_ok(),
2406 "Failed to create nested namespace: {:?}",
2407 result.err()
2408 );
2409
2410 let exists_req = NamespaceExistsRequest {
2412 id: Some(vec!["parent".to_string(), "child".to_string()]),
2413 ..Default::default()
2414 };
2415 let result = dir_namespace.namespace_exists(exists_req).await;
2416 assert!(result.is_ok(), "Nested namespace should exist");
2417
2418 let list_req = ListNamespacesRequest {
2420 id: Some(vec!["parent".to_string()]),
2421 page_token: None,
2422 limit: None,
2423 ..Default::default()
2424 };
2425 let result = dir_namespace.list_namespaces(list_req).await;
2426 assert!(result.is_ok());
2427 let namespaces = result.unwrap();
2428 assert_eq!(namespaces.namespaces.len(), 1);
2429 assert_eq!(namespaces.namespaces[0], "child");
2430 }
2431
2432 #[rstest]
2433 #[case::with_optimization(true)]
2434 #[case::without_optimization(false)]
2435 #[tokio::test]
2436 async fn test_create_namespace_without_parent_fails(#[case] inline_optimization: bool) {
2437 use lance_namespace::models::CreateNamespaceRequest;
2438
2439 let temp_dir = TempStdDir::default();
2440 let temp_path = temp_dir.to_str().unwrap();
2441
2442 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2443 .inline_optimization_enabled(inline_optimization)
2444 .build()
2445 .await
2446 .unwrap();
2447
2448 let mut create_req = CreateNamespaceRequest::new();
2450 create_req.id = Some(vec!["nonexistent_parent".to_string(), "child".to_string()]);
2451 let result = dir_namespace.create_namespace(create_req).await;
2452 assert!(result.is_err(), "Should fail when parent doesn't exist");
2453 }
2454
2455 #[rstest]
2456 #[case::with_optimization(true)]
2457 #[case::without_optimization(false)]
2458 #[tokio::test]
2459 async fn test_drop_child_namespace(#[case] inline_optimization: bool) {
2460 use lance_namespace::models::{
2461 CreateNamespaceRequest, DropNamespaceRequest, NamespaceExistsRequest,
2462 };
2463
2464 let temp_dir = TempStdDir::default();
2465 let temp_path = temp_dir.to_str().unwrap();
2466
2467 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2468 .inline_optimization_enabled(inline_optimization)
2469 .build()
2470 .await
2471 .unwrap();
2472
2473 let mut create_req = CreateNamespaceRequest::new();
2475 create_req.id = Some(vec!["ns1".to_string()]);
2476 dir_namespace.create_namespace(create_req).await.unwrap();
2477
2478 let mut drop_req = DropNamespaceRequest::new();
2480 drop_req.id = Some(vec!["ns1".to_string()]);
2481 let result = dir_namespace.drop_namespace(drop_req).await;
2482 assert!(
2483 result.is_ok(),
2484 "Failed to drop namespace: {:?}",
2485 result.err()
2486 );
2487
2488 let exists_req = NamespaceExistsRequest {
2490 id: Some(vec!["ns1".to_string()]),
2491 ..Default::default()
2492 };
2493 let result = dir_namespace.namespace_exists(exists_req).await;
2494 assert!(result.is_err(), "Namespace should not exist after drop");
2495 }
2496
2497 #[rstest]
2498 #[case::with_optimization(true)]
2499 #[case::without_optimization(false)]
2500 #[tokio::test]
2501 async fn test_drop_namespace_with_children_fails(#[case] inline_optimization: bool) {
2502 use lance_namespace::models::{CreateNamespaceRequest, DropNamespaceRequest};
2503
2504 let temp_dir = TempStdDir::default();
2505 let temp_path = temp_dir.to_str().unwrap();
2506
2507 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2508 .inline_optimization_enabled(inline_optimization)
2509 .build()
2510 .await
2511 .unwrap();
2512
2513 let mut create_req = CreateNamespaceRequest::new();
2515 create_req.id = Some(vec!["parent".to_string()]);
2516 dir_namespace.create_namespace(create_req).await.unwrap();
2517
2518 let mut create_req = CreateNamespaceRequest::new();
2519 create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2520 dir_namespace.create_namespace(create_req).await.unwrap();
2521
2522 let mut drop_req = DropNamespaceRequest::new();
2524 drop_req.id = Some(vec!["parent".to_string()]);
2525 let result = dir_namespace.drop_namespace(drop_req).await;
2526 assert!(result.is_err(), "Should fail when namespace has children");
2527 }
2528
2529 #[rstest]
2530 #[case::with_optimization(true)]
2531 #[case::without_optimization(false)]
2532 #[tokio::test]
2533 async fn test_create_table_in_child_namespace(#[case] inline_optimization: bool) {
2534 use lance_namespace::models::{
2535 CreateNamespaceRequest, CreateTableRequest, ListTablesRequest,
2536 };
2537
2538 let temp_dir = TempStdDir::default();
2539 let temp_path = temp_dir.to_str().unwrap();
2540
2541 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2542 .inline_optimization_enabled(inline_optimization)
2543 .build()
2544 .await
2545 .unwrap();
2546
2547 let mut create_ns_req = CreateNamespaceRequest::new();
2549 create_ns_req.id = Some(vec!["ns1".to_string()]);
2550 dir_namespace.create_namespace(create_ns_req).await.unwrap();
2551
2552 let buffer = create_test_ipc_data();
2554 let mut create_table_req = CreateTableRequest::new();
2555 create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
2556 let result = dir_namespace
2557 .create_table(create_table_req, Bytes::from(buffer))
2558 .await;
2559 assert!(
2560 result.is_ok(),
2561 "Failed to create table in child namespace: {:?}",
2562 result.err()
2563 );
2564
2565 let list_req = ListTablesRequest {
2567 id: Some(vec!["ns1".to_string()]),
2568 page_token: None,
2569 limit: None,
2570 ..Default::default()
2571 };
2572 let result = dir_namespace.list_tables(list_req).await;
2573 assert!(result.is_ok());
2574 let tables = result.unwrap();
2575 assert_eq!(tables.tables.len(), 1);
2576 assert_eq!(tables.tables[0], "table1");
2577 }
2578
2579 #[rstest]
2580 #[case::with_optimization(true)]
2581 #[case::without_optimization(false)]
2582 #[tokio::test]
2583 async fn test_describe_child_namespace(#[case] inline_optimization: bool) {
2584 use lance_namespace::models::{CreateNamespaceRequest, DescribeNamespaceRequest};
2585
2586 let temp_dir = TempStdDir::default();
2587 let temp_path = temp_dir.to_str().unwrap();
2588
2589 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2590 .inline_optimization_enabled(inline_optimization)
2591 .build()
2592 .await
2593 .unwrap();
2594
2595 let mut properties = std::collections::HashMap::new();
2597 properties.insert("key1".to_string(), "value1".to_string());
2598
2599 let mut create_req = CreateNamespaceRequest::new();
2600 create_req.id = Some(vec!["ns1".to_string()]);
2601 create_req.properties = Some(properties.clone());
2602 dir_namespace.create_namespace(create_req).await.unwrap();
2603
2604 let describe_req = DescribeNamespaceRequest {
2606 id: Some(vec!["ns1".to_string()]),
2607 ..Default::default()
2608 };
2609 let result = dir_namespace.describe_namespace(describe_req).await;
2610 assert!(
2611 result.is_ok(),
2612 "Failed to describe namespace: {:?}",
2613 result.err()
2614 );
2615 let response = result.unwrap();
2616 assert!(response.properties.is_some());
2617 assert_eq!(
2618 response.properties.unwrap().get("key1"),
2619 Some(&"value1".to_string())
2620 );
2621 }
2622
2623 #[rstest]
2624 #[case::with_optimization(true)]
2625 #[case::without_optimization(false)]
2626 #[tokio::test]
2627 async fn test_concurrent_create_and_drop_single_instance(#[case] inline_optimization: bool) {
2628 use futures::future::join_all;
2629 use std::sync::Arc;
2630
2631 let temp_dir = TempStdDir::default();
2632 let temp_path = temp_dir.to_str().unwrap();
2633
2634 let dir_namespace = Arc::new(
2635 DirectoryNamespaceBuilder::new(temp_path)
2636 .inline_optimization_enabled(inline_optimization)
2637 .build()
2638 .await
2639 .unwrap(),
2640 );
2641
2642 let mut create_ns_request = CreateNamespaceRequest::new();
2645 create_ns_request.id = Some(vec!["test_ns".to_string()]);
2646 dir_namespace
2647 .create_namespace(create_ns_request)
2648 .await
2649 .unwrap();
2650
2651 let num_tables = 10;
2652 let mut handles = Vec::new();
2653
2654 for i in 0..num_tables {
2655 let ns = dir_namespace.clone();
2656 let handle = async move {
2657 let table_name = format!("concurrent_table_{}", i);
2658 let table_id = vec!["test_ns".to_string(), table_name.clone()];
2659 let buffer = create_test_ipc_data();
2660
2661 let mut create_request = CreateTableRequest::new();
2663 create_request.id = Some(table_id.clone());
2664 ns.create_table(create_request, Bytes::from(buffer))
2665 .await
2666 .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
2667
2668 let mut drop_request = DropTableRequest::new();
2670 drop_request.id = Some(table_id);
2671 ns.drop_table(drop_request)
2672 .await
2673 .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
2674
2675 Ok::<_, lance_core::Error>(())
2676 };
2677 handles.push(handle);
2678 }
2679
2680 let results = join_all(handles).await;
2681 for result in results {
2682 assert!(result.is_ok(), "All concurrent operations should succeed");
2683 }
2684
2685 let mut request = ListTablesRequest::new();
2687 request.id = Some(vec!["test_ns".to_string()]);
2688 let response = dir_namespace.list_tables(request).await.unwrap();
2689 assert_eq!(response.tables.len(), 0, "All tables should be dropped");
2690 }
2691
2692 #[rstest]
2693 #[case::with_optimization(true)]
2694 #[case::without_optimization(false)]
2695 #[tokio::test]
2696 async fn test_concurrent_create_and_drop_multiple_instances(#[case] inline_optimization: bool) {
2697 use futures::future::join_all;
2698
2699 let temp_dir = TempStdDir::default();
2700 let temp_path = temp_dir.to_str().unwrap().to_string();
2701
2702 let init_ns = DirectoryNamespaceBuilder::new(&temp_path)
2705 .inline_optimization_enabled(inline_optimization)
2706 .build()
2707 .await
2708 .unwrap();
2709 let mut create_ns_request = CreateNamespaceRequest::new();
2710 create_ns_request.id = Some(vec!["test_ns".to_string()]);
2711 init_ns.create_namespace(create_ns_request).await.unwrap();
2712
2713 let num_tables = 10;
2714 let mut handles = Vec::new();
2715
2716 for i in 0..num_tables {
2717 let path = temp_path.clone();
2718 let handle = async move {
2719 let ns = DirectoryNamespaceBuilder::new(&path)
2721 .inline_optimization_enabled(inline_optimization)
2722 .build()
2723 .await
2724 .unwrap();
2725
2726 let table_name = format!("multi_ns_table_{}", i);
2727 let table_id = vec!["test_ns".to_string(), table_name.clone()];
2728 let buffer = create_test_ipc_data();
2729
2730 let mut create_request = CreateTableRequest::new();
2732 create_request.id = Some(table_id.clone());
2733 ns.create_table(create_request, Bytes::from(buffer))
2734 .await
2735 .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
2736
2737 let mut drop_request = DropTableRequest::new();
2739 drop_request.id = Some(table_id);
2740 ns.drop_table(drop_request)
2741 .await
2742 .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
2743
2744 Ok::<_, lance_core::Error>(())
2745 };
2746 handles.push(handle);
2747 }
2748
2749 let results = join_all(handles).await;
2750 for result in results {
2751 assert!(result.is_ok(), "All concurrent operations should succeed");
2752 }
2753
2754 let verify_ns = DirectoryNamespaceBuilder::new(&temp_path)
2756 .inline_optimization_enabled(inline_optimization)
2757 .build()
2758 .await
2759 .unwrap();
2760
2761 let mut request = ListTablesRequest::new();
2762 request.id = Some(vec!["test_ns".to_string()]);
2763 let response = verify_ns.list_tables(request).await.unwrap();
2764 assert_eq!(response.tables.len(), 0, "All tables should be dropped");
2765 }
2766
2767 #[rstest]
2768 #[case::with_optimization(true)]
2769 #[case::without_optimization(false)]
2770 #[tokio::test]
2771 async fn test_concurrent_create_then_drop_from_different_instance(
2772 #[case] inline_optimization: bool,
2773 ) {
2774 use futures::future::join_all;
2775
2776 let temp_dir = TempStdDir::default();
2777 let temp_path = temp_dir.to_str().unwrap().to_string();
2778
2779 let init_ns = DirectoryNamespaceBuilder::new(&temp_path)
2782 .inline_optimization_enabled(inline_optimization)
2783 .build()
2784 .await
2785 .unwrap();
2786 let mut create_ns_request = CreateNamespaceRequest::new();
2787 create_ns_request.id = Some(vec!["test_ns".to_string()]);
2788 init_ns.create_namespace(create_ns_request).await.unwrap();
2789
2790 let num_tables = 10;
2791
2792 let mut create_handles = Vec::new();
2794 for i in 0..num_tables {
2795 let path = temp_path.clone();
2796 let handle = async move {
2797 let ns = DirectoryNamespaceBuilder::new(&path)
2798 .inline_optimization_enabled(inline_optimization)
2799 .build()
2800 .await
2801 .unwrap();
2802
2803 let table_name = format!("cross_instance_table_{}", i);
2804 let table_id = vec!["test_ns".to_string(), table_name.clone()];
2805 let buffer = create_test_ipc_data();
2806
2807 let mut create_request = CreateTableRequest::new();
2808 create_request.id = Some(table_id);
2809 ns.create_table(create_request, Bytes::from(buffer))
2810 .await
2811 .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
2812
2813 Ok::<_, lance_core::Error>(())
2814 };
2815 create_handles.push(handle);
2816 }
2817
2818 let create_results = join_all(create_handles).await;
2819 for result in create_results {
2820 assert!(result.is_ok(), "All create operations should succeed");
2821 }
2822
2823 let mut drop_handles = Vec::new();
2825 for i in 0..num_tables {
2826 let path = temp_path.clone();
2827 let handle = async move {
2828 let ns = DirectoryNamespaceBuilder::new(&path)
2829 .inline_optimization_enabled(inline_optimization)
2830 .build()
2831 .await
2832 .unwrap();
2833
2834 let table_name = format!("cross_instance_table_{}", i);
2835 let table_id = vec!["test_ns".to_string(), table_name.clone()];
2836
2837 let mut drop_request = DropTableRequest::new();
2838 drop_request.id = Some(table_id);
2839 ns.drop_table(drop_request)
2840 .await
2841 .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
2842
2843 Ok::<_, lance_core::Error>(())
2844 };
2845 drop_handles.push(handle);
2846 }
2847
2848 let drop_results = join_all(drop_handles).await;
2849 for result in drop_results {
2850 assert!(result.is_ok(), "All drop operations should succeed");
2851 }
2852
2853 let verify_ns = DirectoryNamespaceBuilder::new(&temp_path)
2855 .inline_optimization_enabled(inline_optimization)
2856 .build()
2857 .await
2858 .unwrap();
2859
2860 let mut request = ListTablesRequest::new();
2861 request.id = Some(vec!["test_ns".to_string()]);
2862 let response = verify_ns.list_tables(request).await.unwrap();
2863 assert_eq!(response.tables.len(), 0, "All tables should be dropped");
2864 }
2865
2866 #[test]
2867 fn test_construct_full_uri_with_cloud_urls() {
2868 let s3_result =
2870 ManifestNamespace::construct_full_uri("s3://bucket/path/subdir", "table.lance")
2871 .unwrap();
2872 assert_eq!(
2873 s3_result, "s3://bucket/path/subdir/table.lance",
2874 "S3 URL should correctly append table name to nested path"
2875 );
2876
2877 let az_result =
2879 ManifestNamespace::construct_full_uri("az://container/path/subdir", "table.lance")
2880 .unwrap();
2881 assert_eq!(
2882 az_result, "az://container/path/subdir/table.lance",
2883 "Azure URL should correctly append table name to nested path"
2884 );
2885
2886 let gs_result =
2888 ManifestNamespace::construct_full_uri("gs://bucket/path/subdir", "table.lance")
2889 .unwrap();
2890 assert_eq!(
2891 gs_result, "gs://bucket/path/subdir/table.lance",
2892 "GCS URL should correctly append table name to nested path"
2893 );
2894
2895 let deep_result =
2897 ManifestNamespace::construct_full_uri("s3://bucket/a/b/c/d", "my_table.lance").unwrap();
2898 assert_eq!(
2899 deep_result, "s3://bucket/a/b/c/d/my_table.lance",
2900 "Deeply nested path should work correctly"
2901 );
2902
2903 let shallow_result =
2905 ManifestNamespace::construct_full_uri("s3://bucket", "table.lance").unwrap();
2906 assert_eq!(
2907 shallow_result, "s3://bucket/table.lance",
2908 "Single-level nested path should work correctly"
2909 );
2910
2911 let trailing_slash_result =
2913 ManifestNamespace::construct_full_uri("s3://bucket/path/subdir/", "table.lance")
2914 .unwrap();
2915 assert_eq!(
2916 trailing_slash_result, "s3://bucket/path/subdir/table.lance",
2917 "URL with existing trailing slash should still work"
2918 );
2919 }
2920
2921 #[tokio::test]
2928 async fn test_concurrent_create_table_no_duplicates() {
2929 let temp_dir = TempStdDir::default();
2930 let temp_path = temp_dir.to_str().unwrap();
2931
2932 let ns1 = DirectoryNamespaceBuilder::new(temp_path)
2935 .inline_optimization_enabled(false)
2936 .build()
2937 .await
2938 .unwrap();
2939 let ns2 = DirectoryNamespaceBuilder::new(temp_path)
2940 .inline_optimization_enabled(false)
2941 .build()
2942 .await
2943 .unwrap();
2944
2945 let buffer = create_test_ipc_data();
2946
2947 let mut req1 = CreateTableRequest::new();
2948 req1.id = Some(vec!["race_table".to_string()]);
2949 let mut req2 = CreateTableRequest::new();
2950 req2.id = Some(vec!["race_table".to_string()]);
2951
2952 let (result1, result2) = tokio::join!(
2954 ns1.create_table(req1, Bytes::from(buffer.clone())),
2955 ns2.create_table(req2, Bytes::from(buffer.clone())),
2956 );
2957
2958 let success_count = [&result1, &result2].iter().filter(|r| r.is_ok()).count();
2960 let failure_count = [&result1, &result2].iter().filter(|r| r.is_err()).count();
2961 assert_eq!(
2962 success_count, 1,
2963 "Exactly one create should succeed, got: result1={:?}, result2={:?}",
2964 result1, result2
2965 );
2966 assert_eq!(
2967 failure_count, 1,
2968 "Exactly one create should fail, got: result1={:?}, result2={:?}",
2969 result1, result2
2970 );
2971
2972 let ns_check = DirectoryNamespaceBuilder::new(temp_path)
2974 .inline_optimization_enabled(false)
2975 .build()
2976 .await
2977 .unwrap();
2978 let mut list_request = ListTablesRequest::new();
2979 list_request.id = Some(vec![]);
2980 let response = ns_check.list_tables(list_request).await.unwrap();
2981 assert_eq!(
2982 response.tables.len(),
2983 1,
2984 "Should have exactly 1 table, found: {:?}",
2985 response.tables
2986 );
2987 assert_eq!(response.tables[0], "race_table");
2988
2989 let mut describe_request = DescribeTableRequest::new();
2991 describe_request.id = Some(vec!["race_table".to_string()]);
2992 let describe_result = ns_check.describe_table(describe_request).await;
2993 assert!(
2994 describe_result.is_ok(),
2995 "describe_table should not fail with duplicate entries: {:?}",
2996 describe_result
2997 );
2998 }
2999}