1use arrow::array::builder::{ListBuilder, StringBuilder};
10use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray};
11use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
12use arrow_ipc::reader::StreamReader;
13use async_trait::async_trait;
14use bytes::Bytes;
15use futures::{FutureExt, TryStreamExt, stream::StreamExt};
16use lance::dataset::optimize::{CompactionOptions, compact_files};
17use lance::dataset::{
18 DeleteBuilder, MergeInsertBuilder, ReadParams, WhenMatched, WhenNotMatched, WriteMode,
19 WriteParams, builder::DatasetBuilder,
20};
21use lance::index::DatasetIndexExt;
22use lance::session::Session;
23use lance::{Dataset, dataset::scanner::Scanner};
24use lance_core::Error as LanceError;
25use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION;
26use lance_core::{Error, Result};
27use lance_index::IndexType;
28use lance_index::optimize::OptimizeOptions;
29use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
30use lance_io::object_store::{ObjectStore, ObjectStoreParams};
31use lance_namespace::LanceNamespace;
32use lance_namespace::error::NamespaceError;
33use lance_namespace::models::{
34 CreateNamespaceRequest, CreateNamespaceResponse, CreateTableRequest, CreateTableResponse,
35 DeclareTableRequest, DeclareTableResponse, DeregisterTableRequest, DeregisterTableResponse,
36 DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
37 DescribeTableResponse, DescribeTableVersionResponse, DropNamespaceRequest,
38 DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest,
39 ListNamespacesResponse, ListTableVersionsResponse, ListTablesRequest, ListTablesResponse,
40 NamespaceExistsRequest, RegisterTableRequest, RegisterTableResponse, TableExistsRequest,
41 TableVersion,
42};
43use lance_namespace::schema::arrow_schema_to_json;
44use object_store::{Error as ObjectStoreError, path::Path};
45use std::io::Cursor;
46use std::{
47 collections::HashMap,
48 hash::{DefaultHasher, Hash, Hasher},
49 ops::{Deref, DerefMut},
50 sync::Arc,
51};
52use tokio::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
53
54const MANIFEST_TABLE_NAME: &str = "__manifest";
55const DELIMITER: &str = "$";
56pub(crate) const DECLARED_FILTER_CONCURRENCY: usize = 16;
59
60const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
63const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
65const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
67const MANIFEST_INLINE_OPTIMIZATION_FRAGMENT_THRESHOLD: usize = 8;
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum ObjectType {
74 Namespace,
75 Table,
76 TableVersion,
77}
78
79impl ObjectType {
80 pub fn as_str(&self) -> &str {
81 match self {
82 Self::Namespace => "namespace",
83 Self::Table => "table",
84 Self::TableVersion => "table_version",
85 }
86 }
87
88 pub fn parse(s: &str) -> Result<Self> {
89 match s {
90 "namespace" => Ok(Self::Namespace),
91 "table" => Ok(Self::Table),
92 "table_version" => Ok(Self::TableVersion),
93 _ => Err(NamespaceError::Internal {
94 message: format!("Invalid object type: {}", s),
95 }
96 .into()),
97 }
98 }
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102enum CreateTableMode {
103 Create,
104 ExistOk,
105 Overwrite,
106}
107
108impl CreateTableMode {
109 fn parse(mode: Option<&str>) -> Result<Self> {
110 match mode {
111 None => Ok(Self::Create),
112 Some(mode) if mode.eq_ignore_ascii_case("create") => Ok(Self::Create),
113 Some(mode)
114 if mode.eq_ignore_ascii_case("existok")
115 || mode.eq_ignore_ascii_case("exist_ok") =>
116 {
117 Ok(Self::ExistOk)
118 }
119 Some(mode) if mode.eq_ignore_ascii_case("overwrite") => Ok(Self::Overwrite),
120 Some(mode) => Err(NamespaceError::InvalidInput {
121 message: format!(
122 "Unsupported create_table mode '{}'. Supported modes are: 'Create', 'ExistOk', 'Overwrite'",
123 mode
124 ),
125 }
126 .into()),
127 }
128 }
129
130 fn write_mode(self) -> WriteMode {
131 match self {
132 Self::Overwrite => WriteMode::Overwrite,
133 Self::Create | Self::ExistOk => WriteMode::Create,
134 }
135 }
136}
137
138#[derive(Debug, Clone)]
140pub struct TableInfo {
141 pub namespace: Vec<String>,
142 pub name: String,
143 pub location: String,
144 pub metadata: Option<HashMap<String, String>>,
145}
146
147#[derive(Debug, Clone)]
152pub struct ManifestEntry {
153 pub object_id: String,
155 pub object_type: ObjectType,
157 pub location: Option<String>,
159 pub metadata: Option<String>,
161}
162
163#[derive(Debug, Clone)]
165pub struct NamespaceInfo {
166 pub namespace: Vec<String>,
167 pub name: String,
168 pub metadata: Option<HashMap<String, String>>,
169}
170
171#[derive(Debug, Clone)]
176pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);
177
178impl DatasetConsistencyWrapper {
179 pub fn new(dataset: Dataset) -> Self {
181 Self(Arc::new(RwLock::new(dataset)))
182 }
183
184 pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
187 self.reload().await?;
188 Ok(DatasetReadGuard {
189 guard: self.0.read().await,
190 })
191 }
192
193 pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
196 self.reload().await?;
197 Ok(DatasetWriteGuard {
198 guard: self.0.write().await,
199 })
200 }
201
202 pub async fn set_latest(&self, dataset: Dataset) {
207 let mut write_guard = self.0.write().await;
208 if dataset.manifest().version > write_guard.manifest().version {
209 *write_guard = dataset;
210 }
211 }
212
213 async fn reload(&self) -> Result<()> {
215 let read_guard = self.0.read().await;
217 let dataset_uri = read_guard.uri().to_string();
218 let current_version = read_guard.version().version;
219 log::debug!(
220 "Reload starting for uri={}, current_version={}",
221 dataset_uri,
222 current_version
223 );
224 let latest_version = read_guard.latest_version_id().await.map_err(|e| {
225 lance_core::Error::from(NamespaceError::Internal {
226 message: format!("Failed to get latest version: {:?}", e),
227 })
228 })?;
229 log::debug!(
230 "Reload got latest_version={} for uri={}, current_version={}",
231 latest_version,
232 dataset_uri,
233 current_version
234 );
235 drop(read_guard);
236
237 if latest_version == current_version {
239 log::debug!("Already up-to-date for uri={}", dataset_uri);
240 return Ok(());
241 }
242
243 let mut write_guard = self.0.write().await;
245
246 let latest_version = write_guard.latest_version_id().await.map_err(|e| {
248 lance_core::Error::from(NamespaceError::Internal {
249 message: format!("Failed to get latest version: {:?}", e),
250 })
251 })?;
252
253 if latest_version != write_guard.version().version {
254 write_guard.checkout_latest().await.map_err(|e| {
255 lance_core::Error::from(NamespaceError::Internal {
256 message: format!("Failed to checkout latest: {:?}", e),
257 })
258 })?;
259 }
260
261 Ok(())
262 }
263}
264
265pub struct DatasetReadGuard<'a> {
266 guard: RwLockReadGuard<'a, Dataset>,
267}
268
269impl Deref for DatasetReadGuard<'_> {
270 type Target = Dataset;
271
272 fn deref(&self) -> &Self::Target {
273 &self.guard
274 }
275}
276
277pub struct DatasetWriteGuard<'a> {
278 guard: RwLockWriteGuard<'a, Dataset>,
279}
280
281impl Deref for DatasetWriteGuard<'_> {
282 type Target = Dataset;
283
284 fn deref(&self) -> &Self::Target {
285 &self.guard
286 }
287}
288
289impl DerefMut for DatasetWriteGuard<'_> {
290 fn deref_mut(&mut self) -> &mut Self::Target {
291 &mut self.guard
292 }
293}
294
295pub struct ManifestNamespace {
299 root: String,
300 storage_options: Option<HashMap<String, String>>,
301 session: Option<Arc<Session>>,
302 object_store: Arc<ObjectStore>,
303 base_path: Path,
304 manifest_dataset: DatasetConsistencyWrapper,
305 dir_listing_enabled: bool,
309 inline_optimization_enabled: bool,
312 commit_retries: Option<u32>,
315 manifest_mutation_lock: Arc<Mutex<()>>,
318}
319
320impl std::fmt::Debug for ManifestNamespace {
321 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322 f.debug_struct("ManifestNamespace")
323 .field("root", &self.root)
324 .field("storage_options", &self.storage_options)
325 .field("dir_listing_enabled", &self.dir_listing_enabled)
326 .field(
327 "inline_optimization_enabled",
328 &self.inline_optimization_enabled,
329 )
330 .finish()
331 }
332}
333
334fn convert_lance_commit_error(e: &LanceError, operation: &str, object_id: Option<&str>) -> Error {
343 match e {
344 LanceError::CommitConflict { .. } => NamespaceError::Throttling {
346 message: format!("Too many concurrent writes, please retry later: {:?}", e),
347 }
348 .into(),
349 LanceError::TooMuchWriteContention { .. } | LanceError::IncompatibleTransaction { .. } => {
352 let message = if let Some(id) = object_id {
353 format!(
354 "Object '{}' was concurrently modified by another operation: {:?}",
355 id, e
356 )
357 } else {
358 format!(
359 "Object was concurrently modified by another operation: {:?}",
360 e
361 )
362 };
363 NamespaceError::ConcurrentModification { message }.into()
364 }
365 _ => {
367 let error_msg = e.to_string();
368 if error_msg.contains("matched")
369 || error_msg.contains("duplicate")
370 || error_msg.contains("already exists")
371 {
372 let message = if let Some(id) = object_id {
373 format!(
374 "Object '{}' was concurrently created by another operation: {:?}",
375 id, e
376 )
377 } else {
378 format!(
379 "Object was concurrently created by another operation: {:?}",
380 e
381 )
382 };
383 return NamespaceError::ConcurrentModification { message }.into();
384 }
385 lance_core::Error::from(NamespaceError::Internal {
386 message: format!("{}: {:?}", operation, e),
387 })
388 }
389 }
390}
391
392impl ManifestNamespace {
393 #[allow(clippy::too_many_arguments)]
395 pub async fn from_directory(
396 root: String,
397 storage_options: Option<HashMap<String, String>>,
398 session: Option<Arc<Session>>,
399 object_store: Arc<ObjectStore>,
400 base_path: Path,
401 dir_listing_enabled: bool,
402 inline_optimization_enabled: bool,
403 commit_retries: Option<u32>,
404 table_version_storage_enabled: bool,
405 ) -> Result<Self> {
406 let manifest_dataset = Self::ensure_manifest_table_up_to_date(
407 &root,
408 &storage_options,
409 session.clone(),
410 table_version_storage_enabled,
411 )
412 .await?;
413
414 Ok(Self {
415 root,
416 storage_options,
417 session,
418 object_store,
419 base_path,
420 manifest_dataset,
421 dir_listing_enabled,
422 inline_optimization_enabled,
423 commit_retries,
424 manifest_mutation_lock: Arc::new(Mutex::new(())),
425 })
426 }
427
428 pub fn build_object_id(namespace: &[String], name: &str) -> String {
430 if namespace.is_empty() {
431 name.to_string()
432 } else {
433 let mut id = namespace.join(DELIMITER);
434 id.push_str(DELIMITER);
435 id.push_str(name);
436 id
437 }
438 }
439
440 pub fn parse_object_id(object_id: &str) -> (Vec<String>, String) {
442 let parts: Vec<&str> = object_id.split(DELIMITER).collect();
443 if parts.len() == 1 {
444 (Vec::new(), parts[0].to_string())
445 } else {
446 let namespace = parts[..parts.len() - 1]
447 .iter()
448 .map(|s| s.to_string())
449 .collect();
450 let name = parts[parts.len() - 1].to_string();
451 (namespace, name)
452 }
453 }
454
455 pub fn split_object_id(object_id: &[String]) -> (Vec<String>, String) {
457 if object_id.len() == 1 {
458 (vec![], object_id[0].clone())
459 } else {
460 (
461 object_id[..object_id.len() - 1].to_vec(),
462 object_id[object_id.len() - 1].clone(),
463 )
464 }
465 }
466
467 pub fn str_object_id(object_id: &[String]) -> String {
469 object_id.join(DELIMITER)
470 }
471
472 fn format_table_id(table_id: &[String]) -> String {
473 format!("table id '{}'", Self::str_object_id(table_id))
474 }
475
476 pub fn format_table_version(version: i64) -> String {
481 format!("{:020}", version)
482 }
483
484 pub fn build_version_object_id(table_object_id: &str, version: i64) -> String {
488 format!(
489 "{}{}{}",
490 table_object_id,
491 DELIMITER,
492 Self::format_table_version(version)
493 )
494 }
495
496 pub fn parse_version_from_object_id(object_id: &str) -> Option<i64> {
500 let (_namespace, name) = Self::parse_object_id(object_id);
501 name.parse::<i64>().ok()
502 }
503
504 pub fn generate_dir_name(object_id: &str) -> String {
511 let random_num: u64 = rand::random();
513
514 let mut hasher = DefaultHasher::new();
516 random_num.hash(&mut hasher);
517 object_id.hash(&mut hasher);
518 let hash = hasher.finish();
519
520 format!("{:08x}_{}", (hash & 0xFFFFFFFF) as u32, object_id)
522 }
523
524 pub(crate) fn construct_full_uri(root: &str, relative_location: &str) -> Result<String> {
526 let mut base_url = lance_io::object_store::uri_to_url(root)?;
527
528 if !base_url.path().ends_with('/') {
533 base_url.set_path(&format!("{}/", base_url.path()));
534 }
535
536 let mut full_url = base_url.clone();
537 full_url
538 .path_segments_mut()
539 .map_err(|_| {
540 lance_core::Error::from(NamespaceError::InvalidInput {
541 message: format!("Cannot modify path segments for URI '{}'", root),
542 })
543 })?
544 .pop_if_empty()
545 .extend(
546 relative_location
547 .split('/')
548 .filter(|segment| !segment.is_empty()),
549 );
550
551 full_url.set_query(None);
555
556 Ok(full_url.to_string())
557 }
558
559 async fn run_inline_optimization(&self) -> Result<()> {
571 if !self.inline_optimization_enabled {
572 return Ok(());
573 }
574
575 let mut dataset_guard = self.manifest_dataset.get_mut().await?;
577 let dataset: &mut Dataset = &mut dataset_guard;
578
579 let indices = dataset.load_indices().await?;
581
582 let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
584 let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
585 let has_base_objects_index = indices
586 .iter()
587 .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);
588
589 if !has_object_id_index {
591 log::debug!(
592 "Creating BTREE index '{}' on object_id for __manifest table",
593 OBJECT_ID_INDEX_NAME
594 );
595 let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
596 if let Err(e) = dataset
597 .create_index(
598 &["object_id"],
599 IndexType::BTree,
600 Some(OBJECT_ID_INDEX_NAME.to_string()),
601 ¶ms,
602 true,
603 )
604 .await
605 {
606 log::warn!(
607 "Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.",
608 e
609 );
610 } else {
611 log::info!(
612 "Created BTREE index '{}' on object_id for __manifest table",
613 OBJECT_ID_INDEX_NAME
614 );
615 }
616 }
617
618 if !has_object_type_index {
620 log::debug!(
621 "Creating Bitmap index '{}' on object_type for __manifest table",
622 OBJECT_TYPE_INDEX_NAME
623 );
624 let params = ScalarIndexParams::default();
625 if let Err(e) = dataset
626 .create_index(
627 &["object_type"],
628 IndexType::Bitmap,
629 Some(OBJECT_TYPE_INDEX_NAME.to_string()),
630 ¶ms,
631 true,
632 )
633 .await
634 {
635 log::warn!(
636 "Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.",
637 e
638 );
639 } else {
640 log::info!(
641 "Created Bitmap index '{}' on object_type for __manifest table",
642 OBJECT_TYPE_INDEX_NAME
643 );
644 }
645 }
646
647 if !has_base_objects_index {
649 log::debug!(
650 "Creating LabelList index '{}' on base_objects for __manifest table",
651 BASE_OBJECTS_INDEX_NAME
652 );
653 let params = ScalarIndexParams::default();
654 if let Err(e) = dataset
655 .create_index(
656 &["base_objects"],
657 IndexType::LabelList,
658 Some(BASE_OBJECTS_INDEX_NAME.to_string()),
659 ¶ms,
660 true,
661 )
662 .await
663 {
664 log::warn!(
665 "Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.",
666 e
667 );
668 } else {
669 log::info!(
670 "Created LabelList index '{}' on base_objects for __manifest table",
671 BASE_OBJECTS_INDEX_NAME
672 );
673 }
674 }
675
676 let should_compact_and_optimize =
677 dataset.count_fragments() >= MANIFEST_INLINE_OPTIMIZATION_FRAGMENT_THRESHOLD;
678
679 if !should_compact_and_optimize {
680 return Ok(());
681 }
682
683 log::debug!("Running file compaction on __manifest table");
685 match compact_files(dataset, CompactionOptions::default(), None).await {
686 Ok(compaction_metrics) => {
687 if compaction_metrics.fragments_removed > 0 {
688 log::info!(
689 "Compacted __manifest table: removed {} fragments, added {} fragments",
690 compaction_metrics.fragments_removed,
691 compaction_metrics.fragments_added
692 );
693 }
694 }
695 Err(e) => {
696 log::warn!(
697 "Failed to compact files for __manifest table: {:?}. Continuing with optimization.",
698 e
699 );
700 }
701 }
702
703 log::debug!("Optimizing indices on __manifest table");
705 match dataset.optimize_indices(&OptimizeOptions::default()).await {
706 Ok(_) => {
707 log::info!("Successfully optimized indices on __manifest table");
708 }
709 Err(e) => {
710 log::warn!(
711 "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.",
712 e
713 );
714 }
715 }
716
717 Ok(())
718 }
719
720 fn manifest_schema() -> Arc<ArrowSchema> {
722 Arc::new(ArrowSchema::new(vec![
723 Field::new("object_id", DataType::Utf8, false).with_metadata(
725 [(
726 LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(),
727 "0".to_string(),
728 )]
729 .into_iter()
730 .collect(),
731 ),
732 Field::new("object_type", DataType::Utf8, false),
733 Field::new("location", DataType::Utf8, true),
734 Field::new("metadata", DataType::Utf8, true),
735 Field::new(
736 "base_objects",
737 DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))),
738 true,
739 ),
740 ]))
741 }
742
743 async fn manifest_scanner(&self) -> Result<Scanner> {
745 let dataset_guard = self.manifest_dataset.get().await?;
746 Ok(dataset_guard.scan())
747 }
748
749 async fn execute_scanner(scanner: Scanner) -> Result<Vec<RecordBatch>> {
751 let mut stream = scanner.try_into_stream().await.map_err(|e| {
752 lance_core::Error::from(NamespaceError::Internal {
753 message: format!("Failed to create stream: {:?}", e),
754 })
755 })?;
756
757 let mut batches = Vec::new();
758 while let Some(batch) = stream.next().await {
759 batches.push(batch.map_err(|e| {
760 lance_core::Error::from(NamespaceError::Internal {
761 message: format!("Failed to read batch: {:?}", e),
762 })
763 })?);
764 }
765
766 Ok(batches)
767 }
768
769 fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> {
771 let column = batch.column_by_name(column_name).ok_or_else(|| {
772 lance_core::Error::from(NamespaceError::Internal {
773 message: format!("Column '{}' not found", column_name),
774 })
775 })?;
776 column
777 .as_any()
778 .downcast_ref::<StringArray>()
779 .ok_or_else(|| {
780 lance_core::Error::from(NamespaceError::Internal {
781 message: format!("Column '{}' is not a string array", column_name),
782 })
783 })
784 }
785
786 async fn manifest_contains_object(&self, object_id: &str) -> Result<bool> {
788 let escaped_id = object_id.replace('\'', "''");
789 let filter = format!("object_id = '{}'", escaped_id);
790
791 let dataset_guard = self.manifest_dataset.get().await?;
792 let mut scanner = dataset_guard.scan();
793
794 scanner.filter(&filter).map_err(|e| {
795 lance_core::Error::from(NamespaceError::Internal {
796 message: format!("Failed to filter: {:?}", e),
797 })
798 })?;
799
800 scanner.project::<&str>(&[]).map_err(|e| {
802 lance_core::Error::from(NamespaceError::Internal {
803 message: format!("Failed to project: {:?}", e),
804 })
805 })?;
806
807 scanner.with_row_id();
808
809 let count = scanner.count_rows().await.map_err(|e| {
810 lance_core::Error::from(NamespaceError::Internal {
811 message: format!("Failed to count rows: {:?}", e),
812 })
813 })?;
814
815 Ok(count > 0)
816 }
817
818 async fn query_manifest_for_table(&self, object_id: &str) -> Result<Option<TableInfo>> {
820 let escaped_id = object_id.replace('\'', "''");
821 let filter = format!("object_id = '{}' AND object_type = 'table'", escaped_id);
822 let mut scanner = self.manifest_scanner().await?;
823 scanner.filter(&filter).map_err(|e| {
824 lance_core::Error::from(NamespaceError::Internal {
825 message: format!("Failed to filter: {:?}", e),
826 })
827 })?;
828 scanner
829 .project(&["object_id", "location", "metadata"])
830 .map_err(|e| {
831 lance_core::Error::from(NamespaceError::Internal {
832 message: format!("Failed to project: {:?}", e),
833 })
834 })?;
835 let batches = Self::execute_scanner(scanner).await?;
836
837 let mut found_result: Option<TableInfo> = None;
838 let mut total_rows = 0;
839
840 for batch in batches {
841 if batch.num_rows() == 0 {
842 continue;
843 }
844
845 total_rows += batch.num_rows();
846 if total_rows > 1 {
847 return Err(NamespaceError::Internal {
848 message: format!(
849 "Expected exactly 1 table with id '{}', found {}",
850 object_id, total_rows
851 ),
852 }
853 .into());
854 }
855
856 let object_id_array = Self::get_string_column(&batch, "object_id")?;
857 let location_array = Self::get_string_column(&batch, "location")?;
858 let metadata_array = Self::get_string_column(&batch, "metadata")?;
859 let location = location_array.value(0).to_string();
860 let metadata = if !metadata_array.is_null(0) {
861 let metadata_str = metadata_array.value(0);
862 match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
863 Ok(map) => Some(map),
864 Err(e) => {
865 return Err(NamespaceError::Internal {
866 message: format!(
867 "Failed to deserialize metadata for table '{}': {}",
868 object_id, e
869 ),
870 }
871 .into());
872 }
873 }
874 } else {
875 None
876 };
877 let (namespace, name) = Self::parse_object_id(object_id_array.value(0));
878 found_result = Some(TableInfo {
879 namespace,
880 name,
881 location,
882 metadata,
883 });
884 }
885
886 Ok(found_result)
887 }
888
889 fn serialize_metadata(
890 properties: Option<&HashMap<String, String>>,
891 object_type: &str,
892 object_id: &str,
893 ) -> Result<Option<String>> {
894 match properties {
895 Some(properties) if !properties.is_empty() => {
896 serde_json::to_string(properties).map(Some).map_err(|e| {
897 LanceError::from(NamespaceError::Internal {
898 message: format!(
899 "Failed to serialize {} metadata for '{}': {}",
900 object_type, object_id, e
901 ),
902 })
903 })
904 }
905 _ => Ok(None),
906 }
907 }
908
909 pub(crate) async fn path_has_actual_manifests(
910 object_store: &ObjectStore,
911 table_path: &Path,
912 ) -> Result<bool> {
913 let versions_path = table_path.child(lance_table::io::commit::VERSIONS_DIR);
914 Ok(object_store
917 .list(Some(versions_path))
918 .try_next()
919 .await?
920 .is_some())
921 }
922
923 async fn location_has_actual_manifests(&self, location: &str) -> Result<bool> {
924 Self::path_has_actual_manifests(&self.object_store, &self.base_path.child(location)).await
925 }
926
927 pub(crate) fn is_not_found_load_error(err: &LanceError) -> bool {
928 match err {
929 LanceError::NotFound { .. } => true,
930 LanceError::IO { source, .. } => source
931 .downcast_ref::<ObjectStoreError>()
932 .is_some_and(|source| matches!(source, ObjectStoreError::NotFound { .. })),
933 LanceError::DatasetNotFound { source, .. } => {
934 source
935 .downcast_ref::<LanceError>()
936 .is_some_and(|source| matches!(source, LanceError::NotFound { .. }))
937 || source
938 .downcast_ref::<ObjectStoreError>()
939 .is_some_and(|source| matches!(source, ObjectStoreError::NotFound { .. }))
940 }
941 _ => false,
942 }
943 }
944
945 pub async fn list_manifest_table_locations(&self) -> Result<std::collections::HashSet<String>> {
948 let filter = "object_type = 'table' AND NOT contains(object_id, '$')";
949 let mut scanner = self.manifest_scanner().await?;
950 scanner.filter(filter).map_err(|e| {
951 lance_core::Error::from(NamespaceError::Internal {
952 message: format!("Failed to filter: {:?}", e),
953 })
954 })?;
955 scanner.project(&["location"]).map_err(|e| {
956 lance_core::Error::from(NamespaceError::Internal {
957 message: format!("Failed to project: {:?}", e),
958 })
959 })?;
960
961 let batches = Self::execute_scanner(scanner).await?;
962 let mut locations = std::collections::HashSet::new();
963
964 for batch in batches {
965 if batch.num_rows() == 0 {
966 continue;
967 }
968 let location_array = Self::get_string_column(&batch, "location")?;
969 for i in 0..location_array.len() {
970 locations.insert(location_array.value(i).to_string());
971 }
972 }
973
974 Ok(locations)
975 }
976
977 async fn insert_into_manifest(
979 &self,
980 object_id: String,
981 object_type: ObjectType,
982 location: Option<String>,
983 ) -> Result<()> {
984 self.insert_into_manifest_with_metadata(
985 vec![ManifestEntry {
986 object_id,
987 object_type,
988 location,
989 metadata: None,
990 }],
991 None,
992 )
993 .await
994 }
995
996 pub async fn insert_into_manifest_with_metadata(
1002 &self,
1003 entries: Vec<ManifestEntry>,
1004 base_objects: Option<Vec<String>>,
1005 ) -> Result<()> {
1006 self.merge_into_manifest_with_metadata(entries, base_objects, WhenMatched::Fail)
1007 .await
1008 }
1009
1010 async fn upsert_into_manifest_with_metadata(
1011 &self,
1012 entries: Vec<ManifestEntry>,
1013 base_objects: Option<Vec<String>>,
1014 ) -> Result<()> {
1015 self.merge_into_manifest_with_metadata(entries, base_objects, WhenMatched::UpdateAll)
1016 .await
1017 }
1018
1019 async fn merge_into_manifest_with_metadata(
1020 &self,
1021 entries: Vec<ManifestEntry>,
1022 base_objects: Option<Vec<String>>,
1023 when_matched: WhenMatched,
1024 ) -> Result<()> {
1025 if entries.is_empty() {
1026 return Ok(());
1027 }
1028
1029 let schema = Self::manifest_schema();
1030
1031 let mut object_ids = Vec::with_capacity(entries.len());
1032 let mut object_types = Vec::with_capacity(entries.len());
1033 let mut locations: Vec<Option<String>> = Vec::with_capacity(entries.len());
1034 let mut metadatas: Vec<Option<String>> = Vec::with_capacity(entries.len());
1035
1036 let string_builder = StringBuilder::new();
1037 let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
1038 "object_id",
1039 DataType::Utf8,
1040 true,
1041 )));
1042
1043 for (i, entry) in entries.iter().enumerate() {
1044 object_ids.push(entry.object_id.as_str());
1045 object_types.push(entry.object_type.as_str());
1046 locations.push(entry.location.clone());
1047 metadatas.push(entry.metadata.clone());
1048
1049 if i == 0 {
1052 match &base_objects {
1053 Some(objects) => {
1054 for obj in objects {
1055 list_builder.values().append_value(obj);
1056 }
1057 list_builder.append(true);
1058 }
1059 None => {
1060 list_builder.append_null();
1061 }
1062 }
1063 } else {
1064 list_builder.append_null();
1065 }
1066 }
1067
1068 let base_objects_array = list_builder.finish();
1069
1070 let location_array: Arc<dyn Array> = Arc::new(StringArray::from(
1071 locations.iter().map(|l| l.as_deref()).collect::<Vec<_>>(),
1072 ));
1073
1074 let metadata_array: Arc<dyn Array> = Arc::new(StringArray::from(
1075 metadatas.iter().map(|m| m.as_deref()).collect::<Vec<_>>(),
1076 ));
1077
1078 let batch = RecordBatch::try_new(
1079 schema.clone(),
1080 vec![
1081 Arc::new(StringArray::from(object_ids)),
1082 Arc::new(StringArray::from(object_types.to_vec())),
1083 location_array,
1084 metadata_array,
1085 Arc::new(base_objects_array),
1086 ],
1087 )
1088 .map_err(|e| {
1089 lance_core::Error::from(NamespaceError::Internal {
1090 message: format!("Failed to create manifest entries: {:?}", e),
1091 })
1092 })?;
1093
1094 let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
1095
1096 let _mutation_guard = self.manifest_mutation_lock.lock().await;
1098 let dataset_guard = self.manifest_dataset.get().await?;
1099 let dataset_arc = Arc::new(dataset_guard.clone());
1100 drop(dataset_guard); let mut merge_builder =
1103 MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()]).map_err(
1104 |e| {
1105 lance_core::Error::from(NamespaceError::Internal {
1106 message: format!("Failed to create merge builder: {:?}", e),
1107 })
1108 },
1109 )?;
1110 merge_builder.when_matched(when_matched);
1111 merge_builder.when_not_matched(WhenNotMatched::InsertAll);
1112 merge_builder.conflict_retries(5);
1114 merge_builder.use_index(false);
1119 if let Some(retries) = self.commit_retries {
1120 merge_builder.commit_retries(retries);
1121 }
1122
1123 let (new_dataset_arc, _merge_stats) = merge_builder
1124 .try_build()
1125 .map_err(|e| {
1126 lance_core::Error::from(NamespaceError::Internal {
1127 message: format!("Failed to build merge: {:?}", e),
1128 })
1129 })?
1130 .execute_reader(Box::new(reader))
1131 .await
1132 .map_err(|e| {
1133 convert_lance_commit_error(&e, "Failed to execute merge insert into manifest", None)
1134 })?;
1135
1136 let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
1137 self.manifest_dataset.set_latest(new_dataset).await;
1138
1139 if let Err(e) = self.run_inline_optimization().await {
1141 log::warn!(
1142 "Unexpected failure when running inline optimization: {:?}",
1143 e
1144 );
1145 }
1146
1147 Ok(())
1148 }
1149
1150 pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> {
1152 let predicate = format!("object_id = '{}'", object_id);
1153
1154 let _mutation_guard = self.manifest_mutation_lock.lock().await;
1156 let dataset_guard = self.manifest_dataset.get().await?;
1157 let dataset = Arc::new(dataset_guard.clone());
1158 drop(dataset_guard); let new_dataset = DeleteBuilder::new(dataset, &predicate)
1161 .execute()
1162 .await
1163 .map_err(|e| convert_lance_commit_error(&e, "Failed to delete", None))?;
1164
1165 self.manifest_dataset
1167 .set_latest(
1168 Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1169 )
1170 .await;
1171
1172 if let Err(e) = self.run_inline_optimization().await {
1174 log::warn!(
1175 "Unexpected failure when running inline optimization: {:?}",
1176 e
1177 );
1178 }
1179
1180 Ok(())
1181 }
1182
1183 pub async fn query_table_versions(
1193 &self,
1194 object_id: &str,
1195 descending: bool,
1196 limit: Option<i32>,
1197 ) -> Result<Vec<(i64, String)>> {
1198 let escaped_id = object_id.replace('\'', "''");
1199 let filter = format!(
1201 "object_type = 'table_version' AND starts_with(object_id, '{}{}')",
1202 escaped_id, DELIMITER
1203 );
1204 let mut scanner = self.manifest_scanner().await?;
1205 scanner.filter(&filter).map_err(|e| {
1206 lance_core::Error::from(NamespaceError::Internal {
1207 message: format!("Failed to filter: {:?}", e),
1208 })
1209 })?;
1210 scanner.project(&["object_id", "metadata"]).map_err(|e| {
1211 lance_core::Error::from(NamespaceError::Internal {
1212 message: format!("Failed to project: {:?}", e),
1213 })
1214 })?;
1215 let batches = Self::execute_scanner(scanner).await?;
1216
1217 let mut versions: Vec<(i64, String)> = Vec::new();
1218 for batch in batches {
1219 if batch.num_rows() == 0 {
1220 continue;
1221 }
1222 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1223 let metadata_array = Self::get_string_column(&batch, "metadata")?;
1224 for i in 0..batch.num_rows() {
1225 let oid = object_id_array.value(i);
1226 if let Some(version) = Self::parse_version_from_object_id(oid) {
1228 let metadata_str = metadata_array.value(i).to_string();
1229 versions.push((version, metadata_str));
1230 }
1231 }
1232 }
1233
1234 if descending {
1235 versions.sort_by(|a, b| b.0.cmp(&a.0));
1236 } else {
1237 versions.sort_by(|a, b| a.0.cmp(&b.0));
1238 }
1239
1240 if let Some(limit) = limit {
1241 versions.truncate(limit as usize);
1242 }
1243
1244 Ok(versions)
1245 }
1246
1247 pub async fn query_table_version(
1253 &self,
1254 object_id: &str,
1255 version: i64,
1256 ) -> Result<Option<String>> {
1257 let version_object_id = Self::build_version_object_id(object_id, version);
1258 self.query_table_version_by_object_id(&version_object_id)
1259 .await
1260 }
1261
1262 async fn query_table_version_by_object_id(
1264 &self,
1265 version_object_id: &str,
1266 ) -> Result<Option<String>> {
1267 let escaped_id = version_object_id.replace('\'', "''");
1268 let filter = format!(
1269 "object_id = '{}' AND object_type = 'table_version'",
1270 escaped_id
1271 );
1272 let mut scanner = self.manifest_scanner().await?;
1273 scanner.filter(&filter).map_err(|e| {
1274 lance_core::Error::from(NamespaceError::Internal {
1275 message: format!("Failed to filter: {:?}", e),
1276 })
1277 })?;
1278 scanner.project(&["metadata"]).map_err(|e| {
1279 lance_core::Error::from(NamespaceError::Internal {
1280 message: format!("Failed to project: {:?}", e),
1281 })
1282 })?;
1283 let batches = Self::execute_scanner(scanner).await?;
1284
1285 for batch in batches {
1286 if batch.num_rows() == 0 {
1287 continue;
1288 }
1289 let metadata_array = Self::get_string_column(&batch, "metadata")?;
1290 return Ok(Some(metadata_array.value(0).to_string()));
1291 }
1292
1293 Ok(None)
1294 }
1295
1296 pub async fn delete_table_versions(
1305 &self,
1306 object_id: &str,
1307 ranges: &[(i64, i64)],
1308 ) -> Result<i64> {
1309 if ranges.is_empty() {
1310 return Ok(0);
1311 }
1312
1313 let mut object_id_conditions: Vec<String> = Vec::new();
1315 for (start, end) in ranges {
1316 for version in *start..=*end {
1317 let oid = Self::build_version_object_id(object_id, version);
1318 let escaped = oid.replace('\'', "''");
1319 object_id_conditions.push(format!("'{}'", escaped));
1320 }
1321 }
1322
1323 if object_id_conditions.is_empty() {
1324 return Ok(0);
1325 }
1326
1327 let in_list = object_id_conditions.join(", ");
1329 let filter = format!(
1330 "object_type = 'table_version' AND object_id IN ({})",
1331 in_list
1332 );
1333
1334 let mut scanner = self.manifest_scanner().await?;
1335 scanner.filter(&filter).map_err(|e| {
1336 lance_core::Error::from(NamespaceError::Internal {
1337 message: format!("Failed to filter: {:?}", e),
1338 })
1339 })?;
1340 scanner.project(&["object_id", "location"]).map_err(|e| {
1341 lance_core::Error::from(NamespaceError::Internal {
1342 message: format!("Failed to project: {:?}", e),
1343 })
1344 })?;
1345 let batches = Self::execute_scanner(scanner).await?;
1346 let deleted_count: i64 = batches.iter().map(|b| b.num_rows() as i64).sum();
1347
1348 if deleted_count == 0 {
1349 return Ok(0);
1350 }
1351
1352 let _mutation_guard = self.manifest_mutation_lock.lock().await;
1354 let dataset_guard = self.manifest_dataset.get().await?;
1355 let dataset = Arc::new(dataset_guard.clone());
1356 drop(dataset_guard);
1357
1358 let new_dataset = DeleteBuilder::new(dataset, &filter)
1359 .execute()
1360 .await
1361 .map_err(|e| {
1362 convert_lance_commit_error(&e, "Failed to batch delete table versions", None)
1363 })?;
1364
1365 self.manifest_dataset
1366 .set_latest(
1367 Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1368 )
1369 .await;
1370
1371 if let Err(e) = self.run_inline_optimization().await {
1372 log::warn!(
1373 "Unexpected failure when running inline optimization: {:?}",
1374 e
1375 );
1376 }
1377
1378 Ok(deleted_count)
1379 }
1380
1381 pub async fn batch_delete_table_versions_by_object_ids(
1389 &self,
1390 object_ids: &[String],
1391 ) -> Result<i64> {
1392 if object_ids.is_empty() {
1393 return Ok(0);
1394 }
1395
1396 let in_list: String = object_ids
1397 .iter()
1398 .map(|oid| {
1399 let escaped = oid.replace('\'', "''");
1400 format!("'{}'", escaped)
1401 })
1402 .collect::<Vec<_>>()
1403 .join(", ");
1404
1405 let filter = format!(
1406 "object_type = 'table_version' AND object_id IN ({})",
1407 in_list
1408 );
1409
1410 let mut scanner = self.manifest_scanner().await?;
1412 scanner.filter(&filter).map_err(|e| {
1413 lance_core::Error::from(NamespaceError::Internal {
1414 message: format!("Failed to filter: {:?}", e),
1415 })
1416 })?;
1417 scanner.project(&["object_id", "location"]).map_err(|e| {
1418 lance_core::Error::from(NamespaceError::Internal {
1419 message: format!("Failed to project: {:?}", e),
1420 })
1421 })?;
1422 let batches = Self::execute_scanner(scanner).await?;
1423 let deleted_count: i64 = batches.iter().map(|b| b.num_rows() as i64).sum();
1424
1425 if deleted_count == 0 {
1426 return Ok(0);
1427 }
1428
1429 let _mutation_guard = self.manifest_mutation_lock.lock().await;
1431 let dataset_guard = self.manifest_dataset.get().await?;
1432 let dataset = Arc::new(dataset_guard.clone());
1433 drop(dataset_guard);
1434
1435 let new_dataset = DeleteBuilder::new(dataset, &filter)
1436 .execute()
1437 .await
1438 .map_err(|e| {
1439 convert_lance_commit_error(
1440 &e,
1441 "Failed to batch delete table versions across multiple tables",
1442 None,
1443 )
1444 })?;
1445
1446 self.manifest_dataset
1447 .set_latest(
1448 Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1449 )
1450 .await;
1451
1452 if let Err(e) = self.run_inline_optimization().await {
1453 log::warn!(
1454 "Unexpected failure when running inline optimization: {:?}",
1455 e
1456 );
1457 }
1458
1459 Ok(deleted_count)
1460 }
1461
1462 pub async fn set_property(&self, name: &str, value: &str) -> Result<()> {
1468 let _mutation_guard = self.manifest_mutation_lock.lock().await;
1469 let dataset_guard = self.manifest_dataset.get().await?;
1470 if dataset_guard.metadata().get(name) == Some(&value.to_string()) {
1471 return Ok(());
1472 }
1473 drop(dataset_guard);
1474
1475 let mut dataset_guard = self.manifest_dataset.get_mut().await?;
1476 dataset_guard
1477 .update_metadata([(name, value)])
1478 .await
1479 .map_err(|e| {
1480 lance_core::Error::from(NamespaceError::Internal {
1481 message: format!(
1482 "Failed to set property '{}' in __manifest metadata: {}",
1483 name, e
1484 ),
1485 })
1486 })?;
1487 Ok(())
1488 }
1489
1490 pub async fn has_property(&self, name: &str) -> Result<bool> {
1492 let dataset_guard = self.manifest_dataset.get().await?;
1493 Ok(dataset_guard.metadata().contains_key(name))
1494 }
1495
1496 fn parse_table_version(version: i64, metadata_str: &str) -> Option<TableVersion> {
1500 let meta: serde_json::Value = match serde_json::from_str(metadata_str) {
1501 Ok(v) => v,
1502 Err(e) => {
1503 log::warn!(
1504 "Skipping version {} due to invalid metadata JSON: {}",
1505 version,
1506 e
1507 );
1508 return None;
1509 }
1510 };
1511 let manifest_path = match meta.get("manifest_path").and_then(|v| v.as_str()) {
1512 Some(p) => p.to_string(),
1513 None => {
1514 log::warn!(
1515 "Skipping version {} due to missing 'manifest_path' in metadata — \
1516 this may indicate data corruption",
1517 version
1518 );
1519 return None;
1520 }
1521 };
1522 let manifest_size = meta.get("manifest_size").and_then(|v| v.as_i64());
1523 let e_tag = meta
1524 .get("e_tag")
1525 .and_then(|v| v.as_str())
1526 .map(|s| s.to_string());
1527 Some(TableVersion {
1528 version,
1529 manifest_path,
1530 manifest_size,
1531 e_tag,
1532 timestamp_millis: None,
1533 metadata: None,
1534 })
1535 }
1536
1537 pub async fn list_table_versions(
1542 &self,
1543 table_id: &[String],
1544 descending: bool,
1545 limit: Option<i32>,
1546 ) -> Result<ListTableVersionsResponse> {
1547 let object_id = Self::str_object_id(table_id);
1548 let manifest_versions = self
1549 .query_table_versions(&object_id, descending, limit)
1550 .await?;
1551
1552 let table_versions: Vec<TableVersion> = manifest_versions
1553 .into_iter()
1554 .filter_map(|(version, metadata_str)| Self::parse_table_version(version, &metadata_str))
1555 .collect();
1556
1557 Ok(ListTableVersionsResponse {
1558 versions: table_versions,
1559 page_token: None,
1560 })
1561 }
1562
1563 pub async fn describe_table_version(
1568 &self,
1569 table_id: &[String],
1570 version: i64,
1571 ) -> Result<DescribeTableVersionResponse> {
1572 let object_id = Self::str_object_id(table_id);
1573 if let Some(metadata_str) = self.query_table_version(&object_id, version).await?
1574 && let Some(tv) = Self::parse_table_version(version, &metadata_str)
1575 {
1576 return Ok(DescribeTableVersionResponse {
1577 version: Box::new(tv),
1578 });
1579 }
1580 Err(NamespaceError::TableVersionNotFound {
1581 message: format!("version {} for table {:?}", version, table_id),
1582 }
1583 .into())
1584 }
1585
1586 pub async fn register_table(&self, name: &str, location: String) -> Result<()> {
1588 let object_id = Self::build_object_id(&[], name);
1589 if self.manifest_contains_object(&object_id).await? {
1590 return Err(NamespaceError::Internal {
1591 message: format!("Table '{}' already exists", name),
1592 }
1593 .into());
1594 }
1595
1596 self.insert_into_manifest(object_id, ObjectType::Table, Some(location))
1597 .await
1598 }
1599
1600 async fn validate_namespace_levels_exist(&self, namespace_path: &[String]) -> Result<()> {
1602 for i in 1..=namespace_path.len() {
1603 let partial_path = &namespace_path[..i];
1604 let object_id = partial_path.join(DELIMITER);
1605 if !self.manifest_contains_object(&object_id).await? {
1606 return Err(NamespaceError::NamespaceNotFound {
1607 message: format!("parent namespace '{}'", object_id),
1608 }
1609 .into());
1610 }
1611 }
1612 Ok(())
1613 }
1614
1615 async fn query_manifest_for_namespace(&self, object_id: &str) -> Result<Option<NamespaceInfo>> {
1617 let escaped_id = object_id.replace('\'', "''");
1618 let filter = format!("object_id = '{}' AND object_type = 'namespace'", escaped_id);
1619 let mut scanner = self.manifest_scanner().await?;
1620 scanner.filter(&filter).map_err(|e| {
1621 lance_core::Error::from(NamespaceError::Internal {
1622 message: format!("Failed to filter: {:?}", e),
1623 })
1624 })?;
1625 scanner.project(&["object_id", "metadata"]).map_err(|e| {
1626 lance_core::Error::from(NamespaceError::Internal {
1627 message: format!("Failed to project: {:?}", e),
1628 })
1629 })?;
1630 let batches = Self::execute_scanner(scanner).await?;
1631
1632 let mut found_result: Option<NamespaceInfo> = None;
1633 let mut total_rows = 0;
1634
1635 for batch in batches {
1636 if batch.num_rows() == 0 {
1637 continue;
1638 }
1639
1640 total_rows += batch.num_rows();
1641 if total_rows > 1 {
1642 return Err(NamespaceError::Internal {
1643 message: format!(
1644 "Expected exactly 1 namespace with id '{}', found {}",
1645 object_id, total_rows
1646 ),
1647 }
1648 .into());
1649 }
1650
1651 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1652 let metadata_array = Self::get_string_column(&batch, "metadata")?;
1653
1654 let object_id_str = object_id_array.value(0);
1655 let metadata = if !metadata_array.is_null(0) {
1656 let metadata_str = metadata_array.value(0);
1657 match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
1658 Ok(map) => Some(map),
1659 Err(e) => {
1660 return Err(NamespaceError::Internal {
1661 message: format!(
1662 "Failed to deserialize metadata for namespace '{}': {}",
1663 object_id, e
1664 ),
1665 }
1666 .into());
1667 }
1668 }
1669 } else {
1670 None
1671 };
1672
1673 let (namespace, name) = Self::parse_object_id(object_id_str);
1674 found_result = Some(NamespaceInfo {
1675 namespace,
1676 name,
1677 metadata,
1678 });
1679 }
1680
1681 Ok(found_result)
1682 }
1683
1684 async fn ensure_manifest_table_up_to_date(
1692 root: &str,
1693 storage_options: &Option<HashMap<String, String>>,
1694 session: Option<Arc<Session>>,
1695 table_version_storage_enabled: bool,
1696 ) -> Result<DatasetConsistencyWrapper> {
1697 let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME);
1698 log::debug!("Attempting to load manifest from {}", manifest_path);
1699 let store_options = ObjectStoreParams {
1700 storage_options_accessor: storage_options.as_ref().map(|opts| {
1701 Arc::new(
1702 lance_io::object_store::StorageOptionsAccessor::with_static_options(
1703 opts.clone(),
1704 ),
1705 )
1706 }),
1707 ..Default::default()
1708 };
1709 let read_params = ReadParams {
1710 session: session.clone(),
1711 store_options: Some(store_options.clone()),
1712 ..Default::default()
1713 };
1714 let dataset_result = DatasetBuilder::from_uri(&manifest_path)
1715 .with_read_params(read_params)
1716 .load()
1717 .await;
1718 if let Ok(mut dataset) = dataset_result {
1719 let needs_pk_migration = dataset
1721 .schema()
1722 .field("object_id")
1723 .map(|f| {
1724 !f.metadata
1725 .contains_key(LANCE_UNENFORCED_PRIMARY_KEY_POSITION)
1726 })
1727 .unwrap_or(false);
1728
1729 if needs_pk_migration {
1730 log::info!("Migrating __manifest table to add primary key metadata on object_id");
1731 dataset
1732 .update_field_metadata()
1733 .update("object_id", [(LANCE_UNENFORCED_PRIMARY_KEY_POSITION, "0")])
1734 .map_err(|e| {
1735 lance_core::Error::from(NamespaceError::Internal {
1736 message: format!(
1737 "Failed to find object_id field for migration: {:?}",
1738 e
1739 ),
1740 })
1741 })?
1742 .await
1743 .map_err(|e| {
1744 lance_core::Error::from(NamespaceError::Internal {
1745 message: format!("Failed to migrate primary key metadata: {:?}", e),
1746 })
1747 })?;
1748 }
1749
1750 if table_version_storage_enabled {
1753 let needs_flag = dataset
1754 .metadata()
1755 .get("table_version_storage_enabled")
1756 .map(|v| v != "true")
1757 .unwrap_or(true);
1758
1759 if needs_flag
1760 && let Err(e) = dataset
1761 .update_metadata([("table_version_storage_enabled", "true")])
1762 .await
1763 {
1764 log::warn!(
1765 "Failed to persist table_version_storage_enabled flag in __manifest: {:?}",
1766 e
1767 );
1768 }
1769 }
1770
1771 Ok(DatasetConsistencyWrapper::new(dataset))
1772 } else {
1773 log::info!("Creating new manifest table at {}", manifest_path);
1774 let schema = Self::manifest_schema();
1775 let empty_batch = RecordBatch::new_empty(schema.clone());
1776 let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone());
1777
1778 let store_params = ObjectStoreParams {
1779 storage_options_accessor: storage_options.as_ref().map(|opts| {
1780 Arc::new(
1781 lance_io::object_store::StorageOptionsAccessor::with_static_options(
1782 opts.clone(),
1783 ),
1784 )
1785 }),
1786 ..Default::default()
1787 };
1788 let write_params = WriteParams {
1789 session: session.clone(),
1790 store_params: Some(store_params),
1791 ..Default::default()
1792 };
1793
1794 let dataset =
1795 Dataset::write(Box::new(reader), &manifest_path, Some(write_params)).await;
1796
1797 match dataset {
1799 Ok(dataset) => {
1800 log::info!(
1801 "Successfully created manifest table at {}, version={}, uri={}",
1802 manifest_path,
1803 dataset.version().version,
1804 dataset.uri()
1805 );
1806 Ok(DatasetConsistencyWrapper::new(dataset))
1807 }
1808 Err(ref e)
1809 if matches!(
1810 e,
1811 LanceError::DatasetAlreadyExists { .. }
1812 | LanceError::CommitConflict { .. }
1813 | LanceError::IncompatibleTransaction { .. }
1814 | LanceError::RetryableCommitConflict { .. }
1815 ) =>
1816 {
1817 log::info!(
1819 "Manifest table was created by another process, loading it: {}",
1820 manifest_path
1821 );
1822 let recovery_store_options = ObjectStoreParams {
1823 storage_options_accessor: storage_options.as_ref().map(|opts| {
1824 Arc::new(
1825 lance_io::object_store::StorageOptionsAccessor::with_static_options(
1826 opts.clone(),
1827 ),
1828 )
1829 }),
1830 ..Default::default()
1831 };
1832 let recovery_read_params = ReadParams {
1833 session,
1834 store_options: Some(recovery_store_options),
1835 ..Default::default()
1836 };
1837 let dataset = DatasetBuilder::from_uri(&manifest_path)
1838 .with_read_params(recovery_read_params)
1839 .load()
1840 .await
1841 .map_err(|e| {
1842 lance_core::Error::from(NamespaceError::Internal {
1843 message: format!(
1844 "Failed to load manifest dataset after creation conflict: {}",
1845 e
1846 ),
1847 })
1848 })?;
1849 Ok(DatasetConsistencyWrapper::new(dataset))
1850 }
1851 Err(e) => Err(lance_core::Error::from(NamespaceError::Internal {
1852 message: format!("Failed to create manifest dataset: {:?}", e),
1853 })),
1854 }
1855 }
1856 }
1857
1858 fn apply_pagination(
1863 names: &mut Vec<String>,
1864 page_token: Option<String>,
1865 limit: Option<i32>,
1866 ) -> Option<String> {
1867 names.sort();
1868
1869 if let Some(start_after) = page_token {
1870 if let Some(index) = names
1871 .iter()
1872 .position(|name| name.as_str() > start_after.as_str())
1873 {
1874 names.drain(0..index);
1875 } else {
1876 names.clear();
1877 }
1878 }
1879
1880 if let Some(limit) = limit
1881 && limit >= 0
1882 {
1883 let limit = limit as usize;
1884 if names.len() > limit {
1885 let next_page_token = if limit > 0 {
1886 Some(names[limit - 1].clone())
1887 } else {
1888 None
1889 };
1890 names.truncate(limit);
1891 return next_page_token;
1892 }
1893 }
1894
1895 None
1896 }
1897}
1898
1899#[async_trait]
1900impl LanceNamespace for ManifestNamespace {
1901 fn namespace_id(&self) -> String {
1902 self.root.clone()
1903 }
1904
1905 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1906 let namespace_id = request.id.as_ref().ok_or_else(|| {
1907 lance_core::Error::from(NamespaceError::InvalidInput {
1908 message: "Namespace ID is required".to_string(),
1909 })
1910 })?;
1911
1912 let filter = if namespace_id.is_empty() {
1914 "object_type = 'table' AND NOT contains(object_id, '$')".to_string()
1916 } else {
1917 let prefix = namespace_id.join(DELIMITER);
1919 format!(
1920 "object_type = 'table' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1921 prefix,
1922 DELIMITER,
1923 prefix.len() + 2
1924 )
1925 };
1926
1927 let mut scanner = self.manifest_scanner().await?;
1928 scanner.filter(&filter).map_err(|e| {
1929 lance_core::Error::from(NamespaceError::Internal {
1930 message: format!("Failed to filter: {:?}", e),
1931 })
1932 })?;
1933 scanner.project(&["object_id", "location"]).map_err(|e| {
1934 lance_core::Error::from(NamespaceError::Internal {
1935 message: format!("Failed to project: {:?}", e),
1936 })
1937 })?;
1938
1939 let batches = Self::execute_scanner(scanner).await?;
1940
1941 let mut table_entries = Vec::new();
1942 for batch in batches {
1943 if batch.num_rows() == 0 {
1944 continue;
1945 }
1946
1947 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1948 let location_array = Self::get_string_column(&batch, "location")?;
1949 for i in 0..batch.num_rows() {
1950 let object_id = object_id_array.value(i);
1951 let location = location_array.value(i);
1952 let (_namespace, name) = Self::parse_object_id(object_id);
1953 table_entries.push((name, location.to_string()));
1954 }
1955 }
1956
1957 let mut tables: Vec<String> = if request.include_declared.unwrap_or(true) {
1958 table_entries.into_iter().map(|(name, _)| name).collect()
1959 } else {
1960 let mut stream = futures::stream::iter(table_entries.into_iter().map(
1961 |(name, location)| async move {
1962 if self.location_has_actual_manifests(&location).await? {
1967 Ok::<Option<String>, Error>(Some(name))
1968 } else {
1969 Ok::<Option<String>, Error>(None)
1970 }
1971 },
1972 ))
1973 .buffered(DECLARED_FILTER_CONCURRENCY);
1974
1975 let mut filtered = Vec::new();
1976 while let Some(result) = stream.next().await {
1977 if let Some(name) = result? {
1978 filtered.push(name);
1979 }
1980 }
1981 filtered
1982 };
1983
1984 let next_page_token =
1985 Self::apply_pagination(&mut tables, request.page_token, request.limit);
1986 let mut response = ListTablesResponse::new(tables);
1987 response.page_token = next_page_token;
1988 Ok(response)
1989 }
1990
1991 async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1992 let table_id = request.id.as_ref().ok_or_else(|| {
1993 lance_core::Error::from(NamespaceError::InvalidInput {
1994 message: "Table ID is required".to_string(),
1995 })
1996 })?;
1997
1998 if table_id.is_empty() {
1999 return Err(NamespaceError::InvalidInput {
2000 message: "Table ID cannot be empty".to_string(),
2001 }
2002 .into());
2003 }
2004
2005 let object_id = Self::str_object_id(table_id);
2006 let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
2007
2008 let table_name = table_id.last().cloned().unwrap_or_default();
2010 let namespace_id: Vec<String> = if table_id.len() > 1 {
2011 table_id[..table_id.len() - 1].to_vec()
2012 } else {
2013 vec![]
2014 };
2015
2016 let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
2017 let should_check_declared =
2018 load_detailed_metadata || request.check_declared.unwrap_or(false);
2019 let vend_credentials = request.vend_credentials.unwrap_or(true);
2021
2022 match table_info {
2023 Some(info) => {
2024 let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
2026
2027 let storage_options = if vend_credentials {
2028 self.storage_options.clone()
2029 } else {
2030 None
2031 };
2032 let is_only_declared = if should_check_declared {
2033 Some(!self.location_has_actual_manifests(&info.location).await?)
2034 } else {
2035 None
2036 };
2037
2038 if !load_detailed_metadata {
2039 return Ok(DescribeTableResponse {
2040 table: Some(table_name),
2041 namespace: Some(namespace_id),
2042 location: Some(table_uri.clone()),
2043 table_uri: Some(table_uri),
2044 storage_options,
2045 properties: info.metadata,
2046 is_only_declared,
2047 ..Default::default()
2048 });
2049 }
2050
2051 if is_only_declared == Some(true) {
2052 return Ok(DescribeTableResponse {
2053 table: Some(table_name),
2054 namespace: Some(namespace_id),
2055 location: Some(table_uri.clone()),
2056 table_uri: Some(table_uri),
2057 storage_options,
2058 properties: info.metadata,
2059 is_only_declared,
2060 ..Default::default()
2061 });
2062 }
2063
2064 let mut builder = DatasetBuilder::from_uri(&table_uri);
2065 if let Some(opts) = &self.storage_options {
2066 builder = builder.with_storage_options(opts.clone());
2067 }
2068 if let Some(session) = &self.session {
2069 builder = builder.with_session(session.clone());
2070 }
2071
2072 match builder.load().await {
2073 Ok(mut dataset) => {
2074 if let Some(requested_version) = request.version {
2076 dataset = dataset.checkout_version(requested_version as u64).await?;
2077 }
2078
2079 let version = dataset.version().version;
2080 let lance_schema = dataset.schema();
2081 let arrow_schema: arrow_schema::Schema = lance_schema.into();
2082 let json_schema = arrow_schema_to_json(&arrow_schema)?;
2083
2084 Ok(DescribeTableResponse {
2085 table: Some(table_name.clone()),
2086 namespace: Some(namespace_id.clone()),
2087 version: Some(version as i64),
2088 location: Some(table_uri.clone()),
2089 table_uri: Some(table_uri),
2090 schema: Some(Box::new(json_schema)),
2091 storage_options,
2092 properties: info.metadata.clone(),
2093 is_only_declared,
2094 ..Default::default()
2095 })
2096 }
2097 Err(err) => Err(NamespaceError::Internal {
2098 message: format!(
2099 "Table exists in manifest but failed to load dataset '{}': {}",
2100 object_id, err
2101 ),
2102 }
2103 .into()),
2104 }
2105 }
2106 None => Err(NamespaceError::TableNotFound {
2107 message: Self::format_table_id(table_id),
2108 }
2109 .into()),
2110 }
2111 }
2112
2113 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
2114 let table_id = request.id.as_ref().ok_or_else(|| {
2115 lance_core::Error::from(NamespaceError::InvalidInput {
2116 message: "Table ID is required".to_string(),
2117 })
2118 })?;
2119
2120 if table_id.is_empty() {
2121 return Err(NamespaceError::InvalidInput {
2122 message: "Table ID cannot be empty".to_string(),
2123 }
2124 .into());
2125 }
2126
2127 let object_id = Self::str_object_id(table_id);
2128 let exists = self.manifest_contains_object(&object_id).await?;
2129 if exists {
2130 Ok(())
2131 } else {
2132 Err(NamespaceError::TableNotFound {
2133 message: Self::format_table_id(table_id),
2134 }
2135 .into())
2136 }
2137 }
2138
2139 async fn create_table(
2140 &self,
2141 request: CreateTableRequest,
2142 data: Bytes,
2143 ) -> Result<CreateTableResponse> {
2144 let table_id = request.id.as_ref().ok_or_else(|| {
2145 lance_core::Error::from(NamespaceError::InvalidInput {
2146 message: "Table ID is required".to_string(),
2147 })
2148 })?;
2149
2150 if table_id.is_empty() {
2151 return Err(NamespaceError::InvalidInput {
2152 message: "Table ID cannot be empty".to_string(),
2153 }
2154 .into());
2155 }
2156
2157 let (namespace, table_name) = Self::split_object_id(table_id);
2158 let object_id = Self::build_object_id(&namespace, &table_name);
2159
2160 let existing_table = self.query_manifest_for_table(&object_id).await?;
2161 let existing_has_manifests = if let Some(existing_table) = &existing_table {
2162 Some(
2163 self.location_has_actual_manifests(&existing_table.location)
2164 .await?,
2165 )
2166 } else {
2167 None
2168 };
2169
2170 if existing_has_manifests == Some(false)
2171 && request
2172 .properties
2173 .as_ref()
2174 .is_some_and(|properties| !properties.is_empty())
2175 {
2176 return Err(NamespaceError::InvalidInput {
2177 message: format!(
2178 "create_table cannot set properties for already declared table '{}'",
2179 object_id
2180 ),
2181 }
2182 .into());
2183 }
2184
2185 let create_mode = if existing_has_manifests == Some(false) {
2186 CreateTableMode::Create
2187 } else {
2188 CreateTableMode::parse(request.mode.as_deref())?
2189 };
2190 let dir_name = if let Some(existing_table) = &existing_table {
2191 existing_table.location.clone()
2192 } else if namespace.is_empty() && self.dir_listing_enabled {
2193 format!("{}.lance", table_name)
2194 } else {
2195 Self::generate_dir_name(&object_id)
2196 };
2197 let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
2198 let overwriting_existing_table =
2199 existing_has_manifests == Some(true) && create_mode == CreateTableMode::Overwrite;
2200
2201 if existing_has_manifests == Some(true) {
2202 match create_mode {
2203 CreateTableMode::Create => {
2204 return Err(NamespaceError::TableAlreadyExists {
2205 message: table_name.clone(),
2206 }
2207 .into());
2208 }
2209 CreateTableMode::ExistOk => {
2210 let properties = existing_table
2211 .as_ref()
2212 .and_then(|table| table.metadata.clone());
2213 return Ok(CreateTableResponse {
2214 location: Some(table_uri),
2215 storage_options: self.storage_options.clone(),
2216 properties,
2217 ..Default::default()
2218 });
2219 }
2220 CreateTableMode::Overwrite => {}
2221 }
2222 }
2223
2224 if data.is_empty() {
2226 return Err(NamespaceError::InvalidInput {
2227 message: "Request data (Arrow IPC stream) is required for create_table".to_string(),
2228 }
2229 .into());
2230 }
2231
2232 let cursor = Cursor::new(data.to_vec());
2234 let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| {
2235 lance_core::Error::from(NamespaceError::Internal {
2236 message: format!("Failed to read IPC stream: {:?}", e),
2237 })
2238 })?;
2239
2240 let batches: Vec<RecordBatch> = stream_reader
2241 .collect::<std::result::Result<Vec<_>, _>>()
2242 .map_err(|e| {
2243 lance_core::Error::from(NamespaceError::Internal {
2244 message: format!("Failed to collect batches: {:?}", e),
2245 })
2246 })?;
2247
2248 if batches.is_empty() {
2249 return Err(NamespaceError::Internal {
2250 message: "No data provided for table creation".to_string(),
2251 }
2252 .into());
2253 }
2254
2255 let schema = batches[0].schema();
2256 let batch_results: Vec<std::result::Result<RecordBatch, arrow_schema::ArrowError>> =
2257 batches.into_iter().map(Ok).collect();
2258 let reader = RecordBatchIterator::new(batch_results, schema);
2259
2260 let mut write_storage_options = self.storage_options.clone().unwrap_or_default();
2261 if let Some(request_storage_options) = request.storage_options.as_ref() {
2262 write_storage_options.extend(request_storage_options.clone());
2263 }
2264
2265 let store_params = ObjectStoreParams {
2266 storage_options_accessor: (!write_storage_options.is_empty()).then(|| {
2267 Arc::new(
2268 lance_io::object_store::StorageOptionsAccessor::with_static_options(
2269 write_storage_options,
2270 ),
2271 )
2272 }),
2273 ..Default::default()
2274 };
2275 let write_params = WriteParams {
2276 mode: create_mode.write_mode(),
2277 session: self.session.clone(),
2278 store_params: Some(store_params),
2279 ..Default::default()
2280 };
2281 let dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params))
2282 .await
2283 .map_err(|e| {
2284 lance_core::Error::from(NamespaceError::Internal {
2285 message: format!("Failed to write dataset: {:?}", e),
2286 })
2287 })?;
2288 let version = dataset.version().version as i64;
2289
2290 if overwriting_existing_table {
2291 let metadata =
2292 Self::serialize_metadata(request.properties.as_ref(), "table", &object_id)?;
2293 self.upsert_into_manifest_with_metadata(
2294 vec![ManifestEntry {
2295 object_id,
2296 object_type: ObjectType::Table,
2297 location: Some(dir_name),
2298 metadata,
2299 }],
2300 None,
2301 )
2302 .await?;
2303
2304 Ok(CreateTableResponse {
2305 version: Some(version),
2306 location: Some(table_uri),
2307 storage_options: self.storage_options.clone(),
2308 properties: request.properties,
2309 ..Default::default()
2310 })
2311 } else {
2312 match existing_table {
2313 Some(existing_table) => Ok(CreateTableResponse {
2314 version: Some(version),
2315 location: Some(table_uri),
2316 storage_options: self.storage_options.clone(),
2317 properties: existing_table.metadata,
2318 ..Default::default()
2319 }),
2320 None => {
2321 let metadata =
2322 Self::serialize_metadata(request.properties.as_ref(), "table", &object_id)?;
2323 self.insert_into_manifest_with_metadata(
2325 vec![ManifestEntry {
2326 object_id,
2327 object_type: ObjectType::Table,
2328 location: Some(dir_name.clone()),
2329 metadata,
2330 }],
2331 None,
2332 )
2333 .await?;
2334
2335 Ok(CreateTableResponse {
2336 version: Some(version),
2337 location: Some(table_uri),
2338 storage_options: self.storage_options.clone(),
2339 properties: request.properties,
2340 ..Default::default()
2341 })
2342 }
2343 }
2344 }
2345 }
2346
2347 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
2348 let table_id = request.id.as_ref().ok_or_else(|| {
2349 lance_core::Error::from(NamespaceError::InvalidInput {
2350 message: "Table ID is required".to_string(),
2351 })
2352 })?;
2353
2354 if table_id.is_empty() {
2355 return Err(NamespaceError::InvalidInput {
2356 message: "Table ID cannot be empty".to_string(),
2357 }
2358 .into());
2359 }
2360
2361 let (namespace, table_name) = Self::split_object_id(table_id);
2362 let object_id = Self::build_object_id(&namespace, &table_name);
2363
2364 let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
2366
2367 match table_info {
2368 Some(info) => {
2369 self.delete_from_manifest(&object_id).boxed().await?;
2371
2372 let table_path = self.base_path.child(info.location.as_str());
2374 let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
2375
2376 self.object_store
2378 .remove_dir_all(table_path)
2379 .boxed()
2380 .await
2381 .map_err(|e| {
2382 lance_core::Error::from(NamespaceError::Internal {
2383 message: format!("Failed to delete table directory: {:?}", e),
2384 })
2385 })?;
2386
2387 Ok(DropTableResponse {
2388 id: request.id.clone(),
2389 location: Some(table_uri),
2390 ..Default::default()
2391 })
2392 }
2393 None => Err(NamespaceError::TableNotFound {
2394 message: table_name.to_string(),
2395 }
2396 .into()),
2397 }
2398 }
2399
2400 async fn list_namespaces(
2401 &self,
2402 request: ListNamespacesRequest,
2403 ) -> Result<ListNamespacesResponse> {
2404 let parent_namespace = request.id.as_ref().ok_or_else(|| {
2405 lance_core::Error::from(NamespaceError::InvalidInput {
2406 message: "Namespace ID is required".to_string(),
2407 })
2408 })?;
2409
2410 let filter = if parent_namespace.is_empty() {
2412 "object_type = 'namespace' AND NOT contains(object_id, '$')".to_string()
2414 } else {
2415 let prefix = parent_namespace.join(DELIMITER);
2417 format!(
2418 "object_type = 'namespace' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
2419 prefix,
2420 DELIMITER,
2421 prefix.len() + 2
2422 )
2423 };
2424
2425 let mut scanner = self.manifest_scanner().await?;
2426 scanner.filter(&filter).map_err(|e| {
2427 lance_core::Error::from(NamespaceError::Internal {
2428 message: format!("Failed to filter: {:?}", e),
2429 })
2430 })?;
2431 scanner.project(&["object_id"]).map_err(|e| {
2432 lance_core::Error::from(NamespaceError::Internal {
2433 message: format!("Failed to project: {:?}", e),
2434 })
2435 })?;
2436
2437 let batches = Self::execute_scanner(scanner).await?;
2438 let mut namespaces = Vec::new();
2439
2440 for batch in batches {
2441 if batch.num_rows() == 0 {
2442 continue;
2443 }
2444
2445 let object_id_array = Self::get_string_column(&batch, "object_id")?;
2446 for i in 0..batch.num_rows() {
2447 let object_id = object_id_array.value(i);
2448 let (_namespace, name) = Self::parse_object_id(object_id);
2449 namespaces.push(name);
2450 }
2451 }
2452
2453 let next_page_token =
2454 Self::apply_pagination(&mut namespaces, request.page_token, request.limit);
2455 let mut response = ListNamespacesResponse::new(namespaces);
2456 response.page_token = next_page_token;
2457 Ok(response)
2458 }
2459
2460 async fn describe_namespace(
2461 &self,
2462 request: DescribeNamespaceRequest,
2463 ) -> Result<DescribeNamespaceResponse> {
2464 let namespace_id = request.id.as_ref().ok_or_else(|| {
2465 lance_core::Error::from(NamespaceError::InvalidInput {
2466 message: "Namespace ID is required".to_string(),
2467 })
2468 })?;
2469
2470 if namespace_id.is_empty() {
2472 #[allow(clippy::needless_update)]
2473 return Ok(DescribeNamespaceResponse {
2474 properties: Some(HashMap::new()),
2475 ..Default::default()
2476 });
2477 }
2478
2479 let object_id = namespace_id.join(DELIMITER);
2481 let namespace_info = self.query_manifest_for_namespace(&object_id).await?;
2482
2483 match namespace_info {
2484 #[allow(clippy::needless_update)]
2485 Some(info) => Ok(DescribeNamespaceResponse {
2486 properties: info.metadata,
2487 ..Default::default()
2488 }),
2489 None => Err(NamespaceError::NamespaceNotFound {
2490 message: object_id.to_string(),
2491 }
2492 .into()),
2493 }
2494 }
2495
2496 async fn create_namespace(
2497 &self,
2498 request: CreateNamespaceRequest,
2499 ) -> Result<CreateNamespaceResponse> {
2500 let namespace_id = request.id.as_ref().ok_or_else(|| {
2501 lance_core::Error::from(NamespaceError::InvalidInput {
2502 message: "Namespace ID is required".to_string(),
2503 })
2504 })?;
2505
2506 if namespace_id.is_empty() {
2508 return Err(NamespaceError::NamespaceAlreadyExists {
2509 message: "root namespace".to_string(),
2510 }
2511 .into());
2512 }
2513
2514 if namespace_id.len() > 1 {
2516 self.validate_namespace_levels_exist(&namespace_id[..namespace_id.len() - 1])
2517 .await?;
2518 }
2519
2520 let object_id = namespace_id.join(DELIMITER);
2521 if self.manifest_contains_object(&object_id).await? {
2522 return Err(NamespaceError::NamespaceAlreadyExists {
2523 message: object_id.to_string(),
2524 }
2525 .into());
2526 }
2527
2528 let metadata =
2529 Self::serialize_metadata(request.properties.as_ref(), "namespace", &object_id)?;
2530
2531 self.insert_into_manifest_with_metadata(
2532 vec![ManifestEntry {
2533 object_id,
2534 object_type: ObjectType::Namespace,
2535 location: None,
2536 metadata,
2537 }],
2538 None,
2539 )
2540 .await?;
2541
2542 Ok(CreateNamespaceResponse {
2543 properties: request.properties,
2544 ..Default::default()
2545 })
2546 }
2547
2548 async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
2549 let namespace_id = request.id.as_ref().ok_or_else(|| {
2550 lance_core::Error::from(NamespaceError::InvalidInput {
2551 message: "Namespace ID is required".to_string(),
2552 })
2553 })?;
2554
2555 if namespace_id.is_empty() {
2557 return Err(NamespaceError::InvalidInput {
2558 message: "Root namespace cannot be dropped".to_string(),
2559 }
2560 .into());
2561 }
2562
2563 let object_id = namespace_id.join(DELIMITER);
2564
2565 if !self.manifest_contains_object(&object_id).boxed().await? {
2567 return Err(NamespaceError::NamespaceNotFound {
2568 message: object_id.to_string(),
2569 }
2570 .into());
2571 }
2572
2573 let escaped_id = object_id.replace('\'', "''");
2575 let prefix = format!("{}{}", escaped_id, DELIMITER);
2576 let filter = format!("starts_with(object_id, '{}')", prefix);
2577 let mut scanner = self.manifest_scanner().boxed().await?;
2578 scanner.filter(&filter).map_err(|e| {
2579 lance_core::Error::from(NamespaceError::Internal {
2580 message: format!("Failed to filter: {:?}", e),
2581 })
2582 })?;
2583 scanner.project::<&str>(&[]).map_err(|e| {
2584 lance_core::Error::from(NamespaceError::Internal {
2585 message: format!("Failed to project: {:?}", e),
2586 })
2587 })?;
2588 scanner.with_row_id();
2589 let count = scanner.count_rows().boxed().await.map_err(|e| {
2590 lance_core::Error::from(NamespaceError::Internal {
2591 message: format!("Failed to count rows: {:?}", e),
2592 })
2593 })?;
2594
2595 if count > 0 {
2596 return Err(NamespaceError::NamespaceNotEmpty {
2597 message: format!("'{}' (contains {} child objects)", object_id, count),
2598 }
2599 .into());
2600 }
2601
2602 self.delete_from_manifest(&object_id).boxed().await?;
2603
2604 Ok(DropNamespaceResponse::default())
2605 }
2606
2607 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
2608 let namespace_id = request.id.as_ref().ok_or_else(|| {
2609 lance_core::Error::from(NamespaceError::InvalidInput {
2610 message: "Namespace ID is required".to_string(),
2611 })
2612 })?;
2613
2614 if namespace_id.is_empty() {
2616 return Ok(());
2617 }
2618
2619 let object_id = namespace_id.join(DELIMITER);
2620 if self.manifest_contains_object(&object_id).await? {
2621 Ok(())
2622 } else {
2623 Err(NamespaceError::NamespaceNotFound {
2624 message: object_id.to_string(),
2625 }
2626 .into())
2627 }
2628 }
2629
2630 async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
2631 let table_id = request.id.as_ref().ok_or_else(|| {
2632 lance_core::Error::from(NamespaceError::InvalidInput {
2633 message: "Table ID is required".to_string(),
2634 })
2635 })?;
2636
2637 if table_id.is_empty() {
2638 return Err(NamespaceError::InvalidInput {
2639 message: "Table ID cannot be empty".to_string(),
2640 }
2641 .into());
2642 }
2643
2644 let (namespace, table_name) = Self::split_object_id(table_id);
2645 let object_id = Self::build_object_id(&namespace, &table_name);
2646
2647 let existing = self.query_manifest_for_table(&object_id).await?;
2649 if existing.is_some() {
2650 return Err(NamespaceError::TableAlreadyExists {
2651 message: table_name.to_string(),
2652 }
2653 .into());
2654 }
2655
2656 let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
2660 format!("{}.lance", table_name)
2662 } else {
2663 Self::generate_dir_name(&object_id)
2665 };
2666 let table_path = self.base_path.child(dir_name.as_str());
2667 let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
2668
2669 if let Some(req_location) = &request.location {
2671 let req_location = req_location.trim_end_matches('/');
2672 if req_location != table_uri {
2673 return Err(NamespaceError::InvalidInput {
2674 message: format!(
2675 "Cannot declare table {} at location {}, must be at location {}",
2676 table_name, req_location, table_uri
2677 ),
2678 }
2679 .into());
2680 }
2681 }
2682
2683 let reserved_file_path = table_path.child(".lance-reserved");
2685
2686 self.object_store
2687 .create(&reserved_file_path)
2688 .await
2689 .map_err(|e| {
2690 lance_core::Error::from(NamespaceError::Internal {
2691 message: format!(
2692 "Failed to create .lance-reserved file for table {}: {}",
2693 table_name, e
2694 ),
2695 })
2696 })?
2697 .shutdown()
2698 .await
2699 .map_err(|e| {
2700 lance_core::Error::from(NamespaceError::Internal {
2701 message: format!(
2702 "Failed to finalize .lance-reserved file for table {}: {}",
2703 table_name, e
2704 ),
2705 })
2706 })?;
2707
2708 let metadata = Self::serialize_metadata(request.properties.as_ref(), "table", &object_id)?;
2709
2710 self.insert_into_manifest_with_metadata(
2712 vec![ManifestEntry {
2713 object_id,
2714 object_type: ObjectType::Table,
2715 location: Some(dir_name),
2716 metadata,
2717 }],
2718 None,
2719 )
2720 .await?;
2721
2722 log::info!(
2723 "Declared table '{}' in manifest at {}",
2724 table_name,
2725 table_uri
2726 );
2727
2728 let vend_credentials = request.vend_credentials.unwrap_or(true);
2730 let storage_options = if vend_credentials {
2731 self.storage_options.clone()
2732 } else {
2733 None
2734 };
2735
2736 Ok(DeclareTableResponse {
2737 location: Some(table_uri),
2738 storage_options,
2739 properties: request.properties,
2740 ..Default::default()
2741 })
2742 }
2743
2744 async fn register_table(&self, request: RegisterTableRequest) -> Result<RegisterTableResponse> {
2745 let table_id = request.id.as_ref().ok_or_else(|| {
2746 lance_core::Error::from(NamespaceError::InvalidInput {
2747 message: "Table ID is required".to_string(),
2748 })
2749 })?;
2750
2751 if table_id.is_empty() {
2752 return Err(NamespaceError::InvalidInput {
2753 message: "Table ID cannot be empty".to_string(),
2754 }
2755 .into());
2756 }
2757
2758 let location = request.location.clone();
2759
2760 if location.contains("://") {
2763 return Err(NamespaceError::InvalidInput {
2764 message: format!(
2765 "Absolute URIs are not allowed for register_table. Location must be a relative path within the root directory: {}",
2766 location
2767 ),
2768 }
2769 .into());
2770 }
2771
2772 if location.starts_with('/') {
2773 return Err(NamespaceError::InvalidInput {
2774 message: format!(
2775 "Absolute paths are not allowed for register_table. Location must be a relative path within the root directory: {}",
2776 location
2777 ),
2778 }
2779 .into());
2780 }
2781
2782 if location.contains("..") {
2784 return Err(NamespaceError::InvalidInput {
2785 message: format!(
2786 "Path traversal is not allowed. Location must be a relative path within the root directory: {}",
2787 location
2788 ),
2789 }
2790 .into());
2791 }
2792
2793 let (namespace, table_name) = Self::split_object_id(table_id);
2794 let object_id = Self::build_object_id(&namespace, &table_name);
2795
2796 if !namespace.is_empty() {
2798 self.validate_namespace_levels_exist(&namespace).await?;
2799 }
2800
2801 if self.manifest_contains_object(&object_id).await? {
2803 return Err(NamespaceError::TableAlreadyExists {
2804 message: object_id.to_string(),
2805 }
2806 .into());
2807 }
2808
2809 self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone()))
2811 .await?;
2812
2813 Ok(RegisterTableResponse {
2814 location: Some(location),
2815 ..Default::default()
2816 })
2817 }
2818
2819 async fn deregister_table(
2820 &self,
2821 request: DeregisterTableRequest,
2822 ) -> Result<DeregisterTableResponse> {
2823 let table_id = request.id.as_ref().ok_or_else(|| {
2824 lance_core::Error::from(NamespaceError::InvalidInput {
2825 message: "Table ID is required".to_string(),
2826 })
2827 })?;
2828
2829 if table_id.is_empty() {
2830 return Err(NamespaceError::InvalidInput {
2831 message: "Table ID cannot be empty".to_string(),
2832 }
2833 .into());
2834 }
2835
2836 let (namespace, table_name) = Self::split_object_id(table_id);
2837 let object_id = Self::build_object_id(&namespace, &table_name);
2838
2839 let table_info = self.query_manifest_for_table(&object_id).await?;
2841
2842 let table_uri = match table_info {
2843 Some(info) => {
2844 self.delete_from_manifest(&object_id).boxed().await?;
2846 Self::construct_full_uri(&self.root, &info.location)?
2847 }
2848 None => {
2849 return Err(NamespaceError::TableNotFound {
2850 message: object_id.to_string(),
2851 }
2852 .into());
2853 }
2854 };
2855
2856 Ok(DeregisterTableResponse {
2857 id: request.id.clone(),
2858 location: Some(table_uri),
2859 ..Default::default()
2860 })
2861 }
2862}
2863
2864#[cfg(test)]
2865mod tests {
2866 use crate::{DirectoryNamespaceBuilder, ManifestNamespace};
2867 use bytes::Bytes;
2868 use lance_core::utils::tempfile::TempStdDir;
2869 use lance_namespace::LanceNamespace;
2870 use lance_namespace::models::{
2871 CreateNamespaceRequest, CreateTableRequest, DescribeTableRequest, DropTableRequest,
2872 ListTablesRequest, TableExistsRequest,
2873 };
2874 use rstest::rstest;
2875
2876 fn create_test_ipc_data() -> Vec<u8> {
2877 use arrow::array::{Int32Array, StringArray};
2878 use arrow::datatypes::{DataType, Field, Schema};
2879 use arrow::ipc::writer::StreamWriter;
2880 use arrow::record_batch::RecordBatch;
2881 use std::sync::Arc;
2882
2883 let schema = Arc::new(Schema::new(vec![
2884 Field::new("id", DataType::Int32, false),
2885 Field::new("name", DataType::Utf8, false),
2886 ]));
2887
2888 let batch = RecordBatch::try_new(
2889 schema.clone(),
2890 vec![
2891 Arc::new(Int32Array::from(vec![1, 2, 3])),
2892 Arc::new(StringArray::from(vec!["a", "b", "c"])),
2893 ],
2894 )
2895 .unwrap();
2896
2897 let mut buffer = Vec::new();
2898 {
2899 let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap();
2900 writer.write(&batch).unwrap();
2901 writer.finish().unwrap();
2902 }
2903 buffer
2904 }
2905
2906 #[rstest]
2907 #[case::with_optimization(true)]
2908 #[case::without_optimization(false)]
2909 #[tokio::test]
2910 async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) {
2911 let temp_dir = TempStdDir::default();
2912 let temp_path = temp_dir.to_str().unwrap();
2913
2914 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2916 .inline_optimization_enabled(inline_optimization)
2917 .build()
2918 .await
2919 .unwrap();
2920
2921 let mut request = ListTablesRequest::new();
2923 request.id = Some(vec![]);
2924 let response = dir_namespace.list_tables(request).await.unwrap();
2925 assert_eq!(response.tables.len(), 0);
2926
2927 let buffer = create_test_ipc_data();
2929 let mut create_request = CreateTableRequest::new();
2930 create_request.id = Some(vec!["test_table".to_string()]);
2931
2932 let _response = dir_namespace
2933 .create_table(create_request, Bytes::from(buffer))
2934 .await
2935 .unwrap();
2936
2937 let mut request = ListTablesRequest::new();
2939 request.id = Some(vec![]);
2940 let response = dir_namespace.list_tables(request).await.unwrap();
2941 assert_eq!(response.tables.len(), 1);
2942 assert_eq!(response.tables[0], "test_table");
2943 }
2944
2945 #[rstest]
2946 #[case::with_optimization(true)]
2947 #[case::without_optimization(false)]
2948 #[tokio::test]
2949 async fn test_manifest_namespace_table_exists(#[case] inline_optimization: bool) {
2950 let temp_dir = TempStdDir::default();
2951 let temp_path = temp_dir.to_str().unwrap();
2952
2953 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2954 .inline_optimization_enabled(inline_optimization)
2955 .build()
2956 .await
2957 .unwrap();
2958
2959 let mut request = TableExistsRequest::new();
2961 request.id = Some(vec!["nonexistent".to_string()]);
2962 let result = dir_namespace.table_exists(request).await;
2963 assert!(result.is_err());
2964
2965 let buffer = create_test_ipc_data();
2967 let mut create_request = CreateTableRequest::new();
2968 create_request.id = Some(vec!["test_table".to_string()]);
2969 dir_namespace
2970 .create_table(create_request, Bytes::from(buffer))
2971 .await
2972 .unwrap();
2973
2974 let mut request = TableExistsRequest::new();
2976 request.id = Some(vec!["test_table".to_string()]);
2977 let result = dir_namespace.table_exists(request).await;
2978 assert!(result.is_ok());
2979 }
2980
2981 #[rstest]
2982 #[case::with_optimization(true)]
2983 #[case::without_optimization(false)]
2984 #[tokio::test]
2985 async fn test_manifest_namespace_describe_table(#[case] inline_optimization: bool) {
2986 let temp_dir = TempStdDir::default();
2987 let temp_path = temp_dir.to_str().unwrap();
2988
2989 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2990 .inline_optimization_enabled(inline_optimization)
2991 .build()
2992 .await
2993 .unwrap();
2994
2995 let mut request = DescribeTableRequest::new();
2997 request.id = Some(vec!["nonexistent".to_string()]);
2998 let result = dir_namespace.describe_table(request).await;
2999 assert!(result.is_err());
3000
3001 let buffer = create_test_ipc_data();
3003 let mut create_request = CreateTableRequest::new();
3004 create_request.id = Some(vec!["test_table".to_string()]);
3005 dir_namespace
3006 .create_table(create_request, Bytes::from(buffer))
3007 .await
3008 .unwrap();
3009
3010 let mut request = DescribeTableRequest::new();
3012 request.id = Some(vec!["test_table".to_string()]);
3013 let response = dir_namespace.describe_table(request).await.unwrap();
3014 assert!(response.location.is_some());
3015 assert!(response.location.unwrap().contains("test_table"));
3016 }
3017
3018 #[rstest]
3019 #[case::with_optimization(true)]
3020 #[case::without_optimization(false)]
3021 #[tokio::test]
3022 async fn test_manifest_namespace_drop_table(#[case] inline_optimization: bool) {
3023 let temp_dir = TempStdDir::default();
3024 let temp_path = temp_dir.to_str().unwrap();
3025
3026 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3027 .inline_optimization_enabled(inline_optimization)
3028 .build()
3029 .await
3030 .unwrap();
3031
3032 let buffer = create_test_ipc_data();
3034 let mut create_request = CreateTableRequest::new();
3035 create_request.id = Some(vec!["test_table".to_string()]);
3036 dir_namespace
3037 .create_table(create_request, Bytes::from(buffer))
3038 .await
3039 .unwrap();
3040
3041 let mut request = ListTablesRequest::new();
3043 request.id = Some(vec![]);
3044 let response = dir_namespace.list_tables(request).await.unwrap();
3045 assert_eq!(response.tables.len(), 1);
3046
3047 let mut drop_request = DropTableRequest::new();
3049 drop_request.id = Some(vec!["test_table".to_string()]);
3050 let _response = dir_namespace.drop_table(drop_request).await.unwrap();
3051
3052 let mut request = ListTablesRequest::new();
3054 request.id = Some(vec![]);
3055 let response = dir_namespace.list_tables(request).await.unwrap();
3056 assert_eq!(response.tables.len(), 0);
3057 }
3058
3059 #[tokio::test]
3060 async fn test_list_tables_pagination_limit_zero() {
3061 let temp_dir = TempStdDir::default();
3062 let temp_path = temp_dir.to_str().unwrap();
3063
3064 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3065 .build()
3066 .await
3067 .unwrap();
3068
3069 let buffer = create_test_ipc_data();
3070 let mut create_request = CreateTableRequest::new();
3071 create_request.id = Some(vec!["alpha".to_string()]);
3072 dir_namespace
3073 .create_table(create_request, Bytes::from(buffer))
3074 .await
3075 .unwrap();
3076
3077 let response = dir_namespace
3078 .list_tables(ListTablesRequest {
3079 id: Some(vec![]),
3080 limit: Some(0),
3081 ..Default::default()
3082 })
3083 .await
3084 .unwrap();
3085
3086 assert!(response.tables.is_empty());
3087 assert!(response.page_token.is_none());
3088 }
3089
3090 #[rstest]
3091 #[case::with_optimization(true)]
3092 #[case::without_optimization(false)]
3093 #[tokio::test]
3094 async fn test_manifest_namespace_multiple_tables(#[case] inline_optimization: bool) {
3095 let temp_dir = TempStdDir::default();
3096 let temp_path = temp_dir.to_str().unwrap();
3097
3098 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3099 .inline_optimization_enabled(inline_optimization)
3100 .build()
3101 .await
3102 .unwrap();
3103
3104 let buffer = create_test_ipc_data();
3106 for i in 1..=3 {
3107 let mut create_request = CreateTableRequest::new();
3108 create_request.id = Some(vec![format!("table{}", i)]);
3109 dir_namespace
3110 .create_table(create_request, Bytes::from(buffer.clone()))
3111 .await
3112 .unwrap();
3113 }
3114
3115 let mut request = ListTablesRequest::new();
3117 request.id = Some(vec![]);
3118 let response = dir_namespace.list_tables(request).await.unwrap();
3119 assert_eq!(response.tables.len(), 3);
3120 assert!(response.tables.contains(&"table1".to_string()));
3121 assert!(response.tables.contains(&"table2".to_string()));
3122 assert!(response.tables.contains(&"table3".to_string()));
3123 }
3124
3125 #[rstest]
3126 #[case::with_optimization(true)]
3127 #[case::without_optimization(false)]
3128 #[tokio::test]
3129 async fn test_directory_only_mode(#[case] inline_optimization: bool) {
3130 let temp_dir = TempStdDir::default();
3131 let temp_path = temp_dir.to_str().unwrap();
3132
3133 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3135 .manifest_enabled(false)
3136 .inline_optimization_enabled(inline_optimization)
3137 .build()
3138 .await
3139 .unwrap();
3140
3141 let mut request = ListTablesRequest::new();
3143 request.id = Some(vec![]);
3144 let response = dir_namespace.list_tables(request).await.unwrap();
3145 assert_eq!(response.tables.len(), 0);
3146
3147 let buffer = create_test_ipc_data();
3149 let mut create_request = CreateTableRequest::new();
3150 create_request.id = Some(vec!["test_table".to_string()]);
3151
3152 let _response = dir_namespace
3154 .create_table(create_request, Bytes::from(buffer))
3155 .await
3156 .unwrap();
3157
3158 let mut request = ListTablesRequest::new();
3160 request.id = Some(vec![]);
3161 let response = dir_namespace.list_tables(request).await.unwrap();
3162 assert_eq!(response.tables.len(), 1);
3163 assert_eq!(response.tables[0], "test_table");
3164 }
3165
3166 #[rstest]
3167 #[case::with_optimization(true)]
3168 #[case::without_optimization(false)]
3169 #[tokio::test]
3170 async fn test_dual_mode_merge(#[case] inline_optimization: bool) {
3171 let temp_dir = TempStdDir::default();
3172 let temp_path = temp_dir.to_str().unwrap();
3173
3174 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3176 .manifest_enabled(true)
3177 .dir_listing_enabled(true)
3178 .inline_optimization_enabled(inline_optimization)
3179 .build()
3180 .await
3181 .unwrap();
3182
3183 let buffer = create_test_ipc_data();
3185 let mut create_request = CreateTableRequest::new();
3186 create_request.id = Some(vec!["table1".to_string()]);
3187 dir_namespace
3188 .create_table(create_request, Bytes::from(buffer))
3189 .await
3190 .unwrap();
3191
3192 let mut request = ListTablesRequest::new();
3194 request.id = Some(vec![]);
3195 let response = dir_namespace.list_tables(request).await.unwrap();
3196 assert_eq!(response.tables.len(), 1);
3197 assert_eq!(response.tables[0], "table1");
3198 }
3199
3200 #[rstest]
3201 #[case::with_optimization(true)]
3202 #[case::without_optimization(false)]
3203 #[tokio::test]
3204 async fn test_manifest_only_mode(#[case] inline_optimization: bool) {
3205 let temp_dir = TempStdDir::default();
3206 let temp_path = temp_dir.to_str().unwrap();
3207
3208 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3210 .manifest_enabled(true)
3211 .dir_listing_enabled(false)
3212 .inline_optimization_enabled(inline_optimization)
3213 .build()
3214 .await
3215 .unwrap();
3216
3217 let buffer = create_test_ipc_data();
3219 let mut create_request = CreateTableRequest::new();
3220 create_request.id = Some(vec!["test_table".to_string()]);
3221 dir_namespace
3222 .create_table(create_request, Bytes::from(buffer))
3223 .await
3224 .unwrap();
3225
3226 let mut request = ListTablesRequest::new();
3228 request.id = Some(vec![]);
3229 let response = dir_namespace.list_tables(request).await.unwrap();
3230 assert_eq!(response.tables.len(), 1);
3231 assert_eq!(response.tables[0], "test_table");
3232 }
3233
3234 #[rstest]
3235 #[case::with_optimization(true)]
3236 #[case::without_optimization(false)]
3237 #[tokio::test]
3238 async fn test_drop_nonexistent_table(#[case] inline_optimization: bool) {
3239 let temp_dir = TempStdDir::default();
3240 let temp_path = temp_dir.to_str().unwrap();
3241
3242 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3243 .inline_optimization_enabled(inline_optimization)
3244 .build()
3245 .await
3246 .unwrap();
3247
3248 let mut drop_request = DropTableRequest::new();
3250 drop_request.id = Some(vec!["nonexistent".to_string()]);
3251 let result = dir_namespace.drop_table(drop_request).await;
3252 assert!(result.is_err());
3253 }
3254
3255 #[rstest]
3256 #[case::with_optimization(true)]
3257 #[case::without_optimization(false)]
3258 #[tokio::test]
3259 async fn test_create_duplicate_table_fails(#[case] inline_optimization: bool) {
3260 let temp_dir = TempStdDir::default();
3261 let temp_path = temp_dir.to_str().unwrap();
3262
3263 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3264 .inline_optimization_enabled(inline_optimization)
3265 .build()
3266 .await
3267 .unwrap();
3268
3269 let buffer = create_test_ipc_data();
3271 let mut create_request = CreateTableRequest::new();
3272 create_request.id = Some(vec!["test_table".to_string()]);
3273 dir_namespace
3274 .create_table(create_request, Bytes::from(buffer.clone()))
3275 .await
3276 .unwrap();
3277
3278 let mut create_request = CreateTableRequest::new();
3280 create_request.id = Some(vec!["test_table".to_string()]);
3281 let result = dir_namespace
3282 .create_table(create_request, Bytes::from(buffer))
3283 .await;
3284 assert!(result.is_err());
3285 }
3286
3287 #[rstest]
3288 #[case::with_optimization(true)]
3289 #[case::without_optimization(false)]
3290 #[tokio::test]
3291 async fn test_create_child_namespace(#[case] inline_optimization: bool) {
3292 use lance_namespace::models::{
3293 CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
3294 };
3295
3296 let temp_dir = TempStdDir::default();
3297 let temp_path = temp_dir.to_str().unwrap();
3298
3299 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3300 .inline_optimization_enabled(inline_optimization)
3301 .build()
3302 .await
3303 .unwrap();
3304
3305 let mut create_req = CreateNamespaceRequest::new();
3307 create_req.id = Some(vec!["ns1".to_string()]);
3308 let result = dir_namespace.create_namespace(create_req).await;
3309 assert!(
3310 result.is_ok(),
3311 "Failed to create child namespace: {:?}",
3312 result.err()
3313 );
3314
3315 let exists_req = NamespaceExistsRequest {
3317 id: Some(vec!["ns1".to_string()]),
3318 ..Default::default()
3319 };
3320 let result = dir_namespace.namespace_exists(exists_req).await;
3321 assert!(result.is_ok(), "Namespace should exist");
3322
3323 let list_req = ListNamespacesRequest {
3325 id: Some(vec![]),
3326 page_token: None,
3327 limit: None,
3328 ..Default::default()
3329 };
3330 let result = dir_namespace.list_namespaces(list_req).await;
3331 assert!(result.is_ok());
3332 let namespaces = result.unwrap();
3333 assert_eq!(namespaces.namespaces.len(), 1);
3334 assert_eq!(namespaces.namespaces[0], "ns1");
3335 }
3336
3337 #[rstest]
3338 #[case::with_optimization(true)]
3339 #[case::without_optimization(false)]
3340 #[tokio::test]
3341 async fn test_create_nested_namespace(#[case] inline_optimization: bool) {
3342 use lance_namespace::models::{
3343 CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
3344 };
3345
3346 let temp_dir = TempStdDir::default();
3347 let temp_path = temp_dir.to_str().unwrap();
3348
3349 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3350 .inline_optimization_enabled(inline_optimization)
3351 .build()
3352 .await
3353 .unwrap();
3354
3355 let mut create_req = CreateNamespaceRequest::new();
3357 create_req.id = Some(vec!["parent".to_string()]);
3358 dir_namespace.create_namespace(create_req).await.unwrap();
3359
3360 let mut create_req = CreateNamespaceRequest::new();
3362 create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
3363 let result = dir_namespace.create_namespace(create_req).await;
3364 assert!(
3365 result.is_ok(),
3366 "Failed to create nested namespace: {:?}",
3367 result.err()
3368 );
3369
3370 let exists_req = NamespaceExistsRequest {
3372 id: Some(vec!["parent".to_string(), "child".to_string()]),
3373 ..Default::default()
3374 };
3375 let result = dir_namespace.namespace_exists(exists_req).await;
3376 assert!(result.is_ok(), "Nested namespace should exist");
3377
3378 let list_req = ListNamespacesRequest {
3380 id: Some(vec!["parent".to_string()]),
3381 page_token: None,
3382 limit: None,
3383 ..Default::default()
3384 };
3385 let result = dir_namespace.list_namespaces(list_req).await;
3386 assert!(result.is_ok());
3387 let namespaces = result.unwrap();
3388 assert_eq!(namespaces.namespaces.len(), 1);
3389 assert_eq!(namespaces.namespaces[0], "child");
3390 }
3391
3392 #[rstest]
3393 #[case::with_optimization(true)]
3394 #[case::without_optimization(false)]
3395 #[tokio::test]
3396 async fn test_create_namespace_without_parent_fails(#[case] inline_optimization: bool) {
3397 use lance_namespace::models::CreateNamespaceRequest;
3398
3399 let temp_dir = TempStdDir::default();
3400 let temp_path = temp_dir.to_str().unwrap();
3401
3402 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3403 .inline_optimization_enabled(inline_optimization)
3404 .build()
3405 .await
3406 .unwrap();
3407
3408 let mut create_req = CreateNamespaceRequest::new();
3410 create_req.id = Some(vec!["nonexistent_parent".to_string(), "child".to_string()]);
3411 let result = dir_namespace.create_namespace(create_req).await;
3412 assert!(result.is_err(), "Should fail when parent doesn't exist");
3413 }
3414
3415 #[rstest]
3416 #[case::with_optimization(true)]
3417 #[case::without_optimization(false)]
3418 #[tokio::test]
3419 async fn test_drop_child_namespace(#[case] inline_optimization: bool) {
3420 use lance_namespace::models::{
3421 CreateNamespaceRequest, DropNamespaceRequest, NamespaceExistsRequest,
3422 };
3423
3424 let temp_dir = TempStdDir::default();
3425 let temp_path = temp_dir.to_str().unwrap();
3426
3427 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3428 .inline_optimization_enabled(inline_optimization)
3429 .build()
3430 .await
3431 .unwrap();
3432
3433 let mut create_req = CreateNamespaceRequest::new();
3435 create_req.id = Some(vec!["ns1".to_string()]);
3436 dir_namespace.create_namespace(create_req).await.unwrap();
3437
3438 let mut drop_req = DropNamespaceRequest::new();
3440 drop_req.id = Some(vec!["ns1".to_string()]);
3441 let result = dir_namespace.drop_namespace(drop_req).await;
3442 assert!(
3443 result.is_ok(),
3444 "Failed to drop namespace: {:?}",
3445 result.err()
3446 );
3447
3448 let exists_req = NamespaceExistsRequest {
3450 id: Some(vec!["ns1".to_string()]),
3451 ..Default::default()
3452 };
3453 let result = dir_namespace.namespace_exists(exists_req).await;
3454 assert!(result.is_err(), "Namespace should not exist after drop");
3455 }
3456
3457 #[rstest]
3458 #[case::with_optimization(true)]
3459 #[case::without_optimization(false)]
3460 #[tokio::test]
3461 async fn test_drop_namespace_with_children_fails(#[case] inline_optimization: bool) {
3462 use lance_namespace::models::{CreateNamespaceRequest, DropNamespaceRequest};
3463
3464 let temp_dir = TempStdDir::default();
3465 let temp_path = temp_dir.to_str().unwrap();
3466
3467 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3468 .inline_optimization_enabled(inline_optimization)
3469 .build()
3470 .await
3471 .unwrap();
3472
3473 let mut create_req = CreateNamespaceRequest::new();
3475 create_req.id = Some(vec!["parent".to_string()]);
3476 dir_namespace.create_namespace(create_req).await.unwrap();
3477
3478 let mut create_req = CreateNamespaceRequest::new();
3479 create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
3480 dir_namespace.create_namespace(create_req).await.unwrap();
3481
3482 let mut drop_req = DropNamespaceRequest::new();
3484 drop_req.id = Some(vec!["parent".to_string()]);
3485 let result = dir_namespace.drop_namespace(drop_req).await;
3486 assert!(result.is_err(), "Should fail when namespace has children");
3487 }
3488
3489 #[rstest]
3490 #[case::with_optimization(true)]
3491 #[case::without_optimization(false)]
3492 #[tokio::test]
3493 async fn test_create_table_in_child_namespace(#[case] inline_optimization: bool) {
3494 use lance_namespace::models::{
3495 CreateNamespaceRequest, CreateTableRequest, ListTablesRequest,
3496 };
3497
3498 let temp_dir = TempStdDir::default();
3499 let temp_path = temp_dir.to_str().unwrap();
3500
3501 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3502 .inline_optimization_enabled(inline_optimization)
3503 .build()
3504 .await
3505 .unwrap();
3506
3507 let mut create_ns_req = CreateNamespaceRequest::new();
3509 create_ns_req.id = Some(vec!["ns1".to_string()]);
3510 dir_namespace.create_namespace(create_ns_req).await.unwrap();
3511
3512 let buffer = create_test_ipc_data();
3514 let mut create_table_req = CreateTableRequest::new();
3515 create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
3516 let result = dir_namespace
3517 .create_table(create_table_req, Bytes::from(buffer))
3518 .await;
3519 assert!(
3520 result.is_ok(),
3521 "Failed to create table in child namespace: {:?}",
3522 result.err()
3523 );
3524
3525 let list_req = ListTablesRequest {
3527 id: Some(vec!["ns1".to_string()]),
3528 page_token: None,
3529 limit: None,
3530 ..Default::default()
3531 };
3532 let result = dir_namespace.list_tables(list_req).await;
3533 assert!(result.is_ok());
3534 let tables = result.unwrap();
3535 assert_eq!(tables.tables.len(), 1);
3536 assert_eq!(tables.tables[0], "table1");
3537 }
3538
3539 #[rstest]
3540 #[case::with_optimization(true)]
3541 #[case::without_optimization(false)]
3542 #[tokio::test]
3543 async fn test_describe_child_namespace(#[case] inline_optimization: bool) {
3544 use lance_namespace::models::{CreateNamespaceRequest, DescribeNamespaceRequest};
3545
3546 let temp_dir = TempStdDir::default();
3547 let temp_path = temp_dir.to_str().unwrap();
3548
3549 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3550 .inline_optimization_enabled(inline_optimization)
3551 .build()
3552 .await
3553 .unwrap();
3554
3555 let mut properties = std::collections::HashMap::new();
3557 properties.insert("key1".to_string(), "value1".to_string());
3558
3559 let mut create_req = CreateNamespaceRequest::new();
3560 create_req.id = Some(vec!["ns1".to_string()]);
3561 create_req.properties = Some(properties.clone());
3562 dir_namespace.create_namespace(create_req).await.unwrap();
3563
3564 let describe_req = DescribeNamespaceRequest {
3566 id: Some(vec!["ns1".to_string()]),
3567 ..Default::default()
3568 };
3569 let result = dir_namespace.describe_namespace(describe_req).await;
3570 assert!(
3571 result.is_ok(),
3572 "Failed to describe namespace: {:?}",
3573 result.err()
3574 );
3575 let response = result.unwrap();
3576 assert!(response.properties.is_some());
3577 assert_eq!(
3578 response.properties.unwrap().get("key1"),
3579 Some(&"value1".to_string())
3580 );
3581 }
3582
3583 #[rstest]
3584 #[case::with_optimization(true)]
3585 #[case::without_optimization(false)]
3586 #[tokio::test]
3587 async fn test_concurrent_create_and_drop_single_instance(#[case] inline_optimization: bool) {
3588 use futures::future::join_all;
3589 use std::sync::Arc;
3590
3591 let temp_dir = TempStdDir::default();
3592 let temp_path = temp_dir.to_str().unwrap();
3593
3594 let dir_namespace = Arc::new(
3595 DirectoryNamespaceBuilder::new(temp_path)
3596 .inline_optimization_enabled(inline_optimization)
3597 .build()
3598 .await
3599 .unwrap(),
3600 );
3601
3602 let mut create_ns_request = CreateNamespaceRequest::new();
3605 create_ns_request.id = Some(vec!["test_ns".to_string()]);
3606 dir_namespace
3607 .create_namespace(create_ns_request)
3608 .await
3609 .unwrap();
3610
3611 let num_tables = 10;
3612 let mut handles = Vec::new();
3613
3614 for i in 0..num_tables {
3615 let ns = dir_namespace.clone();
3616 let handle = async move {
3617 let table_name = format!("concurrent_table_{}", i);
3618 let table_id = vec!["test_ns".to_string(), table_name.clone()];
3619 let buffer = create_test_ipc_data();
3620
3621 let mut create_request = CreateTableRequest::new();
3623 create_request.id = Some(table_id.clone());
3624 ns.create_table(create_request, Bytes::from(buffer))
3625 .await
3626 .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3627
3628 let mut drop_request = DropTableRequest::new();
3630 drop_request.id = Some(table_id);
3631 ns.drop_table(drop_request)
3632 .await
3633 .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3634
3635 Ok::<_, lance_core::Error>(())
3636 };
3637 handles.push(handle);
3638 }
3639
3640 let results = join_all(handles).await;
3641 for result in results {
3642 assert!(result.is_ok(), "All concurrent operations should succeed");
3643 }
3644
3645 let mut request = ListTablesRequest::new();
3647 request.id = Some(vec!["test_ns".to_string()]);
3648 let response = dir_namespace.list_tables(request).await.unwrap();
3649 assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3650 }
3651
3652 #[rstest]
3653 #[case::with_optimization(true)]
3654 #[case::without_optimization(false)]
3655 #[tokio::test]
3656 async fn test_concurrent_create_and_drop_multiple_instances(#[case] inline_optimization: bool) {
3657 use futures::future::join_all;
3658
3659 let temp_dir = TempStdDir::default();
3660 let temp_path = temp_dir.to_str().unwrap().to_string();
3661
3662 let init_ns = DirectoryNamespaceBuilder::new(&temp_path)
3665 .inline_optimization_enabled(inline_optimization)
3666 .build()
3667 .await
3668 .unwrap();
3669 let mut create_ns_request = CreateNamespaceRequest::new();
3670 create_ns_request.id = Some(vec!["test_ns".to_string()]);
3671 init_ns.create_namespace(create_ns_request).await.unwrap();
3672
3673 let num_tables = 10;
3674 let mut handles = Vec::new();
3675
3676 for i in 0..num_tables {
3677 let path = temp_path.clone();
3678 let handle = async move {
3679 let ns = DirectoryNamespaceBuilder::new(&path)
3681 .inline_optimization_enabled(inline_optimization)
3682 .build()
3683 .await
3684 .unwrap();
3685
3686 let table_name = format!("multi_ns_table_{}", i);
3687 let table_id = vec!["test_ns".to_string(), table_name.clone()];
3688 let buffer = create_test_ipc_data();
3689
3690 let mut create_request = CreateTableRequest::new();
3692 create_request.id = Some(table_id.clone());
3693 ns.create_table(create_request, Bytes::from(buffer))
3694 .await
3695 .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3696
3697 let mut drop_request = DropTableRequest::new();
3699 drop_request.id = Some(table_id);
3700 ns.drop_table(drop_request)
3701 .await
3702 .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3703
3704 Ok::<_, lance_core::Error>(())
3705 };
3706 handles.push(handle);
3707 }
3708
3709 let results = join_all(handles).await;
3710 for result in results {
3711 assert!(result.is_ok(), "All concurrent operations should succeed");
3712 }
3713
3714 let verify_ns = DirectoryNamespaceBuilder::new(&temp_path)
3716 .inline_optimization_enabled(inline_optimization)
3717 .build()
3718 .await
3719 .unwrap();
3720
3721 let mut request = ListTablesRequest::new();
3722 request.id = Some(vec!["test_ns".to_string()]);
3723 let response = verify_ns.list_tables(request).await.unwrap();
3724 assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3725 }
3726
3727 #[rstest]
3728 #[case::with_optimization(true)]
3729 #[case::without_optimization(false)]
3730 #[tokio::test]
3731 async fn test_concurrent_create_then_drop_from_different_instance(
3732 #[case] inline_optimization: bool,
3733 ) {
3734 use futures::future::join_all;
3735
3736 let temp_dir = TempStdDir::default();
3737 let temp_path = temp_dir.to_str().unwrap().to_string();
3738
3739 let init_ns = DirectoryNamespaceBuilder::new(&temp_path)
3742 .inline_optimization_enabled(inline_optimization)
3743 .build()
3744 .await
3745 .unwrap();
3746 let mut create_ns_request = CreateNamespaceRequest::new();
3747 create_ns_request.id = Some(vec!["test_ns".to_string()]);
3748 init_ns.create_namespace(create_ns_request).await.unwrap();
3749
3750 let num_tables = 10;
3751
3752 let mut create_handles = Vec::new();
3754 for i in 0..num_tables {
3755 let path = temp_path.clone();
3756 let handle = async move {
3757 let ns = DirectoryNamespaceBuilder::new(&path)
3758 .inline_optimization_enabled(inline_optimization)
3759 .build()
3760 .await
3761 .unwrap();
3762
3763 let table_name = format!("cross_instance_table_{}", i);
3764 let table_id = vec!["test_ns".to_string(), table_name.clone()];
3765 let buffer = create_test_ipc_data();
3766
3767 let mut create_request = CreateTableRequest::new();
3768 create_request.id = Some(table_id);
3769 ns.create_table(create_request, Bytes::from(buffer))
3770 .await
3771 .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3772
3773 Ok::<_, lance_core::Error>(())
3774 };
3775 create_handles.push(handle);
3776 }
3777
3778 let create_results = join_all(create_handles).await;
3779 for result in create_results {
3780 assert!(result.is_ok(), "All create operations should succeed");
3781 }
3782
3783 let mut drop_handles = Vec::new();
3785 for i in 0..num_tables {
3786 let path = temp_path.clone();
3787 let handle = async move {
3788 let ns = DirectoryNamespaceBuilder::new(&path)
3789 .inline_optimization_enabled(inline_optimization)
3790 .build()
3791 .await
3792 .unwrap();
3793
3794 let table_name = format!("cross_instance_table_{}", i);
3795 let table_id = vec!["test_ns".to_string(), table_name.clone()];
3796
3797 let mut drop_request = DropTableRequest::new();
3798 drop_request.id = Some(table_id);
3799 ns.drop_table(drop_request)
3800 .await
3801 .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3802
3803 Ok::<_, lance_core::Error>(())
3804 };
3805 drop_handles.push(handle);
3806 }
3807
3808 let drop_results = join_all(drop_handles).await;
3809 for result in drop_results {
3810 assert!(result.is_ok(), "All drop operations should succeed");
3811 }
3812
3813 let verify_ns = DirectoryNamespaceBuilder::new(&temp_path)
3815 .inline_optimization_enabled(inline_optimization)
3816 .build()
3817 .await
3818 .unwrap();
3819
3820 let mut request = ListTablesRequest::new();
3821 request.id = Some(vec!["test_ns".to_string()]);
3822 let response = verify_ns.list_tables(request).await.unwrap();
3823 assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3824 }
3825
3826 #[test]
3827 fn test_construct_full_uri_with_cloud_urls() {
3828 let s3_result =
3830 ManifestNamespace::construct_full_uri("s3://bucket/path/subdir", "table.lance")
3831 .unwrap();
3832 assert_eq!(
3833 s3_result, "s3://bucket/path/subdir/table.lance",
3834 "S3 URL should correctly append table name to nested path"
3835 );
3836
3837 let az_result =
3839 ManifestNamespace::construct_full_uri("az://container/path/subdir", "table.lance")
3840 .unwrap();
3841 assert_eq!(
3842 az_result, "az://container/path/subdir/table.lance",
3843 "Azure URL should correctly append table name to nested path"
3844 );
3845
3846 let gs_result =
3848 ManifestNamespace::construct_full_uri("gs://bucket/path/subdir", "table.lance")
3849 .unwrap();
3850 assert_eq!(
3851 gs_result, "gs://bucket/path/subdir/table.lance",
3852 "GCS URL should correctly append table name to nested path"
3853 );
3854
3855 let deep_result =
3857 ManifestNamespace::construct_full_uri("s3://bucket/a/b/c/d", "my_table.lance").unwrap();
3858 assert_eq!(
3859 deep_result, "s3://bucket/a/b/c/d/my_table.lance",
3860 "Deeply nested path should work correctly"
3861 );
3862
3863 let shallow_result =
3865 ManifestNamespace::construct_full_uri("s3://bucket", "table.lance").unwrap();
3866 assert_eq!(
3867 shallow_result, "s3://bucket/table.lance",
3868 "Single-level nested path should work correctly"
3869 );
3870
3871 let trailing_slash_result =
3873 ManifestNamespace::construct_full_uri("s3://bucket/path/subdir/", "table.lance")
3874 .unwrap();
3875 assert_eq!(
3876 trailing_slash_result, "s3://bucket/path/subdir/table.lance",
3877 "URL with existing trailing slash should still work"
3878 );
3879
3880 let empty_query_result =
3883 ManifestNamespace::construct_full_uri("s3://bucket/path?", "table.lance").unwrap();
3884 assert_eq!(
3885 empty_query_result, "s3://bucket/path/table.lance",
3886 "URL with empty query string should not include trailing '?'"
3887 );
3888
3889 let query_param_result =
3892 ManifestNamespace::construct_full_uri("s3://bucket/path?param=value", "table.lance")
3893 .unwrap();
3894 assert_eq!(
3895 query_param_result, "s3://bucket/path/table.lance",
3896 "URL with query parameters should have them stripped"
3897 );
3898 }
3899
3900 #[test]
3901 fn test_construct_full_uri_with_dollar_sign() {
3902 let result =
3903 ManifestNamespace::construct_full_uri("/tmp/root", "hash_workspace$test_table")
3904 .unwrap();
3905
3906 assert!(
3907 result.ends_with("/tmp/root/hash_workspace$test_table"),
3908 "local file URI should preserve dollar signs without adding empty path segments: {}",
3909 result
3910 );
3911 assert!(
3912 !result.contains("//hash_workspace$test_table"),
3913 "local file URI should not add a double slash before table directory: {}",
3914 result
3915 );
3916 }
3917
3918 #[test]
3919 fn test_construct_full_uri_with_nested_relative_location() {
3920 let result =
3921 ManifestNamespace::construct_full_uri("/tmp/root", "workspace/physical_table.lance")
3922 .unwrap();
3923
3924 assert!(
3925 result.ends_with("/tmp/root/workspace/physical_table.lance"),
3926 "nested relative location should preserve path separators: {}",
3927 result
3928 );
3929 assert!(
3930 !result.contains("%2Fphysical_table.lance"),
3931 "nested relative location should not encode path separators: {}",
3932 result
3933 );
3934 }
3935
3936 #[tokio::test]
3943 async fn test_concurrent_create_table_no_duplicates() {
3944 let temp_dir = TempStdDir::default();
3945 let temp_path = temp_dir.to_str().unwrap();
3946
3947 let ns1 = DirectoryNamespaceBuilder::new(temp_path)
3950 .inline_optimization_enabled(false)
3951 .build()
3952 .await
3953 .unwrap();
3954 let ns2 = DirectoryNamespaceBuilder::new(temp_path)
3955 .inline_optimization_enabled(false)
3956 .build()
3957 .await
3958 .unwrap();
3959
3960 let buffer = create_test_ipc_data();
3961
3962 let mut req1 = CreateTableRequest::new();
3963 req1.id = Some(vec!["race_table".to_string()]);
3964 let mut req2 = CreateTableRequest::new();
3965 req2.id = Some(vec!["race_table".to_string()]);
3966
3967 let (result1, result2) = tokio::join!(
3969 ns1.create_table(req1, Bytes::from(buffer.clone())),
3970 ns2.create_table(req2, Bytes::from(buffer.clone())),
3971 );
3972
3973 let success_count = [&result1, &result2].iter().filter(|r| r.is_ok()).count();
3975 let failure_count = [&result1, &result2].iter().filter(|r| r.is_err()).count();
3976 assert_eq!(
3977 success_count, 1,
3978 "Exactly one create should succeed, got: result1={:?}, result2={:?}",
3979 result1, result2
3980 );
3981 assert_eq!(
3982 failure_count, 1,
3983 "Exactly one create should fail, got: result1={:?}, result2={:?}",
3984 result1, result2
3985 );
3986
3987 let ns_check = DirectoryNamespaceBuilder::new(temp_path)
3989 .inline_optimization_enabled(false)
3990 .build()
3991 .await
3992 .unwrap();
3993 let mut list_request = ListTablesRequest::new();
3994 list_request.id = Some(vec![]);
3995 let response = ns_check.list_tables(list_request).await.unwrap();
3996 assert_eq!(
3997 response.tables.len(),
3998 1,
3999 "Should have exactly 1 table, found: {:?}",
4000 response.tables
4001 );
4002 assert_eq!(response.tables[0], "race_table");
4003
4004 let mut describe_request = DescribeTableRequest::new();
4006 describe_request.id = Some(vec!["race_table".to_string()]);
4007 let describe_result = ns_check.describe_table(describe_request).await;
4008 assert!(
4009 describe_result.is_ok(),
4010 "describe_table should not fail with duplicate entries: {:?}",
4011 describe_result
4012 );
4013 }
4014
4015 fn names(v: &[&str]) -> Vec<String> {
4018 v.iter().map(|s| s.to_string()).collect()
4019 }
4020
4021 #[test]
4022 fn test_apply_pagination_no_token_no_limit() {
4023 let mut n = names(&["b", "a", "c"]);
4024 let next = ManifestNamespace::apply_pagination(&mut n, None, None);
4025 assert_eq!(n, names(&["a", "b", "c"]));
4026 assert_eq!(next, None);
4027 }
4028
4029 #[test]
4030 fn test_apply_pagination_limit_truncates_and_returns_token() {
4031 let mut n = names(&["c", "a", "b"]);
4032 let next = ManifestNamespace::apply_pagination(&mut n, None, Some(2));
4033 assert_eq!(n, names(&["a", "b"]));
4034 assert_eq!(next, Some("b".to_string()));
4035 }
4036
4037 #[test]
4038 fn test_apply_pagination_limit_zero_returns_empty_no_token() {
4039 let mut n = names(&["a", "b", "c"]);
4040 let next = ManifestNamespace::apply_pagination(&mut n, None, Some(0));
4041 assert!(n.is_empty());
4042 assert_eq!(next, None);
4043 }
4044
4045 #[test]
4046 fn test_apply_pagination_page_token_in_list() {
4047 let mut n = names(&["a", "b", "c", "d"]);
4049 let next = ManifestNamespace::apply_pagination(&mut n, Some("b".to_string()), None);
4050 assert_eq!(n, names(&["c", "d"]));
4051 assert_eq!(next, None);
4052 }
4053
4054 #[test]
4055 fn test_apply_pagination_page_token_past_all_items() {
4056 let mut n = names(&["a", "b", "c"]);
4057 let next = ManifestNamespace::apply_pagination(&mut n, Some("z".to_string()), None);
4058 assert!(n.is_empty());
4059 assert_eq!(next, None);
4060 }
4061
4062 #[test]
4063 fn test_apply_pagination_token_and_limit_combined() {
4064 let mut n = names(&["a", "b", "c", "d", "e"]);
4065 let next = ManifestNamespace::apply_pagination(&mut n, Some("b".to_string()), Some(2));
4066 assert_eq!(n, names(&["c", "d"]));
4067 assert_eq!(next, Some("d".to_string()));
4068 }
4069}