1use arrow::array::builder::{ListBuilder, StringBuilder};
10use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray};
11use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
12use arrow_ipc::reader::StreamReader;
13use async_trait::async_trait;
14use bytes::Bytes;
15use futures::{FutureExt, TryStreamExt, stream::StreamExt};
16use lance::dataset::optimize::{CompactionOptions, compact_files};
17use lance::dataset::{
18 DeleteBuilder, MergeInsertBuilder, ReadParams, WhenMatched, WhenNotMatched, WriteMode,
19 WriteParams, builder::DatasetBuilder,
20};
21use lance::index::DatasetIndexExt;
22use lance::session::Session;
23use lance::{Dataset, dataset::scanner::Scanner};
24use lance_core::Error as LanceError;
25use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION;
26use lance_core::{Error, Result};
27use lance_index::IndexType;
28use lance_index::optimize::OptimizeOptions;
29use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
30use lance_io::object_store::{ObjectStore, ObjectStoreParams};
31use lance_namespace::LanceNamespace;
32use lance_namespace::error::NamespaceError;
33use lance_namespace::models::{
34 CreateNamespaceRequest, CreateNamespaceResponse, CreateTableRequest, CreateTableResponse,
35 DeclareTableRequest, DeclareTableResponse, DeregisterTableRequest, DeregisterTableResponse,
36 DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
37 DescribeTableResponse, DescribeTableVersionResponse, DropNamespaceRequest,
38 DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest,
39 ListNamespacesResponse, ListTableVersionsResponse, ListTablesRequest, ListTablesResponse,
40 NamespaceExistsRequest, RegisterTableRequest, RegisterTableResponse, TableExistsRequest,
41 TableVersion,
42};
43use lance_namespace::schema::arrow_schema_to_json;
44use object_store::{Error as ObjectStoreError, path::Path};
45use std::io::Cursor;
46use std::{
47 collections::HashMap,
48 hash::{DefaultHasher, Hash, Hasher},
49 ops::{Deref, DerefMut},
50 sync::Arc,
51};
52use tokio::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
53
54const MANIFEST_TABLE_NAME: &str = "__manifest";
55const DELIMITER: &str = "$";
56pub(crate) const DECLARED_FILTER_CONCURRENCY: usize = 16;
59
60const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
63const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
65const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
67const MANIFEST_INLINE_OPTIMIZATION_FRAGMENT_THRESHOLD: usize = 8;
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum ObjectType {
74 Namespace,
75 Table,
76 TableVersion,
77}
78
79impl ObjectType {
80 pub fn as_str(&self) -> &str {
81 match self {
82 Self::Namespace => "namespace",
83 Self::Table => "table",
84 Self::TableVersion => "table_version",
85 }
86 }
87
88 pub fn parse(s: &str) -> Result<Self> {
89 match s {
90 "namespace" => Ok(Self::Namespace),
91 "table" => Ok(Self::Table),
92 "table_version" => Ok(Self::TableVersion),
93 _ => Err(NamespaceError::Internal {
94 message: format!("Invalid object type: {}", s),
95 }
96 .into()),
97 }
98 }
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102enum CreateTableMode {
103 Create,
104 ExistOk,
105 Overwrite,
106}
107
108impl CreateTableMode {
109 fn parse(mode: Option<&str>) -> Result<Self> {
110 match mode {
111 None => Ok(Self::Create),
112 Some(mode) if mode.eq_ignore_ascii_case("create") => Ok(Self::Create),
113 Some(mode)
114 if mode.eq_ignore_ascii_case("existok")
115 || mode.eq_ignore_ascii_case("exist_ok") =>
116 {
117 Ok(Self::ExistOk)
118 }
119 Some(mode) if mode.eq_ignore_ascii_case("overwrite") => Ok(Self::Overwrite),
120 Some(mode) => Err(NamespaceError::InvalidInput {
121 message: format!(
122 "Unsupported create_table mode '{}'. Supported modes are: 'Create', 'ExistOk', 'Overwrite'",
123 mode
124 ),
125 }
126 .into()),
127 }
128 }
129
130 fn write_mode(self) -> WriteMode {
131 match self {
132 Self::Overwrite => WriteMode::Overwrite,
133 Self::Create | Self::ExistOk => WriteMode::Create,
134 }
135 }
136}
137
138#[derive(Debug, Clone)]
140pub struct TableInfo {
141 pub namespace: Vec<String>,
142 pub name: String,
143 pub location: String,
144 pub metadata: Option<HashMap<String, String>>,
145}
146
147#[derive(Debug, Clone)]
152pub struct ManifestEntry {
153 pub object_id: String,
155 pub object_type: ObjectType,
157 pub location: Option<String>,
159 pub metadata: Option<String>,
161}
162
163#[derive(Debug, Clone)]
165pub struct NamespaceInfo {
166 pub namespace: Vec<String>,
167 pub name: String,
168 pub metadata: Option<HashMap<String, String>>,
169}
170
171#[derive(Debug, Clone)]
176pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);
177
178impl DatasetConsistencyWrapper {
179 pub fn new(dataset: Dataset) -> Self {
181 Self(Arc::new(RwLock::new(dataset)))
182 }
183
184 pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
187 self.reload().await?;
188 Ok(DatasetReadGuard {
189 guard: self.0.read().await,
190 })
191 }
192
193 pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
196 self.reload().await?;
197 Ok(DatasetWriteGuard {
198 guard: self.0.write().await,
199 })
200 }
201
202 pub async fn set_latest(&self, dataset: Dataset) {
207 let mut write_guard = self.0.write().await;
208 if dataset.manifest().version > write_guard.manifest().version {
209 *write_guard = dataset;
210 }
211 }
212
213 async fn reload(&self) -> Result<()> {
215 let read_guard = self.0.read().await;
217 let dataset_uri = read_guard.uri().to_string();
218 let current_version = read_guard.version().version;
219 log::debug!(
220 "Reload starting for uri={}, current_version={}",
221 dataset_uri,
222 current_version
223 );
224 let latest_version = read_guard.latest_version_id().await.map_err(|e| {
225 lance_core::Error::from(NamespaceError::Internal {
226 message: format!("Failed to get latest version: {:?}", e),
227 })
228 })?;
229 log::debug!(
230 "Reload got latest_version={} for uri={}, current_version={}",
231 latest_version,
232 dataset_uri,
233 current_version
234 );
235 drop(read_guard);
236
237 if latest_version == current_version {
239 log::debug!("Already up-to-date for uri={}", dataset_uri);
240 return Ok(());
241 }
242
243 let mut write_guard = self.0.write().await;
245
246 let latest_version = write_guard.latest_version_id().await.map_err(|e| {
248 lance_core::Error::from(NamespaceError::Internal {
249 message: format!("Failed to get latest version: {:?}", e),
250 })
251 })?;
252
253 if latest_version != write_guard.version().version {
254 write_guard.checkout_latest().await.map_err(|e| {
255 lance_core::Error::from(NamespaceError::Internal {
256 message: format!("Failed to checkout latest: {:?}", e),
257 })
258 })?;
259 }
260
261 Ok(())
262 }
263}
264
265pub struct DatasetReadGuard<'a> {
266 guard: RwLockReadGuard<'a, Dataset>,
267}
268
269impl Deref for DatasetReadGuard<'_> {
270 type Target = Dataset;
271
272 fn deref(&self) -> &Self::Target {
273 &self.guard
274 }
275}
276
277pub struct DatasetWriteGuard<'a> {
278 guard: RwLockWriteGuard<'a, Dataset>,
279}
280
281impl Deref for DatasetWriteGuard<'_> {
282 type Target = Dataset;
283
284 fn deref(&self) -> &Self::Target {
285 &self.guard
286 }
287}
288
289impl DerefMut for DatasetWriteGuard<'_> {
290 fn deref_mut(&mut self) -> &mut Self::Target {
291 &mut self.guard
292 }
293}
294
295pub struct ManifestNamespace {
299 root: String,
300 storage_options: Option<HashMap<String, String>>,
301 session: Option<Arc<Session>>,
302 object_store: Arc<ObjectStore>,
303 base_path: Path,
304 manifest_dataset: DatasetConsistencyWrapper,
305 dir_listing_enabled: bool,
309 inline_optimization_enabled: bool,
312 commit_retries: Option<u32>,
315 manifest_mutation_lock: Arc<Mutex<()>>,
318}
319
320impl std::fmt::Debug for ManifestNamespace {
321 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322 f.debug_struct("ManifestNamespace")
323 .field("root", &self.root)
324 .field("storage_options", &self.storage_options)
325 .field("dir_listing_enabled", &self.dir_listing_enabled)
326 .field(
327 "inline_optimization_enabled",
328 &self.inline_optimization_enabled,
329 )
330 .finish()
331 }
332}
333
334fn convert_lance_commit_error(e: &LanceError, operation: &str, object_id: Option<&str>) -> Error {
343 match e {
344 LanceError::CommitConflict { .. } => NamespaceError::Throttling {
346 message: format!("Too many concurrent writes, please retry later: {:?}", e),
347 }
348 .into(),
349 LanceError::TooMuchWriteContention { .. } | LanceError::IncompatibleTransaction { .. } => {
352 let message = if let Some(id) = object_id {
353 format!(
354 "Object '{}' was concurrently modified by another operation: {:?}",
355 id, e
356 )
357 } else {
358 format!(
359 "Object was concurrently modified by another operation: {:?}",
360 e
361 )
362 };
363 NamespaceError::ConcurrentModification { message }.into()
364 }
365 _ => {
367 let error_msg = e.to_string();
368 if error_msg.contains("matched")
369 || error_msg.contains("duplicate")
370 || error_msg.contains("already exists")
371 {
372 let message = if let Some(id) = object_id {
373 format!(
374 "Object '{}' was concurrently created by another operation: {:?}",
375 id, e
376 )
377 } else {
378 format!(
379 "Object was concurrently created by another operation: {:?}",
380 e
381 )
382 };
383 return NamespaceError::ConcurrentModification { message }.into();
384 }
385 lance_core::Error::from(NamespaceError::Internal {
386 message: format!("{}: {:?}", operation, e),
387 })
388 }
389 }
390}
391
392impl ManifestNamespace {
393 #[allow(clippy::too_many_arguments)]
395 pub async fn from_directory(
396 root: String,
397 storage_options: Option<HashMap<String, String>>,
398 session: Option<Arc<Session>>,
399 object_store: Arc<ObjectStore>,
400 base_path: Path,
401 dir_listing_enabled: bool,
402 inline_optimization_enabled: bool,
403 commit_retries: Option<u32>,
404 table_version_storage_enabled: bool,
405 ) -> Result<Self> {
406 let manifest_dataset = Self::ensure_manifest_table_up_to_date(
407 &root,
408 &storage_options,
409 session.clone(),
410 table_version_storage_enabled,
411 )
412 .await?;
413
414 Ok(Self {
415 root,
416 storage_options,
417 session,
418 object_store,
419 base_path,
420 manifest_dataset,
421 dir_listing_enabled,
422 inline_optimization_enabled,
423 commit_retries,
424 manifest_mutation_lock: Arc::new(Mutex::new(())),
425 })
426 }
427
428 pub fn build_object_id(namespace: &[String], name: &str) -> String {
430 if namespace.is_empty() {
431 name.to_string()
432 } else {
433 let mut id = namespace.join(DELIMITER);
434 id.push_str(DELIMITER);
435 id.push_str(name);
436 id
437 }
438 }
439
440 pub fn parse_object_id(object_id: &str) -> (Vec<String>, String) {
442 let parts: Vec<&str> = object_id.split(DELIMITER).collect();
443 if parts.len() == 1 {
444 (Vec::new(), parts[0].to_string())
445 } else {
446 let namespace = parts[..parts.len() - 1]
447 .iter()
448 .map(|s| s.to_string())
449 .collect();
450 let name = parts[parts.len() - 1].to_string();
451 (namespace, name)
452 }
453 }
454
455 pub fn split_object_id(object_id: &[String]) -> (Vec<String>, String) {
457 if object_id.len() == 1 {
458 (vec![], object_id[0].clone())
459 } else {
460 (
461 object_id[..object_id.len() - 1].to_vec(),
462 object_id[object_id.len() - 1].clone(),
463 )
464 }
465 }
466
467 pub fn str_object_id(object_id: &[String]) -> String {
469 object_id.join(DELIMITER)
470 }
471
472 fn format_table_id(table_id: &[String]) -> String {
473 format!("table id '{}'", Self::str_object_id(table_id))
474 }
475
476 pub fn format_table_version(version: i64) -> String {
481 format!("{:020}", version)
482 }
483
484 pub fn build_version_object_id(table_object_id: &str, version: i64) -> String {
488 format!(
489 "{}{}{}",
490 table_object_id,
491 DELIMITER,
492 Self::format_table_version(version)
493 )
494 }
495
496 pub fn parse_version_from_object_id(object_id: &str) -> Option<i64> {
500 let (_namespace, name) = Self::parse_object_id(object_id);
501 name.parse::<i64>().ok()
502 }
503
504 pub fn generate_dir_name(object_id: &str) -> String {
511 let random_num: u64 = rand::random();
513
514 let mut hasher = DefaultHasher::new();
516 random_num.hash(&mut hasher);
517 object_id.hash(&mut hasher);
518 let hash = hasher.finish();
519
520 format!("{:08x}_{}", (hash & 0xFFFFFFFF) as u32, object_id)
522 }
523
524 pub(crate) fn construct_full_uri(root: &str, relative_location: &str) -> Result<String> {
526 let mut base_url = lance_io::object_store::uri_to_url(root)?;
527
528 if !base_url.path().ends_with('/') {
533 base_url.set_path(&format!("{}/", base_url.path()));
534 }
535
536 let mut full_url = base_url.clone();
537 full_url
538 .path_segments_mut()
539 .map_err(|_| {
540 lance_core::Error::from(NamespaceError::InvalidInput {
541 message: format!("Cannot modify path segments for URI '{}'", root),
542 })
543 })?
544 .pop_if_empty()
545 .extend(
546 relative_location
547 .split('/')
548 .filter(|segment| !segment.is_empty()),
549 );
550
551 full_url.set_query(None);
555
556 Ok(full_url.to_string())
557 }
558
559 async fn run_inline_optimization(&self) -> Result<()> {
571 if !self.inline_optimization_enabled {
572 return Ok(());
573 }
574
575 let mut dataset_guard = self.manifest_dataset.get_mut().await?;
577 let dataset: &mut Dataset = &mut dataset_guard;
578
579 let indices = dataset.load_indices().await?;
581
582 let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
584 let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
585 let has_base_objects_index = indices
586 .iter()
587 .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);
588
589 if !has_object_id_index {
591 log::debug!(
592 "Creating BTREE index '{}' on object_id for __manifest table",
593 OBJECT_ID_INDEX_NAME
594 );
595 let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
596 if let Err(e) = dataset
597 .create_index(
598 &["object_id"],
599 IndexType::BTree,
600 Some(OBJECT_ID_INDEX_NAME.to_string()),
601 ¶ms,
602 true,
603 )
604 .await
605 {
606 log::warn!(
607 "Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.",
608 e
609 );
610 } else {
611 log::info!(
612 "Created BTREE index '{}' on object_id for __manifest table",
613 OBJECT_ID_INDEX_NAME
614 );
615 }
616 }
617
618 if !has_object_type_index {
620 log::debug!(
621 "Creating Bitmap index '{}' on object_type for __manifest table",
622 OBJECT_TYPE_INDEX_NAME
623 );
624 let params = ScalarIndexParams::default();
625 if let Err(e) = dataset
626 .create_index(
627 &["object_type"],
628 IndexType::Bitmap,
629 Some(OBJECT_TYPE_INDEX_NAME.to_string()),
630 ¶ms,
631 true,
632 )
633 .await
634 {
635 log::warn!(
636 "Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.",
637 e
638 );
639 } else {
640 log::info!(
641 "Created Bitmap index '{}' on object_type for __manifest table",
642 OBJECT_TYPE_INDEX_NAME
643 );
644 }
645 }
646
647 if !has_base_objects_index {
649 log::debug!(
650 "Creating LabelList index '{}' on base_objects for __manifest table",
651 BASE_OBJECTS_INDEX_NAME
652 );
653 let params = ScalarIndexParams::default();
654 if let Err(e) = dataset
655 .create_index(
656 &["base_objects"],
657 IndexType::LabelList,
658 Some(BASE_OBJECTS_INDEX_NAME.to_string()),
659 ¶ms,
660 true,
661 )
662 .await
663 {
664 log::warn!(
665 "Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.",
666 e
667 );
668 } else {
669 log::info!(
670 "Created LabelList index '{}' on base_objects for __manifest table",
671 BASE_OBJECTS_INDEX_NAME
672 );
673 }
674 }
675
676 let should_compact_and_optimize =
677 dataset.count_fragments() >= MANIFEST_INLINE_OPTIMIZATION_FRAGMENT_THRESHOLD;
678
679 if !should_compact_and_optimize {
680 return Ok(());
681 }
682
683 log::debug!("Running file compaction on __manifest table");
685 match compact_files(dataset, CompactionOptions::default(), None).await {
686 Ok(compaction_metrics) => {
687 if compaction_metrics.fragments_removed > 0 {
688 log::info!(
689 "Compacted __manifest table: removed {} fragments, added {} fragments",
690 compaction_metrics.fragments_removed,
691 compaction_metrics.fragments_added
692 );
693 }
694 }
695 Err(e) => {
696 log::warn!(
697 "Failed to compact files for __manifest table: {:?}. Continuing with optimization.",
698 e
699 );
700 }
701 }
702
703 log::debug!("Optimizing indices on __manifest table");
705 match dataset.optimize_indices(&OptimizeOptions::default()).await {
706 Ok(_) => {
707 log::info!("Successfully optimized indices on __manifest table");
708 }
709 Err(e) => {
710 log::warn!(
711 "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.",
712 e
713 );
714 }
715 }
716
717 Ok(())
718 }
719
720 fn manifest_schema() -> Arc<ArrowSchema> {
722 Arc::new(ArrowSchema::new(vec![
723 Field::new("object_id", DataType::Utf8, false).with_metadata(
725 [(
726 LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(),
727 "0".to_string(),
728 )]
729 .into_iter()
730 .collect(),
731 ),
732 Field::new("object_type", DataType::Utf8, false),
733 Field::new("location", DataType::Utf8, true),
734 Field::new("metadata", DataType::Utf8, true),
735 Field::new(
736 "base_objects",
737 DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))),
738 true,
739 ),
740 ]))
741 }
742
743 async fn manifest_scanner(&self) -> Result<Scanner> {
745 let dataset_guard = self.manifest_dataset.get().await?;
746 Ok(dataset_guard.scan())
747 }
748
749 async fn execute_scanner(scanner: Scanner) -> Result<Vec<RecordBatch>> {
751 let mut stream = scanner.try_into_stream().await.map_err(|e| {
752 lance_core::Error::from(NamespaceError::Internal {
753 message: format!("Failed to create stream: {:?}", e),
754 })
755 })?;
756
757 let mut batches = Vec::new();
758 while let Some(batch) = stream.next().await {
759 batches.push(batch.map_err(|e| {
760 lance_core::Error::from(NamespaceError::Internal {
761 message: format!("Failed to read batch: {:?}", e),
762 })
763 })?);
764 }
765
766 Ok(batches)
767 }
768
769 fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> {
771 let column = batch.column_by_name(column_name).ok_or_else(|| {
772 lance_core::Error::from(NamespaceError::Internal {
773 message: format!("Column '{}' not found", column_name),
774 })
775 })?;
776 column
777 .as_any()
778 .downcast_ref::<StringArray>()
779 .ok_or_else(|| {
780 lance_core::Error::from(NamespaceError::Internal {
781 message: format!("Column '{}' is not a string array", column_name),
782 })
783 })
784 }
785
786 async fn manifest_contains_object(&self, object_id: &str) -> Result<bool> {
788 let escaped_id = object_id.replace('\'', "''");
789 let filter = format!("object_id = '{}'", escaped_id);
790
791 let dataset_guard = self.manifest_dataset.get().await?;
792 let mut scanner = dataset_guard.scan();
793
794 scanner.filter(&filter).map_err(|e| {
795 lance_core::Error::from(NamespaceError::Internal {
796 message: format!("Failed to filter: {:?}", e),
797 })
798 })?;
799
800 scanner.project::<&str>(&[]).map_err(|e| {
802 lance_core::Error::from(NamespaceError::Internal {
803 message: format!("Failed to project: {:?}", e),
804 })
805 })?;
806
807 scanner.with_row_id();
808
809 let count = scanner.count_rows().await.map_err(|e| {
810 lance_core::Error::from(NamespaceError::Internal {
811 message: format!("Failed to count rows: {:?}", e),
812 })
813 })?;
814
815 Ok(count > 0)
816 }
817
818 async fn query_manifest_for_table(&self, object_id: &str) -> Result<Option<TableInfo>> {
820 let escaped_id = object_id.replace('\'', "''");
821 let filter = format!("object_id = '{}' AND object_type = 'table'", escaped_id);
822 let mut scanner = self.manifest_scanner().await?;
823 scanner.filter(&filter).map_err(|e| {
824 lance_core::Error::from(NamespaceError::Internal {
825 message: format!("Failed to filter: {:?}", e),
826 })
827 })?;
828 scanner
829 .project(&["object_id", "location", "metadata"])
830 .map_err(|e| {
831 lance_core::Error::from(NamespaceError::Internal {
832 message: format!("Failed to project: {:?}", e),
833 })
834 })?;
835 let batches = Self::execute_scanner(scanner).await?;
836
837 let mut found_result: Option<TableInfo> = None;
838 let mut total_rows = 0;
839
840 for batch in batches {
841 if batch.num_rows() == 0 {
842 continue;
843 }
844
845 total_rows += batch.num_rows();
846 if total_rows > 1 {
847 return Err(NamespaceError::Internal {
848 message: format!(
849 "Expected exactly 1 table with id '{}', found {}",
850 object_id, total_rows
851 ),
852 }
853 .into());
854 }
855
856 let object_id_array = Self::get_string_column(&batch, "object_id")?;
857 let location_array = Self::get_string_column(&batch, "location")?;
858 let metadata_array = Self::get_string_column(&batch, "metadata")?;
859 let location = location_array.value(0).to_string();
860 let metadata = if !metadata_array.is_null(0) {
861 let metadata_str = metadata_array.value(0);
862 match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
863 Ok(map) => Some(map),
864 Err(e) => {
865 return Err(NamespaceError::Internal {
866 message: format!(
867 "Failed to deserialize metadata for table '{}': {}",
868 object_id, e
869 ),
870 }
871 .into());
872 }
873 }
874 } else {
875 None
876 };
877 let (namespace, name) = Self::parse_object_id(object_id_array.value(0));
878 found_result = Some(TableInfo {
879 namespace,
880 name,
881 location,
882 metadata,
883 });
884 }
885
886 Ok(found_result)
887 }
888
889 fn serialize_metadata(
890 properties: Option<&HashMap<String, String>>,
891 object_type: &str,
892 object_id: &str,
893 ) -> Result<Option<String>> {
894 match properties {
895 Some(properties) if !properties.is_empty() => {
896 serde_json::to_string(properties).map(Some).map_err(|e| {
897 LanceError::from(NamespaceError::Internal {
898 message: format!(
899 "Failed to serialize {} metadata for '{}': {}",
900 object_type, object_id, e
901 ),
902 })
903 })
904 }
905 _ => Ok(None),
906 }
907 }
908
909 pub(crate) async fn path_has_actual_manifests(
910 object_store: &ObjectStore,
911 table_path: &Path,
912 ) -> Result<bool> {
913 let versions_path = table_path
914 .clone()
915 .join(lance_table::io::commit::VERSIONS_DIR);
916 Ok(object_store
919 .list(Some(versions_path))
920 .try_next()
921 .await?
922 .is_some())
923 }
924
925 async fn location_has_actual_manifests(&self, location: &str) -> Result<bool> {
926 Self::path_has_actual_manifests(&self.object_store, &self.base_path.clone().join(location))
927 .await
928 }
929
930 pub(crate) fn is_not_found_load_error(err: &LanceError) -> bool {
931 match err {
932 LanceError::NotFound { .. } => true,
933 LanceError::IO { source, .. } => source
934 .downcast_ref::<ObjectStoreError>()
935 .is_some_and(|source| matches!(source, ObjectStoreError::NotFound { .. })),
936 LanceError::DatasetNotFound { source, .. } => {
937 source
938 .downcast_ref::<LanceError>()
939 .is_some_and(|source| matches!(source, LanceError::NotFound { .. }))
940 || source
941 .downcast_ref::<ObjectStoreError>()
942 .is_some_and(|source| matches!(source, ObjectStoreError::NotFound { .. }))
943 }
944 _ => false,
945 }
946 }
947
948 pub async fn list_manifest_table_locations(&self) -> Result<std::collections::HashSet<String>> {
951 let filter = "object_type = 'table' AND NOT contains(object_id, '$')";
952 let mut scanner = self.manifest_scanner().await?;
953 scanner.filter(filter).map_err(|e| {
954 lance_core::Error::from(NamespaceError::Internal {
955 message: format!("Failed to filter: {:?}", e),
956 })
957 })?;
958 scanner.project(&["location"]).map_err(|e| {
959 lance_core::Error::from(NamespaceError::Internal {
960 message: format!("Failed to project: {:?}", e),
961 })
962 })?;
963
964 let batches = Self::execute_scanner(scanner).await?;
965 let mut locations = std::collections::HashSet::new();
966
967 for batch in batches {
968 if batch.num_rows() == 0 {
969 continue;
970 }
971 let location_array = Self::get_string_column(&batch, "location")?;
972 for i in 0..location_array.len() {
973 locations.insert(location_array.value(i).to_string());
974 }
975 }
976
977 Ok(locations)
978 }
979
980 async fn insert_into_manifest(
982 &self,
983 object_id: String,
984 object_type: ObjectType,
985 location: Option<String>,
986 ) -> Result<()> {
987 self.insert_into_manifest_with_metadata(
988 vec![ManifestEntry {
989 object_id,
990 object_type,
991 location,
992 metadata: None,
993 }],
994 None,
995 )
996 .await
997 }
998
999 pub async fn insert_into_manifest_with_metadata(
1005 &self,
1006 entries: Vec<ManifestEntry>,
1007 base_objects: Option<Vec<String>>,
1008 ) -> Result<()> {
1009 self.merge_into_manifest_with_metadata(entries, base_objects, WhenMatched::Fail)
1010 .await
1011 }
1012
1013 async fn upsert_into_manifest_with_metadata(
1014 &self,
1015 entries: Vec<ManifestEntry>,
1016 base_objects: Option<Vec<String>>,
1017 ) -> Result<()> {
1018 self.merge_into_manifest_with_metadata(entries, base_objects, WhenMatched::UpdateAll)
1019 .await
1020 }
1021
1022 async fn merge_into_manifest_with_metadata(
1023 &self,
1024 entries: Vec<ManifestEntry>,
1025 base_objects: Option<Vec<String>>,
1026 when_matched: WhenMatched,
1027 ) -> Result<()> {
1028 if entries.is_empty() {
1029 return Ok(());
1030 }
1031
1032 let schema = Self::manifest_schema();
1033
1034 let mut object_ids = Vec::with_capacity(entries.len());
1035 let mut object_types = Vec::with_capacity(entries.len());
1036 let mut locations: Vec<Option<String>> = Vec::with_capacity(entries.len());
1037 let mut metadatas: Vec<Option<String>> = Vec::with_capacity(entries.len());
1038
1039 let string_builder = StringBuilder::new();
1040 let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
1041 "object_id",
1042 DataType::Utf8,
1043 true,
1044 )));
1045
1046 for (i, entry) in entries.iter().enumerate() {
1047 object_ids.push(entry.object_id.as_str());
1048 object_types.push(entry.object_type.as_str());
1049 locations.push(entry.location.clone());
1050 metadatas.push(entry.metadata.clone());
1051
1052 if i == 0 {
1055 match &base_objects {
1056 Some(objects) => {
1057 for obj in objects {
1058 list_builder.values().append_value(obj);
1059 }
1060 list_builder.append(true);
1061 }
1062 None => {
1063 list_builder.append_null();
1064 }
1065 }
1066 } else {
1067 list_builder.append_null();
1068 }
1069 }
1070
1071 let base_objects_array = list_builder.finish();
1072
1073 let location_array: Arc<dyn Array> = Arc::new(StringArray::from(
1074 locations.iter().map(|l| l.as_deref()).collect::<Vec<_>>(),
1075 ));
1076
1077 let metadata_array: Arc<dyn Array> = Arc::new(StringArray::from(
1078 metadatas.iter().map(|m| m.as_deref()).collect::<Vec<_>>(),
1079 ));
1080
1081 let batch = RecordBatch::try_new(
1082 schema.clone(),
1083 vec![
1084 Arc::new(StringArray::from(object_ids)),
1085 Arc::new(StringArray::from(object_types.to_vec())),
1086 location_array,
1087 metadata_array,
1088 Arc::new(base_objects_array),
1089 ],
1090 )
1091 .map_err(|e| {
1092 lance_core::Error::from(NamespaceError::Internal {
1093 message: format!("Failed to create manifest entries: {:?}", e),
1094 })
1095 })?;
1096
1097 let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
1098
1099 let _mutation_guard = self.manifest_mutation_lock.lock().await;
1101 let dataset_guard = self.manifest_dataset.get().await?;
1102 let dataset_arc = Arc::new(dataset_guard.clone());
1103 drop(dataset_guard); let mut merge_builder =
1106 MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()]).map_err(
1107 |e| {
1108 lance_core::Error::from(NamespaceError::Internal {
1109 message: format!("Failed to create merge builder: {:?}", e),
1110 })
1111 },
1112 )?;
1113 merge_builder.when_matched(when_matched);
1114 merge_builder.when_not_matched(WhenNotMatched::InsertAll);
1115 merge_builder.conflict_retries(5);
1117 merge_builder.use_index(false);
1122 if let Some(retries) = self.commit_retries {
1123 merge_builder.commit_retries(retries);
1124 }
1125
1126 let (new_dataset_arc, _merge_stats) = merge_builder
1127 .try_build()
1128 .map_err(|e| {
1129 lance_core::Error::from(NamespaceError::Internal {
1130 message: format!("Failed to build merge: {:?}", e),
1131 })
1132 })?
1133 .execute_reader(Box::new(reader))
1134 .await
1135 .map_err(|e| {
1136 convert_lance_commit_error(&e, "Failed to execute merge insert into manifest", None)
1137 })?;
1138
1139 let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
1140 self.manifest_dataset.set_latest(new_dataset).await;
1141
1142 if let Err(e) = self.run_inline_optimization().await {
1144 log::warn!(
1145 "Unexpected failure when running inline optimization: {:?}",
1146 e
1147 );
1148 }
1149
1150 Ok(())
1151 }
1152
1153 pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> {
1155 let predicate = format!("object_id = '{}'", object_id);
1156
1157 let _mutation_guard = self.manifest_mutation_lock.lock().await;
1159 let dataset_guard = self.manifest_dataset.get().await?;
1160 let dataset = Arc::new(dataset_guard.clone());
1161 drop(dataset_guard); let new_dataset = DeleteBuilder::new(dataset, &predicate)
1164 .execute()
1165 .await
1166 .map_err(|e| convert_lance_commit_error(&e, "Failed to delete", None))?;
1167
1168 self.manifest_dataset
1170 .set_latest(
1171 Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1172 )
1173 .await;
1174
1175 if let Err(e) = self.run_inline_optimization().await {
1177 log::warn!(
1178 "Unexpected failure when running inline optimization: {:?}",
1179 e
1180 );
1181 }
1182
1183 Ok(())
1184 }
1185
1186 pub async fn query_table_versions(
1196 &self,
1197 object_id: &str,
1198 descending: bool,
1199 limit: Option<i32>,
1200 ) -> Result<Vec<(i64, String)>> {
1201 let escaped_id = object_id.replace('\'', "''");
1202 let filter = format!(
1204 "object_type = 'table_version' AND starts_with(object_id, '{}{}')",
1205 escaped_id, DELIMITER
1206 );
1207 let mut scanner = self.manifest_scanner().await?;
1208 scanner.filter(&filter).map_err(|e| {
1209 lance_core::Error::from(NamespaceError::Internal {
1210 message: format!("Failed to filter: {:?}", e),
1211 })
1212 })?;
1213 scanner.project(&["object_id", "metadata"]).map_err(|e| {
1214 lance_core::Error::from(NamespaceError::Internal {
1215 message: format!("Failed to project: {:?}", e),
1216 })
1217 })?;
1218 let batches = Self::execute_scanner(scanner).await?;
1219
1220 let mut versions: Vec<(i64, String)> = Vec::new();
1221 for batch in batches {
1222 if batch.num_rows() == 0 {
1223 continue;
1224 }
1225 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1226 let metadata_array = Self::get_string_column(&batch, "metadata")?;
1227 for i in 0..batch.num_rows() {
1228 let oid = object_id_array.value(i);
1229 if let Some(version) = Self::parse_version_from_object_id(oid) {
1231 let metadata_str = metadata_array.value(i).to_string();
1232 versions.push((version, metadata_str));
1233 }
1234 }
1235 }
1236
1237 if descending {
1238 versions.sort_by(|a, b| b.0.cmp(&a.0));
1239 } else {
1240 versions.sort_by(|a, b| a.0.cmp(&b.0));
1241 }
1242
1243 if let Some(limit) = limit {
1244 versions.truncate(limit as usize);
1245 }
1246
1247 Ok(versions)
1248 }
1249
1250 pub async fn query_table_version(
1256 &self,
1257 object_id: &str,
1258 version: i64,
1259 ) -> Result<Option<String>> {
1260 let version_object_id = Self::build_version_object_id(object_id, version);
1261 self.query_table_version_by_object_id(&version_object_id)
1262 .await
1263 }
1264
1265 async fn query_table_version_by_object_id(
1267 &self,
1268 version_object_id: &str,
1269 ) -> Result<Option<String>> {
1270 let escaped_id = version_object_id.replace('\'', "''");
1271 let filter = format!(
1272 "object_id = '{}' AND object_type = 'table_version'",
1273 escaped_id
1274 );
1275 let mut scanner = self.manifest_scanner().await?;
1276 scanner.filter(&filter).map_err(|e| {
1277 lance_core::Error::from(NamespaceError::Internal {
1278 message: format!("Failed to filter: {:?}", e),
1279 })
1280 })?;
1281 scanner.project(&["metadata"]).map_err(|e| {
1282 lance_core::Error::from(NamespaceError::Internal {
1283 message: format!("Failed to project: {:?}", e),
1284 })
1285 })?;
1286 let batches = Self::execute_scanner(scanner).await?;
1287
1288 for batch in batches {
1289 if batch.num_rows() == 0 {
1290 continue;
1291 }
1292 let metadata_array = Self::get_string_column(&batch, "metadata")?;
1293 return Ok(Some(metadata_array.value(0).to_string()));
1294 }
1295
1296 Ok(None)
1297 }
1298
1299 pub async fn delete_table_versions(
1308 &self,
1309 object_id: &str,
1310 ranges: &[(i64, i64)],
1311 ) -> Result<i64> {
1312 if ranges.is_empty() {
1313 return Ok(0);
1314 }
1315
1316 let mut object_id_conditions: Vec<String> = Vec::new();
1318 for (start, end) in ranges {
1319 for version in *start..=*end {
1320 let oid = Self::build_version_object_id(object_id, version);
1321 let escaped = oid.replace('\'', "''");
1322 object_id_conditions.push(format!("'{}'", escaped));
1323 }
1324 }
1325
1326 if object_id_conditions.is_empty() {
1327 return Ok(0);
1328 }
1329
1330 let in_list = object_id_conditions.join(", ");
1332 let filter = format!(
1333 "object_type = 'table_version' AND object_id IN ({})",
1334 in_list
1335 );
1336
1337 let mut scanner = self.manifest_scanner().await?;
1338 scanner.filter(&filter).map_err(|e| {
1339 lance_core::Error::from(NamespaceError::Internal {
1340 message: format!("Failed to filter: {:?}", e),
1341 })
1342 })?;
1343 scanner.project(&["object_id", "location"]).map_err(|e| {
1344 lance_core::Error::from(NamespaceError::Internal {
1345 message: format!("Failed to project: {:?}", e),
1346 })
1347 })?;
1348 let batches = Self::execute_scanner(scanner).await?;
1349 let deleted_count: i64 = batches.iter().map(|b| b.num_rows() as i64).sum();
1350
1351 if deleted_count == 0 {
1352 return Ok(0);
1353 }
1354
1355 let _mutation_guard = self.manifest_mutation_lock.lock().await;
1357 let dataset_guard = self.manifest_dataset.get().await?;
1358 let dataset = Arc::new(dataset_guard.clone());
1359 drop(dataset_guard);
1360
1361 let new_dataset = DeleteBuilder::new(dataset, &filter)
1362 .execute()
1363 .await
1364 .map_err(|e| {
1365 convert_lance_commit_error(&e, "Failed to batch delete table versions", None)
1366 })?;
1367
1368 self.manifest_dataset
1369 .set_latest(
1370 Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1371 )
1372 .await;
1373
1374 if let Err(e) = self.run_inline_optimization().await {
1375 log::warn!(
1376 "Unexpected failure when running inline optimization: {:?}",
1377 e
1378 );
1379 }
1380
1381 Ok(deleted_count)
1382 }
1383
1384 pub async fn batch_delete_table_versions_by_object_ids(
1392 &self,
1393 object_ids: &[String],
1394 ) -> Result<i64> {
1395 if object_ids.is_empty() {
1396 return Ok(0);
1397 }
1398
1399 let in_list: String = object_ids
1400 .iter()
1401 .map(|oid| {
1402 let escaped = oid.replace('\'', "''");
1403 format!("'{}'", escaped)
1404 })
1405 .collect::<Vec<_>>()
1406 .join(", ");
1407
1408 let filter = format!(
1409 "object_type = 'table_version' AND object_id IN ({})",
1410 in_list
1411 );
1412
1413 let mut scanner = self.manifest_scanner().await?;
1415 scanner.filter(&filter).map_err(|e| {
1416 lance_core::Error::from(NamespaceError::Internal {
1417 message: format!("Failed to filter: {:?}", e),
1418 })
1419 })?;
1420 scanner.project(&["object_id", "location"]).map_err(|e| {
1421 lance_core::Error::from(NamespaceError::Internal {
1422 message: format!("Failed to project: {:?}", e),
1423 })
1424 })?;
1425 let batches = Self::execute_scanner(scanner).await?;
1426 let deleted_count: i64 = batches.iter().map(|b| b.num_rows() as i64).sum();
1427
1428 if deleted_count == 0 {
1429 return Ok(0);
1430 }
1431
1432 let _mutation_guard = self.manifest_mutation_lock.lock().await;
1434 let dataset_guard = self.manifest_dataset.get().await?;
1435 let dataset = Arc::new(dataset_guard.clone());
1436 drop(dataset_guard);
1437
1438 let new_dataset = DeleteBuilder::new(dataset, &filter)
1439 .execute()
1440 .await
1441 .map_err(|e| {
1442 convert_lance_commit_error(
1443 &e,
1444 "Failed to batch delete table versions across multiple tables",
1445 None,
1446 )
1447 })?;
1448
1449 self.manifest_dataset
1450 .set_latest(
1451 Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1452 )
1453 .await;
1454
1455 if let Err(e) = self.run_inline_optimization().await {
1456 log::warn!(
1457 "Unexpected failure when running inline optimization: {:?}",
1458 e
1459 );
1460 }
1461
1462 Ok(deleted_count)
1463 }
1464
1465 pub async fn set_property(&self, name: &str, value: &str) -> Result<()> {
1471 let _mutation_guard = self.manifest_mutation_lock.lock().await;
1472 let dataset_guard = self.manifest_dataset.get().await?;
1473 if dataset_guard.metadata().get(name) == Some(&value.to_string()) {
1474 return Ok(());
1475 }
1476 drop(dataset_guard);
1477
1478 let mut dataset_guard = self.manifest_dataset.get_mut().await?;
1479 dataset_guard
1480 .update_metadata([(name, value)])
1481 .await
1482 .map_err(|e| {
1483 lance_core::Error::from(NamespaceError::Internal {
1484 message: format!(
1485 "Failed to set property '{}' in __manifest metadata: {}",
1486 name, e
1487 ),
1488 })
1489 })?;
1490 Ok(())
1491 }
1492
1493 pub async fn has_property(&self, name: &str) -> Result<bool> {
1495 let dataset_guard = self.manifest_dataset.get().await?;
1496 Ok(dataset_guard.metadata().contains_key(name))
1497 }
1498
1499 fn parse_table_version(version: i64, metadata_str: &str) -> Option<TableVersion> {
1503 let meta: serde_json::Value = match serde_json::from_str(metadata_str) {
1504 Ok(v) => v,
1505 Err(e) => {
1506 log::warn!(
1507 "Skipping version {} due to invalid metadata JSON: {}",
1508 version,
1509 e
1510 );
1511 return None;
1512 }
1513 };
1514 let manifest_path = match meta.get("manifest_path").and_then(|v| v.as_str()) {
1515 Some(p) => p.to_string(),
1516 None => {
1517 log::warn!(
1518 "Skipping version {} due to missing 'manifest_path' in metadata — \
1519 this may indicate data corruption",
1520 version
1521 );
1522 return None;
1523 }
1524 };
1525 let manifest_size = meta.get("manifest_size").and_then(|v| v.as_i64());
1526 let e_tag = meta
1527 .get("e_tag")
1528 .and_then(|v| v.as_str())
1529 .map(|s| s.to_string());
1530 Some(TableVersion {
1531 version,
1532 manifest_path,
1533 manifest_size,
1534 e_tag,
1535 timestamp_millis: None,
1536 metadata: None,
1537 })
1538 }
1539
1540 pub async fn list_table_versions(
1545 &self,
1546 table_id: &[String],
1547 descending: bool,
1548 limit: Option<i32>,
1549 ) -> Result<ListTableVersionsResponse> {
1550 let object_id = Self::str_object_id(table_id);
1551 let manifest_versions = self
1552 .query_table_versions(&object_id, descending, limit)
1553 .await?;
1554
1555 let table_versions: Vec<TableVersion> = manifest_versions
1556 .into_iter()
1557 .filter_map(|(version, metadata_str)| Self::parse_table_version(version, &metadata_str))
1558 .collect();
1559
1560 Ok(ListTableVersionsResponse {
1561 versions: table_versions,
1562 page_token: None,
1563 })
1564 }
1565
1566 pub async fn describe_table_version(
1571 &self,
1572 table_id: &[String],
1573 version: i64,
1574 ) -> Result<DescribeTableVersionResponse> {
1575 let object_id = Self::str_object_id(table_id);
1576 if let Some(metadata_str) = self.query_table_version(&object_id, version).await?
1577 && let Some(tv) = Self::parse_table_version(version, &metadata_str)
1578 {
1579 return Ok(DescribeTableVersionResponse {
1580 version: Box::new(tv),
1581 });
1582 }
1583 Err(NamespaceError::TableVersionNotFound {
1584 message: format!("version {} for table {:?}", version, table_id),
1585 }
1586 .into())
1587 }
1588
1589 pub async fn register_table(&self, name: &str, location: String) -> Result<()> {
1591 let object_id = Self::build_object_id(&[], name);
1592 if self.manifest_contains_object(&object_id).await? {
1593 return Err(NamespaceError::Internal {
1594 message: format!("Table '{}' already exists", name),
1595 }
1596 .into());
1597 }
1598
1599 self.insert_into_manifest(object_id, ObjectType::Table, Some(location))
1600 .await
1601 }
1602
1603 async fn validate_namespace_levels_exist(&self, namespace_path: &[String]) -> Result<()> {
1605 for i in 1..=namespace_path.len() {
1606 let partial_path = &namespace_path[..i];
1607 let object_id = partial_path.join(DELIMITER);
1608 if !self.manifest_contains_object(&object_id).await? {
1609 return Err(NamespaceError::NamespaceNotFound {
1610 message: format!("parent namespace '{}'", object_id),
1611 }
1612 .into());
1613 }
1614 }
1615 Ok(())
1616 }
1617
1618 async fn query_manifest_for_namespace(&self, object_id: &str) -> Result<Option<NamespaceInfo>> {
1620 let escaped_id = object_id.replace('\'', "''");
1621 let filter = format!("object_id = '{}' AND object_type = 'namespace'", escaped_id);
1622 let mut scanner = self.manifest_scanner().await?;
1623 scanner.filter(&filter).map_err(|e| {
1624 lance_core::Error::from(NamespaceError::Internal {
1625 message: format!("Failed to filter: {:?}", e),
1626 })
1627 })?;
1628 scanner.project(&["object_id", "metadata"]).map_err(|e| {
1629 lance_core::Error::from(NamespaceError::Internal {
1630 message: format!("Failed to project: {:?}", e),
1631 })
1632 })?;
1633 let batches = Self::execute_scanner(scanner).await?;
1634
1635 let mut found_result: Option<NamespaceInfo> = None;
1636 let mut total_rows = 0;
1637
1638 for batch in batches {
1639 if batch.num_rows() == 0 {
1640 continue;
1641 }
1642
1643 total_rows += batch.num_rows();
1644 if total_rows > 1 {
1645 return Err(NamespaceError::Internal {
1646 message: format!(
1647 "Expected exactly 1 namespace with id '{}', found {}",
1648 object_id, total_rows
1649 ),
1650 }
1651 .into());
1652 }
1653
1654 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1655 let metadata_array = Self::get_string_column(&batch, "metadata")?;
1656
1657 let object_id_str = object_id_array.value(0);
1658 let metadata = if !metadata_array.is_null(0) {
1659 let metadata_str = metadata_array.value(0);
1660 match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
1661 Ok(map) => Some(map),
1662 Err(e) => {
1663 return Err(NamespaceError::Internal {
1664 message: format!(
1665 "Failed to deserialize metadata for namespace '{}': {}",
1666 object_id, e
1667 ),
1668 }
1669 .into());
1670 }
1671 }
1672 } else {
1673 None
1674 };
1675
1676 let (namespace, name) = Self::parse_object_id(object_id_str);
1677 found_result = Some(NamespaceInfo {
1678 namespace,
1679 name,
1680 metadata,
1681 });
1682 }
1683
1684 Ok(found_result)
1685 }
1686
1687 async fn ensure_manifest_table_up_to_date(
1695 root: &str,
1696 storage_options: &Option<HashMap<String, String>>,
1697 session: Option<Arc<Session>>,
1698 table_version_storage_enabled: bool,
1699 ) -> Result<DatasetConsistencyWrapper> {
1700 let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME);
1701 log::debug!("Attempting to load manifest from {}", manifest_path);
1702 let store_options = ObjectStoreParams {
1703 storage_options_accessor: storage_options.as_ref().map(|opts| {
1704 Arc::new(
1705 lance_io::object_store::StorageOptionsAccessor::with_static_options(
1706 opts.clone(),
1707 ),
1708 )
1709 }),
1710 ..Default::default()
1711 };
1712 let read_params = ReadParams {
1713 session: session.clone(),
1714 store_options: Some(store_options.clone()),
1715 ..Default::default()
1716 };
1717 let dataset_result = DatasetBuilder::from_uri(&manifest_path)
1718 .with_read_params(read_params)
1719 .load()
1720 .await;
1721 if let Ok(mut dataset) = dataset_result {
1722 let needs_pk_migration = dataset
1724 .schema()
1725 .field("object_id")
1726 .map(|f| {
1727 !f.metadata
1728 .contains_key(LANCE_UNENFORCED_PRIMARY_KEY_POSITION)
1729 })
1730 .unwrap_or(false);
1731
1732 if needs_pk_migration {
1733 log::info!("Migrating __manifest table to add primary key metadata on object_id");
1734 dataset
1735 .update_field_metadata()
1736 .update("object_id", [(LANCE_UNENFORCED_PRIMARY_KEY_POSITION, "0")])
1737 .map_err(|e| {
1738 lance_core::Error::from(NamespaceError::Internal {
1739 message: format!(
1740 "Failed to find object_id field for migration: {:?}",
1741 e
1742 ),
1743 })
1744 })?
1745 .await
1746 .map_err(|e| {
1747 lance_core::Error::from(NamespaceError::Internal {
1748 message: format!("Failed to migrate primary key metadata: {:?}", e),
1749 })
1750 })?;
1751 }
1752
1753 if table_version_storage_enabled {
1756 let needs_flag = dataset
1757 .metadata()
1758 .get("table_version_storage_enabled")
1759 .map(|v| v != "true")
1760 .unwrap_or(true);
1761
1762 if needs_flag
1763 && let Err(e) = dataset
1764 .update_metadata([("table_version_storage_enabled", "true")])
1765 .await
1766 {
1767 log::warn!(
1768 "Failed to persist table_version_storage_enabled flag in __manifest: {:?}",
1769 e
1770 );
1771 }
1772 }
1773
1774 Ok(DatasetConsistencyWrapper::new(dataset))
1775 } else {
1776 log::info!("Creating new manifest table at {}", manifest_path);
1777 let schema = Self::manifest_schema();
1778 let empty_batch = RecordBatch::new_empty(schema.clone());
1779 let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone());
1780
1781 let store_params = ObjectStoreParams {
1782 storage_options_accessor: storage_options.as_ref().map(|opts| {
1783 Arc::new(
1784 lance_io::object_store::StorageOptionsAccessor::with_static_options(
1785 opts.clone(),
1786 ),
1787 )
1788 }),
1789 ..Default::default()
1790 };
1791 let write_params = WriteParams {
1792 session: session.clone(),
1793 store_params: Some(store_params),
1794 ..Default::default()
1795 };
1796
1797 let dataset =
1798 Dataset::write(Box::new(reader), &manifest_path, Some(write_params)).await;
1799
1800 match dataset {
1802 Ok(dataset) => {
1803 log::info!(
1804 "Successfully created manifest table at {}, version={}, uri={}",
1805 manifest_path,
1806 dataset.version().version,
1807 dataset.uri()
1808 );
1809 Ok(DatasetConsistencyWrapper::new(dataset))
1810 }
1811 Err(ref e)
1812 if matches!(
1813 e,
1814 LanceError::DatasetAlreadyExists { .. }
1815 | LanceError::CommitConflict { .. }
1816 | LanceError::IncompatibleTransaction { .. }
1817 | LanceError::RetryableCommitConflict { .. }
1818 ) =>
1819 {
1820 log::info!(
1822 "Manifest table was created by another process, loading it: {}",
1823 manifest_path
1824 );
1825 let recovery_store_options = ObjectStoreParams {
1826 storage_options_accessor: storage_options.as_ref().map(|opts| {
1827 Arc::new(
1828 lance_io::object_store::StorageOptionsAccessor::with_static_options(
1829 opts.clone(),
1830 ),
1831 )
1832 }),
1833 ..Default::default()
1834 };
1835 let recovery_read_params = ReadParams {
1836 session,
1837 store_options: Some(recovery_store_options),
1838 ..Default::default()
1839 };
1840 let dataset = DatasetBuilder::from_uri(&manifest_path)
1841 .with_read_params(recovery_read_params)
1842 .load()
1843 .await
1844 .map_err(|e| {
1845 lance_core::Error::from(NamespaceError::Internal {
1846 message: format!(
1847 "Failed to load manifest dataset after creation conflict: {}",
1848 e
1849 ),
1850 })
1851 })?;
1852 Ok(DatasetConsistencyWrapper::new(dataset))
1853 }
1854 Err(e) => Err(lance_core::Error::from(NamespaceError::Internal {
1855 message: format!("Failed to create manifest dataset: {:?}", e),
1856 })),
1857 }
1858 }
1859 }
1860
1861 fn apply_pagination(
1866 names: &mut Vec<String>,
1867 page_token: Option<String>,
1868 limit: Option<i32>,
1869 ) -> Option<String> {
1870 names.sort();
1871
1872 if let Some(start_after) = page_token {
1873 if let Some(index) = names
1874 .iter()
1875 .position(|name| name.as_str() > start_after.as_str())
1876 {
1877 names.drain(0..index);
1878 } else {
1879 names.clear();
1880 }
1881 }
1882
1883 if let Some(limit) = limit
1884 && limit >= 0
1885 {
1886 let limit = limit as usize;
1887 if names.len() > limit {
1888 let next_page_token = if limit > 0 {
1889 Some(names[limit - 1].clone())
1890 } else {
1891 None
1892 };
1893 names.truncate(limit);
1894 return next_page_token;
1895 }
1896 }
1897
1898 None
1899 }
1900}
1901
1902#[async_trait]
1903impl LanceNamespace for ManifestNamespace {
1904 fn namespace_id(&self) -> String {
1905 self.root.clone()
1906 }
1907
1908 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1909 let namespace_id = request.id.as_ref().ok_or_else(|| {
1910 lance_core::Error::from(NamespaceError::InvalidInput {
1911 message: "Namespace ID is required".to_string(),
1912 })
1913 })?;
1914
1915 let filter = if namespace_id.is_empty() {
1917 "object_type = 'table' AND NOT contains(object_id, '$')".to_string()
1919 } else {
1920 let prefix = namespace_id.join(DELIMITER);
1922 format!(
1923 "object_type = 'table' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1924 prefix,
1925 DELIMITER,
1926 prefix.len() + 2
1927 )
1928 };
1929
1930 let mut scanner = self.manifest_scanner().await?;
1931 scanner.filter(&filter).map_err(|e| {
1932 lance_core::Error::from(NamespaceError::Internal {
1933 message: format!("Failed to filter: {:?}", e),
1934 })
1935 })?;
1936 scanner.project(&["object_id", "location"]).map_err(|e| {
1937 lance_core::Error::from(NamespaceError::Internal {
1938 message: format!("Failed to project: {:?}", e),
1939 })
1940 })?;
1941
1942 let batches = Self::execute_scanner(scanner).await?;
1943
1944 let mut table_entries = Vec::new();
1945 for batch in batches {
1946 if batch.num_rows() == 0 {
1947 continue;
1948 }
1949
1950 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1951 let location_array = Self::get_string_column(&batch, "location")?;
1952 for i in 0..batch.num_rows() {
1953 let object_id = object_id_array.value(i);
1954 let location = location_array.value(i);
1955 let (_namespace, name) = Self::parse_object_id(object_id);
1956 table_entries.push((name, location.to_string()));
1957 }
1958 }
1959
1960 let mut tables: Vec<String> = if request.include_declared.unwrap_or(true) {
1961 table_entries.into_iter().map(|(name, _)| name).collect()
1962 } else {
1963 let mut stream = futures::stream::iter(table_entries.into_iter().map(
1964 |(name, location)| async move {
1965 if self.location_has_actual_manifests(&location).await? {
1970 Ok::<Option<String>, Error>(Some(name))
1971 } else {
1972 Ok::<Option<String>, Error>(None)
1973 }
1974 },
1975 ))
1976 .buffered(DECLARED_FILTER_CONCURRENCY);
1977
1978 let mut filtered = Vec::new();
1979 while let Some(result) = stream.next().await {
1980 if let Some(name) = result? {
1981 filtered.push(name);
1982 }
1983 }
1984 filtered
1985 };
1986
1987 let next_page_token =
1988 Self::apply_pagination(&mut tables, request.page_token, request.limit);
1989 let mut response = ListTablesResponse::new(tables);
1990 response.page_token = next_page_token;
1991 Ok(response)
1992 }
1993
1994 async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1995 let table_id = request.id.as_ref().ok_or_else(|| {
1996 lance_core::Error::from(NamespaceError::InvalidInput {
1997 message: "Table ID is required".to_string(),
1998 })
1999 })?;
2000
2001 if table_id.is_empty() {
2002 return Err(NamespaceError::InvalidInput {
2003 message: "Table ID cannot be empty".to_string(),
2004 }
2005 .into());
2006 }
2007
2008 let object_id = Self::str_object_id(table_id);
2009 let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
2010
2011 let table_name = table_id.last().cloned().unwrap_or_default();
2013 let namespace_id: Vec<String> = if table_id.len() > 1 {
2014 table_id[..table_id.len() - 1].to_vec()
2015 } else {
2016 vec![]
2017 };
2018
2019 let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
2020 let should_check_declared =
2021 load_detailed_metadata || request.check_declared.unwrap_or(false);
2022 let vend_credentials = request.vend_credentials.unwrap_or(true);
2024
2025 match table_info {
2026 Some(info) => {
2027 let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
2029
2030 let storage_options = if vend_credentials {
2031 self.storage_options.clone()
2032 } else {
2033 None
2034 };
2035 let is_only_declared = if should_check_declared {
2036 Some(!self.location_has_actual_manifests(&info.location).await?)
2037 } else {
2038 None
2039 };
2040
2041 if !load_detailed_metadata {
2042 return Ok(DescribeTableResponse {
2043 table: Some(table_name),
2044 namespace: Some(namespace_id),
2045 location: Some(table_uri.clone()),
2046 table_uri: Some(table_uri),
2047 storage_options,
2048 properties: info.metadata,
2049 is_only_declared,
2050 ..Default::default()
2051 });
2052 }
2053
2054 if is_only_declared == Some(true) {
2055 return Ok(DescribeTableResponse {
2056 table: Some(table_name),
2057 namespace: Some(namespace_id),
2058 location: Some(table_uri.clone()),
2059 table_uri: Some(table_uri),
2060 storage_options,
2061 properties: info.metadata,
2062 is_only_declared,
2063 ..Default::default()
2064 });
2065 }
2066
2067 let mut builder = DatasetBuilder::from_uri(&table_uri);
2068 if let Some(opts) = &self.storage_options {
2069 builder = builder.with_storage_options(opts.clone());
2070 }
2071 if let Some(session) = &self.session {
2072 builder = builder.with_session(session.clone());
2073 }
2074
2075 match builder.load().await {
2076 Ok(mut dataset) => {
2077 if let Some(requested_version) = request.version {
2079 dataset = dataset.checkout_version(requested_version as u64).await?;
2080 }
2081
2082 let version = dataset.version().version;
2083 let lance_schema = dataset.schema();
2084 let arrow_schema: arrow_schema::Schema = lance_schema.into();
2085 let json_schema = arrow_schema_to_json(&arrow_schema)?;
2086
2087 Ok(DescribeTableResponse {
2088 table: Some(table_name.clone()),
2089 namespace: Some(namespace_id.clone()),
2090 version: Some(version as i64),
2091 location: Some(table_uri.clone()),
2092 table_uri: Some(table_uri),
2093 schema: Some(Box::new(json_schema)),
2094 storage_options,
2095 properties: info.metadata.clone(),
2096 is_only_declared,
2097 ..Default::default()
2098 })
2099 }
2100 Err(err) => Err(NamespaceError::Internal {
2101 message: format!(
2102 "Table exists in manifest but failed to load dataset '{}': {}",
2103 object_id, err
2104 ),
2105 }
2106 .into()),
2107 }
2108 }
2109 None => Err(NamespaceError::TableNotFound {
2110 message: Self::format_table_id(table_id),
2111 }
2112 .into()),
2113 }
2114 }
2115
2116 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
2117 let table_id = request.id.as_ref().ok_or_else(|| {
2118 lance_core::Error::from(NamespaceError::InvalidInput {
2119 message: "Table ID is required".to_string(),
2120 })
2121 })?;
2122
2123 if table_id.is_empty() {
2124 return Err(NamespaceError::InvalidInput {
2125 message: "Table ID cannot be empty".to_string(),
2126 }
2127 .into());
2128 }
2129
2130 let object_id = Self::str_object_id(table_id);
2131 let exists = self.manifest_contains_object(&object_id).await?;
2132 if exists {
2133 Ok(())
2134 } else {
2135 Err(NamespaceError::TableNotFound {
2136 message: Self::format_table_id(table_id),
2137 }
2138 .into())
2139 }
2140 }
2141
2142 async fn create_table(
2143 &self,
2144 request: CreateTableRequest,
2145 data: Bytes,
2146 ) -> Result<CreateTableResponse> {
2147 let table_id = request.id.as_ref().ok_or_else(|| {
2148 lance_core::Error::from(NamespaceError::InvalidInput {
2149 message: "Table ID is required".to_string(),
2150 })
2151 })?;
2152
2153 if table_id.is_empty() {
2154 return Err(NamespaceError::InvalidInput {
2155 message: "Table ID cannot be empty".to_string(),
2156 }
2157 .into());
2158 }
2159
2160 let (namespace, table_name) = Self::split_object_id(table_id);
2161 let object_id = Self::build_object_id(&namespace, &table_name);
2162
2163 let existing_table = self.query_manifest_for_table(&object_id).await?;
2164 let existing_has_manifests = if let Some(existing_table) = &existing_table {
2165 Some(
2166 self.location_has_actual_manifests(&existing_table.location)
2167 .await?,
2168 )
2169 } else {
2170 None
2171 };
2172
2173 if existing_has_manifests == Some(false)
2174 && request
2175 .properties
2176 .as_ref()
2177 .is_some_and(|properties| !properties.is_empty())
2178 {
2179 return Err(NamespaceError::InvalidInput {
2180 message: format!(
2181 "create_table cannot set properties for already declared table '{}'",
2182 object_id
2183 ),
2184 }
2185 .into());
2186 }
2187
2188 let create_mode = if existing_has_manifests == Some(false) {
2189 CreateTableMode::Create
2190 } else {
2191 CreateTableMode::parse(request.mode.as_deref())?
2192 };
2193 let dir_name = if let Some(existing_table) = &existing_table {
2194 existing_table.location.clone()
2195 } else if namespace.is_empty() && self.dir_listing_enabled {
2196 format!("{}.lance", table_name)
2197 } else {
2198 Self::generate_dir_name(&object_id)
2199 };
2200 let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
2201 let overwriting_existing_table =
2202 existing_has_manifests == Some(true) && create_mode == CreateTableMode::Overwrite;
2203
2204 if existing_has_manifests == Some(true) {
2205 match create_mode {
2206 CreateTableMode::Create => {
2207 return Err(NamespaceError::TableAlreadyExists {
2208 message: table_name.clone(),
2209 }
2210 .into());
2211 }
2212 CreateTableMode::ExistOk => {
2213 let properties = existing_table
2214 .as_ref()
2215 .and_then(|table| table.metadata.clone());
2216 return Ok(CreateTableResponse {
2217 location: Some(table_uri),
2218 storage_options: self.storage_options.clone(),
2219 properties,
2220 ..Default::default()
2221 });
2222 }
2223 CreateTableMode::Overwrite => {}
2224 }
2225 }
2226
2227 if data.is_empty() {
2229 return Err(NamespaceError::InvalidInput {
2230 message: "Request data (Arrow IPC stream) is required for create_table".to_string(),
2231 }
2232 .into());
2233 }
2234
2235 let cursor = Cursor::new(data.to_vec());
2237 let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| {
2238 lance_core::Error::from(NamespaceError::Internal {
2239 message: format!("Failed to read IPC stream: {:?}", e),
2240 })
2241 })?;
2242
2243 let batches: Vec<RecordBatch> = stream_reader
2244 .collect::<std::result::Result<Vec<_>, _>>()
2245 .map_err(|e| {
2246 lance_core::Error::from(NamespaceError::Internal {
2247 message: format!("Failed to collect batches: {:?}", e),
2248 })
2249 })?;
2250
2251 if batches.is_empty() {
2252 return Err(NamespaceError::Internal {
2253 message: "No data provided for table creation".to_string(),
2254 }
2255 .into());
2256 }
2257
2258 let schema = batches[0].schema();
2259 let batch_results: Vec<std::result::Result<RecordBatch, arrow_schema::ArrowError>> =
2260 batches.into_iter().map(Ok).collect();
2261 let reader = RecordBatchIterator::new(batch_results, schema);
2262
2263 let mut write_storage_options = self.storage_options.clone().unwrap_or_default();
2264 if let Some(request_storage_options) = request.storage_options.as_ref() {
2265 write_storage_options.extend(request_storage_options.clone());
2266 }
2267
2268 let store_params = ObjectStoreParams {
2269 storage_options_accessor: (!write_storage_options.is_empty()).then(|| {
2270 Arc::new(
2271 lance_io::object_store::StorageOptionsAccessor::with_static_options(
2272 write_storage_options,
2273 ),
2274 )
2275 }),
2276 ..Default::default()
2277 };
2278 let write_params = WriteParams {
2279 mode: create_mode.write_mode(),
2280 session: self.session.clone(),
2281 store_params: Some(store_params),
2282 ..Default::default()
2283 };
2284 let dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params))
2285 .await
2286 .map_err(|e| {
2287 lance_core::Error::from(NamespaceError::Internal {
2288 message: format!("Failed to write dataset: {:?}", e),
2289 })
2290 })?;
2291 let version = dataset.version().version as i64;
2292
2293 if overwriting_existing_table {
2294 let metadata =
2295 Self::serialize_metadata(request.properties.as_ref(), "table", &object_id)?;
2296 self.upsert_into_manifest_with_metadata(
2297 vec![ManifestEntry {
2298 object_id,
2299 object_type: ObjectType::Table,
2300 location: Some(dir_name),
2301 metadata,
2302 }],
2303 None,
2304 )
2305 .await?;
2306
2307 Ok(CreateTableResponse {
2308 version: Some(version),
2309 location: Some(table_uri),
2310 storage_options: self.storage_options.clone(),
2311 properties: request.properties,
2312 ..Default::default()
2313 })
2314 } else {
2315 match existing_table {
2316 Some(existing_table) => Ok(CreateTableResponse {
2317 version: Some(version),
2318 location: Some(table_uri),
2319 storage_options: self.storage_options.clone(),
2320 properties: existing_table.metadata,
2321 ..Default::default()
2322 }),
2323 None => {
2324 let metadata =
2325 Self::serialize_metadata(request.properties.as_ref(), "table", &object_id)?;
2326 self.insert_into_manifest_with_metadata(
2328 vec![ManifestEntry {
2329 object_id,
2330 object_type: ObjectType::Table,
2331 location: Some(dir_name.clone()),
2332 metadata,
2333 }],
2334 None,
2335 )
2336 .await?;
2337
2338 Ok(CreateTableResponse {
2339 version: Some(version),
2340 location: Some(table_uri),
2341 storage_options: self.storage_options.clone(),
2342 properties: request.properties,
2343 ..Default::default()
2344 })
2345 }
2346 }
2347 }
2348 }
2349
2350 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
2351 let table_id = request.id.as_ref().ok_or_else(|| {
2352 lance_core::Error::from(NamespaceError::InvalidInput {
2353 message: "Table ID is required".to_string(),
2354 })
2355 })?;
2356
2357 if table_id.is_empty() {
2358 return Err(NamespaceError::InvalidInput {
2359 message: "Table ID cannot be empty".to_string(),
2360 }
2361 .into());
2362 }
2363
2364 let (namespace, table_name) = Self::split_object_id(table_id);
2365 let object_id = Self::build_object_id(&namespace, &table_name);
2366
2367 let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
2369
2370 match table_info {
2371 Some(info) => {
2372 self.delete_from_manifest(&object_id).boxed().await?;
2374
2375 let table_path = self.base_path.clone().join(info.location.as_str());
2377 let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
2378
2379 self.object_store
2381 .remove_dir_all(table_path)
2382 .boxed()
2383 .await
2384 .map_err(|e| {
2385 lance_core::Error::from(NamespaceError::Internal {
2386 message: format!("Failed to delete table directory: {:?}", e),
2387 })
2388 })?;
2389
2390 Ok(DropTableResponse {
2391 id: request.id.clone(),
2392 location: Some(table_uri),
2393 ..Default::default()
2394 })
2395 }
2396 None => Err(NamespaceError::TableNotFound {
2397 message: table_name.to_string(),
2398 }
2399 .into()),
2400 }
2401 }
2402
2403 async fn list_namespaces(
2404 &self,
2405 request: ListNamespacesRequest,
2406 ) -> Result<ListNamespacesResponse> {
2407 let parent_namespace = request.id.as_ref().ok_or_else(|| {
2408 lance_core::Error::from(NamespaceError::InvalidInput {
2409 message: "Namespace ID is required".to_string(),
2410 })
2411 })?;
2412
2413 let filter = if parent_namespace.is_empty() {
2415 "object_type = 'namespace' AND NOT contains(object_id, '$')".to_string()
2417 } else {
2418 let prefix = parent_namespace.join(DELIMITER);
2420 format!(
2421 "object_type = 'namespace' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
2422 prefix,
2423 DELIMITER,
2424 prefix.len() + 2
2425 )
2426 };
2427
2428 let mut scanner = self.manifest_scanner().await?;
2429 scanner.filter(&filter).map_err(|e| {
2430 lance_core::Error::from(NamespaceError::Internal {
2431 message: format!("Failed to filter: {:?}", e),
2432 })
2433 })?;
2434 scanner.project(&["object_id"]).map_err(|e| {
2435 lance_core::Error::from(NamespaceError::Internal {
2436 message: format!("Failed to project: {:?}", e),
2437 })
2438 })?;
2439
2440 let batches = Self::execute_scanner(scanner).await?;
2441 let mut namespaces = Vec::new();
2442
2443 for batch in batches {
2444 if batch.num_rows() == 0 {
2445 continue;
2446 }
2447
2448 let object_id_array = Self::get_string_column(&batch, "object_id")?;
2449 for i in 0..batch.num_rows() {
2450 let object_id = object_id_array.value(i);
2451 let (_namespace, name) = Self::parse_object_id(object_id);
2452 namespaces.push(name);
2453 }
2454 }
2455
2456 let next_page_token =
2457 Self::apply_pagination(&mut namespaces, request.page_token, request.limit);
2458 let mut response = ListNamespacesResponse::new(namespaces);
2459 response.page_token = next_page_token;
2460 Ok(response)
2461 }
2462
2463 async fn describe_namespace(
2464 &self,
2465 request: DescribeNamespaceRequest,
2466 ) -> Result<DescribeNamespaceResponse> {
2467 let namespace_id = request.id.as_ref().ok_or_else(|| {
2468 lance_core::Error::from(NamespaceError::InvalidInput {
2469 message: "Namespace ID is required".to_string(),
2470 })
2471 })?;
2472
2473 if namespace_id.is_empty() {
2475 #[allow(clippy::needless_update)]
2476 return Ok(DescribeNamespaceResponse {
2477 properties: Some(HashMap::new()),
2478 ..Default::default()
2479 });
2480 }
2481
2482 let object_id = namespace_id.join(DELIMITER);
2484 let namespace_info = self.query_manifest_for_namespace(&object_id).await?;
2485
2486 match namespace_info {
2487 #[allow(clippy::needless_update)]
2488 Some(info) => Ok(DescribeNamespaceResponse {
2489 properties: info.metadata,
2490 ..Default::default()
2491 }),
2492 None => Err(NamespaceError::NamespaceNotFound {
2493 message: object_id.to_string(),
2494 }
2495 .into()),
2496 }
2497 }
2498
2499 async fn create_namespace(
2500 &self,
2501 request: CreateNamespaceRequest,
2502 ) -> Result<CreateNamespaceResponse> {
2503 let namespace_id = request.id.as_ref().ok_or_else(|| {
2504 lance_core::Error::from(NamespaceError::InvalidInput {
2505 message: "Namespace ID is required".to_string(),
2506 })
2507 })?;
2508
2509 if namespace_id.is_empty() {
2511 return Err(NamespaceError::NamespaceAlreadyExists {
2512 message: "root namespace".to_string(),
2513 }
2514 .into());
2515 }
2516
2517 if namespace_id.len() > 1 {
2519 self.validate_namespace_levels_exist(&namespace_id[..namespace_id.len() - 1])
2520 .await?;
2521 }
2522
2523 let object_id = namespace_id.join(DELIMITER);
2524 if self.manifest_contains_object(&object_id).await? {
2525 return Err(NamespaceError::NamespaceAlreadyExists {
2526 message: object_id.to_string(),
2527 }
2528 .into());
2529 }
2530
2531 let metadata =
2532 Self::serialize_metadata(request.properties.as_ref(), "namespace", &object_id)?;
2533
2534 self.insert_into_manifest_with_metadata(
2535 vec![ManifestEntry {
2536 object_id,
2537 object_type: ObjectType::Namespace,
2538 location: None,
2539 metadata,
2540 }],
2541 None,
2542 )
2543 .await?;
2544
2545 Ok(CreateNamespaceResponse {
2546 properties: request.properties,
2547 ..Default::default()
2548 })
2549 }
2550
2551 async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
2552 let namespace_id = request.id.as_ref().ok_or_else(|| {
2553 lance_core::Error::from(NamespaceError::InvalidInput {
2554 message: "Namespace ID is required".to_string(),
2555 })
2556 })?;
2557
2558 if namespace_id.is_empty() {
2560 return Err(NamespaceError::InvalidInput {
2561 message: "Root namespace cannot be dropped".to_string(),
2562 }
2563 .into());
2564 }
2565
2566 let object_id = namespace_id.join(DELIMITER);
2567
2568 if !self.manifest_contains_object(&object_id).boxed().await? {
2570 return Err(NamespaceError::NamespaceNotFound {
2571 message: object_id.to_string(),
2572 }
2573 .into());
2574 }
2575
2576 let escaped_id = object_id.replace('\'', "''");
2578 let prefix = format!("{}{}", escaped_id, DELIMITER);
2579 let filter = format!("starts_with(object_id, '{}')", prefix);
2580 let mut scanner = self.manifest_scanner().boxed().await?;
2581 scanner.filter(&filter).map_err(|e| {
2582 lance_core::Error::from(NamespaceError::Internal {
2583 message: format!("Failed to filter: {:?}", e),
2584 })
2585 })?;
2586 scanner.project::<&str>(&[]).map_err(|e| {
2587 lance_core::Error::from(NamespaceError::Internal {
2588 message: format!("Failed to project: {:?}", e),
2589 })
2590 })?;
2591 scanner.with_row_id();
2592 let count = scanner.count_rows().boxed().await.map_err(|e| {
2593 lance_core::Error::from(NamespaceError::Internal {
2594 message: format!("Failed to count rows: {:?}", e),
2595 })
2596 })?;
2597
2598 if count > 0 {
2599 return Err(NamespaceError::NamespaceNotEmpty {
2600 message: format!("'{}' (contains {} child objects)", object_id, count),
2601 }
2602 .into());
2603 }
2604
2605 self.delete_from_manifest(&object_id).boxed().await?;
2606
2607 Ok(DropNamespaceResponse::default())
2608 }
2609
2610 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
2611 let namespace_id = request.id.as_ref().ok_or_else(|| {
2612 lance_core::Error::from(NamespaceError::InvalidInput {
2613 message: "Namespace ID is required".to_string(),
2614 })
2615 })?;
2616
2617 if namespace_id.is_empty() {
2619 return Ok(());
2620 }
2621
2622 let object_id = namespace_id.join(DELIMITER);
2623 if self.manifest_contains_object(&object_id).await? {
2624 Ok(())
2625 } else {
2626 Err(NamespaceError::NamespaceNotFound {
2627 message: object_id.to_string(),
2628 }
2629 .into())
2630 }
2631 }
2632
2633 async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
2634 let table_id = request.id.as_ref().ok_or_else(|| {
2635 lance_core::Error::from(NamespaceError::InvalidInput {
2636 message: "Table ID is required".to_string(),
2637 })
2638 })?;
2639
2640 if table_id.is_empty() {
2641 return Err(NamespaceError::InvalidInput {
2642 message: "Table ID cannot be empty".to_string(),
2643 }
2644 .into());
2645 }
2646
2647 let (namespace, table_name) = Self::split_object_id(table_id);
2648 let object_id = Self::build_object_id(&namespace, &table_name);
2649
2650 let existing = self.query_manifest_for_table(&object_id).await?;
2652 if existing.is_some() {
2653 return Err(NamespaceError::TableAlreadyExists {
2654 message: table_name.to_string(),
2655 }
2656 .into());
2657 }
2658
2659 let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
2663 format!("{}.lance", table_name)
2665 } else {
2666 Self::generate_dir_name(&object_id)
2668 };
2669 let table_path = self.base_path.clone().join(dir_name.as_str());
2670 let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
2671
2672 if let Some(req_location) = &request.location {
2674 let req_location = req_location.trim_end_matches('/');
2675 if req_location != table_uri {
2676 return Err(NamespaceError::InvalidInput {
2677 message: format!(
2678 "Cannot declare table {} at location {}, must be at location {}",
2679 table_name, req_location, table_uri
2680 ),
2681 }
2682 .into());
2683 }
2684 }
2685
2686 let reserved_file_path = table_path.clone().join(".lance-reserved");
2688
2689 self.object_store
2690 .create(&reserved_file_path)
2691 .await
2692 .map_err(|e| {
2693 lance_core::Error::from(NamespaceError::Internal {
2694 message: format!(
2695 "Failed to create .lance-reserved file for table {}: {}",
2696 table_name, e
2697 ),
2698 })
2699 })?
2700 .shutdown()
2701 .await
2702 .map_err(|e| {
2703 lance_core::Error::from(NamespaceError::Internal {
2704 message: format!(
2705 "Failed to finalize .lance-reserved file for table {}: {}",
2706 table_name, e
2707 ),
2708 })
2709 })?;
2710
2711 let metadata = Self::serialize_metadata(request.properties.as_ref(), "table", &object_id)?;
2712
2713 self.insert_into_manifest_with_metadata(
2715 vec![ManifestEntry {
2716 object_id,
2717 object_type: ObjectType::Table,
2718 location: Some(dir_name),
2719 metadata,
2720 }],
2721 None,
2722 )
2723 .await?;
2724
2725 log::info!(
2726 "Declared table '{}' in manifest at {}",
2727 table_name,
2728 table_uri
2729 );
2730
2731 let vend_credentials = request.vend_credentials.unwrap_or(true);
2733 let storage_options = if vend_credentials {
2734 self.storage_options.clone()
2735 } else {
2736 None
2737 };
2738
2739 Ok(DeclareTableResponse {
2740 location: Some(table_uri),
2741 storage_options,
2742 properties: request.properties,
2743 ..Default::default()
2744 })
2745 }
2746
2747 async fn register_table(&self, request: RegisterTableRequest) -> Result<RegisterTableResponse> {
2748 let table_id = request.id.as_ref().ok_or_else(|| {
2749 lance_core::Error::from(NamespaceError::InvalidInput {
2750 message: "Table ID is required".to_string(),
2751 })
2752 })?;
2753
2754 if table_id.is_empty() {
2755 return Err(NamespaceError::InvalidInput {
2756 message: "Table ID cannot be empty".to_string(),
2757 }
2758 .into());
2759 }
2760
2761 let location = request.location.clone();
2762
2763 if location.contains("://") {
2766 return Err(NamespaceError::InvalidInput {
2767 message: format!(
2768 "Absolute URIs are not allowed for register_table. Location must be a relative path within the root directory: {}",
2769 location
2770 ),
2771 }
2772 .into());
2773 }
2774
2775 if location.starts_with('/') {
2776 return Err(NamespaceError::InvalidInput {
2777 message: format!(
2778 "Absolute paths are not allowed for register_table. Location must be a relative path within the root directory: {}",
2779 location
2780 ),
2781 }
2782 .into());
2783 }
2784
2785 if location.contains("..") {
2787 return Err(NamespaceError::InvalidInput {
2788 message: format!(
2789 "Path traversal is not allowed. Location must be a relative path within the root directory: {}",
2790 location
2791 ),
2792 }
2793 .into());
2794 }
2795
2796 let (namespace, table_name) = Self::split_object_id(table_id);
2797 let object_id = Self::build_object_id(&namespace, &table_name);
2798
2799 if !namespace.is_empty() {
2801 self.validate_namespace_levels_exist(&namespace).await?;
2802 }
2803
2804 if self.manifest_contains_object(&object_id).await? {
2806 return Err(NamespaceError::TableAlreadyExists {
2807 message: object_id.to_string(),
2808 }
2809 .into());
2810 }
2811
2812 self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone()))
2814 .await?;
2815
2816 Ok(RegisterTableResponse {
2817 location: Some(location),
2818 ..Default::default()
2819 })
2820 }
2821
2822 async fn deregister_table(
2823 &self,
2824 request: DeregisterTableRequest,
2825 ) -> Result<DeregisterTableResponse> {
2826 let table_id = request.id.as_ref().ok_or_else(|| {
2827 lance_core::Error::from(NamespaceError::InvalidInput {
2828 message: "Table ID is required".to_string(),
2829 })
2830 })?;
2831
2832 if table_id.is_empty() {
2833 return Err(NamespaceError::InvalidInput {
2834 message: "Table ID cannot be empty".to_string(),
2835 }
2836 .into());
2837 }
2838
2839 let (namespace, table_name) = Self::split_object_id(table_id);
2840 let object_id = Self::build_object_id(&namespace, &table_name);
2841
2842 let table_info = self.query_manifest_for_table(&object_id).await?;
2844
2845 let table_uri = match table_info {
2846 Some(info) => {
2847 self.delete_from_manifest(&object_id).boxed().await?;
2849 Self::construct_full_uri(&self.root, &info.location)?
2850 }
2851 None => {
2852 return Err(NamespaceError::TableNotFound {
2853 message: object_id.to_string(),
2854 }
2855 .into());
2856 }
2857 };
2858
2859 Ok(DeregisterTableResponse {
2860 id: request.id.clone(),
2861 location: Some(table_uri),
2862 ..Default::default()
2863 })
2864 }
2865}
2866
2867#[cfg(test)]
2868mod tests {
2869 use crate::{DirectoryNamespaceBuilder, ManifestNamespace};
2870 use bytes::Bytes;
2871 use lance_core::utils::tempfile::TempStdDir;
2872 use lance_namespace::LanceNamespace;
2873 use lance_namespace::models::{
2874 CreateNamespaceRequest, CreateTableRequest, DescribeTableRequest, DropTableRequest,
2875 ListTablesRequest, TableExistsRequest,
2876 };
2877 use rstest::rstest;
2878
2879 fn create_test_ipc_data() -> Vec<u8> {
2880 use arrow::array::{Int32Array, StringArray};
2881 use arrow::datatypes::{DataType, Field, Schema};
2882 use arrow::ipc::writer::StreamWriter;
2883 use arrow::record_batch::RecordBatch;
2884 use std::sync::Arc;
2885
2886 let schema = Arc::new(Schema::new(vec![
2887 Field::new("id", DataType::Int32, false),
2888 Field::new("name", DataType::Utf8, false),
2889 ]));
2890
2891 let batch = RecordBatch::try_new(
2892 schema.clone(),
2893 vec![
2894 Arc::new(Int32Array::from(vec![1, 2, 3])),
2895 Arc::new(StringArray::from(vec!["a", "b", "c"])),
2896 ],
2897 )
2898 .unwrap();
2899
2900 let mut buffer = Vec::new();
2901 {
2902 let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap();
2903 writer.write(&batch).unwrap();
2904 writer.finish().unwrap();
2905 }
2906 buffer
2907 }
2908
2909 #[rstest]
2910 #[case::with_optimization(true)]
2911 #[case::without_optimization(false)]
2912 #[tokio::test]
2913 async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) {
2914 let temp_dir = TempStdDir::default();
2915 let temp_path = temp_dir.to_str().unwrap();
2916
2917 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2919 .inline_optimization_enabled(inline_optimization)
2920 .build()
2921 .await
2922 .unwrap();
2923
2924 let mut request = ListTablesRequest::new();
2926 request.id = Some(vec![]);
2927 let response = dir_namespace.list_tables(request).await.unwrap();
2928 assert_eq!(response.tables.len(), 0);
2929
2930 let buffer = create_test_ipc_data();
2932 let mut create_request = CreateTableRequest::new();
2933 create_request.id = Some(vec!["test_table".to_string()]);
2934
2935 let _response = dir_namespace
2936 .create_table(create_request, Bytes::from(buffer))
2937 .await
2938 .unwrap();
2939
2940 let mut request = ListTablesRequest::new();
2942 request.id = Some(vec![]);
2943 let response = dir_namespace.list_tables(request).await.unwrap();
2944 assert_eq!(response.tables.len(), 1);
2945 assert_eq!(response.tables[0], "test_table");
2946 }
2947
2948 #[rstest]
2949 #[case::with_optimization(true)]
2950 #[case::without_optimization(false)]
2951 #[tokio::test]
2952 async fn test_manifest_namespace_table_exists(#[case] inline_optimization: bool) {
2953 let temp_dir = TempStdDir::default();
2954 let temp_path = temp_dir.to_str().unwrap();
2955
2956 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2957 .inline_optimization_enabled(inline_optimization)
2958 .build()
2959 .await
2960 .unwrap();
2961
2962 let mut request = TableExistsRequest::new();
2964 request.id = Some(vec!["nonexistent".to_string()]);
2965 let result = dir_namespace.table_exists(request).await;
2966 assert!(result.is_err());
2967
2968 let buffer = create_test_ipc_data();
2970 let mut create_request = CreateTableRequest::new();
2971 create_request.id = Some(vec!["test_table".to_string()]);
2972 dir_namespace
2973 .create_table(create_request, Bytes::from(buffer))
2974 .await
2975 .unwrap();
2976
2977 let mut request = TableExistsRequest::new();
2979 request.id = Some(vec!["test_table".to_string()]);
2980 let result = dir_namespace.table_exists(request).await;
2981 assert!(result.is_ok());
2982 }
2983
2984 #[rstest]
2985 #[case::with_optimization(true)]
2986 #[case::without_optimization(false)]
2987 #[tokio::test]
2988 async fn test_manifest_namespace_describe_table(#[case] inline_optimization: bool) {
2989 let temp_dir = TempStdDir::default();
2990 let temp_path = temp_dir.to_str().unwrap();
2991
2992 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2993 .inline_optimization_enabled(inline_optimization)
2994 .build()
2995 .await
2996 .unwrap();
2997
2998 let mut request = DescribeTableRequest::new();
3000 request.id = Some(vec!["nonexistent".to_string()]);
3001 let result = dir_namespace.describe_table(request).await;
3002 assert!(result.is_err());
3003
3004 let buffer = create_test_ipc_data();
3006 let mut create_request = CreateTableRequest::new();
3007 create_request.id = Some(vec!["test_table".to_string()]);
3008 dir_namespace
3009 .create_table(create_request, Bytes::from(buffer))
3010 .await
3011 .unwrap();
3012
3013 let mut request = DescribeTableRequest::new();
3015 request.id = Some(vec!["test_table".to_string()]);
3016 let response = dir_namespace.describe_table(request).await.unwrap();
3017 assert!(response.location.is_some());
3018 assert!(response.location.unwrap().contains("test_table"));
3019 }
3020
3021 #[rstest]
3022 #[case::with_optimization(true)]
3023 #[case::without_optimization(false)]
3024 #[tokio::test]
3025 async fn test_manifest_namespace_drop_table(#[case] inline_optimization: bool) {
3026 let temp_dir = TempStdDir::default();
3027 let temp_path = temp_dir.to_str().unwrap();
3028
3029 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3030 .inline_optimization_enabled(inline_optimization)
3031 .build()
3032 .await
3033 .unwrap();
3034
3035 let buffer = create_test_ipc_data();
3037 let mut create_request = CreateTableRequest::new();
3038 create_request.id = Some(vec!["test_table".to_string()]);
3039 dir_namespace
3040 .create_table(create_request, Bytes::from(buffer))
3041 .await
3042 .unwrap();
3043
3044 let mut request = ListTablesRequest::new();
3046 request.id = Some(vec![]);
3047 let response = dir_namespace.list_tables(request).await.unwrap();
3048 assert_eq!(response.tables.len(), 1);
3049
3050 let mut drop_request = DropTableRequest::new();
3052 drop_request.id = Some(vec!["test_table".to_string()]);
3053 let _response = dir_namespace.drop_table(drop_request).await.unwrap();
3054
3055 let mut request = ListTablesRequest::new();
3057 request.id = Some(vec![]);
3058 let response = dir_namespace.list_tables(request).await.unwrap();
3059 assert_eq!(response.tables.len(), 0);
3060 }
3061
3062 #[tokio::test]
3063 async fn test_list_tables_pagination_limit_zero() {
3064 let temp_dir = TempStdDir::default();
3065 let temp_path = temp_dir.to_str().unwrap();
3066
3067 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3068 .build()
3069 .await
3070 .unwrap();
3071
3072 let buffer = create_test_ipc_data();
3073 let mut create_request = CreateTableRequest::new();
3074 create_request.id = Some(vec!["alpha".to_string()]);
3075 dir_namespace
3076 .create_table(create_request, Bytes::from(buffer))
3077 .await
3078 .unwrap();
3079
3080 let response = dir_namespace
3081 .list_tables(ListTablesRequest {
3082 id: Some(vec![]),
3083 limit: Some(0),
3084 ..Default::default()
3085 })
3086 .await
3087 .unwrap();
3088
3089 assert!(response.tables.is_empty());
3090 assert!(response.page_token.is_none());
3091 }
3092
3093 #[rstest]
3094 #[case::with_optimization(true)]
3095 #[case::without_optimization(false)]
3096 #[tokio::test]
3097 async fn test_manifest_namespace_multiple_tables(#[case] inline_optimization: bool) {
3098 let temp_dir = TempStdDir::default();
3099 let temp_path = temp_dir.to_str().unwrap();
3100
3101 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3102 .inline_optimization_enabled(inline_optimization)
3103 .build()
3104 .await
3105 .unwrap();
3106
3107 let buffer = create_test_ipc_data();
3109 for i in 1..=3 {
3110 let mut create_request = CreateTableRequest::new();
3111 create_request.id = Some(vec![format!("table{}", i)]);
3112 dir_namespace
3113 .create_table(create_request, Bytes::from(buffer.clone()))
3114 .await
3115 .unwrap();
3116 }
3117
3118 let mut request = ListTablesRequest::new();
3120 request.id = Some(vec![]);
3121 let response = dir_namespace.list_tables(request).await.unwrap();
3122 assert_eq!(response.tables.len(), 3);
3123 assert!(response.tables.contains(&"table1".to_string()));
3124 assert!(response.tables.contains(&"table2".to_string()));
3125 assert!(response.tables.contains(&"table3".to_string()));
3126 }
3127
3128 #[rstest]
3129 #[case::with_optimization(true)]
3130 #[case::without_optimization(false)]
3131 #[tokio::test]
3132 async fn test_directory_only_mode(#[case] inline_optimization: bool) {
3133 let temp_dir = TempStdDir::default();
3134 let temp_path = temp_dir.to_str().unwrap();
3135
3136 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3138 .manifest_enabled(false)
3139 .inline_optimization_enabled(inline_optimization)
3140 .build()
3141 .await
3142 .unwrap();
3143
3144 let mut request = ListTablesRequest::new();
3146 request.id = Some(vec![]);
3147 let response = dir_namespace.list_tables(request).await.unwrap();
3148 assert_eq!(response.tables.len(), 0);
3149
3150 let buffer = create_test_ipc_data();
3152 let mut create_request = CreateTableRequest::new();
3153 create_request.id = Some(vec!["test_table".to_string()]);
3154
3155 let _response = dir_namespace
3157 .create_table(create_request, Bytes::from(buffer))
3158 .await
3159 .unwrap();
3160
3161 let mut request = ListTablesRequest::new();
3163 request.id = Some(vec![]);
3164 let response = dir_namespace.list_tables(request).await.unwrap();
3165 assert_eq!(response.tables.len(), 1);
3166 assert_eq!(response.tables[0], "test_table");
3167 }
3168
3169 #[rstest]
3170 #[case::with_optimization(true)]
3171 #[case::without_optimization(false)]
3172 #[tokio::test]
3173 async fn test_dual_mode_merge(#[case] inline_optimization: bool) {
3174 let temp_dir = TempStdDir::default();
3175 let temp_path = temp_dir.to_str().unwrap();
3176
3177 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3179 .manifest_enabled(true)
3180 .dir_listing_enabled(true)
3181 .inline_optimization_enabled(inline_optimization)
3182 .build()
3183 .await
3184 .unwrap();
3185
3186 let buffer = create_test_ipc_data();
3188 let mut create_request = CreateTableRequest::new();
3189 create_request.id = Some(vec!["table1".to_string()]);
3190 dir_namespace
3191 .create_table(create_request, Bytes::from(buffer))
3192 .await
3193 .unwrap();
3194
3195 let mut request = ListTablesRequest::new();
3197 request.id = Some(vec![]);
3198 let response = dir_namespace.list_tables(request).await.unwrap();
3199 assert_eq!(response.tables.len(), 1);
3200 assert_eq!(response.tables[0], "table1");
3201 }
3202
3203 #[rstest]
3204 #[case::with_optimization(true)]
3205 #[case::without_optimization(false)]
3206 #[tokio::test]
3207 async fn test_manifest_only_mode(#[case] inline_optimization: bool) {
3208 let temp_dir = TempStdDir::default();
3209 let temp_path = temp_dir.to_str().unwrap();
3210
3211 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3213 .manifest_enabled(true)
3214 .dir_listing_enabled(false)
3215 .inline_optimization_enabled(inline_optimization)
3216 .build()
3217 .await
3218 .unwrap();
3219
3220 let buffer = create_test_ipc_data();
3222 let mut create_request = CreateTableRequest::new();
3223 create_request.id = Some(vec!["test_table".to_string()]);
3224 dir_namespace
3225 .create_table(create_request, Bytes::from(buffer))
3226 .await
3227 .unwrap();
3228
3229 let mut request = ListTablesRequest::new();
3231 request.id = Some(vec![]);
3232 let response = dir_namespace.list_tables(request).await.unwrap();
3233 assert_eq!(response.tables.len(), 1);
3234 assert_eq!(response.tables[0], "test_table");
3235 }
3236
3237 #[rstest]
3238 #[case::with_optimization(true)]
3239 #[case::without_optimization(false)]
3240 #[tokio::test]
3241 async fn test_drop_nonexistent_table(#[case] inline_optimization: bool) {
3242 let temp_dir = TempStdDir::default();
3243 let temp_path = temp_dir.to_str().unwrap();
3244
3245 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3246 .inline_optimization_enabled(inline_optimization)
3247 .build()
3248 .await
3249 .unwrap();
3250
3251 let mut drop_request = DropTableRequest::new();
3253 drop_request.id = Some(vec!["nonexistent".to_string()]);
3254 let result = dir_namespace.drop_table(drop_request).await;
3255 assert!(result.is_err());
3256 }
3257
3258 #[rstest]
3259 #[case::with_optimization(true)]
3260 #[case::without_optimization(false)]
3261 #[tokio::test]
3262 async fn test_create_duplicate_table_fails(#[case] inline_optimization: bool) {
3263 let temp_dir = TempStdDir::default();
3264 let temp_path = temp_dir.to_str().unwrap();
3265
3266 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3267 .inline_optimization_enabled(inline_optimization)
3268 .build()
3269 .await
3270 .unwrap();
3271
3272 let buffer = create_test_ipc_data();
3274 let mut create_request = CreateTableRequest::new();
3275 create_request.id = Some(vec!["test_table".to_string()]);
3276 dir_namespace
3277 .create_table(create_request, Bytes::from(buffer.clone()))
3278 .await
3279 .unwrap();
3280
3281 let mut create_request = CreateTableRequest::new();
3283 create_request.id = Some(vec!["test_table".to_string()]);
3284 let result = dir_namespace
3285 .create_table(create_request, Bytes::from(buffer))
3286 .await;
3287 assert!(result.is_err());
3288 }
3289
3290 #[rstest]
3291 #[case::with_optimization(true)]
3292 #[case::without_optimization(false)]
3293 #[tokio::test]
3294 async fn test_create_child_namespace(#[case] inline_optimization: bool) {
3295 use lance_namespace::models::{
3296 CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
3297 };
3298
3299 let temp_dir = TempStdDir::default();
3300 let temp_path = temp_dir.to_str().unwrap();
3301
3302 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3303 .inline_optimization_enabled(inline_optimization)
3304 .build()
3305 .await
3306 .unwrap();
3307
3308 let mut create_req = CreateNamespaceRequest::new();
3310 create_req.id = Some(vec!["ns1".to_string()]);
3311 let result = dir_namespace.create_namespace(create_req).await;
3312 assert!(
3313 result.is_ok(),
3314 "Failed to create child namespace: {:?}",
3315 result.err()
3316 );
3317
3318 let exists_req = NamespaceExistsRequest {
3320 id: Some(vec!["ns1".to_string()]),
3321 ..Default::default()
3322 };
3323 let result = dir_namespace.namespace_exists(exists_req).await;
3324 assert!(result.is_ok(), "Namespace should exist");
3325
3326 let list_req = ListNamespacesRequest {
3328 id: Some(vec![]),
3329 page_token: None,
3330 limit: None,
3331 ..Default::default()
3332 };
3333 let result = dir_namespace.list_namespaces(list_req).await;
3334 assert!(result.is_ok());
3335 let namespaces = result.unwrap();
3336 assert_eq!(namespaces.namespaces.len(), 1);
3337 assert_eq!(namespaces.namespaces[0], "ns1");
3338 }
3339
3340 #[rstest]
3341 #[case::with_optimization(true)]
3342 #[case::without_optimization(false)]
3343 #[tokio::test]
3344 async fn test_create_nested_namespace(#[case] inline_optimization: bool) {
3345 use lance_namespace::models::{
3346 CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
3347 };
3348
3349 let temp_dir = TempStdDir::default();
3350 let temp_path = temp_dir.to_str().unwrap();
3351
3352 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3353 .inline_optimization_enabled(inline_optimization)
3354 .build()
3355 .await
3356 .unwrap();
3357
3358 let mut create_req = CreateNamespaceRequest::new();
3360 create_req.id = Some(vec!["parent".to_string()]);
3361 dir_namespace.create_namespace(create_req).await.unwrap();
3362
3363 let mut create_req = CreateNamespaceRequest::new();
3365 create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
3366 let result = dir_namespace.create_namespace(create_req).await;
3367 assert!(
3368 result.is_ok(),
3369 "Failed to create nested namespace: {:?}",
3370 result.err()
3371 );
3372
3373 let exists_req = NamespaceExistsRequest {
3375 id: Some(vec!["parent".to_string(), "child".to_string()]),
3376 ..Default::default()
3377 };
3378 let result = dir_namespace.namespace_exists(exists_req).await;
3379 assert!(result.is_ok(), "Nested namespace should exist");
3380
3381 let list_req = ListNamespacesRequest {
3383 id: Some(vec!["parent".to_string()]),
3384 page_token: None,
3385 limit: None,
3386 ..Default::default()
3387 };
3388 let result = dir_namespace.list_namespaces(list_req).await;
3389 assert!(result.is_ok());
3390 let namespaces = result.unwrap();
3391 assert_eq!(namespaces.namespaces.len(), 1);
3392 assert_eq!(namespaces.namespaces[0], "child");
3393 }
3394
3395 #[rstest]
3396 #[case::with_optimization(true)]
3397 #[case::without_optimization(false)]
3398 #[tokio::test]
3399 async fn test_create_namespace_without_parent_fails(#[case] inline_optimization: bool) {
3400 use lance_namespace::models::CreateNamespaceRequest;
3401
3402 let temp_dir = TempStdDir::default();
3403 let temp_path = temp_dir.to_str().unwrap();
3404
3405 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3406 .inline_optimization_enabled(inline_optimization)
3407 .build()
3408 .await
3409 .unwrap();
3410
3411 let mut create_req = CreateNamespaceRequest::new();
3413 create_req.id = Some(vec!["nonexistent_parent".to_string(), "child".to_string()]);
3414 let result = dir_namespace.create_namespace(create_req).await;
3415 assert!(result.is_err(), "Should fail when parent doesn't exist");
3416 }
3417
3418 #[rstest]
3419 #[case::with_optimization(true)]
3420 #[case::without_optimization(false)]
3421 #[tokio::test]
3422 async fn test_drop_child_namespace(#[case] inline_optimization: bool) {
3423 use lance_namespace::models::{
3424 CreateNamespaceRequest, DropNamespaceRequest, NamespaceExistsRequest,
3425 };
3426
3427 let temp_dir = TempStdDir::default();
3428 let temp_path = temp_dir.to_str().unwrap();
3429
3430 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3431 .inline_optimization_enabled(inline_optimization)
3432 .build()
3433 .await
3434 .unwrap();
3435
3436 let mut create_req = CreateNamespaceRequest::new();
3438 create_req.id = Some(vec!["ns1".to_string()]);
3439 dir_namespace.create_namespace(create_req).await.unwrap();
3440
3441 let mut drop_req = DropNamespaceRequest::new();
3443 drop_req.id = Some(vec!["ns1".to_string()]);
3444 let result = dir_namespace.drop_namespace(drop_req).await;
3445 assert!(
3446 result.is_ok(),
3447 "Failed to drop namespace: {:?}",
3448 result.err()
3449 );
3450
3451 let exists_req = NamespaceExistsRequest {
3453 id: Some(vec!["ns1".to_string()]),
3454 ..Default::default()
3455 };
3456 let result = dir_namespace.namespace_exists(exists_req).await;
3457 assert!(result.is_err(), "Namespace should not exist after drop");
3458 }
3459
3460 #[rstest]
3461 #[case::with_optimization(true)]
3462 #[case::without_optimization(false)]
3463 #[tokio::test]
3464 async fn test_drop_namespace_with_children_fails(#[case] inline_optimization: bool) {
3465 use lance_namespace::models::{CreateNamespaceRequest, DropNamespaceRequest};
3466
3467 let temp_dir = TempStdDir::default();
3468 let temp_path = temp_dir.to_str().unwrap();
3469
3470 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3471 .inline_optimization_enabled(inline_optimization)
3472 .build()
3473 .await
3474 .unwrap();
3475
3476 let mut create_req = CreateNamespaceRequest::new();
3478 create_req.id = Some(vec!["parent".to_string()]);
3479 dir_namespace.create_namespace(create_req).await.unwrap();
3480
3481 let mut create_req = CreateNamespaceRequest::new();
3482 create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
3483 dir_namespace.create_namespace(create_req).await.unwrap();
3484
3485 let mut drop_req = DropNamespaceRequest::new();
3487 drop_req.id = Some(vec!["parent".to_string()]);
3488 let result = dir_namespace.drop_namespace(drop_req).await;
3489 assert!(result.is_err(), "Should fail when namespace has children");
3490 }
3491
3492 #[rstest]
3493 #[case::with_optimization(true)]
3494 #[case::without_optimization(false)]
3495 #[tokio::test]
3496 async fn test_create_table_in_child_namespace(#[case] inline_optimization: bool) {
3497 use lance_namespace::models::{
3498 CreateNamespaceRequest, CreateTableRequest, ListTablesRequest,
3499 };
3500
3501 let temp_dir = TempStdDir::default();
3502 let temp_path = temp_dir.to_str().unwrap();
3503
3504 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3505 .inline_optimization_enabled(inline_optimization)
3506 .build()
3507 .await
3508 .unwrap();
3509
3510 let mut create_ns_req = CreateNamespaceRequest::new();
3512 create_ns_req.id = Some(vec!["ns1".to_string()]);
3513 dir_namespace.create_namespace(create_ns_req).await.unwrap();
3514
3515 let buffer = create_test_ipc_data();
3517 let mut create_table_req = CreateTableRequest::new();
3518 create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
3519 let result = dir_namespace
3520 .create_table(create_table_req, Bytes::from(buffer))
3521 .await;
3522 assert!(
3523 result.is_ok(),
3524 "Failed to create table in child namespace: {:?}",
3525 result.err()
3526 );
3527
3528 let list_req = ListTablesRequest {
3530 id: Some(vec!["ns1".to_string()]),
3531 page_token: None,
3532 limit: None,
3533 ..Default::default()
3534 };
3535 let result = dir_namespace.list_tables(list_req).await;
3536 assert!(result.is_ok());
3537 let tables = result.unwrap();
3538 assert_eq!(tables.tables.len(), 1);
3539 assert_eq!(tables.tables[0], "table1");
3540 }
3541
3542 #[rstest]
3543 #[case::with_optimization(true)]
3544 #[case::without_optimization(false)]
3545 #[tokio::test]
3546 async fn test_describe_child_namespace(#[case] inline_optimization: bool) {
3547 use lance_namespace::models::{CreateNamespaceRequest, DescribeNamespaceRequest};
3548
3549 let temp_dir = TempStdDir::default();
3550 let temp_path = temp_dir.to_str().unwrap();
3551
3552 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3553 .inline_optimization_enabled(inline_optimization)
3554 .build()
3555 .await
3556 .unwrap();
3557
3558 let mut properties = std::collections::HashMap::new();
3560 properties.insert("key1".to_string(), "value1".to_string());
3561
3562 let mut create_req = CreateNamespaceRequest::new();
3563 create_req.id = Some(vec!["ns1".to_string()]);
3564 create_req.properties = Some(properties.clone());
3565 dir_namespace.create_namespace(create_req).await.unwrap();
3566
3567 let describe_req = DescribeNamespaceRequest {
3569 id: Some(vec!["ns1".to_string()]),
3570 ..Default::default()
3571 };
3572 let result = dir_namespace.describe_namespace(describe_req).await;
3573 assert!(
3574 result.is_ok(),
3575 "Failed to describe namespace: {:?}",
3576 result.err()
3577 );
3578 let response = result.unwrap();
3579 assert!(response.properties.is_some());
3580 assert_eq!(
3581 response.properties.unwrap().get("key1"),
3582 Some(&"value1".to_string())
3583 );
3584 }
3585
3586 #[rstest]
3587 #[case::with_optimization(true)]
3588 #[case::without_optimization(false)]
3589 #[tokio::test]
3590 async fn test_concurrent_create_and_drop_single_instance(#[case] inline_optimization: bool) {
3591 use futures::future::join_all;
3592 use std::sync::Arc;
3593
3594 let temp_dir = TempStdDir::default();
3595 let temp_path = temp_dir.to_str().unwrap();
3596
3597 let dir_namespace = Arc::new(
3598 DirectoryNamespaceBuilder::new(temp_path)
3599 .inline_optimization_enabled(inline_optimization)
3600 .build()
3601 .await
3602 .unwrap(),
3603 );
3604
3605 let mut create_ns_request = CreateNamespaceRequest::new();
3608 create_ns_request.id = Some(vec!["test_ns".to_string()]);
3609 dir_namespace
3610 .create_namespace(create_ns_request)
3611 .await
3612 .unwrap();
3613
3614 let num_tables = 10;
3615 let mut handles = Vec::new();
3616
3617 for i in 0..num_tables {
3618 let ns = dir_namespace.clone();
3619 let handle = async move {
3620 let table_name = format!("concurrent_table_{}", i);
3621 let table_id = vec!["test_ns".to_string(), table_name.clone()];
3622 let buffer = create_test_ipc_data();
3623
3624 let mut create_request = CreateTableRequest::new();
3626 create_request.id = Some(table_id.clone());
3627 ns.create_table(create_request, Bytes::from(buffer))
3628 .await
3629 .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3630
3631 let mut drop_request = DropTableRequest::new();
3633 drop_request.id = Some(table_id);
3634 ns.drop_table(drop_request)
3635 .await
3636 .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3637
3638 Ok::<_, lance_core::Error>(())
3639 };
3640 handles.push(handle);
3641 }
3642
3643 let results = join_all(handles).await;
3644 for result in results {
3645 assert!(result.is_ok(), "All concurrent operations should succeed");
3646 }
3647
3648 let mut request = ListTablesRequest::new();
3650 request.id = Some(vec!["test_ns".to_string()]);
3651 let response = dir_namespace.list_tables(request).await.unwrap();
3652 assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3653 }
3654
3655 #[rstest]
3656 #[case::with_optimization(true)]
3657 #[case::without_optimization(false)]
3658 #[tokio::test]
3659 async fn test_concurrent_create_and_drop_multiple_instances(#[case] inline_optimization: bool) {
3660 use futures::future::join_all;
3661
3662 let temp_dir = TempStdDir::default();
3663 let temp_path = temp_dir.to_str().unwrap().to_string();
3664
3665 let init_ns = DirectoryNamespaceBuilder::new(&temp_path)
3668 .inline_optimization_enabled(inline_optimization)
3669 .build()
3670 .await
3671 .unwrap();
3672 let mut create_ns_request = CreateNamespaceRequest::new();
3673 create_ns_request.id = Some(vec!["test_ns".to_string()]);
3674 init_ns.create_namespace(create_ns_request).await.unwrap();
3675
3676 let num_tables = 10;
3677 let mut handles = Vec::new();
3678
3679 for i in 0..num_tables {
3680 let path = temp_path.clone();
3681 let handle = async move {
3682 let ns = DirectoryNamespaceBuilder::new(&path)
3684 .inline_optimization_enabled(inline_optimization)
3685 .build()
3686 .await
3687 .unwrap();
3688
3689 let table_name = format!("multi_ns_table_{}", i);
3690 let table_id = vec!["test_ns".to_string(), table_name.clone()];
3691 let buffer = create_test_ipc_data();
3692
3693 let mut create_request = CreateTableRequest::new();
3695 create_request.id = Some(table_id.clone());
3696 ns.create_table(create_request, Bytes::from(buffer))
3697 .await
3698 .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3699
3700 let mut drop_request = DropTableRequest::new();
3702 drop_request.id = Some(table_id);
3703 ns.drop_table(drop_request)
3704 .await
3705 .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3706
3707 Ok::<_, lance_core::Error>(())
3708 };
3709 handles.push(handle);
3710 }
3711
3712 let results = join_all(handles).await;
3713 for result in results {
3714 assert!(result.is_ok(), "All concurrent operations should succeed");
3715 }
3716
3717 let verify_ns = DirectoryNamespaceBuilder::new(&temp_path)
3719 .inline_optimization_enabled(inline_optimization)
3720 .build()
3721 .await
3722 .unwrap();
3723
3724 let mut request = ListTablesRequest::new();
3725 request.id = Some(vec!["test_ns".to_string()]);
3726 let response = verify_ns.list_tables(request).await.unwrap();
3727 assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3728 }
3729
3730 #[rstest]
3731 #[case::with_optimization(true)]
3732 #[case::without_optimization(false)]
3733 #[tokio::test]
3734 async fn test_concurrent_create_then_drop_from_different_instance(
3735 #[case] inline_optimization: bool,
3736 ) {
3737 use futures::future::join_all;
3738
3739 let temp_dir = TempStdDir::default();
3740 let temp_path = temp_dir.to_str().unwrap().to_string();
3741
3742 let init_ns = DirectoryNamespaceBuilder::new(&temp_path)
3745 .inline_optimization_enabled(inline_optimization)
3746 .build()
3747 .await
3748 .unwrap();
3749 let mut create_ns_request = CreateNamespaceRequest::new();
3750 create_ns_request.id = Some(vec!["test_ns".to_string()]);
3751 init_ns.create_namespace(create_ns_request).await.unwrap();
3752
3753 let num_tables = 10;
3754
3755 let mut create_handles = Vec::new();
3757 for i in 0..num_tables {
3758 let path = temp_path.clone();
3759 let handle = async move {
3760 let ns = DirectoryNamespaceBuilder::new(&path)
3761 .inline_optimization_enabled(inline_optimization)
3762 .build()
3763 .await
3764 .unwrap();
3765
3766 let table_name = format!("cross_instance_table_{}", i);
3767 let table_id = vec!["test_ns".to_string(), table_name.clone()];
3768 let buffer = create_test_ipc_data();
3769
3770 let mut create_request = CreateTableRequest::new();
3771 create_request.id = Some(table_id);
3772 ns.create_table(create_request, Bytes::from(buffer))
3773 .await
3774 .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3775
3776 Ok::<_, lance_core::Error>(())
3777 };
3778 create_handles.push(handle);
3779 }
3780
3781 let create_results = join_all(create_handles).await;
3782 for result in create_results {
3783 assert!(result.is_ok(), "All create operations should succeed");
3784 }
3785
3786 let mut drop_handles = Vec::new();
3788 for i in 0..num_tables {
3789 let path = temp_path.clone();
3790 let handle = async move {
3791 let ns = DirectoryNamespaceBuilder::new(&path)
3792 .inline_optimization_enabled(inline_optimization)
3793 .build()
3794 .await
3795 .unwrap();
3796
3797 let table_name = format!("cross_instance_table_{}", i);
3798 let table_id = vec!["test_ns".to_string(), table_name.clone()];
3799
3800 let mut drop_request = DropTableRequest::new();
3801 drop_request.id = Some(table_id);
3802 ns.drop_table(drop_request)
3803 .await
3804 .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3805
3806 Ok::<_, lance_core::Error>(())
3807 };
3808 drop_handles.push(handle);
3809 }
3810
3811 let drop_results = join_all(drop_handles).await;
3812 for result in drop_results {
3813 assert!(result.is_ok(), "All drop operations should succeed");
3814 }
3815
3816 let verify_ns = DirectoryNamespaceBuilder::new(&temp_path)
3818 .inline_optimization_enabled(inline_optimization)
3819 .build()
3820 .await
3821 .unwrap();
3822
3823 let mut request = ListTablesRequest::new();
3824 request.id = Some(vec!["test_ns".to_string()]);
3825 let response = verify_ns.list_tables(request).await.unwrap();
3826 assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3827 }
3828
3829 #[test]
3830 fn test_construct_full_uri_with_cloud_urls() {
3831 let s3_result =
3833 ManifestNamespace::construct_full_uri("s3://bucket/path/subdir", "table.lance")
3834 .unwrap();
3835 assert_eq!(
3836 s3_result, "s3://bucket/path/subdir/table.lance",
3837 "S3 URL should correctly append table name to nested path"
3838 );
3839
3840 let az_result =
3842 ManifestNamespace::construct_full_uri("az://container/path/subdir", "table.lance")
3843 .unwrap();
3844 assert_eq!(
3845 az_result, "az://container/path/subdir/table.lance",
3846 "Azure URL should correctly append table name to nested path"
3847 );
3848
3849 let gs_result =
3851 ManifestNamespace::construct_full_uri("gs://bucket/path/subdir", "table.lance")
3852 .unwrap();
3853 assert_eq!(
3854 gs_result, "gs://bucket/path/subdir/table.lance",
3855 "GCS URL should correctly append table name to nested path"
3856 );
3857
3858 let deep_result =
3860 ManifestNamespace::construct_full_uri("s3://bucket/a/b/c/d", "my_table.lance").unwrap();
3861 assert_eq!(
3862 deep_result, "s3://bucket/a/b/c/d/my_table.lance",
3863 "Deeply nested path should work correctly"
3864 );
3865
3866 let shallow_result =
3868 ManifestNamespace::construct_full_uri("s3://bucket", "table.lance").unwrap();
3869 assert_eq!(
3870 shallow_result, "s3://bucket/table.lance",
3871 "Single-level nested path should work correctly"
3872 );
3873
3874 let trailing_slash_result =
3876 ManifestNamespace::construct_full_uri("s3://bucket/path/subdir/", "table.lance")
3877 .unwrap();
3878 assert_eq!(
3879 trailing_slash_result, "s3://bucket/path/subdir/table.lance",
3880 "URL with existing trailing slash should still work"
3881 );
3882
3883 let empty_query_result =
3886 ManifestNamespace::construct_full_uri("s3://bucket/path?", "table.lance").unwrap();
3887 assert_eq!(
3888 empty_query_result, "s3://bucket/path/table.lance",
3889 "URL with empty query string should not include trailing '?'"
3890 );
3891
3892 let query_param_result =
3895 ManifestNamespace::construct_full_uri("s3://bucket/path?param=value", "table.lance")
3896 .unwrap();
3897 assert_eq!(
3898 query_param_result, "s3://bucket/path/table.lance",
3899 "URL with query parameters should have them stripped"
3900 );
3901 }
3902
3903 #[test]
3904 fn test_construct_full_uri_with_dollar_sign() {
3905 let result =
3906 ManifestNamespace::construct_full_uri("/tmp/root", "hash_workspace$test_table")
3907 .unwrap();
3908
3909 assert!(
3910 result.ends_with("/tmp/root/hash_workspace$test_table"),
3911 "local file URI should preserve dollar signs without adding empty path segments: {}",
3912 result
3913 );
3914 assert!(
3915 !result.contains("//hash_workspace$test_table"),
3916 "local file URI should not add a double slash before table directory: {}",
3917 result
3918 );
3919 }
3920
3921 #[test]
3922 fn test_construct_full_uri_with_nested_relative_location() {
3923 let result =
3924 ManifestNamespace::construct_full_uri("/tmp/root", "workspace/physical_table.lance")
3925 .unwrap();
3926
3927 assert!(
3928 result.ends_with("/tmp/root/workspace/physical_table.lance"),
3929 "nested relative location should preserve path separators: {}",
3930 result
3931 );
3932 assert!(
3933 !result.contains("%2Fphysical_table.lance"),
3934 "nested relative location should not encode path separators: {}",
3935 result
3936 );
3937 }
3938
3939 #[tokio::test]
3946 async fn test_concurrent_create_table_no_duplicates() {
3947 let temp_dir = TempStdDir::default();
3948 let temp_path = temp_dir.to_str().unwrap();
3949
3950 let ns1 = DirectoryNamespaceBuilder::new(temp_path)
3953 .inline_optimization_enabled(false)
3954 .build()
3955 .await
3956 .unwrap();
3957 let ns2 = DirectoryNamespaceBuilder::new(temp_path)
3958 .inline_optimization_enabled(false)
3959 .build()
3960 .await
3961 .unwrap();
3962
3963 let buffer = create_test_ipc_data();
3964
3965 let mut req1 = CreateTableRequest::new();
3966 req1.id = Some(vec!["race_table".to_string()]);
3967 let mut req2 = CreateTableRequest::new();
3968 req2.id = Some(vec!["race_table".to_string()]);
3969
3970 let (result1, result2) = tokio::join!(
3972 ns1.create_table(req1, Bytes::from(buffer.clone())),
3973 ns2.create_table(req2, Bytes::from(buffer.clone())),
3974 );
3975
3976 let success_count = [&result1, &result2].iter().filter(|r| r.is_ok()).count();
3978 let failure_count = [&result1, &result2].iter().filter(|r| r.is_err()).count();
3979 assert_eq!(
3980 success_count, 1,
3981 "Exactly one create should succeed, got: result1={:?}, result2={:?}",
3982 result1, result2
3983 );
3984 assert_eq!(
3985 failure_count, 1,
3986 "Exactly one create should fail, got: result1={:?}, result2={:?}",
3987 result1, result2
3988 );
3989
3990 let ns_check = DirectoryNamespaceBuilder::new(temp_path)
3992 .inline_optimization_enabled(false)
3993 .build()
3994 .await
3995 .unwrap();
3996 let mut list_request = ListTablesRequest::new();
3997 list_request.id = Some(vec![]);
3998 let response = ns_check.list_tables(list_request).await.unwrap();
3999 assert_eq!(
4000 response.tables.len(),
4001 1,
4002 "Should have exactly 1 table, found: {:?}",
4003 response.tables
4004 );
4005 assert_eq!(response.tables[0], "race_table");
4006
4007 let mut describe_request = DescribeTableRequest::new();
4009 describe_request.id = Some(vec!["race_table".to_string()]);
4010 let describe_result = ns_check.describe_table(describe_request).await;
4011 assert!(
4012 describe_result.is_ok(),
4013 "describe_table should not fail with duplicate entries: {:?}",
4014 describe_result
4015 );
4016 }
4017
4018 fn names(v: &[&str]) -> Vec<String> {
4021 v.iter().map(|s| s.to_string()).collect()
4022 }
4023
4024 #[test]
4025 fn test_apply_pagination_no_token_no_limit() {
4026 let mut n = names(&["b", "a", "c"]);
4027 let next = ManifestNamespace::apply_pagination(&mut n, None, None);
4028 assert_eq!(n, names(&["a", "b", "c"]));
4029 assert_eq!(next, None);
4030 }
4031
4032 #[test]
4033 fn test_apply_pagination_limit_truncates_and_returns_token() {
4034 let mut n = names(&["c", "a", "b"]);
4035 let next = ManifestNamespace::apply_pagination(&mut n, None, Some(2));
4036 assert_eq!(n, names(&["a", "b"]));
4037 assert_eq!(next, Some("b".to_string()));
4038 }
4039
4040 #[test]
4041 fn test_apply_pagination_limit_zero_returns_empty_no_token() {
4042 let mut n = names(&["a", "b", "c"]);
4043 let next = ManifestNamespace::apply_pagination(&mut n, None, Some(0));
4044 assert!(n.is_empty());
4045 assert_eq!(next, None);
4046 }
4047
4048 #[test]
4049 fn test_apply_pagination_page_token_in_list() {
4050 let mut n = names(&["a", "b", "c", "d"]);
4052 let next = ManifestNamespace::apply_pagination(&mut n, Some("b".to_string()), None);
4053 assert_eq!(n, names(&["c", "d"]));
4054 assert_eq!(next, None);
4055 }
4056
4057 #[test]
4058 fn test_apply_pagination_page_token_past_all_items() {
4059 let mut n = names(&["a", "b", "c"]);
4060 let next = ManifestNamespace::apply_pagination(&mut n, Some("z".to_string()), None);
4061 assert!(n.is_empty());
4062 assert_eq!(next, None);
4063 }
4064
4065 #[test]
4066 fn test_apply_pagination_token_and_limit_combined() {
4067 let mut n = names(&["a", "b", "c", "d", "e"]);
4068 let next = ManifestNamespace::apply_pagination(&mut n, Some("b".to_string()), Some(2));
4069 assert_eq!(n, names(&["c", "d"]));
4070 assert_eq!(next, Some("d".to_string()));
4071 }
4072}