1use arrow::array::builder::{ListBuilder, StringBuilder};
10use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray};
11use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
12use arrow_ipc::reader::StreamReader;
13use async_trait::async_trait;
14use bytes::Bytes;
15use futures::{FutureExt, stream::StreamExt};
16use lance::dataset::optimize::{CompactionOptions, compact_files};
17use lance::dataset::{
18 DeleteBuilder, MergeInsertBuilder, ReadParams, WhenMatched, WhenNotMatched, WriteParams,
19 builder::DatasetBuilder,
20};
21use lance::session::Session;
22use lance::{Dataset, dataset::scanner::Scanner};
23use lance_core::Error as LanceError;
24use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION;
25use lance_core::{Error, Result, box_error};
26use lance_index::IndexType;
27use lance_index::optimize::OptimizeOptions;
28use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
29use lance_index::traits::DatasetIndexExt;
30use lance_io::object_store::{ObjectStore, ObjectStoreParams};
31use lance_namespace::LanceNamespace;
32use lance_namespace::error::NamespaceError;
33use lance_namespace::models::{
34 CreateNamespaceRequest, CreateNamespaceResponse, CreateTableRequest, CreateTableResponse,
35 DeclareTableRequest, DeclareTableResponse, DeregisterTableRequest, DeregisterTableResponse,
36 DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
37 DescribeTableResponse, DescribeTableVersionResponse, DropNamespaceRequest,
38 DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest,
39 ListNamespacesResponse, ListTableVersionsResponse, ListTablesRequest, ListTablesResponse,
40 NamespaceExistsRequest, RegisterTableRequest, RegisterTableResponse, TableExistsRequest,
41 TableVersion,
42};
43use lance_namespace::schema::arrow_schema_to_json;
44use object_store::path::Path;
45use std::io::Cursor;
46use std::{
47 collections::HashMap,
48 hash::{DefaultHasher, Hash, Hasher},
49 ops::{Deref, DerefMut},
50 sync::Arc,
51};
52use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
53
54const MANIFEST_TABLE_NAME: &str = "__manifest";
55const DELIMITER: &str = "$";
56
57const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
60const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
62const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum ObjectType {
68 Namespace,
69 Table,
70 TableVersion,
71}
72
73impl ObjectType {
74 pub fn as_str(&self) -> &str {
75 match self {
76 Self::Namespace => "namespace",
77 Self::Table => "table",
78 Self::TableVersion => "table_version",
79 }
80 }
81
82 pub fn parse(s: &str) -> Result<Self> {
83 match s {
84 "namespace" => Ok(Self::Namespace),
85 "table" => Ok(Self::Table),
86 "table_version" => Ok(Self::TableVersion),
87 _ => Err(Error::io(format!("Invalid object type: {}", s))),
88 }
89 }
90}
91
92#[derive(Debug, Clone)]
94pub struct TableInfo {
95 pub namespace: Vec<String>,
96 pub name: String,
97 pub location: String,
98}
99
100#[derive(Debug, Clone)]
105pub struct ManifestEntry {
106 pub object_id: String,
108 pub object_type: ObjectType,
110 pub location: Option<String>,
112 pub metadata: Option<String>,
114}
115
116#[derive(Debug, Clone)]
118pub struct NamespaceInfo {
119 pub namespace: Vec<String>,
120 pub name: String,
121 pub metadata: Option<HashMap<String, String>>,
122}
123
124#[derive(Debug, Clone)]
129pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);
130
131impl DatasetConsistencyWrapper {
132 pub fn new(dataset: Dataset) -> Self {
134 Self(Arc::new(RwLock::new(dataset)))
135 }
136
137 pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
140 self.reload().await?;
141 Ok(DatasetReadGuard {
142 guard: self.0.read().await,
143 })
144 }
145
146 pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
149 self.reload().await?;
150 Ok(DatasetWriteGuard {
151 guard: self.0.write().await,
152 })
153 }
154
155 pub async fn set_latest(&self, dataset: Dataset) {
160 let mut write_guard = self.0.write().await;
161 if dataset.manifest().version > write_guard.manifest().version {
162 *write_guard = dataset;
163 }
164 }
165
166 async fn reload(&self) -> Result<()> {
168 let read_guard = self.0.read().await;
170 let dataset_uri = read_guard.uri().to_string();
171 let current_version = read_guard.version().version;
172 log::debug!(
173 "Reload starting for uri={}, current_version={}",
174 dataset_uri,
175 current_version
176 );
177 let latest_version = read_guard.latest_version_id().await.map_err(|e| {
178 Error::io_source(box_error(std::io::Error::other(format!(
179 "Failed to get latest version: {}",
180 e
181 ))))
182 })?;
183 log::debug!(
184 "Reload got latest_version={} for uri={}, current_version={}",
185 latest_version,
186 dataset_uri,
187 current_version
188 );
189 drop(read_guard);
190
191 if latest_version == current_version {
193 log::debug!("Already up-to-date for uri={}", dataset_uri);
194 return Ok(());
195 }
196
197 let mut write_guard = self.0.write().await;
199
200 let latest_version = write_guard.latest_version_id().await.map_err(|e| {
202 Error::io_source(box_error(std::io::Error::other(format!(
203 "Failed to get latest version: {}",
204 e
205 ))))
206 })?;
207
208 if latest_version != write_guard.version().version {
209 write_guard.checkout_latest().await.map_err(|e| {
210 Error::io_source(box_error(std::io::Error::other(format!(
211 "Failed to checkout latest: {}",
212 e
213 ))))
214 })?;
215 }
216
217 Ok(())
218 }
219}
220
221pub struct DatasetReadGuard<'a> {
222 guard: RwLockReadGuard<'a, Dataset>,
223}
224
225impl Deref for DatasetReadGuard<'_> {
226 type Target = Dataset;
227
228 fn deref(&self) -> &Self::Target {
229 &self.guard
230 }
231}
232
233pub struct DatasetWriteGuard<'a> {
234 guard: RwLockWriteGuard<'a, Dataset>,
235}
236
237impl Deref for DatasetWriteGuard<'_> {
238 type Target = Dataset;
239
240 fn deref(&self) -> &Self::Target {
241 &self.guard
242 }
243}
244
245impl DerefMut for DatasetWriteGuard<'_> {
246 fn deref_mut(&mut self) -> &mut Self::Target {
247 &mut self.guard
248 }
249}
250
251pub struct ManifestNamespace {
255 root: String,
256 storage_options: Option<HashMap<String, String>>,
257 #[allow(dead_code)]
258 session: Option<Arc<Session>>,
259 #[allow(dead_code)]
260 object_store: Arc<ObjectStore>,
261 #[allow(dead_code)]
262 base_path: Path,
263 manifest_dataset: DatasetConsistencyWrapper,
264 dir_listing_enabled: bool,
268 inline_optimization_enabled: bool,
271 commit_retries: Option<u32>,
274}
275
276impl std::fmt::Debug for ManifestNamespace {
277 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278 f.debug_struct("ManifestNamespace")
279 .field("root", &self.root)
280 .field("storage_options", &self.storage_options)
281 .field("dir_listing_enabled", &self.dir_listing_enabled)
282 .field(
283 "inline_optimization_enabled",
284 &self.inline_optimization_enabled,
285 )
286 .finish()
287 }
288}
289
290fn convert_lance_commit_error(e: &LanceError, operation: &str, object_id: Option<&str>) -> Error {
299 match e {
300 LanceError::CommitConflict { .. } => NamespaceError::Throttled {
302 message: format!("Too many concurrent writes, please retry later: {:?}", e),
303 }
304 .into(),
305 LanceError::TooMuchWriteContention { .. } | LanceError::IncompatibleTransaction { .. } => {
308 let message = if let Some(id) = object_id {
309 format!(
310 "Object '{}' was concurrently modified by another operation: {:?}",
311 id, e
312 )
313 } else {
314 format!(
315 "Object was concurrently modified by another operation: {:?}",
316 e
317 )
318 };
319 NamespaceError::ConcurrentModification { message }.into()
320 }
321 _ => {
323 let error_msg = e.to_string();
324 if error_msg.contains("matched")
325 || error_msg.contains("duplicate")
326 || error_msg.contains("already exists")
327 {
328 let message = if let Some(id) = object_id {
329 format!(
330 "Object '{}' was concurrently created by another operation: {:?}",
331 id, e
332 )
333 } else {
334 format!(
335 "Object was concurrently created by another operation: {:?}",
336 e
337 )
338 };
339 return NamespaceError::ConcurrentModification { message }.into();
340 }
341 Error::io_source(box_error(std::io::Error::other(format!(
342 "{}: {:?}",
343 operation, e
344 ))))
345 }
346 }
347}
348
349impl ManifestNamespace {
350 #[allow(clippy::too_many_arguments)]
352 pub async fn from_directory(
353 root: String,
354 storage_options: Option<HashMap<String, String>>,
355 session: Option<Arc<Session>>,
356 object_store: Arc<ObjectStore>,
357 base_path: Path,
358 dir_listing_enabled: bool,
359 inline_optimization_enabled: bool,
360 commit_retries: Option<u32>,
361 table_version_storage_enabled: bool,
362 ) -> Result<Self> {
363 let manifest_dataset = Self::ensure_manifest_table_up_to_date(
364 &root,
365 &storage_options,
366 session.clone(),
367 table_version_storage_enabled,
368 )
369 .await?;
370
371 Ok(Self {
372 root,
373 storage_options,
374 session,
375 object_store,
376 base_path,
377 manifest_dataset,
378 dir_listing_enabled,
379 inline_optimization_enabled,
380 commit_retries,
381 })
382 }
383
384 pub fn build_object_id(namespace: &[String], name: &str) -> String {
386 if namespace.is_empty() {
387 name.to_string()
388 } else {
389 let mut id = namespace.join(DELIMITER);
390 id.push_str(DELIMITER);
391 id.push_str(name);
392 id
393 }
394 }
395
396 pub fn parse_object_id(object_id: &str) -> (Vec<String>, String) {
398 let parts: Vec<&str> = object_id.split(DELIMITER).collect();
399 if parts.len() == 1 {
400 (Vec::new(), parts[0].to_string())
401 } else {
402 let namespace = parts[..parts.len() - 1]
403 .iter()
404 .map(|s| s.to_string())
405 .collect();
406 let name = parts[parts.len() - 1].to_string();
407 (namespace, name)
408 }
409 }
410
411 pub fn split_object_id(object_id: &[String]) -> (Vec<String>, String) {
413 if object_id.len() == 1 {
414 (vec![], object_id[0].clone())
415 } else {
416 (
417 object_id[..object_id.len() - 1].to_vec(),
418 object_id[object_id.len() - 1].clone(),
419 )
420 }
421 }
422
423 pub fn str_object_id(object_id: &[String]) -> String {
425 object_id.join(DELIMITER)
426 }
427
428 pub fn format_table_version(version: i64) -> String {
433 format!("{:020}", version)
434 }
435
436 pub fn build_version_object_id(table_object_id: &str, version: i64) -> String {
440 format!(
441 "{}{}{}",
442 table_object_id,
443 DELIMITER,
444 Self::format_table_version(version)
445 )
446 }
447
448 pub fn parse_version_from_object_id(object_id: &str) -> Option<i64> {
452 let (_namespace, name) = Self::parse_object_id(object_id);
453 name.parse::<i64>().ok()
454 }
455
456 pub fn generate_dir_name(object_id: &str) -> String {
463 let random_num: u64 = rand::random();
465
466 let mut hasher = DefaultHasher::new();
468 random_num.hash(&mut hasher);
469 object_id.hash(&mut hasher);
470 let hash = hasher.finish();
471
472 format!("{:08x}_{}", (hash & 0xFFFFFFFF) as u32, object_id)
474 }
475
476 pub(crate) fn construct_full_uri(root: &str, relative_location: &str) -> Result<String> {
478 let mut base_url = lance_io::object_store::uri_to_url(root)?;
479
480 if !base_url.path().ends_with('/') {
485 base_url.set_path(&format!("{}/", base_url.path()));
486 }
487
488 let full_url = base_url.join(relative_location).map_err(|e| {
489 Error::invalid_input_source(
490 format!(
491 "Failed to join URI '{}' with '{}': {:?}",
492 root, relative_location, e
493 )
494 .into(),
495 )
496 })?;
497
498 Ok(full_url.to_string())
499 }
500
501 async fn run_inline_optimization(&self) -> Result<()> {
513 if !self.inline_optimization_enabled {
514 return Ok(());
515 }
516
517 let mut dataset_guard = self.manifest_dataset.get_mut().await?;
519 let dataset: &mut Dataset = &mut dataset_guard;
520
521 let indices = dataset.load_indices().await?;
523
524 let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
526 let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
527 let has_base_objects_index = indices
528 .iter()
529 .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);
530
531 if !has_object_id_index {
533 log::debug!(
534 "Creating BTREE index '{}' on object_id for __manifest table",
535 OBJECT_ID_INDEX_NAME
536 );
537 let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
538 if let Err(e) = dataset
539 .create_index(
540 &["object_id"],
541 IndexType::BTree,
542 Some(OBJECT_ID_INDEX_NAME.to_string()),
543 ¶ms,
544 true,
545 )
546 .await
547 {
548 log::warn!(
549 "Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.",
550 e
551 );
552 } else {
553 log::info!(
554 "Created BTREE index '{}' on object_id for __manifest table",
555 OBJECT_ID_INDEX_NAME
556 );
557 }
558 }
559
560 if !has_object_type_index {
562 log::debug!(
563 "Creating Bitmap index '{}' on object_type for __manifest table",
564 OBJECT_TYPE_INDEX_NAME
565 );
566 let params = ScalarIndexParams::default();
567 if let Err(e) = dataset
568 .create_index(
569 &["object_type"],
570 IndexType::Bitmap,
571 Some(OBJECT_TYPE_INDEX_NAME.to_string()),
572 ¶ms,
573 true,
574 )
575 .await
576 {
577 log::warn!(
578 "Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.",
579 e
580 );
581 } else {
582 log::info!(
583 "Created Bitmap index '{}' on object_type for __manifest table",
584 OBJECT_TYPE_INDEX_NAME
585 );
586 }
587 }
588
589 if !has_base_objects_index {
591 log::debug!(
592 "Creating LabelList index '{}' on base_objects for __manifest table",
593 BASE_OBJECTS_INDEX_NAME
594 );
595 let params = ScalarIndexParams::default();
596 if let Err(e) = dataset
597 .create_index(
598 &["base_objects"],
599 IndexType::LabelList,
600 Some(BASE_OBJECTS_INDEX_NAME.to_string()),
601 ¶ms,
602 true,
603 )
604 .await
605 {
606 log::warn!(
607 "Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.",
608 e
609 );
610 } else {
611 log::info!(
612 "Created LabelList index '{}' on base_objects for __manifest table",
613 BASE_OBJECTS_INDEX_NAME
614 );
615 }
616 }
617
618 log::debug!("Running file compaction on __manifest table");
620 match compact_files(dataset, CompactionOptions::default(), None).await {
621 Ok(compaction_metrics) => {
622 if compaction_metrics.fragments_removed > 0 {
623 log::info!(
624 "Compacted __manifest table: removed {} fragments, added {} fragments",
625 compaction_metrics.fragments_removed,
626 compaction_metrics.fragments_added
627 );
628 }
629 }
630 Err(e) => {
631 log::warn!(
632 "Failed to compact files for __manifest table: {:?}. Continuing with optimization.",
633 e
634 );
635 }
636 }
637
638 log::debug!("Optimizing indices on __manifest table");
640 match dataset.optimize_indices(&OptimizeOptions::default()).await {
641 Ok(_) => {
642 log::info!("Successfully optimized indices on __manifest table");
643 }
644 Err(e) => {
645 log::warn!(
646 "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.",
647 e
648 );
649 }
650 }
651
652 Ok(())
653 }
654
655 fn manifest_schema() -> Arc<ArrowSchema> {
657 Arc::new(ArrowSchema::new(vec![
658 Field::new("object_id", DataType::Utf8, false).with_metadata(
660 [(
661 LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(),
662 "0".to_string(),
663 )]
664 .into_iter()
665 .collect(),
666 ),
667 Field::new("object_type", DataType::Utf8, false),
668 Field::new("location", DataType::Utf8, true),
669 Field::new("metadata", DataType::Utf8, true),
670 Field::new(
671 "base_objects",
672 DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))),
673 true,
674 ),
675 ]))
676 }
677
678 async fn manifest_scanner(&self) -> Result<Scanner> {
680 let dataset_guard = self.manifest_dataset.get().await?;
681 Ok(dataset_guard.scan())
682 }
683
684 async fn execute_scanner(scanner: Scanner) -> Result<Vec<RecordBatch>> {
686 let mut stream = scanner.try_into_stream().await.map_err(|e| {
687 Error::io_source(box_error(std::io::Error::other(format!(
688 "Failed to create stream: {}",
689 e
690 ))))
691 })?;
692
693 let mut batches = Vec::new();
694 while let Some(batch) = stream.next().await {
695 batches.push(batch.map_err(|e| {
696 Error::io_source(box_error(std::io::Error::other(format!(
697 "Failed to read batch: {}",
698 e
699 ))))
700 })?);
701 }
702
703 Ok(batches)
704 }
705
706 fn get_string_column<'a>(batch: &'a RecordBatch, column_name: &str) -> Result<&'a StringArray> {
708 let column = batch
709 .column_by_name(column_name)
710 .ok_or_else(|| Error::io(format!("Column '{}' not found", column_name)))?;
711 column
712 .as_any()
713 .downcast_ref::<StringArray>()
714 .ok_or_else(|| Error::io(format!("Column '{}' is not a string array", column_name)))
715 }
716
717 async fn manifest_contains_object(&self, object_id: &str) -> Result<bool> {
719 let escaped_id = object_id.replace('\'', "''");
720 let filter = format!("object_id = '{}'", escaped_id);
721
722 let dataset_guard = self.manifest_dataset.get().await?;
723 let mut scanner = dataset_guard.scan();
724
725 scanner.filter(&filter).map_err(|e| {
726 Error::io_source(box_error(std::io::Error::other(format!(
727 "Failed to filter: {}",
728 e
729 ))))
730 })?;
731
732 scanner.project::<&str>(&[]).map_err(|e| {
734 Error::io_source(box_error(std::io::Error::other(format!(
735 "Failed to project: {}",
736 e
737 ))))
738 })?;
739
740 scanner.with_row_id();
741
742 let count = scanner.count_rows().await.map_err(|e| {
743 Error::io_source(box_error(std::io::Error::other(format!(
744 "Failed to count rows: {}",
745 e
746 ))))
747 })?;
748
749 Ok(count > 0)
750 }
751
752 async fn query_manifest_for_table(&self, object_id: &str) -> Result<Option<TableInfo>> {
754 let escaped_id = object_id.replace('\'', "''");
755 let filter = format!("object_id = '{}' AND object_type = 'table'", escaped_id);
756 let mut scanner = self.manifest_scanner().await?;
757 scanner.filter(&filter).map_err(|e| {
758 Error::io_source(box_error(std::io::Error::other(format!(
759 "Failed to filter: {}",
760 e
761 ))))
762 })?;
763 scanner.project(&["object_id", "location"]).map_err(|e| {
764 Error::io_source(box_error(std::io::Error::other(format!(
765 "Failed to project: {}",
766 e
767 ))))
768 })?;
769 let batches = Self::execute_scanner(scanner).await?;
770
771 let mut found_result: Option<TableInfo> = None;
772 let mut total_rows = 0;
773
774 for batch in batches {
775 if batch.num_rows() == 0 {
776 continue;
777 }
778
779 total_rows += batch.num_rows();
780 if total_rows > 1 {
781 return Err(Error::io(format!(
782 "Expected exactly 1 table with id '{}', found {}",
783 object_id, total_rows
784 )));
785 }
786
787 let object_id_array = Self::get_string_column(&batch, "object_id")?;
788 let location_array = Self::get_string_column(&batch, "location")?;
789 let location = location_array.value(0).to_string();
790 let (namespace, name) = Self::parse_object_id(object_id_array.value(0));
791 found_result = Some(TableInfo {
792 namespace,
793 name,
794 location,
795 });
796 }
797
798 Ok(found_result)
799 }
800
801 pub async fn list_manifest_table_locations(&self) -> Result<std::collections::HashSet<String>> {
804 let filter = "object_type = 'table' AND NOT contains(object_id, '$')";
805 let mut scanner = self.manifest_scanner().await?;
806 scanner.filter(filter).map_err(|e| {
807 Error::io_source(box_error(std::io::Error::other(format!(
808 "Failed to filter: {}",
809 e
810 ))))
811 })?;
812 scanner.project(&["location"]).map_err(|e| {
813 Error::io_source(box_error(std::io::Error::other(format!(
814 "Failed to project: {}",
815 e
816 ))))
817 })?;
818
819 let batches = Self::execute_scanner(scanner).await?;
820 let mut locations = std::collections::HashSet::new();
821
822 for batch in batches {
823 if batch.num_rows() == 0 {
824 continue;
825 }
826 let location_array = Self::get_string_column(&batch, "location")?;
827 for i in 0..location_array.len() {
828 locations.insert(location_array.value(i).to_string());
829 }
830 }
831
832 Ok(locations)
833 }
834
835 async fn insert_into_manifest(
837 &self,
838 object_id: String,
839 object_type: ObjectType,
840 location: Option<String>,
841 ) -> Result<()> {
842 self.insert_into_manifest_with_metadata(
843 vec![ManifestEntry {
844 object_id,
845 object_type,
846 location,
847 metadata: None,
848 }],
849 None,
850 )
851 .await
852 }
853
854 pub async fn insert_into_manifest_with_metadata(
860 &self,
861 entries: Vec<ManifestEntry>,
862 base_objects: Option<Vec<String>>,
863 ) -> Result<()> {
864 if entries.is_empty() {
865 return Ok(());
866 }
867
868 let schema = Self::manifest_schema();
869
870 let mut object_ids = Vec::with_capacity(entries.len());
871 let mut object_types = Vec::with_capacity(entries.len());
872 let mut locations: Vec<Option<String>> = Vec::with_capacity(entries.len());
873 let mut metadatas: Vec<Option<String>> = Vec::with_capacity(entries.len());
874
875 let string_builder = StringBuilder::new();
876 let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
877 "object_id",
878 DataType::Utf8,
879 true,
880 )));
881
882 for (i, entry) in entries.iter().enumerate() {
883 object_ids.push(entry.object_id.as_str());
884 object_types.push(entry.object_type.as_str());
885 locations.push(entry.location.clone());
886 metadatas.push(entry.metadata.clone());
887
888 if i == 0 {
891 match &base_objects {
892 Some(objects) => {
893 for obj in objects {
894 list_builder.values().append_value(obj);
895 }
896 list_builder.append(true);
897 }
898 None => {
899 list_builder.append_null();
900 }
901 }
902 } else {
903 list_builder.append_null();
904 }
905 }
906
907 let base_objects_array = list_builder.finish();
908
909 let location_array: Arc<dyn Array> = Arc::new(StringArray::from(
910 locations.iter().map(|l| l.as_deref()).collect::<Vec<_>>(),
911 ));
912
913 let metadata_array: Arc<dyn Array> = Arc::new(StringArray::from(
914 metadatas.iter().map(|m| m.as_deref()).collect::<Vec<_>>(),
915 ));
916
917 let batch = RecordBatch::try_new(
918 schema.clone(),
919 vec![
920 Arc::new(StringArray::from(object_ids)),
921 Arc::new(StringArray::from(object_types.to_vec())),
922 location_array,
923 metadata_array,
924 Arc::new(base_objects_array),
925 ],
926 )
927 .map_err(|e| Error::io(format!("Failed to create manifest entries: {}", e)))?;
928
929 let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
930
931 let dataset_guard = self.manifest_dataset.get().await?;
933 let dataset_arc = Arc::new(dataset_guard.clone());
934 drop(dataset_guard); let mut merge_builder =
937 MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()]).map_err(
938 |e| {
939 Error::io_source(box_error(std::io::Error::other(format!(
940 "Failed to create merge builder: {}",
941 e
942 ))))
943 },
944 )?;
945 merge_builder.when_matched(WhenMatched::Fail);
946 merge_builder.when_not_matched(WhenNotMatched::InsertAll);
947 merge_builder.conflict_retries(5);
953 merge_builder.use_index(false);
958 if let Some(retries) = self.commit_retries {
959 merge_builder.commit_retries(retries);
960 }
961
962 let (new_dataset_arc, _merge_stats) = merge_builder
963 .try_build()
964 .map_err(|e| {
965 Error::io_source(box_error(std::io::Error::other(format!(
966 "Failed to build merge: {}",
967 e
968 ))))
969 })?
970 .execute_reader(Box::new(reader))
971 .await
972 .map_err(|e| {
973 convert_lance_commit_error(&e, "Failed to execute merge insert into manifest", None)
974 })?;
975
976 let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
977 self.manifest_dataset.set_latest(new_dataset).await;
978
979 if let Err(e) = self.run_inline_optimization().await {
981 log::warn!(
982 "Unexpected failure when running inline optimization: {:?}",
983 e
984 );
985 }
986
987 Ok(())
988 }
989
990 pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> {
992 let predicate = format!("object_id = '{}'", object_id);
993
994 let dataset_guard = self.manifest_dataset.get().await?;
996 let dataset = Arc::new(dataset_guard.clone());
997 drop(dataset_guard); let new_dataset = DeleteBuilder::new(dataset, &predicate)
1000 .execute()
1001 .await
1002 .map_err(|e| convert_lance_commit_error(&e, "Failed to delete", None))?;
1003
1004 self.manifest_dataset
1006 .set_latest(
1007 Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1008 )
1009 .await;
1010
1011 if let Err(e) = self.run_inline_optimization().await {
1013 log::warn!(
1014 "Unexpected failure when running inline optimization: {:?}",
1015 e
1016 );
1017 }
1018
1019 Ok(())
1020 }
1021
1022 pub async fn query_table_versions(
1032 &self,
1033 object_id: &str,
1034 descending: bool,
1035 limit: Option<i32>,
1036 ) -> Result<Vec<(i64, String)>> {
1037 let escaped_id = object_id.replace('\'', "''");
1038 let filter = format!(
1040 "object_type = 'table_version' AND starts_with(object_id, '{}{}')",
1041 escaped_id, DELIMITER
1042 );
1043 let mut scanner = self.manifest_scanner().await?;
1044 scanner.filter(&filter).map_err(|e| {
1045 Error::io_source(box_error(std::io::Error::other(format!(
1046 "Failed to filter: {}",
1047 e
1048 ))))
1049 })?;
1050 scanner.project(&["object_id", "metadata"]).map_err(|e| {
1051 Error::io_source(box_error(std::io::Error::other(format!(
1052 "Failed to project: {}",
1053 e
1054 ))))
1055 })?;
1056 let batches = Self::execute_scanner(scanner).await?;
1057
1058 let mut versions: Vec<(i64, String)> = Vec::new();
1059 for batch in batches {
1060 if batch.num_rows() == 0 {
1061 continue;
1062 }
1063 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1064 let metadata_array = Self::get_string_column(&batch, "metadata")?;
1065 for i in 0..batch.num_rows() {
1066 let oid = object_id_array.value(i);
1067 if let Some(version) = Self::parse_version_from_object_id(oid) {
1069 let metadata_str = metadata_array.value(i).to_string();
1070 versions.push((version, metadata_str));
1071 }
1072 }
1073 }
1074
1075 if descending {
1076 versions.sort_by(|a, b| b.0.cmp(&a.0));
1077 } else {
1078 versions.sort_by(|a, b| a.0.cmp(&b.0));
1079 }
1080
1081 if let Some(limit) = limit {
1082 versions.truncate(limit as usize);
1083 }
1084
1085 Ok(versions)
1086 }
1087
1088 pub async fn query_table_version(
1094 &self,
1095 object_id: &str,
1096 version: i64,
1097 ) -> Result<Option<String>> {
1098 let version_object_id = Self::build_version_object_id(object_id, version);
1099 self.query_table_version_by_object_id(&version_object_id)
1100 .await
1101 }
1102
1103 async fn query_table_version_by_object_id(
1105 &self,
1106 version_object_id: &str,
1107 ) -> Result<Option<String>> {
1108 let escaped_id = version_object_id.replace('\'', "''");
1109 let filter = format!(
1110 "object_id = '{}' AND object_type = 'table_version'",
1111 escaped_id
1112 );
1113 let mut scanner = self.manifest_scanner().await?;
1114 scanner.filter(&filter).map_err(|e| {
1115 Error::io_source(box_error(std::io::Error::other(format!(
1116 "Failed to filter: {}",
1117 e
1118 ))))
1119 })?;
1120 scanner.project(&["metadata"]).map_err(|e| {
1121 Error::io_source(box_error(std::io::Error::other(format!(
1122 "Failed to project: {}",
1123 e
1124 ))))
1125 })?;
1126 let batches = Self::execute_scanner(scanner).await?;
1127
1128 for batch in batches {
1129 if batch.num_rows() == 0 {
1130 continue;
1131 }
1132 let metadata_array = Self::get_string_column(&batch, "metadata")?;
1133 return Ok(Some(metadata_array.value(0).to_string()));
1134 }
1135
1136 Ok(None)
1137 }
1138
1139 pub async fn delete_table_versions(
1148 &self,
1149 object_id: &str,
1150 ranges: &[(i64, i64)],
1151 ) -> Result<i64> {
1152 if ranges.is_empty() {
1153 return Ok(0);
1154 }
1155
1156 let mut object_id_conditions: Vec<String> = Vec::new();
1158 for (start, end) in ranges {
1159 for version in *start..=*end {
1160 let oid = Self::build_version_object_id(object_id, version);
1161 let escaped = oid.replace('\'', "''");
1162 object_id_conditions.push(format!("'{}'", escaped));
1163 }
1164 }
1165
1166 if object_id_conditions.is_empty() {
1167 return Ok(0);
1168 }
1169
1170 let in_list = object_id_conditions.join(", ");
1172 let filter = format!(
1173 "object_type = 'table_version' AND object_id IN ({})",
1174 in_list
1175 );
1176
1177 let mut scanner = self.manifest_scanner().await?;
1178 scanner.filter(&filter).map_err(|e| {
1179 Error::io_source(box_error(std::io::Error::other(format!(
1180 "Failed to filter: {}",
1181 e
1182 ))))
1183 })?;
1184 scanner.project(&["object_id"]).map_err(|e| {
1185 Error::io_source(box_error(std::io::Error::other(format!(
1186 "Failed to project: {}",
1187 e
1188 ))))
1189 })?;
1190 let batches = Self::execute_scanner(scanner).await?;
1191 let deleted_count: i64 = batches.iter().map(|b| b.num_rows() as i64).sum();
1192
1193 if deleted_count == 0 {
1194 return Ok(0);
1195 }
1196
1197 let dataset_guard = self.manifest_dataset.get().await?;
1199 let dataset = Arc::new(dataset_guard.clone());
1200 drop(dataset_guard);
1201
1202 let new_dataset = DeleteBuilder::new(dataset, &filter)
1203 .execute()
1204 .await
1205 .map_err(|e| {
1206 convert_lance_commit_error(&e, "Failed to batch delete table versions", None)
1207 })?;
1208
1209 self.manifest_dataset
1210 .set_latest(
1211 Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1212 )
1213 .await;
1214
1215 if let Err(e) = self.run_inline_optimization().await {
1216 log::warn!(
1217 "Unexpected failure when running inline optimization: {:?}",
1218 e
1219 );
1220 }
1221
1222 Ok(deleted_count)
1223 }
1224
1225 pub async fn batch_delete_table_versions_by_object_ids(
1233 &self,
1234 object_ids: &[String],
1235 ) -> Result<i64> {
1236 if object_ids.is_empty() {
1237 return Ok(0);
1238 }
1239
1240 let in_list: String = object_ids
1241 .iter()
1242 .map(|oid| {
1243 let escaped = oid.replace('\'', "''");
1244 format!("'{}'", escaped)
1245 })
1246 .collect::<Vec<_>>()
1247 .join(", ");
1248
1249 let filter = format!(
1250 "object_type = 'table_version' AND object_id IN ({})",
1251 in_list
1252 );
1253
1254 let mut scanner = self.manifest_scanner().await?;
1256 scanner.filter(&filter).map_err(|e| {
1257 Error::io_source(box_error(std::io::Error::other(format!(
1258 "Failed to filter: {}",
1259 e
1260 ))))
1261 })?;
1262 scanner.project(&["object_id"]).map_err(|e| {
1263 Error::io_source(box_error(std::io::Error::other(format!(
1264 "Failed to project: {}",
1265 e
1266 ))))
1267 })?;
1268 let batches = Self::execute_scanner(scanner).await?;
1269 let deleted_count: i64 = batches.iter().map(|b| b.num_rows() as i64).sum();
1270
1271 if deleted_count == 0 {
1272 return Ok(0);
1273 }
1274
1275 let dataset_guard = self.manifest_dataset.get().await?;
1277 let dataset = Arc::new(dataset_guard.clone());
1278 drop(dataset_guard);
1279
1280 let new_dataset = DeleteBuilder::new(dataset, &filter)
1281 .execute()
1282 .await
1283 .map_err(|e| {
1284 convert_lance_commit_error(
1285 &e,
1286 "Failed to batch delete table versions across multiple tables",
1287 None,
1288 )
1289 })?;
1290
1291 self.manifest_dataset
1292 .set_latest(
1293 Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
1294 )
1295 .await;
1296
1297 if let Err(e) = self.run_inline_optimization().await {
1298 log::warn!(
1299 "Unexpected failure when running inline optimization: {:?}",
1300 e
1301 );
1302 }
1303
1304 Ok(deleted_count)
1305 }
1306
1307 pub async fn set_property(&self, name: &str, value: &str) -> Result<()> {
1313 let dataset_guard = self.manifest_dataset.get().await?;
1314 if dataset_guard.metadata().get(name) == Some(&value.to_string()) {
1315 return Ok(());
1316 }
1317 drop(dataset_guard);
1318
1319 let mut dataset_guard = self.manifest_dataset.get_mut().await?;
1320 dataset_guard
1321 .update_metadata([(name, value)])
1322 .await
1323 .map_err(|e| {
1324 Error::io_source(box_error(std::io::Error::other(format!(
1325 "Failed to set property '{}' in __manifest metadata: {}",
1326 name, e
1327 ))))
1328 })?;
1329 Ok(())
1330 }
1331
1332 pub async fn has_property(&self, name: &str) -> Result<bool> {
1334 let dataset_guard = self.manifest_dataset.get().await?;
1335 Ok(dataset_guard.metadata().contains_key(name))
1336 }
1337
1338 fn parse_table_version(version: i64, metadata_str: &str) -> Option<TableVersion> {
1342 let meta: serde_json::Value = match serde_json::from_str(metadata_str) {
1343 Ok(v) => v,
1344 Err(e) => {
1345 log::warn!(
1346 "Skipping version {} due to invalid metadata JSON: {}",
1347 version,
1348 e
1349 );
1350 return None;
1351 }
1352 };
1353 let manifest_path = match meta.get("manifest_path").and_then(|v| v.as_str()) {
1354 Some(p) => p.to_string(),
1355 None => {
1356 log::warn!(
1357 "Skipping version {} due to missing 'manifest_path' in metadata — \
1358 this may indicate data corruption",
1359 version
1360 );
1361 return None;
1362 }
1363 };
1364 let manifest_size = meta.get("manifest_size").and_then(|v| v.as_i64());
1365 let e_tag = meta
1366 .get("e_tag")
1367 .and_then(|v| v.as_str())
1368 .map(|s| s.to_string());
1369 Some(TableVersion {
1370 version,
1371 manifest_path,
1372 manifest_size,
1373 e_tag,
1374 timestamp_millis: None,
1375 metadata: None,
1376 })
1377 }
1378
1379 pub async fn list_table_versions(
1384 &self,
1385 table_id: &[String],
1386 descending: bool,
1387 limit: Option<i32>,
1388 ) -> Result<ListTableVersionsResponse> {
1389 let object_id = Self::str_object_id(table_id);
1390 let manifest_versions = self
1391 .query_table_versions(&object_id, descending, limit)
1392 .await?;
1393
1394 let table_versions: Vec<TableVersion> = manifest_versions
1395 .into_iter()
1396 .filter_map(|(version, metadata_str)| Self::parse_table_version(version, &metadata_str))
1397 .collect();
1398
1399 Ok(ListTableVersionsResponse {
1400 versions: table_versions,
1401 page_token: None,
1402 })
1403 }
1404
1405 pub async fn describe_table_version(
1410 &self,
1411 table_id: &[String],
1412 version: i64,
1413 ) -> Result<DescribeTableVersionResponse> {
1414 let object_id = Self::str_object_id(table_id);
1415 if let Some(metadata_str) = self.query_table_version(&object_id, version).await?
1416 && let Some(tv) = Self::parse_table_version(version, &metadata_str)
1417 {
1418 return Ok(DescribeTableVersionResponse {
1419 version: Box::new(tv),
1420 });
1421 }
1422 Err(Error::namespace_source(
1423 format!(
1424 "Version {} not found in manifest for table {:?}",
1425 version, table_id
1426 )
1427 .into(),
1428 ))
1429 }
1430
1431 pub async fn register_table(&self, name: &str, location: String) -> Result<()> {
1433 let object_id = Self::build_object_id(&[], name);
1434 if self.manifest_contains_object(&object_id).await? {
1435 return Err(Error::io(format!("Table '{}' already exists", name)));
1436 }
1437
1438 self.insert_into_manifest(object_id, ObjectType::Table, Some(location))
1439 .await
1440 }
1441
1442 async fn validate_namespace_levels_exist(&self, namespace_path: &[String]) -> Result<()> {
1444 for i in 1..=namespace_path.len() {
1445 let partial_path = &namespace_path[..i];
1446 let object_id = partial_path.join(DELIMITER);
1447 if !self.manifest_contains_object(&object_id).await? {
1448 return Err(Error::namespace_source(
1449 format!("Parent namespace '{}' does not exist", object_id).into(),
1450 ));
1451 }
1452 }
1453 Ok(())
1454 }
1455
1456 async fn query_manifest_for_namespace(&self, object_id: &str) -> Result<Option<NamespaceInfo>> {
1458 let escaped_id = object_id.replace('\'', "''");
1459 let filter = format!("object_id = '{}' AND object_type = 'namespace'", escaped_id);
1460 let mut scanner = self.manifest_scanner().await?;
1461 scanner.filter(&filter).map_err(|e| {
1462 Error::io_source(box_error(std::io::Error::other(format!(
1463 "Failed to filter: {}",
1464 e
1465 ))))
1466 })?;
1467 scanner.project(&["object_id", "metadata"]).map_err(|e| {
1468 Error::io_source(box_error(std::io::Error::other(format!(
1469 "Failed to project: {}",
1470 e
1471 ))))
1472 })?;
1473 let batches = Self::execute_scanner(scanner).await?;
1474
1475 let mut found_result: Option<NamespaceInfo> = None;
1476 let mut total_rows = 0;
1477
1478 for batch in batches {
1479 if batch.num_rows() == 0 {
1480 continue;
1481 }
1482
1483 total_rows += batch.num_rows();
1484 if total_rows > 1 {
1485 return Err(Error::io(format!(
1486 "Expected exactly 1 namespace with id '{}', found {}",
1487 object_id, total_rows
1488 )));
1489 }
1490
1491 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1492 let metadata_array = Self::get_string_column(&batch, "metadata")?;
1493
1494 let object_id_str = object_id_array.value(0);
1495 let metadata = if !metadata_array.is_null(0) {
1496 let metadata_str = metadata_array.value(0);
1497 match serde_json::from_str::<HashMap<String, String>>(metadata_str) {
1498 Ok(map) => Some(map),
1499 Err(e) => {
1500 return Err(Error::io(format!(
1501 "Failed to deserialize metadata for namespace '{}': {}",
1502 object_id, e
1503 )));
1504 }
1505 }
1506 } else {
1507 None
1508 };
1509
1510 let (namespace, name) = Self::parse_object_id(object_id_str);
1511 found_result = Some(NamespaceInfo {
1512 namespace,
1513 name,
1514 metadata,
1515 });
1516 }
1517
1518 Ok(found_result)
1519 }
1520
1521 async fn ensure_manifest_table_up_to_date(
1529 root: &str,
1530 storage_options: &Option<HashMap<String, String>>,
1531 session: Option<Arc<Session>>,
1532 table_version_storage_enabled: bool,
1533 ) -> Result<DatasetConsistencyWrapper> {
1534 let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME);
1535 log::debug!("Attempting to load manifest from {}", manifest_path);
1536 let store_options = ObjectStoreParams {
1537 storage_options_accessor: storage_options.as_ref().map(|opts| {
1538 Arc::new(
1539 lance_io::object_store::StorageOptionsAccessor::with_static_options(
1540 opts.clone(),
1541 ),
1542 )
1543 }),
1544 ..Default::default()
1545 };
1546 let read_params = ReadParams {
1547 session: session.clone(),
1548 store_options: Some(store_options.clone()),
1549 ..Default::default()
1550 };
1551 let dataset_result = DatasetBuilder::from_uri(&manifest_path)
1552 .with_read_params(read_params)
1553 .load()
1554 .await;
1555 if let Ok(mut dataset) = dataset_result {
1556 let needs_pk_migration = dataset
1558 .schema()
1559 .field("object_id")
1560 .map(|f| {
1561 !f.metadata
1562 .contains_key(LANCE_UNENFORCED_PRIMARY_KEY_POSITION)
1563 })
1564 .unwrap_or(false);
1565
1566 if needs_pk_migration {
1567 log::info!("Migrating __manifest table to add primary key metadata on object_id");
1568 dataset
1569 .update_field_metadata()
1570 .update("object_id", [(LANCE_UNENFORCED_PRIMARY_KEY_POSITION, "0")])
1571 .map_err(|e| {
1572 Error::io_source(box_error(std::io::Error::other(format!(
1573 "Failed to find object_id field for migration: {}",
1574 e
1575 ))))
1576 })?
1577 .await
1578 .map_err(|e| {
1579 Error::io_source(box_error(std::io::Error::other(format!(
1580 "Failed to migrate primary key metadata: {}",
1581 e
1582 ))))
1583 })?;
1584 }
1585
1586 if table_version_storage_enabled {
1589 let needs_flag = dataset
1590 .metadata()
1591 .get("table_version_storage_enabled")
1592 .map(|v| v != "true")
1593 .unwrap_or(true);
1594
1595 if needs_flag
1596 && let Err(e) = dataset
1597 .update_metadata([("table_version_storage_enabled", "true")])
1598 .await
1599 {
1600 log::warn!(
1601 "Failed to persist table_version_storage_enabled flag in __manifest: {:?}",
1602 e
1603 );
1604 }
1605 }
1606
1607 Ok(DatasetConsistencyWrapper::new(dataset))
1608 } else {
1609 log::info!("Creating new manifest table at {}", manifest_path);
1610 let schema = Self::manifest_schema();
1611 let empty_batch = RecordBatch::new_empty(schema.clone());
1612 let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone());
1613
1614 let store_params = ObjectStoreParams {
1615 storage_options_accessor: storage_options.as_ref().map(|opts| {
1616 Arc::new(
1617 lance_io::object_store::StorageOptionsAccessor::with_static_options(
1618 opts.clone(),
1619 ),
1620 )
1621 }),
1622 ..Default::default()
1623 };
1624 let write_params = WriteParams {
1625 session: session.clone(),
1626 store_params: Some(store_params),
1627 ..Default::default()
1628 };
1629
1630 let dataset =
1631 Dataset::write(Box::new(reader), &manifest_path, Some(write_params)).await;
1632
1633 match dataset {
1635 Ok(dataset) => {
1636 log::info!(
1637 "Successfully created manifest table at {}, version={}, uri={}",
1638 manifest_path,
1639 dataset.version().version,
1640 dataset.uri()
1641 );
1642 Ok(DatasetConsistencyWrapper::new(dataset))
1643 }
1644 Err(ref e)
1645 if matches!(
1646 e,
1647 LanceError::DatasetAlreadyExists { .. }
1648 | LanceError::CommitConflict { .. }
1649 | LanceError::IncompatibleTransaction { .. }
1650 | LanceError::RetryableCommitConflict { .. }
1651 ) =>
1652 {
1653 log::info!(
1655 "Manifest table was created by another process, loading it: {}",
1656 manifest_path
1657 );
1658 let recovery_store_options = ObjectStoreParams {
1659 storage_options_accessor: storage_options.as_ref().map(|opts| {
1660 Arc::new(
1661 lance_io::object_store::StorageOptionsAccessor::with_static_options(
1662 opts.clone(),
1663 ),
1664 )
1665 }),
1666 ..Default::default()
1667 };
1668 let recovery_read_params = ReadParams {
1669 session,
1670 store_options: Some(recovery_store_options),
1671 ..Default::default()
1672 };
1673 let dataset = DatasetBuilder::from_uri(&manifest_path)
1674 .with_read_params(recovery_read_params)
1675 .load()
1676 .await
1677 .map_err(|e| {
1678 Error::io_source(box_error(std::io::Error::other(format!(
1679 "Failed to load manifest dataset after creation conflict: {}",
1680 e
1681 ))))
1682 })?;
1683 Ok(DatasetConsistencyWrapper::new(dataset))
1684 }
1685 Err(e) => Err(Error::io_source(box_error(std::io::Error::other(format!(
1686 "Failed to create manifest dataset: {}",
1687 e
1688 ))))),
1689 }
1690 }
1691 }
1692}
1693
1694#[async_trait]
1695impl LanceNamespace for ManifestNamespace {
1696 fn namespace_id(&self) -> String {
1697 self.root.clone()
1698 }
1699
1700 async fn list_tables(&self, request: ListTablesRequest) -> Result<ListTablesResponse> {
1701 let namespace_id = request
1702 .id
1703 .as_ref()
1704 .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
1705
1706 let filter = if namespace_id.is_empty() {
1708 "object_type = 'table' AND NOT contains(object_id, '$')".to_string()
1710 } else {
1711 let prefix = namespace_id.join(DELIMITER);
1713 format!(
1714 "object_type = 'table' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
1715 prefix,
1716 DELIMITER,
1717 prefix.len() + 2
1718 )
1719 };
1720
1721 let mut scanner = self.manifest_scanner().await?;
1722 scanner.filter(&filter).map_err(|e| {
1723 Error::io_source(box_error(std::io::Error::other(format!(
1724 "Failed to filter: {}",
1725 e
1726 ))))
1727 })?;
1728 scanner.project(&["object_id"]).map_err(|e| {
1729 Error::io_source(box_error(std::io::Error::other(format!(
1730 "Failed to project: {}",
1731 e
1732 ))))
1733 })?;
1734
1735 let batches = Self::execute_scanner(scanner).await?;
1736
1737 let mut tables = Vec::new();
1738 for batch in batches {
1739 if batch.num_rows() == 0 {
1740 continue;
1741 }
1742
1743 let object_id_array = Self::get_string_column(&batch, "object_id")?;
1744 for i in 0..batch.num_rows() {
1745 let object_id = object_id_array.value(i);
1746 let (_namespace, name) = Self::parse_object_id(object_id);
1747 tables.push(name);
1748 }
1749 }
1750
1751 Ok(ListTablesResponse::new(tables))
1752 }
1753
1754 async fn describe_table(&self, request: DescribeTableRequest) -> Result<DescribeTableResponse> {
1755 let table_id = request
1756 .id
1757 .as_ref()
1758 .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1759
1760 if table_id.is_empty() {
1761 return Err(Error::invalid_input_source(
1762 "Table ID cannot be empty".into(),
1763 ));
1764 }
1765
1766 let object_id = Self::str_object_id(table_id);
1767 let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
1768
1769 let table_name = table_id.last().cloned().unwrap_or_default();
1771 let namespace_id: Vec<String> = if table_id.len() > 1 {
1772 table_id[..table_id.len() - 1].to_vec()
1773 } else {
1774 vec![]
1775 };
1776
1777 let load_detailed_metadata = request.load_detailed_metadata.unwrap_or(false);
1778 let vend_credentials = request.vend_credentials.unwrap_or(true);
1780
1781 match table_info {
1782 Some(info) => {
1783 let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1785
1786 let storage_options = if vend_credentials {
1787 self.storage_options.clone()
1788 } else {
1789 None
1790 };
1791
1792 if !load_detailed_metadata {
1794 return Ok(DescribeTableResponse {
1795 table: Some(table_name),
1796 namespace: Some(namespace_id),
1797 location: Some(table_uri.clone()),
1798 table_uri: Some(table_uri),
1799 storage_options,
1800 ..Default::default()
1801 });
1802 }
1803
1804 match Dataset::open(&table_uri).await {
1806 Ok(mut dataset) => {
1807 if let Some(requested_version) = request.version {
1809 dataset = dataset.checkout_version(requested_version as u64).await?;
1810 }
1811
1812 let version = dataset.version().version;
1813 let lance_schema = dataset.schema();
1814 let arrow_schema: arrow_schema::Schema = lance_schema.into();
1815 let json_schema = arrow_schema_to_json(&arrow_schema)?;
1816
1817 Ok(DescribeTableResponse {
1818 table: Some(table_name.clone()),
1819 namespace: Some(namespace_id.clone()),
1820 version: Some(version as i64),
1821 location: Some(table_uri.clone()),
1822 table_uri: Some(table_uri),
1823 schema: Some(Box::new(json_schema)),
1824 storage_options,
1825 ..Default::default()
1826 })
1827 }
1828 Err(_) => {
1829 Ok(DescribeTableResponse {
1831 table: Some(table_name),
1832 namespace: Some(namespace_id),
1833 location: Some(table_uri.clone()),
1834 table_uri: Some(table_uri),
1835 storage_options,
1836 ..Default::default()
1837 })
1838 }
1839 }
1840 }
1841 None => Err(Error::namespace_source(
1842 format!("Table '{}' not found", object_id).into(),
1843 )),
1844 }
1845 }
1846
1847 async fn table_exists(&self, request: TableExistsRequest) -> Result<()> {
1848 let table_id = request
1849 .id
1850 .as_ref()
1851 .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1852
1853 if table_id.is_empty() {
1854 return Err(Error::invalid_input_source(
1855 "Table ID cannot be empty".into(),
1856 ));
1857 }
1858
1859 let (namespace, table_name) = Self::split_object_id(table_id);
1860 let object_id = Self::build_object_id(&namespace, &table_name);
1861 let exists = self.manifest_contains_object(&object_id).await?;
1862 if exists {
1863 Ok(())
1864 } else {
1865 Err(Error::namespace_source(
1866 format!("Table '{}' not found", table_name).into(),
1867 ))
1868 }
1869 }
1870
1871 async fn create_table(
1872 &self,
1873 request: CreateTableRequest,
1874 data: Bytes,
1875 ) -> Result<CreateTableResponse> {
1876 let table_id = request
1877 .id
1878 .as_ref()
1879 .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1880
1881 if table_id.is_empty() {
1882 return Err(Error::invalid_input_source(
1883 "Table ID cannot be empty".into(),
1884 ));
1885 }
1886
1887 let (namespace, table_name) = Self::split_object_id(table_id);
1888 let object_id = Self::build_object_id(&namespace, &table_name);
1889
1890 if self.manifest_contains_object(&object_id).await? {
1892 return Err(Error::io(format!("Table '{}' already exists", table_name)));
1893 }
1894
1895 let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
1899 format!("{}.lance", table_name)
1901 } else {
1902 Self::generate_dir_name(&object_id)
1904 };
1905 let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
1906
1907 if data.is_empty() {
1909 return Err(Error::namespace_source(
1910 "Request data (Arrow IPC stream) is required for create_table".into(),
1911 ));
1912 }
1913
1914 let cursor = Cursor::new(data.to_vec());
1916 let stream_reader = StreamReader::try_new(cursor, None)
1917 .map_err(|e| Error::io(format!("Failed to read IPC stream: {}", e)))?;
1918
1919 let batches: Vec<RecordBatch> =
1920 stream_reader
1921 .collect::<std::result::Result<Vec<_>, _>>()
1922 .map_err(|e| Error::io(format!("Failed to collect batches: {}", e)))?;
1923
1924 if batches.is_empty() {
1925 return Err(Error::io("No data provided for table creation"));
1926 }
1927
1928 let schema = batches[0].schema();
1929 let batch_results: Vec<std::result::Result<RecordBatch, arrow_schema::ArrowError>> =
1930 batches.into_iter().map(Ok).collect();
1931 let reader = RecordBatchIterator::new(batch_results, schema);
1932
1933 let store_params = ObjectStoreParams {
1934 storage_options_accessor: self.storage_options.as_ref().map(|opts| {
1935 Arc::new(
1936 lance_io::object_store::StorageOptionsAccessor::with_static_options(
1937 opts.clone(),
1938 ),
1939 )
1940 }),
1941 ..Default::default()
1942 };
1943 let write_params = WriteParams {
1944 session: self.session.clone(),
1945 store_params: Some(store_params),
1946 ..Default::default()
1947 };
1948 let _dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params))
1949 .await
1950 .map_err(|e| {
1951 Error::io_source(box_error(std::io::Error::other(format!(
1952 "Failed to write dataset: {}",
1953 e
1954 ))))
1955 })?;
1956
1957 self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
1959 .await?;
1960
1961 Ok(CreateTableResponse {
1962 version: Some(1),
1963 location: Some(table_uri),
1964 storage_options: self.storage_options.clone(),
1965 ..Default::default()
1966 })
1967 }
1968
1969 async fn drop_table(&self, request: DropTableRequest) -> Result<DropTableResponse> {
1970 let table_id = request
1971 .id
1972 .as_ref()
1973 .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
1974
1975 if table_id.is_empty() {
1976 return Err(Error::invalid_input_source(
1977 "Table ID cannot be empty".into(),
1978 ));
1979 }
1980
1981 let (namespace, table_name) = Self::split_object_id(table_id);
1982 let object_id = Self::build_object_id(&namespace, &table_name);
1983
1984 let table_info = self.query_manifest_for_table(&object_id).boxed().await?;
1986
1987 match table_info {
1988 Some(info) => {
1989 self.delete_from_manifest(&object_id).boxed().await?;
1991
1992 let table_path = self.base_path.child(info.location.as_str());
1994 let table_uri = Self::construct_full_uri(&self.root, &info.location)?;
1995
1996 self.object_store
1998 .remove_dir_all(table_path)
1999 .boxed()
2000 .await
2001 .map_err(|e| {
2002 Error::namespace_source(
2003 format!("Failed to delete table directory: {}", e).into(),
2004 )
2005 })?;
2006
2007 Ok(DropTableResponse {
2008 id: request.id.clone(),
2009 location: Some(table_uri),
2010 ..Default::default()
2011 })
2012 }
2013 None => Err(Error::namespace_source(
2014 format!("Table '{}' not found", table_name).into(),
2015 )),
2016 }
2017 }
2018
2019 async fn list_namespaces(
2020 &self,
2021 request: ListNamespacesRequest,
2022 ) -> Result<ListNamespacesResponse> {
2023 let parent_namespace = request
2024 .id
2025 .as_ref()
2026 .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
2027
2028 let filter = if parent_namespace.is_empty() {
2030 "object_type = 'namespace' AND NOT contains(object_id, '$')".to_string()
2032 } else {
2033 let prefix = parent_namespace.join(DELIMITER);
2035 format!(
2036 "object_type = 'namespace' AND starts_with(object_id, '{}{}') AND NOT contains(substring(object_id, {}), '$')",
2037 prefix,
2038 DELIMITER,
2039 prefix.len() + 2
2040 )
2041 };
2042
2043 let mut scanner = self.manifest_scanner().await?;
2044 scanner.filter(&filter).map_err(|e| {
2045 Error::io_source(box_error(std::io::Error::other(format!(
2046 "Failed to filter: {}",
2047 e
2048 ))))
2049 })?;
2050 scanner.project(&["object_id"]).map_err(|e| {
2051 Error::io_source(box_error(std::io::Error::other(format!(
2052 "Failed to project: {}",
2053 e
2054 ))))
2055 })?;
2056
2057 let batches = Self::execute_scanner(scanner).await?;
2058 let mut namespaces = Vec::new();
2059
2060 for batch in batches {
2061 if batch.num_rows() == 0 {
2062 continue;
2063 }
2064
2065 let object_id_array = Self::get_string_column(&batch, "object_id")?;
2066 for i in 0..batch.num_rows() {
2067 let object_id = object_id_array.value(i);
2068 let (_namespace, name) = Self::parse_object_id(object_id);
2069 namespaces.push(name);
2070 }
2071 }
2072
2073 Ok(ListNamespacesResponse::new(namespaces))
2074 }
2075
2076 async fn describe_namespace(
2077 &self,
2078 request: DescribeNamespaceRequest,
2079 ) -> Result<DescribeNamespaceResponse> {
2080 let namespace_id = request
2081 .id
2082 .as_ref()
2083 .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
2084
2085 if namespace_id.is_empty() {
2087 #[allow(clippy::needless_update)]
2088 return Ok(DescribeNamespaceResponse {
2089 properties: Some(HashMap::new()),
2090 ..Default::default()
2091 });
2092 }
2093
2094 let object_id = namespace_id.join(DELIMITER);
2096 let namespace_info = self.query_manifest_for_namespace(&object_id).await?;
2097
2098 match namespace_info {
2099 #[allow(clippy::needless_update)]
2100 Some(info) => Ok(DescribeNamespaceResponse {
2101 properties: info.metadata,
2102 ..Default::default()
2103 }),
2104 None => Err(Error::namespace_source(
2105 format!("Namespace '{}' not found", object_id).into(),
2106 )),
2107 }
2108 }
2109
2110 async fn create_namespace(
2111 &self,
2112 request: CreateNamespaceRequest,
2113 ) -> Result<CreateNamespaceResponse> {
2114 let namespace_id = request
2115 .id
2116 .as_ref()
2117 .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
2118
2119 if namespace_id.is_empty() {
2121 return Err(Error::namespace_source(
2122 "Root namespace already exists and cannot be created".into(),
2123 ));
2124 }
2125
2126 if namespace_id.len() > 1 {
2128 self.validate_namespace_levels_exist(&namespace_id[..namespace_id.len() - 1])
2129 .await?;
2130 }
2131
2132 let object_id = namespace_id.join(DELIMITER);
2133 if self.manifest_contains_object(&object_id).await? {
2134 return Err(Error::namespace_source(
2135 format!("Namespace '{}' already exists", object_id).into(),
2136 ));
2137 }
2138
2139 let metadata = request.properties.as_ref().and_then(|props| {
2141 if props.is_empty() {
2142 None
2143 } else {
2144 Some(serde_json::to_string(props).ok()?)
2145 }
2146 });
2147
2148 self.insert_into_manifest_with_metadata(
2149 vec![ManifestEntry {
2150 object_id,
2151 object_type: ObjectType::Namespace,
2152 location: None,
2153 metadata,
2154 }],
2155 None,
2156 )
2157 .await?;
2158
2159 Ok(CreateNamespaceResponse {
2160 properties: request.properties,
2161 ..Default::default()
2162 })
2163 }
2164
2165 async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<DropNamespaceResponse> {
2166 let namespace_id = request
2167 .id
2168 .as_ref()
2169 .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
2170
2171 if namespace_id.is_empty() {
2173 return Err(Error::namespace_source(
2174 "Root namespace cannot be dropped".into(),
2175 ));
2176 }
2177
2178 let object_id = namespace_id.join(DELIMITER);
2179
2180 if !self.manifest_contains_object(&object_id).boxed().await? {
2182 return Err(Error::namespace_source(
2183 format!("Namespace '{}' not found", object_id).into(),
2184 ));
2185 }
2186
2187 let escaped_id = object_id.replace('\'', "''");
2189 let prefix = format!("{}{}", escaped_id, DELIMITER);
2190 let filter = format!("starts_with(object_id, '{}')", prefix);
2191 let mut scanner = self.manifest_scanner().boxed().await?;
2192 scanner.filter(&filter).map_err(|e| {
2193 Error::io_source(box_error(std::io::Error::other(format!(
2194 "Failed to filter: {}",
2195 e
2196 ))))
2197 })?;
2198 scanner.project::<&str>(&[]).map_err(|e| {
2199 Error::io_source(box_error(std::io::Error::other(format!(
2200 "Failed to project: {}",
2201 e
2202 ))))
2203 })?;
2204 scanner.with_row_id();
2205 let count = scanner.count_rows().boxed().await.map_err(|e| {
2206 Error::io_source(box_error(std::io::Error::other(format!(
2207 "Failed to count rows: {}",
2208 e
2209 ))))
2210 })?;
2211
2212 if count > 0 {
2213 return Err(Error::namespace_source(
2214 format!(
2215 "Namespace '{}' is not empty (contains {} child objects)",
2216 object_id, count
2217 )
2218 .into(),
2219 ));
2220 }
2221
2222 self.delete_from_manifest(&object_id).boxed().await?;
2223
2224 Ok(DropNamespaceResponse::default())
2225 }
2226
2227 async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> {
2228 let namespace_id = request
2229 .id
2230 .as_ref()
2231 .ok_or_else(|| Error::invalid_input_source("Namespace ID is required".into()))?;
2232
2233 if namespace_id.is_empty() {
2235 return Ok(());
2236 }
2237
2238 let object_id = namespace_id.join(DELIMITER);
2239 if self.manifest_contains_object(&object_id).await? {
2240 Ok(())
2241 } else {
2242 Err(Error::namespace_source(
2243 format!("Namespace '{}' not found", object_id).into(),
2244 ))
2245 }
2246 }
2247
2248 async fn declare_table(&self, request: DeclareTableRequest) -> Result<DeclareTableResponse> {
2249 let table_id = request
2250 .id
2251 .as_ref()
2252 .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
2253
2254 if table_id.is_empty() {
2255 return Err(Error::invalid_input_source(
2256 "Table ID cannot be empty".into(),
2257 ));
2258 }
2259
2260 let (namespace, table_name) = Self::split_object_id(table_id);
2261 let object_id = Self::build_object_id(&namespace, &table_name);
2262
2263 let existing = self.query_manifest_for_table(&object_id).await?;
2265 if existing.is_some() {
2266 return Err(Error::namespace_source(
2267 format!("Table '{}' already exists", table_name).into(),
2268 ));
2269 }
2270
2271 let dir_name = if namespace.is_empty() && self.dir_listing_enabled {
2275 format!("{}.lance", table_name)
2277 } else {
2278 Self::generate_dir_name(&object_id)
2280 };
2281 let table_path = self.base_path.child(dir_name.as_str());
2282 let table_uri = Self::construct_full_uri(&self.root, &dir_name)?;
2283
2284 if let Some(req_location) = &request.location {
2286 let req_location = req_location.trim_end_matches('/');
2287 if req_location != table_uri {
2288 return Err(Error::namespace_source(
2289 format!(
2290 "Cannot declare table {} at location {}, must be at location {}",
2291 table_name, req_location, table_uri
2292 )
2293 .into(),
2294 ));
2295 }
2296 }
2297
2298 let reserved_file_path = table_path.child(".lance-reserved");
2300
2301 self.object_store
2302 .create(&reserved_file_path)
2303 .await
2304 .map_err(|e| {
2305 Error::namespace_source(
2306 format!(
2307 "Failed to create .lance-reserved file for table {}: {}",
2308 table_name, e
2309 )
2310 .into(),
2311 )
2312 })?
2313 .shutdown()
2314 .await
2315 .map_err(|e| {
2316 Error::namespace_source(
2317 format!(
2318 "Failed to finalize .lance-reserved file for table {}: {}",
2319 table_name, e
2320 )
2321 .into(),
2322 )
2323 })?;
2324
2325 self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name))
2327 .await?;
2328
2329 log::info!(
2330 "Declared table '{}' in manifest at {}",
2331 table_name,
2332 table_uri
2333 );
2334
2335 let vend_credentials = request.vend_credentials.unwrap_or(true);
2337 let storage_options = if vend_credentials {
2338 self.storage_options.clone()
2339 } else {
2340 None
2341 };
2342
2343 Ok(DeclareTableResponse {
2344 location: Some(table_uri),
2345 storage_options,
2346 ..Default::default()
2347 })
2348 }
2349
2350 async fn register_table(&self, request: RegisterTableRequest) -> Result<RegisterTableResponse> {
2351 let table_id = request
2352 .id
2353 .as_ref()
2354 .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
2355
2356 if table_id.is_empty() {
2357 return Err(Error::invalid_input_source(
2358 "Table ID cannot be empty".into(),
2359 ));
2360 }
2361
2362 let location = request.location.clone();
2363
2364 if location.contains("://") {
2367 return Err(Error::invalid_input_source(format!(
2368 "Absolute URIs are not allowed for register_table. Location must be a relative path within the root directory: {}",
2369 location
2370 ).into()));
2371 }
2372
2373 if location.starts_with('/') {
2374 return Err(Error::invalid_input_source(format!(
2375 "Absolute paths are not allowed for register_table. Location must be a relative path within the root directory: {}",
2376 location
2377 ).into()));
2378 }
2379
2380 if location.contains("..") {
2382 return Err(Error::invalid_input_source(format!(
2383 "Path traversal is not allowed. Location must be a relative path within the root directory: {}",
2384 location
2385 ).into()));
2386 }
2387
2388 let (namespace, table_name) = Self::split_object_id(table_id);
2389 let object_id = Self::build_object_id(&namespace, &table_name);
2390
2391 if !namespace.is_empty() {
2393 self.validate_namespace_levels_exist(&namespace).await?;
2394 }
2395
2396 if self.manifest_contains_object(&object_id).await? {
2398 return Err(Error::namespace_source(
2399 format!("Table '{}' already exists", object_id).into(),
2400 ));
2401 }
2402
2403 self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone()))
2405 .await?;
2406
2407 Ok(RegisterTableResponse {
2408 location: Some(location),
2409 ..Default::default()
2410 })
2411 }
2412
2413 async fn deregister_table(
2414 &self,
2415 request: DeregisterTableRequest,
2416 ) -> Result<DeregisterTableResponse> {
2417 let table_id = request
2418 .id
2419 .as_ref()
2420 .ok_or_else(|| Error::invalid_input_source("Table ID is required".into()))?;
2421
2422 if table_id.is_empty() {
2423 return Err(Error::invalid_input_source(
2424 "Table ID cannot be empty".into(),
2425 ));
2426 }
2427
2428 let (namespace, table_name) = Self::split_object_id(table_id);
2429 let object_id = Self::build_object_id(&namespace, &table_name);
2430
2431 let table_info = self.query_manifest_for_table(&object_id).await?;
2433
2434 let table_uri = match table_info {
2435 Some(info) => {
2436 self.delete_from_manifest(&object_id).boxed().await?;
2438 Self::construct_full_uri(&self.root, &info.location)?
2439 }
2440 None => {
2441 return Err(Error::namespace_source(
2442 format!("Table '{}' not found", object_id).into(),
2443 ));
2444 }
2445 };
2446
2447 Ok(DeregisterTableResponse {
2448 id: request.id.clone(),
2449 location: Some(table_uri),
2450 ..Default::default()
2451 })
2452 }
2453}
2454
2455#[cfg(test)]
2456mod tests {
2457 use crate::{DirectoryNamespaceBuilder, ManifestNamespace};
2458 use bytes::Bytes;
2459 use lance_core::utils::tempfile::TempStdDir;
2460 use lance_namespace::LanceNamespace;
2461 use lance_namespace::models::{
2462 CreateNamespaceRequest, CreateTableRequest, DescribeTableRequest, DropTableRequest,
2463 ListTablesRequest, TableExistsRequest,
2464 };
2465 use rstest::rstest;
2466
2467 fn create_test_ipc_data() -> Vec<u8> {
2468 use arrow::array::{Int32Array, StringArray};
2469 use arrow::datatypes::{DataType, Field, Schema};
2470 use arrow::ipc::writer::StreamWriter;
2471 use arrow::record_batch::RecordBatch;
2472 use std::sync::Arc;
2473
2474 let schema = Arc::new(Schema::new(vec![
2475 Field::new("id", DataType::Int32, false),
2476 Field::new("name", DataType::Utf8, false),
2477 ]));
2478
2479 let batch = RecordBatch::try_new(
2480 schema.clone(),
2481 vec![
2482 Arc::new(Int32Array::from(vec![1, 2, 3])),
2483 Arc::new(StringArray::from(vec!["a", "b", "c"])),
2484 ],
2485 )
2486 .unwrap();
2487
2488 let mut buffer = Vec::new();
2489 {
2490 let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap();
2491 writer.write(&batch).unwrap();
2492 writer.finish().unwrap();
2493 }
2494 buffer
2495 }
2496
2497 #[rstest]
2498 #[case::with_optimization(true)]
2499 #[case::without_optimization(false)]
2500 #[tokio::test]
2501 async fn test_manifest_namespace_basic_create_and_list(#[case] inline_optimization: bool) {
2502 let temp_dir = TempStdDir::default();
2503 let temp_path = temp_dir.to_str().unwrap();
2504
2505 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2507 .inline_optimization_enabled(inline_optimization)
2508 .build()
2509 .await
2510 .unwrap();
2511
2512 let mut request = ListTablesRequest::new();
2514 request.id = Some(vec![]);
2515 let response = dir_namespace.list_tables(request).await.unwrap();
2516 assert_eq!(response.tables.len(), 0);
2517
2518 let buffer = create_test_ipc_data();
2520 let mut create_request = CreateTableRequest::new();
2521 create_request.id = Some(vec!["test_table".to_string()]);
2522
2523 let _response = dir_namespace
2524 .create_table(create_request, Bytes::from(buffer))
2525 .await
2526 .unwrap();
2527
2528 let mut request = ListTablesRequest::new();
2530 request.id = Some(vec![]);
2531 let response = dir_namespace.list_tables(request).await.unwrap();
2532 assert_eq!(response.tables.len(), 1);
2533 assert_eq!(response.tables[0], "test_table");
2534 }
2535
2536 #[rstest]
2537 #[case::with_optimization(true)]
2538 #[case::without_optimization(false)]
2539 #[tokio::test]
2540 async fn test_manifest_namespace_table_exists(#[case] inline_optimization: bool) {
2541 let temp_dir = TempStdDir::default();
2542 let temp_path = temp_dir.to_str().unwrap();
2543
2544 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2545 .inline_optimization_enabled(inline_optimization)
2546 .build()
2547 .await
2548 .unwrap();
2549
2550 let mut request = TableExistsRequest::new();
2552 request.id = Some(vec!["nonexistent".to_string()]);
2553 let result = dir_namespace.table_exists(request).await;
2554 assert!(result.is_err());
2555
2556 let buffer = create_test_ipc_data();
2558 let mut create_request = CreateTableRequest::new();
2559 create_request.id = Some(vec!["test_table".to_string()]);
2560 dir_namespace
2561 .create_table(create_request, Bytes::from(buffer))
2562 .await
2563 .unwrap();
2564
2565 let mut request = TableExistsRequest::new();
2567 request.id = Some(vec!["test_table".to_string()]);
2568 let result = dir_namespace.table_exists(request).await;
2569 assert!(result.is_ok());
2570 }
2571
2572 #[rstest]
2573 #[case::with_optimization(true)]
2574 #[case::without_optimization(false)]
2575 #[tokio::test]
2576 async fn test_manifest_namespace_describe_table(#[case] inline_optimization: bool) {
2577 let temp_dir = TempStdDir::default();
2578 let temp_path = temp_dir.to_str().unwrap();
2579
2580 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2581 .inline_optimization_enabled(inline_optimization)
2582 .build()
2583 .await
2584 .unwrap();
2585
2586 let mut request = DescribeTableRequest::new();
2588 request.id = Some(vec!["nonexistent".to_string()]);
2589 let result = dir_namespace.describe_table(request).await;
2590 assert!(result.is_err());
2591
2592 let buffer = create_test_ipc_data();
2594 let mut create_request = CreateTableRequest::new();
2595 create_request.id = Some(vec!["test_table".to_string()]);
2596 dir_namespace
2597 .create_table(create_request, Bytes::from(buffer))
2598 .await
2599 .unwrap();
2600
2601 let mut request = DescribeTableRequest::new();
2603 request.id = Some(vec!["test_table".to_string()]);
2604 let response = dir_namespace.describe_table(request).await.unwrap();
2605 assert!(response.location.is_some());
2606 assert!(response.location.unwrap().contains("test_table"));
2607 }
2608
2609 #[rstest]
2610 #[case::with_optimization(true)]
2611 #[case::without_optimization(false)]
2612 #[tokio::test]
2613 async fn test_manifest_namespace_drop_table(#[case] inline_optimization: bool) {
2614 let temp_dir = TempStdDir::default();
2615 let temp_path = temp_dir.to_str().unwrap();
2616
2617 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2618 .inline_optimization_enabled(inline_optimization)
2619 .build()
2620 .await
2621 .unwrap();
2622
2623 let buffer = create_test_ipc_data();
2625 let mut create_request = CreateTableRequest::new();
2626 create_request.id = Some(vec!["test_table".to_string()]);
2627 dir_namespace
2628 .create_table(create_request, Bytes::from(buffer))
2629 .await
2630 .unwrap();
2631
2632 let mut request = ListTablesRequest::new();
2634 request.id = Some(vec![]);
2635 let response = dir_namespace.list_tables(request).await.unwrap();
2636 assert_eq!(response.tables.len(), 1);
2637
2638 let mut drop_request = DropTableRequest::new();
2640 drop_request.id = Some(vec!["test_table".to_string()]);
2641 let _response = dir_namespace.drop_table(drop_request).await.unwrap();
2642
2643 let mut request = ListTablesRequest::new();
2645 request.id = Some(vec![]);
2646 let response = dir_namespace.list_tables(request).await.unwrap();
2647 assert_eq!(response.tables.len(), 0);
2648 }
2649
2650 #[rstest]
2651 #[case::with_optimization(true)]
2652 #[case::without_optimization(false)]
2653 #[tokio::test]
2654 async fn test_manifest_namespace_multiple_tables(#[case] inline_optimization: bool) {
2655 let temp_dir = TempStdDir::default();
2656 let temp_path = temp_dir.to_str().unwrap();
2657
2658 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2659 .inline_optimization_enabled(inline_optimization)
2660 .build()
2661 .await
2662 .unwrap();
2663
2664 let buffer = create_test_ipc_data();
2666 for i in 1..=3 {
2667 let mut create_request = CreateTableRequest::new();
2668 create_request.id = Some(vec![format!("table{}", i)]);
2669 dir_namespace
2670 .create_table(create_request, Bytes::from(buffer.clone()))
2671 .await
2672 .unwrap();
2673 }
2674
2675 let mut request = ListTablesRequest::new();
2677 request.id = Some(vec![]);
2678 let response = dir_namespace.list_tables(request).await.unwrap();
2679 assert_eq!(response.tables.len(), 3);
2680 assert!(response.tables.contains(&"table1".to_string()));
2681 assert!(response.tables.contains(&"table2".to_string()));
2682 assert!(response.tables.contains(&"table3".to_string()));
2683 }
2684
2685 #[rstest]
2686 #[case::with_optimization(true)]
2687 #[case::without_optimization(false)]
2688 #[tokio::test]
2689 async fn test_directory_only_mode(#[case] inline_optimization: bool) {
2690 let temp_dir = TempStdDir::default();
2691 let temp_path = temp_dir.to_str().unwrap();
2692
2693 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2695 .manifest_enabled(false)
2696 .inline_optimization_enabled(inline_optimization)
2697 .build()
2698 .await
2699 .unwrap();
2700
2701 let mut request = ListTablesRequest::new();
2703 request.id = Some(vec![]);
2704 let response = dir_namespace.list_tables(request).await.unwrap();
2705 assert_eq!(response.tables.len(), 0);
2706
2707 let buffer = create_test_ipc_data();
2709 let mut create_request = CreateTableRequest::new();
2710 create_request.id = Some(vec!["test_table".to_string()]);
2711
2712 let _response = dir_namespace
2714 .create_table(create_request, Bytes::from(buffer))
2715 .await
2716 .unwrap();
2717
2718 let mut request = ListTablesRequest::new();
2720 request.id = Some(vec![]);
2721 let response = dir_namespace.list_tables(request).await.unwrap();
2722 assert_eq!(response.tables.len(), 1);
2723 assert_eq!(response.tables[0], "test_table");
2724 }
2725
2726 #[rstest]
2727 #[case::with_optimization(true)]
2728 #[case::without_optimization(false)]
2729 #[tokio::test]
2730 async fn test_dual_mode_merge(#[case] inline_optimization: bool) {
2731 let temp_dir = TempStdDir::default();
2732 let temp_path = temp_dir.to_str().unwrap();
2733
2734 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2736 .manifest_enabled(true)
2737 .dir_listing_enabled(true)
2738 .inline_optimization_enabled(inline_optimization)
2739 .build()
2740 .await
2741 .unwrap();
2742
2743 let buffer = create_test_ipc_data();
2745 let mut create_request = CreateTableRequest::new();
2746 create_request.id = Some(vec!["table1".to_string()]);
2747 dir_namespace
2748 .create_table(create_request, Bytes::from(buffer))
2749 .await
2750 .unwrap();
2751
2752 let mut request = ListTablesRequest::new();
2754 request.id = Some(vec![]);
2755 let response = dir_namespace.list_tables(request).await.unwrap();
2756 assert_eq!(response.tables.len(), 1);
2757 assert_eq!(response.tables[0], "table1");
2758 }
2759
2760 #[rstest]
2761 #[case::with_optimization(true)]
2762 #[case::without_optimization(false)]
2763 #[tokio::test]
2764 async fn test_manifest_only_mode(#[case] inline_optimization: bool) {
2765 let temp_dir = TempStdDir::default();
2766 let temp_path = temp_dir.to_str().unwrap();
2767
2768 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2770 .manifest_enabled(true)
2771 .dir_listing_enabled(false)
2772 .inline_optimization_enabled(inline_optimization)
2773 .build()
2774 .await
2775 .unwrap();
2776
2777 let buffer = create_test_ipc_data();
2779 let mut create_request = CreateTableRequest::new();
2780 create_request.id = Some(vec!["test_table".to_string()]);
2781 dir_namespace
2782 .create_table(create_request, Bytes::from(buffer))
2783 .await
2784 .unwrap();
2785
2786 let mut request = ListTablesRequest::new();
2788 request.id = Some(vec![]);
2789 let response = dir_namespace.list_tables(request).await.unwrap();
2790 assert_eq!(response.tables.len(), 1);
2791 assert_eq!(response.tables[0], "test_table");
2792 }
2793
2794 #[rstest]
2795 #[case::with_optimization(true)]
2796 #[case::without_optimization(false)]
2797 #[tokio::test]
2798 async fn test_drop_nonexistent_table(#[case] inline_optimization: bool) {
2799 let temp_dir = TempStdDir::default();
2800 let temp_path = temp_dir.to_str().unwrap();
2801
2802 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2803 .inline_optimization_enabled(inline_optimization)
2804 .build()
2805 .await
2806 .unwrap();
2807
2808 let mut drop_request = DropTableRequest::new();
2810 drop_request.id = Some(vec!["nonexistent".to_string()]);
2811 let result = dir_namespace.drop_table(drop_request).await;
2812 assert!(result.is_err());
2813 }
2814
2815 #[rstest]
2816 #[case::with_optimization(true)]
2817 #[case::without_optimization(false)]
2818 #[tokio::test]
2819 async fn test_create_duplicate_table_fails(#[case] inline_optimization: bool) {
2820 let temp_dir = TempStdDir::default();
2821 let temp_path = temp_dir.to_str().unwrap();
2822
2823 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2824 .inline_optimization_enabled(inline_optimization)
2825 .build()
2826 .await
2827 .unwrap();
2828
2829 let buffer = create_test_ipc_data();
2831 let mut create_request = CreateTableRequest::new();
2832 create_request.id = Some(vec!["test_table".to_string()]);
2833 dir_namespace
2834 .create_table(create_request, Bytes::from(buffer.clone()))
2835 .await
2836 .unwrap();
2837
2838 let mut create_request = CreateTableRequest::new();
2840 create_request.id = Some(vec!["test_table".to_string()]);
2841 let result = dir_namespace
2842 .create_table(create_request, Bytes::from(buffer))
2843 .await;
2844 assert!(result.is_err());
2845 }
2846
2847 #[rstest]
2848 #[case::with_optimization(true)]
2849 #[case::without_optimization(false)]
2850 #[tokio::test]
2851 async fn test_create_child_namespace(#[case] inline_optimization: bool) {
2852 use lance_namespace::models::{
2853 CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2854 };
2855
2856 let temp_dir = TempStdDir::default();
2857 let temp_path = temp_dir.to_str().unwrap();
2858
2859 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2860 .inline_optimization_enabled(inline_optimization)
2861 .build()
2862 .await
2863 .unwrap();
2864
2865 let mut create_req = CreateNamespaceRequest::new();
2867 create_req.id = Some(vec!["ns1".to_string()]);
2868 let result = dir_namespace.create_namespace(create_req).await;
2869 assert!(
2870 result.is_ok(),
2871 "Failed to create child namespace: {:?}",
2872 result.err()
2873 );
2874
2875 let exists_req = NamespaceExistsRequest {
2877 id: Some(vec!["ns1".to_string()]),
2878 ..Default::default()
2879 };
2880 let result = dir_namespace.namespace_exists(exists_req).await;
2881 assert!(result.is_ok(), "Namespace should exist");
2882
2883 let list_req = ListNamespacesRequest {
2885 id: Some(vec![]),
2886 page_token: None,
2887 limit: None,
2888 ..Default::default()
2889 };
2890 let result = dir_namespace.list_namespaces(list_req).await;
2891 assert!(result.is_ok());
2892 let namespaces = result.unwrap();
2893 assert_eq!(namespaces.namespaces.len(), 1);
2894 assert_eq!(namespaces.namespaces[0], "ns1");
2895 }
2896
2897 #[rstest]
2898 #[case::with_optimization(true)]
2899 #[case::without_optimization(false)]
2900 #[tokio::test]
2901 async fn test_create_nested_namespace(#[case] inline_optimization: bool) {
2902 use lance_namespace::models::{
2903 CreateNamespaceRequest, ListNamespacesRequest, NamespaceExistsRequest,
2904 };
2905
2906 let temp_dir = TempStdDir::default();
2907 let temp_path = temp_dir.to_str().unwrap();
2908
2909 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2910 .inline_optimization_enabled(inline_optimization)
2911 .build()
2912 .await
2913 .unwrap();
2914
2915 let mut create_req = CreateNamespaceRequest::new();
2917 create_req.id = Some(vec!["parent".to_string()]);
2918 dir_namespace.create_namespace(create_req).await.unwrap();
2919
2920 let mut create_req = CreateNamespaceRequest::new();
2922 create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
2923 let result = dir_namespace.create_namespace(create_req).await;
2924 assert!(
2925 result.is_ok(),
2926 "Failed to create nested namespace: {:?}",
2927 result.err()
2928 );
2929
2930 let exists_req = NamespaceExistsRequest {
2932 id: Some(vec!["parent".to_string(), "child".to_string()]),
2933 ..Default::default()
2934 };
2935 let result = dir_namespace.namespace_exists(exists_req).await;
2936 assert!(result.is_ok(), "Nested namespace should exist");
2937
2938 let list_req = ListNamespacesRequest {
2940 id: Some(vec!["parent".to_string()]),
2941 page_token: None,
2942 limit: None,
2943 ..Default::default()
2944 };
2945 let result = dir_namespace.list_namespaces(list_req).await;
2946 assert!(result.is_ok());
2947 let namespaces = result.unwrap();
2948 assert_eq!(namespaces.namespaces.len(), 1);
2949 assert_eq!(namespaces.namespaces[0], "child");
2950 }
2951
2952 #[rstest]
2953 #[case::with_optimization(true)]
2954 #[case::without_optimization(false)]
2955 #[tokio::test]
2956 async fn test_create_namespace_without_parent_fails(#[case] inline_optimization: bool) {
2957 use lance_namespace::models::CreateNamespaceRequest;
2958
2959 let temp_dir = TempStdDir::default();
2960 let temp_path = temp_dir.to_str().unwrap();
2961
2962 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2963 .inline_optimization_enabled(inline_optimization)
2964 .build()
2965 .await
2966 .unwrap();
2967
2968 let mut create_req = CreateNamespaceRequest::new();
2970 create_req.id = Some(vec!["nonexistent_parent".to_string(), "child".to_string()]);
2971 let result = dir_namespace.create_namespace(create_req).await;
2972 assert!(result.is_err(), "Should fail when parent doesn't exist");
2973 }
2974
2975 #[rstest]
2976 #[case::with_optimization(true)]
2977 #[case::without_optimization(false)]
2978 #[tokio::test]
2979 async fn test_drop_child_namespace(#[case] inline_optimization: bool) {
2980 use lance_namespace::models::{
2981 CreateNamespaceRequest, DropNamespaceRequest, NamespaceExistsRequest,
2982 };
2983
2984 let temp_dir = TempStdDir::default();
2985 let temp_path = temp_dir.to_str().unwrap();
2986
2987 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
2988 .inline_optimization_enabled(inline_optimization)
2989 .build()
2990 .await
2991 .unwrap();
2992
2993 let mut create_req = CreateNamespaceRequest::new();
2995 create_req.id = Some(vec!["ns1".to_string()]);
2996 dir_namespace.create_namespace(create_req).await.unwrap();
2997
2998 let mut drop_req = DropNamespaceRequest::new();
3000 drop_req.id = Some(vec!["ns1".to_string()]);
3001 let result = dir_namespace.drop_namespace(drop_req).await;
3002 assert!(
3003 result.is_ok(),
3004 "Failed to drop namespace: {:?}",
3005 result.err()
3006 );
3007
3008 let exists_req = NamespaceExistsRequest {
3010 id: Some(vec!["ns1".to_string()]),
3011 ..Default::default()
3012 };
3013 let result = dir_namespace.namespace_exists(exists_req).await;
3014 assert!(result.is_err(), "Namespace should not exist after drop");
3015 }
3016
3017 #[rstest]
3018 #[case::with_optimization(true)]
3019 #[case::without_optimization(false)]
3020 #[tokio::test]
3021 async fn test_drop_namespace_with_children_fails(#[case] inline_optimization: bool) {
3022 use lance_namespace::models::{CreateNamespaceRequest, DropNamespaceRequest};
3023
3024 let temp_dir = TempStdDir::default();
3025 let temp_path = temp_dir.to_str().unwrap();
3026
3027 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3028 .inline_optimization_enabled(inline_optimization)
3029 .build()
3030 .await
3031 .unwrap();
3032
3033 let mut create_req = CreateNamespaceRequest::new();
3035 create_req.id = Some(vec!["parent".to_string()]);
3036 dir_namespace.create_namespace(create_req).await.unwrap();
3037
3038 let mut create_req = CreateNamespaceRequest::new();
3039 create_req.id = Some(vec!["parent".to_string(), "child".to_string()]);
3040 dir_namespace.create_namespace(create_req).await.unwrap();
3041
3042 let mut drop_req = DropNamespaceRequest::new();
3044 drop_req.id = Some(vec!["parent".to_string()]);
3045 let result = dir_namespace.drop_namespace(drop_req).await;
3046 assert!(result.is_err(), "Should fail when namespace has children");
3047 }
3048
3049 #[rstest]
3050 #[case::with_optimization(true)]
3051 #[case::without_optimization(false)]
3052 #[tokio::test]
3053 async fn test_create_table_in_child_namespace(#[case] inline_optimization: bool) {
3054 use lance_namespace::models::{
3055 CreateNamespaceRequest, CreateTableRequest, ListTablesRequest,
3056 };
3057
3058 let temp_dir = TempStdDir::default();
3059 let temp_path = temp_dir.to_str().unwrap();
3060
3061 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3062 .inline_optimization_enabled(inline_optimization)
3063 .build()
3064 .await
3065 .unwrap();
3066
3067 let mut create_ns_req = CreateNamespaceRequest::new();
3069 create_ns_req.id = Some(vec!["ns1".to_string()]);
3070 dir_namespace.create_namespace(create_ns_req).await.unwrap();
3071
3072 let buffer = create_test_ipc_data();
3074 let mut create_table_req = CreateTableRequest::new();
3075 create_table_req.id = Some(vec!["ns1".to_string(), "table1".to_string()]);
3076 let result = dir_namespace
3077 .create_table(create_table_req, Bytes::from(buffer))
3078 .await;
3079 assert!(
3080 result.is_ok(),
3081 "Failed to create table in child namespace: {:?}",
3082 result.err()
3083 );
3084
3085 let list_req = ListTablesRequest {
3087 id: Some(vec!["ns1".to_string()]),
3088 page_token: None,
3089 limit: None,
3090 ..Default::default()
3091 };
3092 let result = dir_namespace.list_tables(list_req).await;
3093 assert!(result.is_ok());
3094 let tables = result.unwrap();
3095 assert_eq!(tables.tables.len(), 1);
3096 assert_eq!(tables.tables[0], "table1");
3097 }
3098
3099 #[rstest]
3100 #[case::with_optimization(true)]
3101 #[case::without_optimization(false)]
3102 #[tokio::test]
3103 async fn test_describe_child_namespace(#[case] inline_optimization: bool) {
3104 use lance_namespace::models::{CreateNamespaceRequest, DescribeNamespaceRequest};
3105
3106 let temp_dir = TempStdDir::default();
3107 let temp_path = temp_dir.to_str().unwrap();
3108
3109 let dir_namespace = DirectoryNamespaceBuilder::new(temp_path)
3110 .inline_optimization_enabled(inline_optimization)
3111 .build()
3112 .await
3113 .unwrap();
3114
3115 let mut properties = std::collections::HashMap::new();
3117 properties.insert("key1".to_string(), "value1".to_string());
3118
3119 let mut create_req = CreateNamespaceRequest::new();
3120 create_req.id = Some(vec!["ns1".to_string()]);
3121 create_req.properties = Some(properties.clone());
3122 dir_namespace.create_namespace(create_req).await.unwrap();
3123
3124 let describe_req = DescribeNamespaceRequest {
3126 id: Some(vec!["ns1".to_string()]),
3127 ..Default::default()
3128 };
3129 let result = dir_namespace.describe_namespace(describe_req).await;
3130 assert!(
3131 result.is_ok(),
3132 "Failed to describe namespace: {:?}",
3133 result.err()
3134 );
3135 let response = result.unwrap();
3136 assert!(response.properties.is_some());
3137 assert_eq!(
3138 response.properties.unwrap().get("key1"),
3139 Some(&"value1".to_string())
3140 );
3141 }
3142
3143 #[rstest]
3144 #[case::with_optimization(true)]
3145 #[case::without_optimization(false)]
3146 #[tokio::test]
3147 async fn test_concurrent_create_and_drop_single_instance(#[case] inline_optimization: bool) {
3148 use futures::future::join_all;
3149 use std::sync::Arc;
3150
3151 let temp_dir = TempStdDir::default();
3152 let temp_path = temp_dir.to_str().unwrap();
3153
3154 let dir_namespace = Arc::new(
3155 DirectoryNamespaceBuilder::new(temp_path)
3156 .inline_optimization_enabled(inline_optimization)
3157 .build()
3158 .await
3159 .unwrap(),
3160 );
3161
3162 let mut create_ns_request = CreateNamespaceRequest::new();
3165 create_ns_request.id = Some(vec!["test_ns".to_string()]);
3166 dir_namespace
3167 .create_namespace(create_ns_request)
3168 .await
3169 .unwrap();
3170
3171 let num_tables = 10;
3172 let mut handles = Vec::new();
3173
3174 for i in 0..num_tables {
3175 let ns = dir_namespace.clone();
3176 let handle = async move {
3177 let table_name = format!("concurrent_table_{}", i);
3178 let table_id = vec!["test_ns".to_string(), table_name.clone()];
3179 let buffer = create_test_ipc_data();
3180
3181 let mut create_request = CreateTableRequest::new();
3183 create_request.id = Some(table_id.clone());
3184 ns.create_table(create_request, Bytes::from(buffer))
3185 .await
3186 .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3187
3188 let mut drop_request = DropTableRequest::new();
3190 drop_request.id = Some(table_id);
3191 ns.drop_table(drop_request)
3192 .await
3193 .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3194
3195 Ok::<_, lance_core::Error>(())
3196 };
3197 handles.push(handle);
3198 }
3199
3200 let results = join_all(handles).await;
3201 for result in results {
3202 assert!(result.is_ok(), "All concurrent operations should succeed");
3203 }
3204
3205 let mut request = ListTablesRequest::new();
3207 request.id = Some(vec!["test_ns".to_string()]);
3208 let response = dir_namespace.list_tables(request).await.unwrap();
3209 assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3210 }
3211
3212 #[rstest]
3213 #[case::with_optimization(true)]
3214 #[case::without_optimization(false)]
3215 #[tokio::test]
3216 async fn test_concurrent_create_and_drop_multiple_instances(#[case] inline_optimization: bool) {
3217 use futures::future::join_all;
3218
3219 let temp_dir = TempStdDir::default();
3220 let temp_path = temp_dir.to_str().unwrap().to_string();
3221
3222 let init_ns = DirectoryNamespaceBuilder::new(&temp_path)
3225 .inline_optimization_enabled(inline_optimization)
3226 .build()
3227 .await
3228 .unwrap();
3229 let mut create_ns_request = CreateNamespaceRequest::new();
3230 create_ns_request.id = Some(vec!["test_ns".to_string()]);
3231 init_ns.create_namespace(create_ns_request).await.unwrap();
3232
3233 let num_tables = 10;
3234 let mut handles = Vec::new();
3235
3236 for i in 0..num_tables {
3237 let path = temp_path.clone();
3238 let handle = async move {
3239 let ns = DirectoryNamespaceBuilder::new(&path)
3241 .inline_optimization_enabled(inline_optimization)
3242 .build()
3243 .await
3244 .unwrap();
3245
3246 let table_name = format!("multi_ns_table_{}", i);
3247 let table_id = vec!["test_ns".to_string(), table_name.clone()];
3248 let buffer = create_test_ipc_data();
3249
3250 let mut create_request = CreateTableRequest::new();
3252 create_request.id = Some(table_id.clone());
3253 ns.create_table(create_request, Bytes::from(buffer))
3254 .await
3255 .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3256
3257 let mut drop_request = DropTableRequest::new();
3259 drop_request.id = Some(table_id);
3260 ns.drop_table(drop_request)
3261 .await
3262 .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3263
3264 Ok::<_, lance_core::Error>(())
3265 };
3266 handles.push(handle);
3267 }
3268
3269 let results = join_all(handles).await;
3270 for result in results {
3271 assert!(result.is_ok(), "All concurrent operations should succeed");
3272 }
3273
3274 let verify_ns = DirectoryNamespaceBuilder::new(&temp_path)
3276 .inline_optimization_enabled(inline_optimization)
3277 .build()
3278 .await
3279 .unwrap();
3280
3281 let mut request = ListTablesRequest::new();
3282 request.id = Some(vec!["test_ns".to_string()]);
3283 let response = verify_ns.list_tables(request).await.unwrap();
3284 assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3285 }
3286
3287 #[rstest]
3288 #[case::with_optimization(true)]
3289 #[case::without_optimization(false)]
3290 #[tokio::test]
3291 async fn test_concurrent_create_then_drop_from_different_instance(
3292 #[case] inline_optimization: bool,
3293 ) {
3294 use futures::future::join_all;
3295
3296 let temp_dir = TempStdDir::default();
3297 let temp_path = temp_dir.to_str().unwrap().to_string();
3298
3299 let init_ns = DirectoryNamespaceBuilder::new(&temp_path)
3302 .inline_optimization_enabled(inline_optimization)
3303 .build()
3304 .await
3305 .unwrap();
3306 let mut create_ns_request = CreateNamespaceRequest::new();
3307 create_ns_request.id = Some(vec!["test_ns".to_string()]);
3308 init_ns.create_namespace(create_ns_request).await.unwrap();
3309
3310 let num_tables = 10;
3311
3312 let mut create_handles = Vec::new();
3314 for i in 0..num_tables {
3315 let path = temp_path.clone();
3316 let handle = async move {
3317 let ns = DirectoryNamespaceBuilder::new(&path)
3318 .inline_optimization_enabled(inline_optimization)
3319 .build()
3320 .await
3321 .unwrap();
3322
3323 let table_name = format!("cross_instance_table_{}", i);
3324 let table_id = vec!["test_ns".to_string(), table_name.clone()];
3325 let buffer = create_test_ipc_data();
3326
3327 let mut create_request = CreateTableRequest::new();
3328 create_request.id = Some(table_id);
3329 ns.create_table(create_request, Bytes::from(buffer))
3330 .await
3331 .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e));
3332
3333 Ok::<_, lance_core::Error>(())
3334 };
3335 create_handles.push(handle);
3336 }
3337
3338 let create_results = join_all(create_handles).await;
3339 for result in create_results {
3340 assert!(result.is_ok(), "All create operations should succeed");
3341 }
3342
3343 let mut drop_handles = Vec::new();
3345 for i in 0..num_tables {
3346 let path = temp_path.clone();
3347 let handle = async move {
3348 let ns = DirectoryNamespaceBuilder::new(&path)
3349 .inline_optimization_enabled(inline_optimization)
3350 .build()
3351 .await
3352 .unwrap();
3353
3354 let table_name = format!("cross_instance_table_{}", i);
3355 let table_id = vec!["test_ns".to_string(), table_name.clone()];
3356
3357 let mut drop_request = DropTableRequest::new();
3358 drop_request.id = Some(table_id);
3359 ns.drop_table(drop_request)
3360 .await
3361 .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e));
3362
3363 Ok::<_, lance_core::Error>(())
3364 };
3365 drop_handles.push(handle);
3366 }
3367
3368 let drop_results = join_all(drop_handles).await;
3369 for result in drop_results {
3370 assert!(result.is_ok(), "All drop operations should succeed");
3371 }
3372
3373 let verify_ns = DirectoryNamespaceBuilder::new(&temp_path)
3375 .inline_optimization_enabled(inline_optimization)
3376 .build()
3377 .await
3378 .unwrap();
3379
3380 let mut request = ListTablesRequest::new();
3381 request.id = Some(vec!["test_ns".to_string()]);
3382 let response = verify_ns.list_tables(request).await.unwrap();
3383 assert_eq!(response.tables.len(), 0, "All tables should be dropped");
3384 }
3385
3386 #[test]
3387 fn test_construct_full_uri_with_cloud_urls() {
3388 let s3_result =
3390 ManifestNamespace::construct_full_uri("s3://bucket/path/subdir", "table.lance")
3391 .unwrap();
3392 assert_eq!(
3393 s3_result, "s3://bucket/path/subdir/table.lance",
3394 "S3 URL should correctly append table name to nested path"
3395 );
3396
3397 let az_result =
3399 ManifestNamespace::construct_full_uri("az://container/path/subdir", "table.lance")
3400 .unwrap();
3401 assert_eq!(
3402 az_result, "az://container/path/subdir/table.lance",
3403 "Azure URL should correctly append table name to nested path"
3404 );
3405
3406 let gs_result =
3408 ManifestNamespace::construct_full_uri("gs://bucket/path/subdir", "table.lance")
3409 .unwrap();
3410 assert_eq!(
3411 gs_result, "gs://bucket/path/subdir/table.lance",
3412 "GCS URL should correctly append table name to nested path"
3413 );
3414
3415 let deep_result =
3417 ManifestNamespace::construct_full_uri("s3://bucket/a/b/c/d", "my_table.lance").unwrap();
3418 assert_eq!(
3419 deep_result, "s3://bucket/a/b/c/d/my_table.lance",
3420 "Deeply nested path should work correctly"
3421 );
3422
3423 let shallow_result =
3425 ManifestNamespace::construct_full_uri("s3://bucket", "table.lance").unwrap();
3426 assert_eq!(
3427 shallow_result, "s3://bucket/table.lance",
3428 "Single-level nested path should work correctly"
3429 );
3430
3431 let trailing_slash_result =
3433 ManifestNamespace::construct_full_uri("s3://bucket/path/subdir/", "table.lance")
3434 .unwrap();
3435 assert_eq!(
3436 trailing_slash_result, "s3://bucket/path/subdir/table.lance",
3437 "URL with existing trailing slash should still work"
3438 );
3439 }
3440
3441 #[tokio::test]
3448 async fn test_concurrent_create_table_no_duplicates() {
3449 let temp_dir = TempStdDir::default();
3450 let temp_path = temp_dir.to_str().unwrap();
3451
3452 let ns1 = DirectoryNamespaceBuilder::new(temp_path)
3455 .inline_optimization_enabled(false)
3456 .build()
3457 .await
3458 .unwrap();
3459 let ns2 = DirectoryNamespaceBuilder::new(temp_path)
3460 .inline_optimization_enabled(false)
3461 .build()
3462 .await
3463 .unwrap();
3464
3465 let buffer = create_test_ipc_data();
3466
3467 let mut req1 = CreateTableRequest::new();
3468 req1.id = Some(vec!["race_table".to_string()]);
3469 let mut req2 = CreateTableRequest::new();
3470 req2.id = Some(vec!["race_table".to_string()]);
3471
3472 let (result1, result2) = tokio::join!(
3474 ns1.create_table(req1, Bytes::from(buffer.clone())),
3475 ns2.create_table(req2, Bytes::from(buffer.clone())),
3476 );
3477
3478 let success_count = [&result1, &result2].iter().filter(|r| r.is_ok()).count();
3480 let failure_count = [&result1, &result2].iter().filter(|r| r.is_err()).count();
3481 assert_eq!(
3482 success_count, 1,
3483 "Exactly one create should succeed, got: result1={:?}, result2={:?}",
3484 result1, result2
3485 );
3486 assert_eq!(
3487 failure_count, 1,
3488 "Exactly one create should fail, got: result1={:?}, result2={:?}",
3489 result1, result2
3490 );
3491
3492 let ns_check = DirectoryNamespaceBuilder::new(temp_path)
3494 .inline_optimization_enabled(false)
3495 .build()
3496 .await
3497 .unwrap();
3498 let mut list_request = ListTablesRequest::new();
3499 list_request.id = Some(vec![]);
3500 let response = ns_check.list_tables(list_request).await.unwrap();
3501 assert_eq!(
3502 response.tables.len(),
3503 1,
3504 "Should have exactly 1 table, found: {:?}",
3505 response.tables
3506 );
3507 assert_eq!(response.tables[0], "race_table");
3508
3509 let mut describe_request = DescribeTableRequest::new();
3511 describe_request.id = Some(vec!["race_table".to_string()]);
3512 let describe_result = ns_check.describe_table(describe_request).await;
3513 assert!(
3514 describe_result.is_ok(),
3515 "describe_table should not fail with duplicate entries: {:?}",
3516 describe_result
3517 );
3518 }
3519}